RTC Toolkit 4.0.2
Loading...
Searching...
No Matches
ddtPublisherImage2d.hpp
Go to the documentation of this file.
1
13#ifndef DDT_PUBLISHER_IMAGE2D_HPP
14#define DDT_PUBLISHER_IMAGE2D_HPP
15
20
21#include <ddt/ddtDataPublisher.hpp>
22#include <ddt/ddtEncDecImage2D.hpp>
23#include <ddt/ddtLogger.hpp>
24
25#include <boost/endian.hpp>
26#include <fmt/format.h>
27
28namespace rtctk::ddtServer {
29
30template <typename SI>
32public:
33 using StreamInfo = SI;
34
36
37 static_assert(SI::ID.find_first_not_of("abcdefghijklmnopqrstuvwxyz_0123456789") ==
38 std::string_view::npos,
39 "DDT Publisher ID contains illegal characters!");
40
41 DdtPublisherImage2d(std::string const& db_prefix, ServiceContainer& services)
42 : m_logger(componentFramework::GetLogger("rtctk"))
43 , m_rtr(services.Get<RuntimeRepoIf>())
44 , m_rtr_prefix_static(GetStaticRtrPrefix(db_prefix))
45 , m_rtr_prefix_dynamic(GetDynamicRtrPrefix(db_prefix))
46 , m_enabled(true)
47 , m_broker_endpoint("zpb.rr://127.0.0.1:5001") {
48 LOG4CPLUS_INFO(m_logger, fmt::format("Creating DdtPulisherImage2d '{}'", SI::ID));
49
50 // publisher configuration
51
52 auto broker_endpoint_path = DataPointPath(m_rtr_prefix_static + "/broker_endpoint");
53 if (m_rtr.DataPointExists(broker_endpoint_path)) {
54 m_broker_endpoint = m_rtr.GetDataPoint<std::string>(broker_endpoint_path);
55 }
56
57 auto enabled_path = DataPointPath(m_rtr_prefix_dynamic + "/enabled");
58 if (m_rtr.DataPointExists(enabled_path)) {
59 m_enabled = m_rtr.GetDataPoint<bool>(enabled_path);
60 }
61
62 // metadata preparation
63
64 m_meta_data.meta_data_base.bytes_per_pixel = sizeof(typename SI::PixelType);
65 m_meta_data.meta_data_base.number_dimensions = 1;
66 m_meta_data.meta_data_base.complete_flag = true;
67 m_meta_data.meta_data_base.last_segment = true;
68 /*
69 * Important note: Endiannes of the source is assumed to be the same as the one of the
70 * host architecture. This has been set by design, that the low-level devices has to
71 * translate (if necessary) to the RTCTK's endiannes. RTCTK assumes the data comes in the
72 * same endiannes as the machine running this process.
73 */
74 m_meta_data.meta_data_base.byte_order_little_endian =
75 static_cast<bool>(boost::endian::order::native);
76
77 m_meta_data.meta_data_base.description = SI::ID;
78
79 if (typeid(typename SI::PixelType) == typeid(uint8_t)) {
80 m_meta_data.meta_data_base.data_type = ddt::UINT8;
81 } else if (typeid(typename SI::PixelType) == typeid(int16_t)) {
82 m_meta_data.meta_data_base.data_type = ddt::SINT16;
83 } else if (typeid(typename SI::PixelType) == typeid(int32_t)) {
84 m_meta_data.meta_data_base.data_type = ddt::SINT32;
85 } else if (typeid(typename SI::PixelType) == typeid(float)) {
86 m_meta_data.meta_data_base.data_type = ddt::FLOAT32;
87 } else if (typeid(typename SI::PixelType) == typeid(double)) {
88 m_meta_data.meta_data_base.data_type = ddt::FLOAT64;
89 } else if (typeid(typename SI::PixelType) == typeid(uint16_t)) {
90 m_meta_data.meta_data_base.data_type = ddt::UINT16;
91 } else if (typeid(typename SI::PixelType) == typeid(uint32_t)) {
92 m_meta_data.meta_data_base.data_type = ddt::UINT32;
93 } else {
94 CII_THROW(UnsupportedTypeException, typeid(typename SI::PixelType));
95 }
96
97 m_meta_data.binning_factor_x = 1;
98 m_meta_data.binning_factor_y = 1;
99 m_meta_data.number_pixels_x = SI::WIDTH;
100 m_meta_data.number_pixels_y = SI::HEIGHT;
101 m_meta_data.first_pixel_x = 0;
102 m_meta_data.first_pixel_y = 0;
103 m_meta_data.number_chunks_x = 1;
104 m_meta_data.number_chunks_y = 1;
105
106 // publisher instantiation and registration
107
108 m_ddt_publisher = std::make_unique<ddt::DdtDataPublisher>(m_logger);
109 m_ddt_publisher->set_topic_id(m_enc_dec.get_topic_id());
110 int bytes_per_sample = SI::HEIGHT * SI::WIDTH * sizeof(typename SI::PixelType);
111 int samples_in_buffer = 10;
112 m_ddt_publisher->SetBufferSize(bytes_per_sample, samples_in_buffer);
113
114 auto ret =
115 m_ddt_publisher->RegisterPublisher(m_broker_endpoint, std::string(SI::ID), false);
116 if (ret < 0) {
117 CII_THROW(RtctkException,
118 fmt::format("Failed to register DDT Publisher: {} against DDT Broker: {} "
119 "Check Broker URI, and ensure that Broker process is running.",
120 SI::ID,
121 m_broker_endpoint));
122 }
123 }
124
125 void Update() {
126 LOG4CPLUS_INFO(m_logger, fmt::format("Updating DdtPulisherImage2d::{}", SI::ID));
127
128 auto enabled_path = DataPointPath(m_rtr_prefix_dynamic + "/enabled");
129 if (m_rtr.DataPointExists(enabled_path)) {
130 m_enabled = m_rtr.GetDataPoint<bool>(enabled_path);
131 }
132 }
133
134 bool IsEnabled() {
135 return m_enabled;
136 }
137
138 void Publish(uint32_t sample_id, const uint8_t* data_ptr, uint32_t elements) {
139 m_meta_data.image_id = sample_id;
140 m_meta_data.meta_data_base.utc_timestamp = m_enc_dec.get_utc_timestamp();
141 m_enc_dec.Encode(m_meta_data);
142 std::vector<uint8_t> metadata = m_enc_dec.get_meta_data();
143
144 auto data_bytes = elements * sizeof(typename SI::PixelType);
145 m_ddt_publisher->WriteData(
146 sample_id, data_ptr, data_bytes, metadata.data(), metadata.size());
147 m_ddt_publisher->PublishData();
148 }
149
150private:
151 std::string GetStaticRtrPrefix(std::string const& db_prefix) {
152 std::string temp = db_prefix;
153 temp.insert(temp.find("/"), "/static");
154 return temp + "/" + std::string(SI::ID);
155 }
156
157 std::string GetDynamicRtrPrefix(std::string const& db_prefix) {
158 std::string temp = db_prefix;
159 temp.insert(temp.find("/"), "/dynamic");
160 return temp + "/" + std::string(SI::ID);
161 }
162
163 log4cplus::Logger& m_logger;
164
165 RuntimeRepoIf& m_rtr;
166
167 std::string m_rtr_prefix_static;
168 std::string m_rtr_prefix_dynamic;
169
173 bool m_enabled;
174
178 std::string m_broker_endpoint;
179
183 MetaDataElementsImage2D m_meta_data;
184
188 DdtEncDecImage2D m_enc_dec;
189
193 std::unique_ptr<ddt::DdtDataPublisher> m_ddt_publisher;
194};
195
196} // namespace rtctk::ddtServer
197
198#endif // DDT_PUBLISHER_IMAGE2D_HPP
This class provides a wrapper for a data point path.
Definition: dataPointPath.hpp:73
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition: repositoryIf.hpp:574
virtual bool DataPointExists(const DataPointPath &path) const =0
Checks for the existence of a datapoint in the repository.
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:237
Base interface for all Runtime Configuration Repository adapters.
Definition: runtimeRepoIf.hpp:28
Container class that holds services of any type.
Definition: serviceContainer.hpp:39
The UnsupportedTypeException is thrown whenever an attempt is made to use an unsupported type in the ...
Definition: exceptions.hpp:265
Definition: ddtPublisherImage2d.hpp:31
SI StreamInfo
Definition: ddtPublisherImage2d.hpp:33
componentFramework::DataPointPath DataPointPath
Definition: ddtPublisherImage2d.hpp:35
DdtPublisherImage2d(std::string const &db_prefix, ServiceContainer &services)
Definition: ddtPublisherImage2d.hpp:41
bool IsEnabled()
Definition: ddtPublisherImage2d.hpp:134
void Publish(uint32_t sample_id, const uint8_t *data_ptr, uint32_t elements)
Definition: ddtPublisherImage2d.hpp:138
void Update()
Definition: ddtPublisherImage2d.hpp:125
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
log4cplus::Logger & GetLogger(const std::string &name="app")
Get handle to a specific logger.
Definition: logger.cpp:180
Definition: businessLogic.cpp:24
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.
A container that can hold any type of service.