Creating a Custom Data Task

This tutorial shows how a Data Task component can be created, configured and built. The goal of this tutorial is to give developers a starting point for creating their own Data Task.

The RTC Tk does not provide any final Data Task applications that can be used in a real world RTC out of the box. Instead, the toolkit provides a reference design and Data Task support libraries that can be used by instrument RTC developers to create custom data task applications.

The idea is to have users implement a custom Business Logic class and a Computation class. The former defines the behaviour for the different stages of the component’s life-cycle (i.e. it implements the activity methods) and the latter provides and confines the data task’s specific computation algorithm.

Note

For simplicity reasons, the example uses the file based implementation of OLDB, Persistent and Runtime Configuration Repositories; as well as file based service discovery. This means that the underlying format of configuration and data points is different than the one used when the above mentioned services are used with the standard back-ends such as CII configuration service, CII OLDB, etc.

Provided Examples

This customisation of the Data Task will be discussed in reference to the provided examples. The main aim of this example is not to give a fully fledged Data Task that can be used as-is, but to show how the pieces fit together. The following example Data Tasks are provided:

  • Periodic Telemetry Data Task;

  • Measure Telemetry Data Task;

  • Optimise Data Task (with Python support);

  • GPU and Python integration.

This tutorial will focus on the Periodic Telemetry Data Task. Other Data Task flavours will also be mentioned but with less detail and with focus on important differences only.

Periodic Telemetry Data Task. In this section, it is shown how the BusinessLogic can deploy and configure a Computation class to process incoming telemetry data. The example aims at demonstrating how to configure a Data Task that collects telemetry and performs a computation after N frames of data are collected. Once these N frames of data are collected, a simple computation is triggered in the Computation class, aiming at emulating the function of a loop monitor. (See section Periodic Telemetry Data Task)

Measure Telemetry Data Task. In this section, it is shown how the Measure interface can be used with the Computation class to perform on-demand calculations from telemetry sources and return a result. This example is similar to measuring a dark map of a wavefront camera. (See section Measure Telemetry Data Task)

Optimise Data Task. In this section, it is shown how the Optimise command can be used to perform on-demand calculation from data that is in the Runtime Configuration Repository. The example also shows how the JSON can be parsed and allows multiple algorithms to be handled. (See section Optimise Data Task)

GPU and Python integration. See section GPU and Python integration for more information.

Periodic Telemetry Data Task

The example shows how to configure the BusinessLogic and interface with the Computation class to process the telemetry data and perform a very simple calculation.

The Computation class implements a callback to copy the wavefront sensor slope vector to a local sample buffer. This buffer is then averaged to calculate an average slope vector which is then projected to an average modes vector, using a matrix vector multiplication against a slopes2modes matrix.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask/telemetry

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration

  • scripts - Helper scripts for deployment and control

The example also uses common libraries that are shared with other examples:

Dependencies

The provided example code depends on the following modules:

  • Component Framework - for RTC Component functionality with Services

  • Reusable Components - for Data Task support libraries

  • Client Application - to steer the application

Running the Periodic Telemetry Data Task Example

The exampleDataTask can be run in isolation or as part of a SRTC system. The only requirement for the Data Task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational.

To make the example as simple as possible, a script rtctkExampleDataTaskTelemetry.sh is provided to help bring the data task and supporting infrastructure online.

rtctkExampleDataTaskTelemetry.sh

After installing the RTC Tk, run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTaskTelemetry.sh deploy
rtctkExampleDataTaskTelemetry.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskTelemetry.sh send Init
rtctkExampleDataTaskTelemetry.sh send Enable
rtctkExampleDataTaskTelemetry.sh send Run
rtctkExampleDataTaskTelemetry.sh send Idle
rtctkExampleDataTaskTelemetry.sh send Disable
rtctkExampleDataTaskTelemetry.sh send Reset

# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskTelemetry.sh stop
rtctkExampleDataTaskTelemetry.sh undeploy

During the deploy action a python script is invoked to produce the required FITS input files (see section Generating Input Data).

Generating Input Data

As we do not want to supply large FITS files within our git repository, the decision was made to provide a python script that can be used to generate the required data. The python script to generate the data required by the exampleDataTask is found in the following location:

_examples/exampleDataTask/common/genFitsData

It provides a script called with three arguments as follows:

$ rtctkExampleDataTaskGenFitsData <num_slopes> <num_modes> <filepath>

This will create an identity slopes2modes matrix that is required by the data task. This is by default called by the script “rtctkExampleDataTaskTelemetry.sh” during the deploy action.

Data Task Development Guide

This section explains how to create a simple telemetry based Data Task from scratch. See Data Task for more information about specifics; in this section the development of a custom data task is covered. The use of the exampleDataTask will be the focus.

Business Logic Class

#include "computation.hpp"

#include <rtctk/componentFramework/rtcComponent.hpp>
#include <rtctk/componentFramework/runnable.hpp>
#include <rtctk/componentFramework/runtimeRepoIf.hpp>
#include <rtctk/componentFramework/typedEventService.hpp>

using namespace rtctk::componentFramework;

using LifeCycle = Runnable<RtcComponent>;

class BusinessLogic : public LifeCycle::BizLogicIf, public Computation::ResultIf {
public:
    using ComponentType = LifeCycle;

    BusinessLogic(const std::string& name, ServiceContainer& services)
        : m_name(name)
        , m_services(services)
        , m_rtr(services.Get<RuntimeRepoIf>())
        , m_esv(services.Get<EventServiceIf>())
        , m_ev_pub_conf_updated(m_esv.MakePublisher<ConfigurationUpdatedEvent>())
        , m_dp_queue_name("/" + name + "/static/queue_name")
        , m_dp_thread_policies("/" + name + "/static/thread_policies")
        , m_dp_samples_to_read("/" + name + "/static/samples_to_read")
        , m_dp_samples_to_skip("/" + name + "/static/samples_to_skip")
        , m_dp_sample_timeout("/" + name + "/static/sample_timeout")
        , m_dp_slopes("/" + name + "/static/slopes")
        , m_dp_modes("/" + name + "/static/modes")
        , m_dp_s2m("/" + name + "/" + "dynamic/slopes2modes")
        , m_dp_avg_slopes("/" + name + "/dynamic/avg_slopes")
        , m_dp_avg_modes("/" + name + "/dynamic/avg_modes") {
    }

    // activities
    void ActivityStarting(StopToken st) override {
        m_computation.reset();
    }

    void ActivityInitialising(StopToken st) override {
        m_computation.reset();

        auto shm_queue_name = m_rtr.GetDataPoint<std::string>(m_dp_queue_name);
        size_t samples_to_read = m_rtr.GetDataPoint<uint32_t>(m_dp_samples_to_read);
        size_t samples_to_skip = m_rtr.GetDataPoint<uint32_t>(m_dp_samples_to_skip);
        auto sample_timeout = milliseconds(m_rtr.GetDataPoint<uint32_t>(m_dp_sample_timeout));
        auto thread_policies = GetNumaPolicies(m_rtr, m_dp_thread_policies);
        size_t num_slopes = m_rtr.GetDataPoint<uint32_t>(m_dp_slopes);
        size_t num_modes = m_rtr.GetDataPoint<uint32_t>(m_dp_modes);

        m_computation = std::make_unique<Computation>(m_services,
                                                      *this,
                                                      shm_queue_name,
                                                      samples_to_read,
                                                      samples_to_skip,
                                                      sample_timeout,
                                                      thread_policies,
                                                      num_slopes,
                                                      num_modes);

        m_computation->SetSlopesToModesMatrix(m_rtr.GetDataPoint<MatrixBuffer<float>>(m_dp_s2m));
    }

    void ActivityEnabling(StopToken st) override {
        m_computation->Spawn();
    }

    void ActivityDisabling(StopToken st) override {
        m_computation->Join();
    }

    void ActivityGoingRunning(StopToken st) override {
        m_computation->Run();
    }

    void ActivityGoingIdle(StopToken st) override {
        m_computation->Idle();
    }

    void ActivityRunning(StopToken st) override {
        while (not st.StopRequested()) {
            m_computation->CheckErrors();
            std::this_thread::sleep_for(100ms);
        }
    }

    void ActivityRecovering(StopToken st) override {
        m_computation->Join();
        m_computation->Spawn();
    }

    void ActivityUpdating(StopToken st, JsonPayload const& args) override {
        m_computation->SetSlopesToModesMatrix(m_rtr.GetDataPoint<MatrixBuffer<float>>(m_dp_s2m));
    }

private:
    void OnResultAvailable(Computation::Result const& result) override {
        // write output back to runtime repo
        m_rtr.WriteDataPoint<std::vector<float>>(m_dp_avg_slopes, result.avg_slopes);
        m_rtr.WriteDataPoint<std::vector<float>>(m_dp_avg_modes, result.avg_modes);

        // we can notify multiple updated datapoints using a single event
        m_ev_pub_conf_updated->Publish(
            {m_name, {m_dp_avg_slopes.ToString(), m_dp_avg_modes.ToString()}});
    }

    std::string m_name;
    ServiceContainer& m_services;
    RuntimeRepoIf& m_rtr;
    TypedEventService m_esv;

    // event publishers
    std::unique_ptr<TypedEventPublisher<ConfigurationUpdatedEvent>> m_ev_pub_conf_updated;

    // rtr datapoints (inputs)
    DataPointPath m_dp_queue_name;
    DataPointPath m_dp_thread_policies;
    DataPointPath m_dp_samples_to_read;
    DataPointPath m_dp_samples_to_skip;
    DataPointPath m_dp_sample_timeout;
    DataPointPath m_dp_slopes;
    DataPointPath m_dp_modes;
    DataPointPath m_dp_s2m;

    // rtr datapoints (outputs)
    DataPointPath m_dp_avg_slopes;
    DataPointPath m_dp_avg_modes;

    std::unique_ptr<Computation> m_computation;
};

In ActivityInitialising, the Computation object is created by passing the required services and the static configuration parameters to its constructor.

In ActivityEnabling, the computation object spawns its worker thread, here the object connects to the shared memory queue and goes to Idle state where it continuously resets its shared memory read queue to be ready to process fresh data.

Note

The user-hook OnThreadStart will be invoked at this stage.

In ActivityDisabling, the computation object joins its thread and disconnects from the shared memory queue.

In ActivityGoingRunning, the computation object is commanded to enter state Running where it will begin to process data from the shared memory queue.

In ActivityGoingIdle, the computation object is commanded to enter state Idle, where the object will stay attached to the shared memory queue but will continuously reset its shared memory read queue to be ready to process fresh data.

In ActivityRunning, the Data Task performs computations. While the computation object worker thread performs its Read, Compute, Publish and Skip cycles, the BusinessLogic only observes the computation object by continuously checking for errors.

Note

In this state computation class user-hooks OnCycleStart, CopyData, Compute, Publish are continuously being invoked.

In case an error occurs, CheckErrors will throw an exception and the component will enter the state On::Operational::Error.

In ActivityRecovering, the Data Task performs error recovery actions. The safest way to recover from asynchronous errors is to re-spawn the worker thread. This will cause the shared memory reader to reconnect to the shared memory queue.

In ActivityUpdating, the dynamic configuration of the data task is being updated. In addition to user-specific dynamic computation parameters, the samples_to_read and samples_to_skip parameters may be changed dynamically using the provided setters if necessary.

Computation Class

The Computation class is the place where the largest effort will be put during implementation. In the exampleDataTask a very simple calculation is implemented to show how this can be done. The computation is a projection from slopes to modes, this is done via a Matrix Vector Multiplication.

#include <rtctk/componentFramework/serviceContainer.hpp>
#include <rtctk/componentFramework/matrixBuffer.hpp>

#include <rtctk/exampleTopics/topics.hpp>
#include <rtctk/dataTask/computationBase.hpp>

#include <chrono>
#include <mutex>
#include <optional>
#include <vector>

// the type of the shm super topic
using TopicType = rtctk::exampleTopic::ScaoLoopTopic;

// type of the user-owned sample buffer
using BufferType = std::vector<decltype(decltype(TopicType::wfs)::slopes)>;

class Computation : public dataTask::ComputationBase<TopicType> {
public:
    using ServiceContainer = rtctk::componentFramework::ServiceContainer;

    template <typename T>
    using MatrixBuffer = rtctk::componentFramework::MatrixBuffer<T>;

    struct Result {
        std::vector<float> avg_slopes;
        std::vector<float> avg_modes;
    };

    struct ResultIf {
        virtual ~ResultIf() = default;
        virtual void OnResultAvailable(Result const& result) = 0;
    };

    Computation(ServiceContainer& services,
                ResultIf& publisher,
                std::string const& shm_name,
                size_t to_read,
                size_t to_skip,
                std::chrono::milliseconds sample_timeout,
                std::optional<numapp::NumaPolicies> thread_policies,
                size_t n_slopes,
                size_t n_modes);
        : ComputationBase(services, shm_name, to_read, to_skip, sample_timeout, thread_policies)
        , m_publisher(publisher)
        , m_slopes(n_slopes)
        , m_modes(n_modes) {
        // set vectors and matrix to correct size.
        m_sample_buffer.resize(to_read);
        m_result.avg_slopes.resize(n_slopes, 0.0);
        m_result.avg_modes.resize(n_modes, 0.0);
        m_s2m_matrix.resize(m_modes, m_slopes);
    }

    ~Computation() {
        Join();  // this is important, otherwise we get problems during destruction
    }

    void SetSlopesToModesMatrix(MatrixBuffer<float>&& s2m_matrix) {
        std::scoped_lock lock{m_mutex};
        m_s2m_matrix = std::move(s2m_matrix);
    }

private:

    void OnThreadStart() override {
        // optional user hook
    }

    void OnCycleStart(size_t to_read) override {
        // optional user hook
    }

    void CopyData(size_t sample_idx, TopicType const& sample) noexcept override {
        m_sample_buffer[sample_idx] = sample.wfs.slopes;
    }

    void Compute() override {
        std::scoped_lock lock{m_mutex};
        m_result = UserDefinedAlgorithm(m_sample_buffer, m_s2m_matrix);
    }

    void Publish() override {
        m_publisher.OnResultAvailable(m_result);
    }

    ResultIf& m_publisher;
    size_t m_slopes;
    size_t m_modes;
    std::mutex m_mutex; // protects m_s2m_matrix from concurrent access
    MatrixBuffer<float> m_s2m_matrix;
    BufferType m_sample_buffer;
    Result m_result;
};

The static configuration data is set via the computation class constructor. This is only called during component initialisation.

The dynamic configuration data attribute s2m_matrix is set via a dedicated setter method. The actual data is retrieved from runtime_repo during Initialising* and Updating.

Note

We advise to create individual setters for dynamic configuration attributes; this enables selective parameter updates using the Update command.

The CopyData callback is used to copy a subset of incoming shared memory data of TopicType into the local sample_buffer of BufferType.

Note

When adjusting the sample buffer dynamically, make use of the OnCycleStart hook to resize the sample buffer to the updated number of samples to_read.

The bulk of the data task algorithm resides in the Compute method. This Data Task uses the worker thread of the ComputationBase class to perform the computation.

Note

When using dynamically updated an input data (such as m_s2m_matrix), consider introducing a mutex to protect your input data from concurrent access.

In the Publish method the result is written to the Runtime Configuration Repository.

Note

For the purpose of this example, it was decided to not implement the RTR access directly in the computation class; this was delegated to the BusinessLogic via interface ResultIf instead. Users are free to also write to RTR directly if they do not care about strong coupling with the RTR dependency in the Computation class.

Configuration

The example data task has a series of runtime configuration datapoints:

Listing 1 data_task_1.yaml
 static:
     queue_name:
         type: RtcString
         value: exampleDataTaskQueue
     samples_to_read:
         type: RtcUInt32
         value: 5
     samples_to_skip:
         type: RtcUInt32
         value: 10
     sample_timeout:
         type: RtcUInt32
         value: 250
     slopes:
         type: RtcUInt32
         value: 9232
     modes:
         type: RtcUInt32
         value: 50
 dynamic:
     slopes2modes:
         type: RtcMatrixFloat
         value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.slopes2modes.fits
     avg_slopes:
         type: RtcVectorFloat
         value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.avg_slopes.fits
     avg_modes:
         type: RtcVectorFloat
         value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.avg_modes.fits

Note

Most of these datapoints are considered inputs, therefore they are read-only from the Data Task’s perspective. Only datapoints dynamc/avg_slopes and dynamic/avg_modes contain actual computation results that are written by the Data Task.

Measure Telemetry Data Task

The Measure Telemetry Data Task allows the user to trigger an on-demand computation based on telemetry. This may be like measuring the Dark Map of a wavefront sensor.

When the Measure command is triggered, the Data Task starts processing incoming telemetry and performs a computation on this data. A feature of the Measure interface is that it can return results directly to the user in a user defined JSON structure, but it can also put any results into the Runtime Configuration Repository.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask/measureTel

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration

  • scripts - Helper scripts for deployment and control

The example also uses common libraries that are shared with other examples:

Dependencies

The provided example code depends on the following modules:

  • Component Framework - for RTC Component functionality with Services

  • Reusable Components - for Data Task support libraries

  • Client Application - to steer the application

Running the Measure Telemetry Data Task Example

The Measure Telemetry Data Task can be run in isolation or as part of a SRTC system. As with the Periodic Telemetry Data Task, the only requirement for the data task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational. By sending the Measure command with a specific JSON payload, the Data Task can provide multiple computations depending on the payload. Here we provide a very simple payload of '{"algorithm":"mean"}' to specify the mean should be calculated.

To make the example as simple as possible, a script rtctkExampleDataTaskMeasureTel.sh is provided to help bring the data task and supporting infrastructure online.

rtctkExampleDataTaskMeasureTel.sh

After installing the RTC Tk, run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTaskMeasureTel.sh deploy
rtctkExampleDataTaskMeasureTel.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskMeasureTel.sh send Init
rtctkExampleDataTaskMeasureTel.sh send Enable
rtctkExampleDataTaskMeasureTel.sh send Measure '{"algorithm":"mean"}'
rtctkExampleDataTaskMeasureTel.sh send Disable
rtctkExampleDataTaskMeasureTel.sh send Reset

# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskMeasureTel.sh stop
rtctkExampleDataTaskMeasureTel.sh undeploy

Measure Interface Development Guide

The process for configuring the Computation class (see Computation Class) is explained above in the previous sections. In this section we will focus on the measure interface provided to the BusinessLogic.

Business Logic Class

The addition of the BusinessLogic class allows for the customisation of the state and state transitions (for more information see RTC Component).

To add ActivityMeasuring to the BusinessLogic the following include is required. #include <rtctk/componentFramework/measurable.hpp>

It is then required to declare the life-cycle of the component to be Measurable with the following lines within the BusinessLogic:

using LifeCycle = Measurable<RtcComponent>; using ComponentType = LifeCycle;

This declaration provides the BusinessLogic an interface to the following: JsonPayload ActivityMeasuring(StopToken st, JsonPayload const& arg) override

This can then be customised to suit the needs of the Data Task. As this is called with a JSON command, it requires parsing by the user. The rest of this example is similar to that of the Periodic Telemetry Data Task example (see Periodic Telemetry Data Task for more details).

See Client Application for more information about the Measurable interface.

Optimise Data Task

The Optimise Data Task offers the Optimise interface that is used to trigger an on-demand, one-shot computation. It is a Data Task that takes data from the Runtime Configuration Repository, performs a complex computation and then returns the result to the Runtime Configuration Repository.

Note

The Optimise Data task has been combined with the python integration to reduce the number of examples providing similar functionality. As such, some of the information here is repeated in Python support. It is advised to read the python section as well as this even when implementing a non-python based Data Task, as useful information is in both places.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask/optimiser

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration.

  • pyLib - Python library containing calculation.

  • scripts - Helper scripts for deployment and control.

In addition the example uses the following component framework libraries:

  • rtctkRtcComponent - For RtcComponent functionality

  • rtctkServicesPython - Framework-provided Python bindings for C++

Running the Optimise Data Task Example

The optimiser example can be run in isolation or as part of an SRTC system. The Python computation can be invoked after bringing the component to state Operational. By sending the Optimise command with a specific JSON payload, the Python interpreter is started and the computation is carried out.

The script rtctkExampleDataTaskOptimiser.sh is provided to bring the example data task and the supporting infrastructure online in a simple way.

rtctkExampleDataTaskOptimiser.sh

After installing the RTC Tk, run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTaskOptimiser.sh deploy
rtctkExampleDataTaskOptimiser.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskOptimiser.sh send Init
rtctkExampleDataTaskOptimiser.sh send Enable
rtctkExampleDataTaskOptimiser.sh send Optimise '{"algorithm":"SimpleInversion"}'
rtctkExampleDataTaskOptimiser.sh send Disable
rtctkExampleDataTaskOptimiser.sh send Reset

# Gracefully terminate the applications and clean-up
rtctkExampleDataTaskOptimiser.sh stop
rtctkExampleDataTaskOptimiser.sh undeploy

Optimise Interface Development Guide

The majority of the required information on how to configure a component from the Runtime Configuration Repository, etc. has previously been addressed. In this section we will focus on just the Optimise command of the BusinessLogic.

Business Logic Class

The addition of the BusinessLogic class allows for the customisation of the state and state transitions (for more information see RTC Component).

To add ActivityOptimising to the BusinessLogic the following include is required. #include <rtctk/componentFramework/optimisable.hpp>

It is then required to declare the LifeCycle of the component to be Optimisable with the following lines within the BusinessLogic:

using LifeCycle = Optimisable<RtcComponent>; using ComponentType = LifeCycle;

This declaration provides the BusinessLogic an interface to the following: JsonPayload ActivityOptimising(StopToken st, JsonPayload const& arg) override

In this example, the JSON is parsed and the calculation is computed with the result returned to the runtime_repo.

void ActivityOptimising(StopToken st, JsonPayload const& arg) override {
    auto result = m_computation.Compute(ParseAlgorithm(arg));
    m_rtr.WriteDataPoint<MatrixBuffer<float>>(m_dp_cm, result.cm);
    m_oldb.SetDataPoint<double>(m_dp_stats_time, result.stats.elapsed.count());
}

This can then be customised to suit the needs of the Data Task. As this is called with a JSON command, it requires parsing by the user. The rest of this example is similar to Data Tasks previously detailed without ReaderThread as no telemetry is being processed.

See Client Application for more information about the Optimisable interface.

GPU and Python integration

The GPU and Python integration is too big of a topic to cover here and separate pages are provided to focus on the specific details.

GPU Support

See GPU support for information about GPU support.

Python Support

See Python support for more information about Python interfaces.