RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
ddtPublisherImage2d.hpp
Go to the documentation of this file.
1
13#ifndef DDT_PUBLISHER_IMAGE2D_HPP
14#define DDT_PUBLISHER_IMAGE2D_HPP
15
20
21#include <ddt/ddtDataPublisher.hpp>
22#include <ddt/ddtEncDecImage2D.hpp>
23#include <ddt/ddtLogger.hpp>
24
25#include <boost/endian.hpp>
26#include <fmt/format.h>
27
28namespace rtctk::ddtServer {
29
37template <typename SI>
39public:
40 using StreamInfo = SI;
41
43
44 static_assert(SI::ID.find_first_not_of("abcdefghijklmnopqrstuvwxyz_0123456789") ==
45 std::string_view::npos,
46 "DDT Publisher ID contains illegal characters!");
47
48 DdtPublisherImage2d(const std::string& db_prefix, ServiceContainer& services)
49 : m_logger(componentFramework::GetLogger("rtctk"))
50 , m_rtr(services.Get<RuntimeRepoIf>())
51 , m_rtr_prefix_static(GetStaticRtrPrefix(db_prefix))
52 , m_rtr_prefix_dynamic(GetDynamicRtrPrefix(db_prefix))
53 , m_enabled(true)
54 , m_broker_endpoint("zpb.rr://127.0.0.1:5001") {
55 LOG4CPLUS_INFO(m_logger, fmt::format("Creating DdtPulisherImage2d '{}'", SI::ID));
56
57 // publisher configuration
58
59 auto broker_endpoint_path = DataPointPath(m_rtr_prefix_static + "/broker_endpoint");
61 m_broker_endpoint = m_rtr.GetDataPoint<std::string>(broker_endpoint_path);
62 }
63
64 auto enabled_path = DataPointPath(m_rtr_prefix_dynamic + "/enabled");
65 if (m_rtr.DataPointExists(enabled_path)) {
66 m_enabled = m_rtr.GetDataPoint<bool>(enabled_path);
67 }
68
69 // metadata preparation
70
71 m_meta_data.meta_data_base.bytes_per_pixel = sizeof(typename SI::PixelType);
72 m_meta_data.meta_data_base.number_dimensions = 1;
73 m_meta_data.meta_data_base.complete_flag = true;
74 m_meta_data.meta_data_base.last_segment = true;
75 /*
76 * Important note: Endiannes of the source is assumed to be the same as the one of the
77 * host architecture. This has been set by design, that the low-level devices has to
78 * translate (if necessary) to the RTCTK's endiannes. RTCTK assumes the data comes in the
79 * same endiannes as the machine running this process.
80 */
81 m_meta_data.meta_data_base.byte_order_little_endian =
82 static_cast<bool>(boost::endian::order::native);
83
84 m_meta_data.meta_data_base.description = SI::ID;
85
86 if (typeid(typename SI::PixelType) == typeid(uint8_t)) {
87 m_meta_data.meta_data_base.data_type = ddt::UINT8;
88 } else if (typeid(typename SI::PixelType) == typeid(int16_t)) {
89 m_meta_data.meta_data_base.data_type = ddt::SINT16;
90 } else if (typeid(typename SI::PixelType) == typeid(int32_t)) {
91 m_meta_data.meta_data_base.data_type = ddt::SINT32;
92 } else if (typeid(typename SI::PixelType) == typeid(float)) {
93 m_meta_data.meta_data_base.data_type = ddt::FLOAT32;
94 } else if (typeid(typename SI::PixelType) == typeid(double)) {
95 m_meta_data.meta_data_base.data_type = ddt::FLOAT64;
96 } else if (typeid(typename SI::PixelType) == typeid(uint16_t)) {
97 m_meta_data.meta_data_base.data_type = ddt::UINT16;
98 } else if (typeid(typename SI::PixelType) == typeid(uint32_t)) {
99 m_meta_data.meta_data_base.data_type = ddt::UINT32;
100 } else {
101 CII_THROW(UnsupportedTypeException, typeid(typename SI::PixelType));
102 }
103
104 m_meta_data.binning_factor_x = 1;
105 m_meta_data.binning_factor_y = 1;
106 m_meta_data.number_pixels_x = SI::WIDTH;
107 m_meta_data.number_pixels_y = SI::HEIGHT;
108 m_meta_data.first_pixel_x = 0;
109 m_meta_data.first_pixel_y = 0;
110 m_meta_data.number_chunks_x = 1;
111 m_meta_data.number_chunks_y = 1;
112
113 // publisher instantiation and registration
114
115 m_ddt_publisher = std::make_unique<ddt::DdtDataPublisher>(m_logger);
116 m_ddt_publisher->set_topic_id(m_enc_dec.get_topic_id());
117 int bytes_per_sample = SI::HEIGHT * SI::WIDTH * sizeof(typename SI::PixelType);
118 int samples_in_buffer = 10;
119 m_ddt_publisher->SetBufferSize(bytes_per_sample, samples_in_buffer);
120
121 auto ret =
122 m_ddt_publisher->RegisterPublisher(m_broker_endpoint, std::string(SI::ID), false);
123 if (ret < 0) {
125 fmt::format("Failed to register DDT Publisher: {} against DDT Broker: {} "
126 "Check Broker URI, and ensure that Broker process is running.",
127 SI::ID,
128 m_broker_endpoint));
129 }
130 }
131
132 void Update() {
133 LOG4CPLUS_INFO(m_logger, fmt::format("Updating DdtPulisherImage2d::{}", SI::ID));
134
135 auto enabled_path = DataPointPath(m_rtr_prefix_dynamic + "/enabled");
136 if (m_rtr.DataPointExists(enabled_path)) {
137 m_enabled = m_rtr.GetDataPoint<bool>(enabled_path);
138 }
139 }
140
141 bool IsEnabled() {
142 return m_enabled;
143 }
144
146 m_meta_data.image_id = sample_id;
147 m_meta_data.meta_data_base.utc_timestamp = m_enc_dec.get_utc_timestamp();
148 m_enc_dec.Encode(m_meta_data);
149 std::vector<uint8_t> metadata = m_enc_dec.get_meta_data();
150
151 auto data_bytes = elements * sizeof(typename SI::PixelType);
152 m_ddt_publisher->WriteData(
153 sample_id, data_ptr, data_bytes, metadata.data(), metadata.size());
154 m_ddt_publisher->PublishData();
155 }
156
157private:
158 std::string GetStaticRtrPrefix(const std::string& db_prefix) {
159 std::string temp = db_prefix;
160 temp.insert(temp.find("/"), "/static");
161 return "/" + temp + "/" + std::string(SI::ID);
162 }
163
164 std::string GetDynamicRtrPrefix(const std::string& db_prefix) {
165 std::string temp = db_prefix;
166 temp.insert(temp.find("/"), "/dynamic");
167 return "/" + temp + "/" + std::string(SI::ID);
168 }
169
170 log4cplus::Logger& m_logger;
171
172 RuntimeRepoIf& m_rtr;
173
174 std::string m_rtr_prefix_static;
175 std::string m_rtr_prefix_dynamic;
176
180 bool m_enabled;
181
185 std::string m_broker_endpoint;
186
190 MetaDataElementsImage2D m_meta_data;
191
195 DdtEncDecImage2D m_enc_dec;
196
200 std::unique_ptr<ddt::DdtDataPublisher> m_ddt_publisher;
201};
202
203} // namespace rtctk::ddtServer
204
205#endif // DDT_PUBLISHER_IMAGE2D_HPP
This class provides a wrapper for a data point path.
Definition dataPointPath.hpp:74
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition repositoryIf.ipp:1711
bool DataPointExists(const DataPointPath &path) const
Checks for the existence of a datapoint in the repository.
Definition repositoryIf.cpp:461
The RtctkException class is the base class for all Rtctk exceptions.
Definition exceptions.hpp:211
Base interface for all Runtime Configuration Repository adapters.
Definition runtimeRepoIf.hpp:27
Container class that holds services of any type.
Definition serviceContainer.hpp:39
The UnsupportedTypeException is thrown whenever an attempt is made to use an unsupported type in the ...
Definition exceptions.hpp:239
DDT Publisher class used to publish two-dimensional DDT streams.
Definition ddtPublisherImage2d.hpp:38
SI StreamInfo
Definition ddtPublisherImage2d.hpp:40
componentFramework::DataPointPath DataPointPath
Definition ddtPublisherImage2d.hpp:42
bool IsEnabled()
Definition ddtPublisherImage2d.hpp:141
void Publish(uint32_t sample_id, const uint8_t *data_ptr, uint32_t elements)
Definition ddtPublisherImage2d.hpp:145
DdtPublisherImage2d(const std::string &db_prefix, ServiceContainer &services)
Definition ddtPublisherImage2d.hpp:48
void Update()
Definition ddtPublisherImage2d.hpp:132
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Definition businessLogic.cpp:24
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.
A container that can hold any type of service.