RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
rtctkGenRtrWriter.h
Go to the documentation of this file.
1
12#ifndef RTCTK_GEN_RTR_WRITER_
13#define RTCTK_GEN_RTR_WRITER_
14
18
19#include <boost/accumulators/accumulators.hpp>
20#include <boost/accumulators/statistics/count.hpp>
21#include <boost/accumulators/statistics/max.hpp>
22#include <boost/accumulators/statistics/mean.hpp>
23#include <boost/accumulators/statistics/min.hpp>
24#include <boost/accumulators/statistics/stats.hpp>
25#include <boost/accumulators/statistics/variance.hpp>
26
27namespace rtctk::standaloneTools {
28
30using namespace std::chrono;
31namespace ba = boost::accumulators;
32
37protected:
38 log4cplus::Logger& m_logger;
39 uint16_t m_idx;
40 std::shared_ptr<rtctk_cfw::RuntimeRepoIf> m_rtr;
42 std::string m_server_alias;
45
46 std::chrono::microseconds m_sleep_period;
48
49 std::thread m_thread;
50 std::atomic<bool> m_thread_running = true;
51
52 ba::accumulator_set<
53 unsigned,
54 ba::stats<ba::tag::mean, ba::tag::variance, ba::tag::min, ba::tag::max, ba::tag::count>>
56
57public:
58 explicit GenRtrWriterBase(std::shared_ptr<rtctk_cfw::RuntimeRepoIf>& rtr,
60 const std::string& server_alias = "")
61 : m_logger(rtctk_cfw::GetLogger("rtctk"))
62 , m_rtr(rtr)
63 , m_dp(dp)
64 , m_server_alias(server_alias) {
65 s_idx++;
66 }
67
68 virtual ~GenRtrWriterBase() noexcept {
69 --s_idx;
70 }
71
72 GenRtrWriterBase() = delete;
75
76 virtual void Initialize(std::size_t dim_x,
77 std::size_t dim_y,
78 std::chrono::microseconds period,
79 uint32_t iterations) = 0;
80
82 m_thread_running = false;
83 } // SignalThreadExit
84
88 void Finalize() {
89 LOG4CPLUS_INFO_FMT(m_logger, "[%s] Finalize", m_dp.ToString().c_str());
90
92
93 if (m_thread.joinable()) {
94 m_thread.join();
95 }
96 //? should we check if dp exists?
97 if (m_dp_created) {
98 LOG4CPLUS_DEBUG_FMT(m_logger, "[%s] Going to delete DP.", m_dp.ToString().c_str());
99 m_rtr->DeleteDataPoint(m_dp);
100 }
101 } // Finalize
102
103 void Join() {
104 m_thread.join();
105 } // Join
106
107 // index of Generic RTR Writer
108 static std::atomic<uint16_t> s_idx;
109}; // GenRtrWriterBase
110
119template <typename T>
121 std::unique_ptr<rtctk_cfw::MatrixBuffer<T>> m_value;
122
126 void WriteDataPoint() {
127 uint16_t error_counter = 0;
128 if (m_iteration_num > 0) {
129 LOG4CPLUS_INFO_FMT(m_logger,
130 "[%s] Going to write value of size: %ld x %ld (%d bytes) %d times.",
131 m_dp.ToString().c_str(),
132 m_value->GetNrows(),
133 m_value->GetNcols(),
136 } else {
137 LOG4CPLUS_INFO_FMT(
138 m_logger,
139 "[%s] Going to write value of size: %ld x %ld (%d bytes) until Ctrl-C.",
140 m_dp.ToString().c_str(),
141 m_value->GetNrows(),
142 m_value->GetNcols(),
144 }
145
146 // Enter the work loop: incrementing iteration counter
147 // and write value to RTR's DP. We continue until the thread is running.
148 uint32_t iteration_counter = 0;
149 while (m_thread_running && (m_iteration_num == 0 ||
150 (m_iteration_num > 0 && iteration_counter < m_iteration_num))) {
151 try {
152 if (iteration_counter % 100 == 0) {
153 LOG4CPLUS_DEBUG_FMT(m_logger,
154 "[%s] Going to write DP. Iteration: %d.",
155 m_dp.ToString().c_str(),
156 iteration_counter);
157 }
158 auto timestamp = high_resolution_clock::now(); // timestamp
159 //? set timestamp to value to be written to RTR
160 auto t1 = std::chrono::steady_clock::now();
161 // write DP
162 m_rtr->WriteDataPoint(m_dp, *m_value);
163 //? EH
164
165 auto t2 = std::chrono::steady_clock::now();
166 unsigned t_diff =
167 std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
168 m_accumulator(t_diff);
169 error_counter = 0;
170 if (iteration_counter % 100 == 0) {
171 auto timestamp_nanos =
172 duration_cast<nanoseconds>(timestamp.time_since_epoch()).count();
173 double timestamp_s = static_cast<double>(timestamp_nanos) / 1e9;
174 LOG4CPLUS_DEBUG_FMT(
175 m_logger,
176 "[%s]\tWrote to DP at time (epoch): %f. Iteration: %d.\tWrite "
177 "time [us]: min: %u max: %u mean: %.2f variance: %.2f",
178 m_dp.ToString().c_str(),
179 timestamp_s,
180 iteration_counter,
181 ba::min(m_accumulator),
182 ba::max(m_accumulator),
183 ba::mean(m_accumulator),
184 ba::variance(m_accumulator));
185 m_accumulator = {};
186 }
187 iteration_counter++;
188 } catch (const std::exception& ex) {
189 LOG4CPLUS_WARN_FMT(m_logger,
190 "[%s ] Iteration: %d exception caught: %s!",
191 m_dp.ToString().c_str(),
192 iteration_counter,
193 ex.what());
194 if (error_counter++ > 5) {
195 LOG4CPLUS_ERROR_FMT(
196 m_logger,
197 "[%s] More than 5 consecutive exception caught. Writing to RTR stoped!",
198 m_dp.ToString().c_str());
199 break;
200 } // if
201 } catch (...) {
202 LOG4CPLUS_WARN_FMT(m_logger,
203 "[%s ] Iteration: %d unknown exception caught!",
204 m_dp.ToString().c_str(),
205 iteration_counter);
206 if (error_counter++ > 5) {
207 LOG4CPLUS_ERROR_FMT(
208 m_logger,
209 "[%s] More than 5 consecutive exception caught. Writing to RTR stoped!",
210 m_dp.ToString().c_str());
211 break;
212 } // if
213 }
214
215 std::this_thread::sleep_for(m_sleep_period);
216 } // while
217
218 LOG4CPLUS_INFO_FMT(m_logger,
219 "[%s] %u / %u values have been written. DONE!",
220 m_dp.ToString().c_str(),
221 iteration_counter,
223 LOG4CPLUS_DEBUG_FMT(m_logger,
224 "[%s]\tWrite "
225 "time [us]: min: %u max: %u med: %.2f variance: %.2f",
226 m_dp.ToString().c_str(),
227 ba::min(m_accumulator),
228 ba::max(m_accumulator),
229 ba::mean(m_accumulator),
230 ba::variance(m_accumulator));
231 } // WriteDataPoint
232
233public:
234 explicit GenRtrWriter(std::shared_ptr<rtctk_cfw::RuntimeRepoIf>& rtr,
235 const rtctk_cfw::DataPointPath& dp,
236 const std::string& server_alias = "")
237 : GenRtrWriterBase(rtr, dp, server_alias) {
238 }
239
240 virtual ~GenRtrWriter() noexcept {
241 }
242
243 GenRtrWriter() = delete;
244 GenRtrWriter(const GenRtrWriter&) = delete;
246
255 void Initialize(std::size_t dim_x,
256 std::size_t dim_y,
257 std::chrono::microseconds period,
258 uint32_t iterations) {
259 m_sleep_period = period;
260 m_iteration_num = iterations;
261 m_value = std::make_unique<rtctk_cfw::MatrixBuffer<T>>();
262 m_value->resize(dim_x, dim_y);
263 LOG4CPLUS_INFO_FMT(m_logger,
264 "[%s] Created matrix of size: %ld x %ld",
265 m_dp.ToString().c_str(),
266 dim_x,
267 dim_y);
268 m_payload_bytes = m_value->GetNrows() * m_value->GetNcols() * sizeof(T);
269
270 if (!m_rtr->DataPointExists(m_dp)) {
271 try {
272 LOG4CPLUS_DEBUG_FMT(m_logger,
273 "[%s] DP does not exist going to create one (server alias=%s).",
274 m_dp.ToString().c_str(),
275 m_server_alias.c_str());
276 // if server_alias is not provided it means it is "" what means to write to the
277 // central OLDB
278 m_rtr->CreateDataPoint<rtctk_cfw::MatrixBuffer<T>>(m_dp, *m_value, m_server_alias);
279 m_dp_created = true;
280 } catch (const std::exception& ex) {
281 // we need to check if the DP has been created, and thus the problem probably just
282 // to write a value ..
283 // ... in such a case we need to delete it
284 if (m_rtr->DataPointExists(m_dp)) {
285 LOG4CPLUS_WARN_FMT(m_logger,
286 "[%s] Going to delete DP after an error.",
287 m_dp.ToString().c_str());
288 m_rtr->DeleteDataPoint(m_dp);
289 }
290 CII_THROW_WITH_NESTED(
292 ex,
293 "[" + m_dp.ToString() + "] Problem creating DP and setting inital value");
294 } // try-catch
295 } else {
296 LOG4CPLUS_DEBUG_FMT(m_logger, "[%s] DP exists.", m_dp.ToString().c_str());
297 m_dp_created = false;
298 } // if-else
299
300 m_thread = std::thread(&GenRtrWriter::WriteDataPoint, this);
301 } // Initialize
302
303}; // GenRtrWriter
304
305} // namespace rtctk::standaloneTools
306
307#endif // RTCTK_GEN_RTR_WRITER_
This class provides a wrapper for a data point path.
Definition dataPointPath.hpp:74
const std::string & ToString() const noexcept
Get string representing the DataPointPath.
Definition dataPointPath.hpp:428
A buffer class representing 2D matrix data.
Definition matrixBuffer.hpp:28
The RtctkException class is the base class for all Rtctk exceptions.
Definition exceptions.hpp:211
Base class for Generic RTR Writer.
Definition rtctkGenRtrWriter.h:36
uint32_t m_payload_bytes
Definition rtctkGenRtrWriter.h:44
std::chrono::microseconds m_sleep_period
Definition rtctkGenRtrWriter.h:46
std::thread m_thread
Definition rtctkGenRtrWriter.h:49
static std::atomic< uint16_t > s_idx
Definition rtctkGenRtrWriter.h:108
rtctk_cfw::DataPointPath m_dp
Definition rtctkGenRtrWriter.h:41
virtual ~GenRtrWriterBase() noexcept
Definition rtctkGenRtrWriter.h:68
GenRtrWriterBase(GenRtrWriterBase &&)=delete
GenRtrWriterBase(const GenRtrWriterBase &)=delete
void Finalize()
Stops publishing thread, and join it, delete data point (DP).
Definition rtctkGenRtrWriter.h:88
uint16_t m_idx
Definition rtctkGenRtrWriter.h:39
uint32_t m_iteration_num
Definition rtctkGenRtrWriter.h:47
bool m_dp_created
Definition rtctkGenRtrWriter.h:43
log4cplus::Logger & m_logger
Definition rtctkGenRtrWriter.h:38
std::string m_server_alias
Definition rtctkGenRtrWriter.h:42
virtual void Initialize(std::size_t dim_x, std::size_t dim_y, std::chrono::microseconds period, uint32_t iterations)=0
std::atomic< bool > m_thread_running
Definition rtctkGenRtrWriter.h:50
void SignalThreadExit()
Definition rtctkGenRtrWriter.h:81
GenRtrWriterBase(std::shared_ptr< rtctk_cfw::RuntimeRepoIf > &rtr, const rtctk_cfw::DataPointPath &dp, const std::string &server_alias="")
Definition rtctkGenRtrWriter.h:58
void Join()
Definition rtctkGenRtrWriter.h:103
ba::accumulator_set< unsigned, ba::stats< ba::tag::mean, ba::tag::variance, ba::tag::min, ba::tag::max, ba::tag::count > > m_accumulator
Definition rtctkGenRtrWriter.h:55
std::shared_ptr< rtctk_cfw::RuntimeRepoIf > m_rtr
Definition rtctkGenRtrWriter.h:40
Generic RTR Writer Class provides functionally to create one RTR's DP DDS, and one thread that writes...
Definition rtctkGenRtrWriter.h:120
void Initialize(std::size_t dim_x, std::size_t dim_y, std::chrono::microseconds period, uint32_t iterations)
Creates Data Point (DP) and publishing thread.
Definition rtctkGenRtrWriter.h:255
virtual ~GenRtrWriter() noexcept
Definition rtctkGenRtrWriter.h:240
GenRtrWriter(GenRtrWriter &&)=delete
GenRtrWriter(std::shared_ptr< rtctk_cfw::RuntimeRepoIf > &rtr, const rtctk_cfw::DataPointPath &dp, const std::string &server_alias="")
Definition rtctkGenRtrWriter.h:234
GenRtrWriter(const GenRtrWriter &)=delete
Logging Support Library based on log4cplus.
Declaration of the MatrixSpan template class used in APIs.
Definition commandReplier.cpp:22
Definition genDdsPublisher.hpp:20
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.