RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
ddtPublisherImage1d.hpp
Go to the documentation of this file.
1
13#ifndef DDT_PUBLISHER_IMAGE1D_HPP
14#define DDT_PUBLISHER_IMAGE1D_HPP
15
20
21#include <ddt/ddtDataPublisher.hpp>
22#include <ddt/ddtEncDecBinaryxD.hpp>
23#include <ddt/ddtLogger.hpp>
24
25#include <boost/endian.hpp>
26#include <fmt/format.h>
27
28namespace rtctk::ddtServer {
29
37template <typename SI>
39public:
40 using StreamInfo = SI;
41
43
44 static_assert(SI::ID.find_first_not_of("abcdefghijklmnopqrstuvwxyz_0123456789") ==
45 std::string_view::npos,
46 "DDT Publisher ID contains illegal characters!");
47
48 DdtPublisherImage1d(const std::string& db_prefix, ServiceContainer& services)
49 : m_logger(componentFramework::GetLogger("rtctk"))
50 , m_rtr(services.Get<RuntimeRepoIf>())
51 , m_rtr_prefix_static(GetStaticRtrPrefix(db_prefix))
52 , m_rtr_prefix_dynamic(GetDynamicRtrPrefix(db_prefix))
53 , m_enabled(true)
54 , m_broker_endpoint("zpb.rr://127.0.0.1:5001") {
55 LOG4CPLUS_INFO(m_logger, fmt::format("Creating DdtPublisherImage1d '{}'", SI::ID));
56
57 // publisher configuration
58
59 auto broker_endpoint_path = DataPointPath(m_rtr_prefix_static + "/broker_endpoint");
61 m_broker_endpoint = m_rtr.GetDataPoint<std::string>(broker_endpoint_path);
62 }
63
64 auto enabled_path = DataPointPath(m_rtr_prefix_dynamic + "/enabled");
65 if (m_rtr.DataPointExists(enabled_path)) {
66 m_enabled = m_rtr.GetDataPoint<bool>(enabled_path);
67 }
68
69 // metadata preparation
70
71 m_meta_data.meta_data_base.bytes_per_pixel = sizeof(typename SI::PixelType);
72 m_meta_data.meta_data_base.number_dimensions = 1;
73 m_meta_data.meta_data_base.complete_flag = true;
74 m_meta_data.meta_data_base.last_segment = true;
75 /*
76 * Important note: Endiannes of the source is assumed to be the same as the one of the
77 * host architecture. This has been set by design, that the low-level devices has to
78 * translate (if necessary) to the RTCTK's endiannes. RTCTK assumes the data comes in the
79 * same endiannes as the machine running this process.
80 */
81 m_meta_data.meta_data_base.byte_order_little_endian =
82 static_cast<bool>(boost::endian::order::native);
83
84 m_meta_data.meta_data_base.description = SI::ID;
85
86 if (typeid(typename SI::PixelType) == typeid(uint8_t)) {
87 m_meta_data.meta_data_base.data_type = ddt::UINT8;
88 } else if (typeid(typename SI::PixelType) == typeid(int16_t)) {
89 m_meta_data.meta_data_base.data_type = ddt::SINT16;
90 } else if (typeid(typename SI::PixelType) == typeid(int32_t)) {
91 m_meta_data.meta_data_base.data_type = ddt::SINT32;
92 } else if (typeid(typename SI::PixelType) == typeid(float)) {
93 m_meta_data.meta_data_base.data_type = ddt::FLOAT32;
94 } else if (typeid(typename SI::PixelType) == typeid(double)) {
95 m_meta_data.meta_data_base.data_type = ddt::FLOAT64;
96 } else if (typeid(typename SI::PixelType) == typeid(uint16_t)) {
97 m_meta_data.meta_data_base.data_type = ddt::UINT16;
98 } else if (typeid(typename SI::PixelType) == typeid(uint32_t)) {
99 m_meta_data.meta_data_base.data_type = ddt::UINT32;
100 } else {
101 CII_THROW(UnsupportedTypeException, typeid(typename SI::PixelType));
102 }
103
104 m_meta_data.array_dimensions = std::to_string(SI::WIDTH);
105
106 // publisher instantiation and registration
107
108 m_ddt_publisher = std::make_unique<ddt::DdtDataPublisher>(m_logger);
109 m_ddt_publisher->set_topic_id(m_enc_dec.get_topic_id());
110 int bytes_per_sample = SI::WIDTH * sizeof(typename SI::PixelType);
111 int samples_in_buffer = 10;
112 m_ddt_publisher->SetBufferSize(bytes_per_sample, samples_in_buffer);
113
114 auto ret =
115 m_ddt_publisher->RegisterPublisher(m_broker_endpoint, std::string(SI::ID), false);
116 if (ret < 0) {
118 fmt::format("Failed to register DDT Publisher: {} against DDT Broker: {} "
119 "Check Broker URI, and ensure that Broker process is running.",
120 SI::ID,
121 m_broker_endpoint));
122 }
123 }
124
125 void Update() {
126 LOG4CPLUS_INFO(m_logger, fmt::format("Updating DdtPublisherImage1d::{}", SI::ID));
127
128 auto enabled_path = DataPointPath(m_rtr_prefix_dynamic + "/enabled");
129 if (m_rtr.DataPointExists(enabled_path)) {
130 m_enabled = m_rtr.GetDataPoint<bool>(enabled_path);
131 }
132 }
133
134 bool IsEnabled() {
135 return m_enabled;
136 }
137
139 m_meta_data.meta_data_base.utc_timestamp = m_enc_dec.get_utc_timestamp();
140 m_enc_dec.Encode(m_meta_data);
141 std::vector<uint8_t> metadata = m_enc_dec.get_meta_data();
142
143 auto data_bytes = elements * sizeof(typename SI::PixelType);
144 m_ddt_publisher->WriteData(
145 sample_id, data_ptr, data_bytes, metadata.data(), metadata.size());
146 m_ddt_publisher->PublishData();
147 }
148
149private:
150 std::string GetStaticRtrPrefix(const std::string& db_prefix) {
151 std::string temp = db_prefix;
152 temp.insert(temp.find("/"), "/static");
153 return "/" + temp + "/" + std::string(SI::ID);
154 }
155
156 std::string GetDynamicRtrPrefix(const std::string& db_prefix) {
157 std::string temp = db_prefix;
158 temp.insert(temp.find("/"), "/dynamic");
159 return "/" + temp + "/" + std::string(SI::ID);
160 }
161
162 log4cplus::Logger& m_logger;
163
164 RuntimeRepoIf& m_rtr;
165
166 std::string m_rtr_prefix_static;
167 std::string m_rtr_prefix_dynamic;
168
172 bool m_enabled;
173
177 std::string m_broker_endpoint;
178
182 MetaDataElementsBinaryxD m_meta_data;
183
187 DdtEncDecBinaryxD m_enc_dec;
188
192 std::unique_ptr<ddt::DdtDataPublisher> m_ddt_publisher;
193};
194
195} // namespace rtctk::ddtServer
196
197#endif // DDT_PUBLISHER_IMAGE1D_HPP
This class provides a wrapper for a data point path.
Definition dataPointPath.hpp:74
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition repositoryIf.ipp:1711
bool DataPointExists(const DataPointPath &path) const
Checks for the existence of a datapoint in the repository.
Definition repositoryIf.cpp:461
The RtctkException class is the base class for all Rtctk exceptions.
Definition exceptions.hpp:211
Base interface for all Runtime Configuration Repository adapters.
Definition runtimeRepoIf.hpp:27
Container class that holds services of any type.
Definition serviceContainer.hpp:39
The UnsupportedTypeException is thrown whenever an attempt is made to use an unsupported type in the ...
Definition exceptions.hpp:239
DDT Publisher class used to publish one-dimensional DDT streams.
Definition ddtPublisherImage1d.hpp:38
void Publish(uint32_t sample_id, const uint8_t *data_ptr, uint32_t elements)
Definition ddtPublisherImage1d.hpp:138
componentFramework::DataPointPath DataPointPath
Definition ddtPublisherImage1d.hpp:42
void Update()
Definition ddtPublisherImage1d.hpp:125
bool IsEnabled()
Definition ddtPublisherImage1d.hpp:134
SI StreamInfo
Definition ddtPublisherImage1d.hpp:40
DdtPublisherImage1d(const std::string &db_prefix, ServiceContainer &services)
Definition ddtPublisherImage1d.hpp:48
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Definition businessLogic.cpp:24
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.
A container that can hold any type of service.