Creating a Custom Data Task¶
This tutorial shows how a Data Task component can be created, configured and built. The goal of this tutorial is to give developers a starting point for creating their own Data Task.
The RTC Tk does not provide any final Data Task applications that can be used in a real world RTC out of the box. Instead, the toolkit provides a reference design and Data Task support libraries that can be used by instrument RTC developers to create custom data task applications.
The idea is to have users implement a custom Business Logic
class and a Computation
class.
The former defines the behaviour for the different stages of the component’s life-cycle (i.e. it
implements the activity methods) and the latter provides and confines the data task’s specific
computation algorithm.
Note
For simplicity reasons, the example uses the file based implementation of OLDB, Persistent and Runtime Configuration Repositories; as well as file based service discovery. This means that the underlying format of configuration and data points is different than the one used when the above mentioned services are used with the standard back-ends such as CII configuration service, CII OLDB, etc.
Provided Examples¶
This customisation of the Data Task will be discussed in reference to the provided examples. The main aim of this example is not to give a fully fledged Data Task that can be used as-is, but to show how the pieces fit together. The following example Data Tasks are provided:
Periodic Telemetry Data Task;
Measure Telemetry Data Task;
Optimise Data Task (with Python support);
GPU and Python integration.
This tutorial will focus on the Periodic Telemetry Data Task. Other Data Task flavours will also be mentioned but with less detail and with focus on important differences only.
Periodic Telemetry Data Task. In this section, it is shown how the BusinessLogic can deploy and configure a Computation class to process incoming telemetry data. The example aims at demonstrating how to configure a Data Task that collects telemetry and performs a computation after N frames of data are collected. Once these N frames of data are collected, a simple computation is triggered in the Computation class, aiming at emulating the function of a loop monitor. (See section Periodic Telemetry Data Task)
Measure Telemetry Data Task. In this section, it is shown how the Measure interface can be used with the Computation class to perform on-demand calculations from telemetry sources and return a result. This example is similar to measuring a dark map of a wavefront camera. (See section Measure Telemetry Data Task)
Optimise Data Task. In this section, it is shown how the Optimise command can be used to perform on-demand calculation from data that is in the Runtime Configuration Repository. The example also shows how the JSON can be parsed and allows multiple algorithms to be handled. (See section Optimise Data Task)
GPU and Python integration. See section GPU and Python integration for more information.
Periodic Telemetry Data Task¶
The example shows how to configure the BusinessLogic
and interface with the Computation
class to process the telemetry data and perform a very simple calculation.
The Computation class implements a callback to copy the wavefront sensor slope vector to a local sample buffer. This buffer is then averaged to calculate an average slope vector which is then projected to an average modes vector, using a matrix vector multiplication against a slopes2modes matrix.
Source Code Location¶
The example source code can be found in the following sub-directory of the rtctk project:
_examples/exampleDataTask/telemetry
Modules¶
The provided example is composed into the following waf modules:
app - The data task application including default configuration
scripts - Helper scripts for deployment and control
The example also uses common libraries that are shared with other examples:
shmPub - Implements an example shmPublisher. (See Shared Memory Publisher)
Dependencies¶
The provided example code depends on the following modules:
Component Framework - for RTC Component functionality with Services
Reusable Components - for Data Task support libraries
Client Application - to steer the application
Running the Periodic Telemetry Data Task Example¶
The exampleDataTask can be run in isolation or as part of a SRTC system. The only requirement for the Data Task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational.
To make the example as simple as possible, a script rtctkExampleDataTaskTelemetry.sh
is provided
to help bring the data task and supporting infrastructure online.
rtctkExampleDataTaskTelemetry.sh¶
After installing the RTC Tk, run the example using the following sequence of commands:
# Deploy and start the example applications
rtctkExampleDataTaskTelemetry.sh deploy
rtctkExampleDataTaskTelemetry.sh start
# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskTelemetry.sh send Init
rtctkExampleDataTaskTelemetry.sh send Enable
rtctkExampleDataTaskTelemetry.sh send Run
rtctkExampleDataTaskTelemetry.sh send Idle
rtctkExampleDataTaskTelemetry.sh send Disable
rtctkExampleDataTaskTelemetry.sh send Reset
# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskTelemetry.sh stop
rtctkExampleDataTaskTelemetry.sh undeploy
During the deploy action a python script is invoked to produce the required FITS input files (see section Generating Input Data).
Generating Input Data¶
As we do not want to supply large FITS files within our git repository, the decision was made to provide a python script that can be used to generate the required data. The python script to generate the data required by the exampleDataTask is found in the following location:
_examples/exampleDataTask/common/genFitsData
It provides a script called with three arguments as follows:
$ rtctkExampleDataTaskGenFitsData <num_slopes> <num_modes> <filepath>
This will create an identity slopes2modes matrix that is required by the data task. This is by default called by the script “rtctkExampleDataTaskTelemetry.sh” during the deploy action.
Data Task Development Guide¶
This section explains how to create a simple telemetry based Data Task from scratch. See Data Task for more information about specifics; in this section the development of a custom data task is covered. The use of the exampleDataTask will be the focus.
Business Logic Class¶
#include "computation.hpp"
#include <rtctk/componentFramework/rtcComponent.hpp>
#include <rtctk/componentFramework/runnable.hpp>
#include <rtctk/componentFramework/runtimeRepoIf.hpp>
#include <rtctk/componentFramework/typedEventService.hpp>
using namespace rtctk::componentFramework;
using LifeCycle = Runnable<RtcComponent>;
class BusinessLogic : public LifeCycle::BizLogicIf, public Computation::ResultIf {
public:
using ComponentType = LifeCycle;
BusinessLogic(const std::string& name, ServiceContainer& services)
: m_name(name)
, m_services(services)
, m_rtr(services.Get<RuntimeRepoIf>())
, m_esv(services.Get<EventServiceIf>())
, m_ev_pub_conf_updated(m_esv.MakePublisher<ConfigurationUpdatedEvent>())
, m_dp_queue_name("/" + name + "/static/queue_name")
, m_dp_thread_policies("/" + name + "/static/thread_policies")
, m_dp_samples_to_read("/" + name + "/static/samples_to_read")
, m_dp_samples_to_skip("/" + name + "/static/samples_to_skip")
, m_dp_sample_timeout("/" + name + "/static/sample_timeout")
, m_dp_slopes("/" + name + "/static/slopes")
, m_dp_modes("/" + name + "/static/modes")
, m_dp_s2m("/" + name + "/" + "dynamic/slopes2modes")
, m_dp_avg_slopes("/" + name + "/dynamic/avg_slopes")
, m_dp_avg_modes("/" + name + "/dynamic/avg_modes") {
}
// activities
void ActivityStarting(StopToken st) override {
m_computation.reset();
}
void ActivityInitialising(StopToken st) override {
m_computation.reset();
auto shm_queue_name = m_rtr.GetDataPoint<std::string>(m_dp_queue_name);
size_t samples_to_read = m_rtr.GetDataPoint<uint32_t>(m_dp_samples_to_read);
size_t samples_to_skip = m_rtr.GetDataPoint<uint32_t>(m_dp_samples_to_skip);
auto sample_timeout = milliseconds(m_rtr.GetDataPoint<uint32_t>(m_dp_sample_timeout));
auto thread_policies = GetNumaPolicies(m_rtr, m_dp_thread_policies);
size_t num_slopes = m_rtr.GetDataPoint<uint32_t>(m_dp_slopes);
size_t num_modes = m_rtr.GetDataPoint<uint32_t>(m_dp_modes);
m_computation = std::make_unique<Computation>(m_services,
*this,
shm_queue_name,
samples_to_read,
samples_to_skip,
sample_timeout,
thread_policies,
num_slopes,
num_modes);
m_computation->SetSlopesToModesMatrix(m_rtr.GetDataPoint<MatrixBuffer<float>>(m_dp_s2m));
}
void ActivityEnabling(StopToken st) override {
m_computation->Spawn();
}
void ActivityDisabling(StopToken st) override {
m_computation->Join();
}
void ActivityGoingRunning(StopToken st) override {
m_computation->Run();
}
void ActivityGoingIdle(StopToken st) override {
m_computation->Idle();
}
void ActivityRunning(StopToken st) override {
while (not st.StopRequested()) {
m_computation->CheckErrors();
std::this_thread::sleep_for(100ms);
}
}
void ActivityRecovering(StopToken st) override {
m_computation->Join();
m_computation->Spawn();
}
void ActivityUpdating(StopToken st, JsonPayload const& args) override {
m_computation->SetSlopesToModesMatrix(m_rtr.GetDataPoint<MatrixBuffer<float>>(m_dp_s2m));
}
private:
void OnResultAvailable(Computation::Result const& result) override {
// write output back to runtime repo
m_rtr.WriteDataPoint<std::vector<float>>(m_dp_avg_slopes, result.avg_slopes);
m_rtr.WriteDataPoint<std::vector<float>>(m_dp_avg_modes, result.avg_modes);
// we can notify multiple updated datapoints using a single event
m_ev_pub_conf_updated->Publish(
{m_name, {m_dp_avg_slopes.ToString(), m_dp_avg_modes.ToString()}});
}
std::string m_name;
ServiceContainer& m_services;
RuntimeRepoIf& m_rtr;
TypedEventService m_esv;
// event publishers
std::unique_ptr<TypedEventPublisher<ConfigurationUpdatedEvent>> m_ev_pub_conf_updated;
// rtr datapoints (inputs)
DataPointPath m_dp_queue_name;
DataPointPath m_dp_thread_policies;
DataPointPath m_dp_samples_to_read;
DataPointPath m_dp_samples_to_skip;
DataPointPath m_dp_sample_timeout;
DataPointPath m_dp_slopes;
DataPointPath m_dp_modes;
DataPointPath m_dp_s2m;
// rtr datapoints (outputs)
DataPointPath m_dp_avg_slopes;
DataPointPath m_dp_avg_modes;
std::unique_ptr<Computation> m_computation;
};
In ActivityInitialising
, the Computation object is created by passing the required services
and the static configuration parameters to its constructor.
In ActivityEnabling
, the computation object spawns its worker thread, here the object connects
to the shared memory queue and goes to Idle
state where it continuously resets its shared
memory read queue to be ready to process fresh data.
Note
The user-hook OnThreadStart
will be invoked at this stage.
In ActivityDisabling
, the computation object joins its thread and disconnects from the shared
memory queue.
In ActivityGoingRunning
, the computation object is commanded to enter state Running
where
it will begin to process data from the shared memory queue.
In ActivityGoingIdle
, the computation object is commanded to enter state Idle
, where the
object will stay attached to the shared memory queue but will continuously reset its shared
memory read queue to be ready to process fresh data.
In ActivityRunning
, the Data Task performs computations. While the computation object worker
thread performs its Read
, Compute
, Publish
and Skip
cycles, the BusinessLogic
only observes the computation object by continuously checking for errors.
Note
In this state computation class user-hooks OnCycleStart
, CopyData
, Compute
,
Publish
are continuously being invoked.
In case an error occurs, CheckErrors
will throw an exception and the component will enter the state
On::Operational::Error
.
In ActivityRecovering
, the Data Task performs error recovery actions. The safest way to recover
from asynchronous errors is to re-spawn the worker thread. This will cause the shared memory reader
to reconnect to the shared memory queue.
In ActivityUpdating
, the dynamic configuration of the data task is being updated. In addition to
user-specific dynamic computation parameters, the samples_to_read
and samples_to_skip
parameters may be changed dynamically using the provided setters if necessary.
Computation Class¶
The Computation
class is the place where the largest effort will be put during implementation.
In the exampleDataTask a very simple calculation is implemented to show how this can be done. The
computation is a projection from slopes to modes, this is done via a Matrix Vector Multiplication.
#include <rtctk/componentFramework/serviceContainer.hpp>
#include <rtctk/componentFramework/matrixBuffer.hpp>
#include <rtctk/exampleTopics/topics.hpp>
#include <rtctk/dataTask/computationBase.hpp>
#include <chrono>
#include <mutex>
#include <optional>
#include <vector>
// the type of the shm super topic
using TopicType = rtctk::exampleTopic::ScaoLoopTopic;
// type of the user-owned sample buffer
using BufferType = std::vector<decltype(decltype(TopicType::wfs)::slopes)>;
class Computation : public dataTask::ComputationBase<TopicType> {
public:
using ServiceContainer = rtctk::componentFramework::ServiceContainer;
template <typename T>
using MatrixBuffer = rtctk::componentFramework::MatrixBuffer<T>;
struct Result {
std::vector<float> avg_slopes;
std::vector<float> avg_modes;
};
struct ResultIf {
virtual ~ResultIf() = default;
virtual void OnResultAvailable(Result const& result) = 0;
};
Computation(ServiceContainer& services,
ResultIf& publisher,
std::string const& shm_name,
size_t to_read,
size_t to_skip,
std::chrono::milliseconds sample_timeout,
std::optional<numapp::NumaPolicies> thread_policies,
size_t n_slopes,
size_t n_modes);
: ComputationBase(services, shm_name, to_read, to_skip, sample_timeout, thread_policies)
, m_publisher(publisher)
, m_slopes(n_slopes)
, m_modes(n_modes) {
// set vectors and matrix to correct size.
m_sample_buffer.resize(to_read);
m_result.avg_slopes.resize(n_slopes, 0.0);
m_result.avg_modes.resize(n_modes, 0.0);
m_s2m_matrix.resize(m_modes, m_slopes);
}
~Computation() {
Join(); // this is important, otherwise we get problems during destruction
}
void SetSlopesToModesMatrix(MatrixBuffer<float>&& s2m_matrix) {
std::scoped_lock lock{m_mutex};
m_s2m_matrix = std::move(s2m_matrix);
}
private:
void OnThreadStart() override {
// optional user hook
}
void OnCycleStart(size_t to_read) override {
// optional user hook
}
void CopyData(size_t sample_idx, TopicType const& sample) noexcept override {
m_sample_buffer[sample_idx] = sample.wfs.slopes;
}
void Compute() override {
std::scoped_lock lock{m_mutex};
m_result = UserDefinedAlgorithm(m_sample_buffer, m_s2m_matrix);
}
void Publish() override {
m_publisher.OnResultAvailable(m_result);
}
ResultIf& m_publisher;
size_t m_slopes;
size_t m_modes;
std::mutex m_mutex; // protects m_s2m_matrix from concurrent access
MatrixBuffer<float> m_s2m_matrix;
BufferType m_sample_buffer;
Result m_result;
};
The static configuration data is set via the computation class constructor. This is only called during component initialisation.
The dynamic configuration data attribute s2m_matrix
is set via a dedicated setter
method. The actual data is retrieved from runtime_repo during Initialising* and Updating.
Note
We advise to create individual setters for dynamic configuration attributes; this enables
selective parameter updates using the Update
command.
The CopyData
callback is used to copy a subset of incoming shared memory data of TopicType
into the local sample_buffer of BufferType
.
Note
When adjusting the sample buffer dynamically, make use of the OnCycleStart
hook to
resize
the sample buffer to the updated number of samples to_read
.
The bulk of the data task algorithm resides in the Compute
method. This Data Task uses the
worker thread of the ComputationBase
class to perform the computation.
Note
When using dynamically updated an input data (such as m_s2m_matrix
), consider introducing
a mutex
to protect your input data from concurrent access.
In the Publish
method the result is written to the Runtime Configuration Repository.
Note
For the purpose of this example, it was decided to not implement the RTR access directly in the
computation class; this was delegated to the BusinessLogic via interface ResultIf
instead.
Users are free to also write to RTR directly if they do not care about strong coupling with the
RTR dependency in the Computation class.
Configuration¶
The example data task has a series of runtime configuration datapoints:
static:
queue_name:
type: RtcString
value: exampleDataTaskQueue
samples_to_read:
type: RtcUInt32
value: 5
samples_to_skip:
type: RtcUInt32
value: 10
sample_timeout:
type: RtcUInt32
value: 250
slopes:
type: RtcUInt32
value: 9232
modes:
type: RtcUInt32
value: 50
dynamic:
slopes2modes:
type: RtcMatrixFloat
value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.slopes2modes.fits
avg_slopes:
type: RtcVectorFloat
value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.avg_slopes.fits
avg_modes:
type: RtcVectorFloat
value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.avg_modes.fits
Note
Most of these datapoints are considered inputs, therefore they are read-only from the
Data Task’s perspective. Only datapoints dynamc/avg_slopes
and dynamic/avg_modes
contain actual computation results that are written by the Data Task.
Measure Telemetry Data Task¶
The Measure Telemetry Data Task allows the user to trigger an on-demand computation based on telemetry. This may be like measuring the Dark Map of a wavefront sensor.
When the Measure command is triggered, the Data Task starts processing incoming telemetry and performs a computation on this data. A feature of the Measure interface is that it can return results directly to the user in a user defined JSON structure, but it can also put any results into the Runtime Configuration Repository.
Source Code Location¶
The example source code can be found in the following sub-directory of the rtctk project:
_examples/exampleDataTask/measureTel
Modules¶
The provided example is composed into the following waf modules:
app - The data task application including default configuration
scripts - Helper scripts for deployment and control
The example also uses common libraries that are shared with other examples:
shmPub - Implements an example shmPublisher (See Shared Memory Publisher)
Dependencies¶
The provided example code depends on the following modules:
Component Framework - for RTC Component functionality with Services
Reusable Components - for Data Task support libraries
Client Application - to steer the application
Running the Measure Telemetry Data Task Example¶
The Measure Telemetry Data Task can be run in isolation or as part of a SRTC system. As with the
Periodic Telemetry Data Task, the only requirement for the data task to be brought to
Operational is that a shared memory queue writer has been created. This can be either part of
the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to
bring the Data Task to Operational. By sending the Measure command with a specific JSON payload,
the Data Task can provide multiple computations depending on the payload. Here we provide a very
simple payload of '{"algorithm":"mean"}'
to specify the mean should be calculated.
To make the example as simple as possible, a script rtctkExampleDataTaskMeasureTel.sh
is provided
to help bring the data task and supporting infrastructure online.
rtctkExampleDataTaskMeasureTel.sh¶
After installing the RTC Tk, run the example using the following sequence of commands:
# Deploy and start the example applications
rtctkExampleDataTaskMeasureTel.sh deploy
rtctkExampleDataTaskMeasureTel.sh start
# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskMeasureTel.sh send Init
rtctkExampleDataTaskMeasureTel.sh send Enable
rtctkExampleDataTaskMeasureTel.sh send Measure '{"algorithm":"mean"}'
rtctkExampleDataTaskMeasureTel.sh send Disable
rtctkExampleDataTaskMeasureTel.sh send Reset
# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskMeasureTel.sh stop
rtctkExampleDataTaskMeasureTel.sh undeploy
Measure Interface Development Guide¶
The process for configuring the Computation
class (see Computation Class) is explained
above in the previous sections. In this section we will focus on the measure interface provided
to the BusinessLogic.
Business Logic Class¶
The addition of the BusinessLogic class allows for the customisation of the state and state transitions (for more information see RTC Component).
To add ActivityMeasuring to the BusinessLogic the following include is required.
#include <rtctk/componentFramework/measurable.hpp>
It is then required to declare the life-cycle of the component to be Measurable with the following lines within the BusinessLogic:
using LifeCycle = Measurable<RtcComponent>;
using ComponentType = LifeCycle;
This declaration provides the BusinessLogic an interface to the following:
JsonPayload ActivityMeasuring(StopToken st, JsonPayload const& arg) override
This can then be customised to suit the needs of the Data Task. As this is called with a JSON command, it requires parsing by the user. The rest of this example is similar to that of the Periodic Telemetry Data Task example (see Periodic Telemetry Data Task for more details).
See Client Application for more information about the Measurable interface.
Optimise Data Task¶
The Optimise Data Task offers the Optimise interface that is used to trigger an on-demand, one-shot computation. It is a Data Task that takes data from the Runtime Configuration Repository, performs a complex computation and then returns the result to the Runtime Configuration Repository.
Note
The Optimise Data task has been combined with the python integration to reduce the number of examples providing similar functionality. As such, some of the information here is repeated in Python support. It is advised to read the python section as well as this even when implementing a non-python based Data Task, as useful information is in both places.
Source Code Location¶
The example source code can be found in the following sub-directory of the rtctk project:
_examples/exampleDataTask/optimiser
Modules¶
The provided example is composed into the following waf modules:
app - The data task application including default configuration.
pyLib - Python library containing calculation.
scripts - Helper scripts for deployment and control.
In addition the example uses the following component framework libraries:
rtctkRtcComponent - For RtcComponent functionality
rtctkServicesPython - Framework-provided Python bindings for C++
Running the Optimise Data Task Example¶
The optimiser example can be run in isolation or as part of an SRTC system. The Python computation can be invoked after bringing the component to state Operational. By sending the Optimise command with a specific JSON payload, the Python interpreter is started and the computation is carried out.
The script rtctkExampleDataTaskOptimiser.sh
is provided to bring the example data task and the
supporting infrastructure online in a simple way.
rtctkExampleDataTaskOptimiser.sh¶
After installing the RTC Tk, run the example using the following sequence of commands:
# Deploy and start the example applications
rtctkExampleDataTaskOptimiser.sh deploy
rtctkExampleDataTaskOptimiser.sh start
# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskOptimiser.sh send Init
rtctkExampleDataTaskOptimiser.sh send Enable
rtctkExampleDataTaskOptimiser.sh send Optimise '{"algorithm":"SimpleInversion"}'
rtctkExampleDataTaskOptimiser.sh send Disable
rtctkExampleDataTaskOptimiser.sh send Reset
# Gracefully terminate the applications and clean-up
rtctkExampleDataTaskOptimiser.sh stop
rtctkExampleDataTaskOptimiser.sh undeploy
Optimise Interface Development Guide¶
The majority of the required information on how to configure a component from the Runtime Configuration Repository, etc. has previously been addressed. In this section we will focus on just the Optimise command of the BusinessLogic.
Business Logic Class¶
The addition of the BusinessLogic class allows for the customisation of the state and state transitions (for more information see RTC Component).
To add ActivityOptimising to the BusinessLogic the following include is required.
#include <rtctk/componentFramework/optimisable.hpp>
It is then required to declare the LifeCycle of the component to be Optimisable with the following lines within the BusinessLogic:
using LifeCycle = Optimisable<RtcComponent>;
using ComponentType = LifeCycle;
This declaration provides the BusinessLogic an interface to the following:
JsonPayload ActivityOptimising(StopToken st, JsonPayload const& arg) override
In this example, the JSON is parsed and the calculation is computed with the result returned to the runtime_repo.
void ActivityOptimising(StopToken st, JsonPayload const& arg) override {
auto result = m_computation.Compute(ParseAlgorithm(arg));
m_rtr.WriteDataPoint<MatrixBuffer<float>>(m_dp_cm, result.cm);
m_oldb.SetDataPoint<double>(m_dp_stats_time, result.stats.elapsed.count());
}
This can then be customised to suit the needs of the Data Task. As this is called with a JSON command, it requires parsing by the user. The rest of this example is similar to Data Tasks previously detailed without ReaderThread as no telemetry is being processed.
See Client Application for more information about the Optimisable interface.
GPU and Python integration¶
The GPU and Python integration is too big of a topic to cover here and separate pages are provided to focus on the specific details.
GPU Support¶
See GPU support for information about GPU support.
Python Support¶
See Python support for more information about Python interfaces.