RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
computationBase.hpp
Go to the documentation of this file.
1
13#ifndef RTCTK_DATATASK_COMPUTATIONBASE_HPP
14#define RTCTK_DATATASK_COMPUTATIONBASE_HPP
15
20
21#include <ipcq/reader.hpp>
22#include <numapp/numapolicies.hpp>
23#include <numapp/thread.hpp>
24
25#include <fmt/format.h>
26
27#include <atomic>
28#include <chrono>
29#include <optional>
30#include <shared_mutex>
31#include <thread>
32
33namespace rtctk::dataTask {
34
35namespace {
36
37class ComputationMonitor {
38public:
39 struct Stats {
40 size_t num_cycles;
45 std::chrono::duration<double, std::micro> dur_read;
46 std::chrono::duration<double, std::micro> dur_compute;
47 std::chrono::duration<double, std::micro> dur_publish;
48 };
49
50 using ComponentMetricsIf = rtctk::componentFramework::ComponentMetricsIf;
51
52 explicit ComputationMonitor(ComponentMetricsIf& metrics)
53 : m_start_time(std::chrono::steady_clock::now()) {
54 using namespace rtctk::componentFramework;
55
56 m_pc_cycles_reg = metrics.AddCounter(
57 &m_pc_cycles, CounterMetricInfo("num_cycles", "cycles since running"));
58
59 m_pc_samples_reg = metrics.AddCounter(
60 &m_pc_samples, CounterMetricInfo("num_samples", "samples read since running"));
61
62 m_pc_sample_id_reg = metrics.AddCounter(
63 &m_pc_sample_id, CounterMetricInfo("last_sample_id", "last observed sample id"));
64
65 m_pc_freq_estimate_reg =
66 metrics.AddCounter(&m_pc_freq_estimate,
67 CounterMetricInfo("frequency_estimate", "frequency estimate [Hz]"));
68
69 m_pc_occupancy_reg = metrics.AddCounter(
70 &m_pc_occupancy, CounterMetricInfo("buffer_occupancy", "buffer occupancy [%]"));
71
72 m_pc_dur_read_reg = metrics.AddCounter(
73 &m_pc_dur_read, CounterMetricInfo("duration_read", "read duration [us]"));
74
75 m_pc_dur_compute_reg = metrics.AddCounter(
76 &m_pc_dur_compute, CounterMetricInfo("duration_compute", "compute duration [us]"));
77
78 m_pc_dur_publish_reg = metrics.AddCounter(
79 &m_pc_dur_publish, CounterMetricInfo("duration_publish", "publish duration [us]"));
80 }
81
82 void Tick(const Stats& stats) noexcept {
83 using namespace std::chrono_literals;
84
85 auto now_time = std::chrono::steady_clock::now();
86 auto elapsed_time = now_time - m_start_time;
87 if (elapsed_time > 2s) {
88 m_start_time = now_time;
89
90 m_pc_cycles.Store(stats.num_cycles);
91 m_pc_samples.Store(stats.num_samples);
92 m_pc_sample_id.Store(stats.last_sample_id);
93 m_pc_freq_estimate.Store(stats.freq_estimate);
94 m_pc_occupancy.Store(stats.buffer_occupancy);
95 m_pc_dur_read.Store(stats.dur_read.count());
96 m_pc_dur_compute.Store(stats.dur_compute.count());
97 m_pc_dur_publish.Store(stats.dur_publish.count());
98 }
99 }
100
101 void Reset() noexcept {
102 m_start_time = std::chrono::steady_clock::now();
103 m_pc_cycles.Store(0);
104 m_pc_samples.Store(0);
105 m_pc_sample_id.Store(0);
106 m_pc_freq_estimate.Store(0);
107 m_pc_occupancy.Store(0.0);
108 m_pc_dur_read.Store(0);
109 m_pc_dur_compute.Store(0);
110 m_pc_dur_publish.Store(0);
111 }
112
113private:
114 std::chrono::steady_clock::time_point m_start_time;
115
116 perfc::CounterI64 m_pc_cycles;
117 perfc::ScopedRegistration m_pc_cycles_reg;
118
119 perfc::CounterI64 m_pc_samples;
120 perfc::ScopedRegistration m_pc_samples_reg;
121
122 perfc::CounterI64 m_pc_sample_id;
123 perfc::ScopedRegistration m_pc_sample_id_reg;
124
125 perfc::CounterDouble m_pc_freq_estimate;
126 perfc::ScopedRegistration m_pc_freq_estimate_reg;
127
128 perfc::CounterDouble m_pc_occupancy;
129 perfc::ScopedRegistration m_pc_occupancy_reg;
130
131 perfc::CounterI64 m_pc_dur_read;
132 perfc::ScopedRegistration m_pc_dur_read_reg;
133
134 perfc::CounterI64 m_pc_dur_compute;
135 perfc::ScopedRegistration m_pc_dur_compute_reg;
136
137 perfc::CounterI64 m_pc_dur_publish;
138 perfc::ScopedRegistration m_pc_dur_publish_reg;
139};
140
141} // namespace
142
157template <typename TopicTypeX, typename ReaderType = ipcq::Reader<TopicTypeX>>
159public:
160 using TopicType = TopicTypeX;
161
165 enum class State : uint8_t {
166 RUNNING, // thread is reading, skipping and performing computations
167 IDLE, // thread is continuously resetting the queue, no computations
168 ERROR, // thread terminated automatically due to an error
169 OFF // thread was not yet started or terminated manually
170 };
171
174
186 const std::string& shm_name,
187 size_t to_read,
188 size_t to_skip,
189 std::chrono::milliseconds sample_timeout,
190 std::optional<numapp::NumaPolicies> thread_policies)
191 : m_logger(rtctk::componentFramework::GetLogger("rtctk"))
192 , m_services(services)
193 , m_shm_name(shm_name)
194 , m_to_read(to_read)
195 , m_to_skip(to_skip)
196 , m_chunk_size(std::max(1l, (std::chrono::milliseconds(500) / sample_timeout)))
197 , m_sample_timeout(sample_timeout)
198 , m_thread_policies(std::move(thread_policies))
199 , m_command(Command::IDLE)
200 , m_state(State::OFF)
201 , m_cycles_to_run(0)
202 , m_cycles(0) {
203 }
204
212 virtual ~ComputationBase() = default;
213
220 void SetSamplesToRead(size_t value) {
221 m_to_read.store(value, std::memory_order_relaxed);
222 }
223
227 size_t GetSamplesToRead() const {
228 return m_to_read.load(std::memory_order_relaxed);
229 }
230
237 void SetSamplesToSkip(size_t value) {
238 m_to_skip.store(value, std::memory_order_relaxed);
239 }
240
244 size_t GetSamplesToSkip() const {
245 return m_to_skip.load(std::memory_order_relaxed);
246 }
247
253 void Spawn() {
254 // clear the previous exception to be able to recover
255 m_exception = nullptr;
256 m_command = Command::IDLE;
257 m_thread = numapp::MakeThread("Computation",
258 m_thread_policies.value_or(numapp::NumaPolicies()),
259 &ComputationBase::Work,
260 this);
261 while (1) {
262 using namespace std::chrono_literals;
263 std::this_thread::sleep_for(10ms);
264
265 CheckErrors();
266
267 if (GetState() != State::OFF) {
268 break;
269 }
270 }
271 }
272
278 void Join() {
279 m_command = Command::EXIT;
280 if (m_thread.joinable()) {
281 m_thread.join();
282 }
283 }
284
294 void Run(std::optional<size_t> cycles = std::nullopt) {
295 m_cycles_to_run = cycles.value_or(0);
296 m_command = Command::RUN;
297 }
298
306 void RunOnceSync(std::optional<std::chrono::milliseconds> poll_interval = std::nullopt) {
307 Run(1);
308
309 while (1) {
310 using namespace std::chrono_literals;
311 std::this_thread::sleep_for(poll_interval.value_or(10ms));
312
313 CheckErrors();
314
315 if (GetState() == State::IDLE) {
316 break;
317 }
318 }
319 }
320
326 void Idle() {
327 m_command = Command::IDLE;
328 }
329
337 State GetState() const {
338 auto state = m_state.load(std::memory_order_relaxed);
339 auto command = m_command.load(std::memory_order_relaxed);
340
341 if (state == State::IDLE and command == Command::RUN) {
342 return State::RUNNING;
343 } else {
344 return state;
345 }
346 }
347
355 size_t GetCycles() const {
356 return m_cycles.load(std::memory_order_relaxed);
357 }
358
364 void CheckErrors() {
365 auto lock = std::shared_lock{m_exception_mutex};
366 if (m_exception) {
367 std::rethrow_exception(m_exception);
368 }
369 }
370
371protected:
377 virtual void OnThreadStart() {
378 }
379
385 virtual void OnCycleStart(size_t to_read) {
386 }
387
393 virtual void CopyData(size_t sample_idx, const TopicType& sample) noexcept = 0;
394
400 virtual void Compute() = 0;
401
407 virtual void Publish() = 0;
408
409private:
410 void Work() {
411 using namespace rtctk::componentFramework;
412 using namespace std::chrono_literals;
413
414 const std::error_code ok{};
415 std::error_code ret = ok;
416
417 size_t cycles = 0; // since State::RUNNING
418 size_t samples = 0; // since State::RUNNING
419 m_cycles.store(cycles, std::memory_order_relaxed);
420
421 auto t0 = std::chrono::steady_clock::now();
422 auto t1 = t0;
423 auto t2 = t0;
424 auto t3 = t0;
425 auto t4 = t0;
426
427 try {
428 auto reader = ReaderType::MakeReader(m_shm_name.c_str(), 30s);
429
430 ComputationMonitor monitor{m_services.Get<ComponentMetricsIf>()};
431
433
434 while (1) {
435 Command command = m_command.load(std::memory_order_relaxed);
436 if (command == Command::EXIT) {
437 monitor.Reset();
438 m_state.store(State::OFF, std::memory_order_relaxed);
439 return;
440 } else if (command == Command::IDLE) {
441 cycles = 0;
442 samples = 0;
443 m_cycles.store(cycles, std::memory_order_relaxed);
444
445 // as long as we publish metrics to CII OLDB, we wont reset the monitor here
446 // monitor.Reset();
447 m_state.store(State::IDLE, std::memory_order_relaxed);
448
449 std::this_thread::sleep_for(10ms);
450 } else { // RUN
451 auto prev_state = m_state.exchange(State::RUNNING, std::memory_order_relaxed);
452 if (prev_state != State::RUNNING) {
453 // we are entering State::RUNNING for first time
454 ret = Reset(reader);
455 if (ret != ok) {
456 CII_THROW(RtctkException,
457 fmt::format("SHM Reset failed: {}", ret.message()));
458 }
459 }
460
461 auto occupancy = Occupancy(reader);
462
463 size_t to_read = m_to_read.load(std::memory_order_relaxed);
464 size_t to_skip = m_to_skip.load(std::memory_order_relaxed);
465
466 OnCycleStart(to_read);
467
468 size_t sample_idx = 0;
469 uint32_t last_sample_id = 0;
470 t0 = std::chrono::steady_clock::now();
471 ret = Read(reader, to_read, [&](const TopicType& sample) {
472 CopyData(sample_idx, sample);
473 last_sample_id = sample.sample_id;
474 sample_idx++;
475 });
476
477 if (ret != ok) {
478 LOG4CPLUS_ERROR(
479 m_logger,
480 fmt::format(
481 "Work() Read: cycle {}, buffer occupancy {}", cycles, occupancy));
482 CII_THROW(RtctkException,
483 fmt::format("SHM Read failed: {}", ret.message()));
484 }
485
486 if (m_command.load(std::memory_order_relaxed) != Command::RUN) {
487 // premature exit if command changed while reading
488 continue;
489 }
490
491 // all data has arrived for this cycle
492 t1 = std::chrono::steady_clock::now();
493 Compute();
494 t2 = std::chrono::steady_clock::now();
495 Publish();
496 t3 = std::chrono::steady_clock::now();
497
498 ret = Skip(reader, to_skip);
499 if (ret != ok) {
500 LOG4CPLUS_ERROR(
501 m_logger,
502 fmt::format(
503 "Work() Skip: cycle {}, buffer occupancy {}", cycles, occupancy));
504 CII_THROW(RtctkException,
505 fmt::format("SHM Skip failed: {}", ret.message()));
506 }
507
508 t4 = std::chrono::steady_clock::now();
509 cycles++;
510 samples += to_read;
511 m_cycles.store(cycles, std::memory_order_relaxed);
512
513 monitor.Tick(
514 {cycles,
515 samples,
517 (to_read + to_skip) / std::chrono::duration<float>(t4 - t0).count(),
518 occupancy,
519 t1 - t0,
520 t2 - t1,
521 t3 - t2});
522
523 // after having computed requested number of cycles we Idle
524 if (size_t c2r = m_cycles_to_run.load(); c2r != 0 and c2r == cycles) {
525 Idle();
526 }
527 }
528 }
529 } catch (...) {
530 m_state.store(State::ERROR, std::memory_order_relaxed);
531 std::scoped_lock lock(m_exception_mutex);
532 m_exception = std::current_exception();
533 }
534 }
535
536 template <typename Operation>
537 std::error_code Read(ReaderType& reader, size_t to_read, Operation&& op) {
538 using namespace std::chrono;
539
540 size_t read = 0;
541 const std::error_code ok;
542 std::pair<std::error_code, size_t> ret;
543 milliseconds time_elapsed{0};
544 auto time_start = steady_clock::now();
545
546 while (1) {
547 if (m_command.load(std::memory_order_relaxed) != Command::RUN) {
548 // premature exit, no error
549 return {};
550 }
551
552 size_t to_read_now = std::min(m_chunk_size, to_read - read);
553
554 ret = reader.Read(std::forward<Operation>(op), to_read_now, m_sample_timeout);
555 if (ret.first != ok) {
556 return ret.first;
557 }
558 read += ret.second;
559 if (read == to_read) {
560 return {};
561 }
562
563 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
564 if (time_elapsed > m_sample_timeout * to_read) {
565 return std::make_error_code(std::errc::timed_out);
566 }
567 }
568 }
569
570 std::error_code Skip(ReaderType& reader, size_t to_skip) {
571 using namespace std::chrono;
572
573 size_t skipped = 0;
574 const std::error_code ok;
575 std::pair<std::error_code, size_t> ret;
576 milliseconds time_elapsed{0};
577 auto time_start = steady_clock::now();
578
579 while (1) {
580 if ((to_skip == 0) or (m_command.load(std::memory_order_relaxed) != Command::RUN)) {
581 // premature exit, no error
582 return {};
583 }
584
585 size_t to_skip_now = std::min(m_chunk_size, to_skip - skipped);
586
587 ret = reader.Skip(to_skip_now, m_sample_timeout);
588 if (ret.first != ok) {
589 return ret.first;
590 }
591 skipped += ret.second;
592 if (skipped == to_skip) {
593 return {};
594 }
595
596 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
597 if (time_elapsed > m_sample_timeout * to_skip) {
598 return std::make_error_code(std::errc::timed_out);
599 }
600 }
601 }
602
603 std::error_code Reset(ReaderType& reader) {
604 auto ret = reader.Reset();
605 if (ret == ipcq::Error::WouldBlock) {
606 return {};
607 }
608 return ret;
609 }
610
611 float Occupancy(ReaderType& reader) {
612 return 100.0 * (static_cast<float>(reader.NumAvailable()) / reader.Size());
613 }
614
615private:
616 enum class Command : uint8_t { RUN, IDLE, EXIT };
617
618 log4cplus::Logger& m_logger;
619 ServiceContainer& m_services;
620 std::string m_shm_name;
621 std::atomic<size_t> m_to_read;
622 std::atomic<size_t> m_to_skip;
623 size_t m_chunk_size;
624 std::chrono::milliseconds m_sample_timeout;
625 std::optional<numapp::NumaPolicies> m_thread_policies;
626 std::atomic<Command> m_command;
627 std::atomic<State> m_state;
628 std::atomic<size_t> m_cycles_to_run;
629 std::atomic<size_t> m_cycles;
630 std::exception_ptr m_exception = nullptr;
631 std::shared_mutex m_exception_mutex;
632 std::thread m_thread;
633};
634
635} // namespace rtctk::dataTask
636
637#endif // RTCTK_DATATASK_COMPUTATIONBASE_HPP
Component metrics interface.
Definition componentMetricsIf.hpp:85
Defines auxiliary information associated with each counter registered with ComponentMetricsIf.
Definition componentMetricsIf.hpp:46
The RtctkException class is the base class for all Rtctk exceptions.
Definition exceptions.hpp:211
Container class that holds services of any type.
Definition serviceContainer.hpp:39
Service & Get(const std::string &name="")
Get a handle to a service.
Definition serviceContainer.hpp:100
Base class for Data Task computations.
Definition computationBase.hpp:158
void Run(std::optional< size_t > cycles=std::nullopt)
Commands the worker thread to perform number of computation cycles (async method).
Definition computationBase.hpp:294
void Spawn()
Spawns the worker thread (sync method).
Definition computationBase.hpp:253
virtual void OnThreadStart()
Optional user-hook called after worker thread started and reader was created.
Definition computationBase.hpp:377
virtual void OnCycleStart(size_t to_read)
Optional user-hook called immediately before a read-cycle from SHM starts.
Definition computationBase.hpp:385
void Idle()
Commands the worker thread to stop performing computation cycles (async method).
Definition computationBase.hpp:326
void SetSamplesToSkip(size_t value)
Sets the number of samples to skip.
Definition computationBase.hpp:237
size_t GetSamplesToSkip() const
Gets the number of samples to skip.
Definition computationBase.hpp:244
void CheckErrors()
Checks for errors in the worker thread and rethrows them to the caller.
Definition computationBase.hpp:364
void RunOnceSync(std::optional< std::chrono::milliseconds > poll_interval=std::nullopt)
Commands the worker thread to perform a single computation cycle (sync method).
Definition computationBase.hpp:306
virtual ~ComputationBase()=default
Destructor.
size_t GetCycles() const
Retrieves number of computation cycles performed since running.
Definition computationBase.hpp:355
void SetSamplesToRead(size_t value)
Sets the number of samples to read.
Definition computationBase.hpp:220
void Join()
Terminates the worker thread (sync method).
Definition computationBase.hpp:278
State
Current state of the worker thread.
Definition computationBase.hpp:165
virtual void Compute()=0
User-hook to perform a computation cycle.
virtual void CopyData(size_t sample_idx, const TopicType &sample) noexcept=0
User-hook used to copy data of a single computation cycle into user-owned sample buffer.
TopicTypeX TopicType
Definition computationBase.hpp:160
size_t GetSamplesToRead() const
Gets the number of samples to read.
Definition computationBase.hpp:227
virtual void Publish()=0
User-hook to publish a computation result.
State GetState() const
Returns the current state of the worker thread.
Definition computationBase.hpp:337
ComputationBase(ServiceContainer &services, const std::string &shm_name, size_t to_read, size_t to_skip, std::chrono::milliseconds sample_timeout, std::optional< numapp::NumaPolicies > thread_policies)
Constructor.
Definition computationBase.hpp:185
Header file for ComponentMetricsIf.
float freq_estimate
Definition computationBase.hpp:43
std::chrono::duration< double, std::micro > dur_compute
Definition computationBase.hpp:46
std::chrono::duration< double, std::micro > dur_read
Definition computationBase.hpp:45
uint32_t last_sample_id
Definition computationBase.hpp:42
float buffer_occupancy
Definition computationBase.hpp:44
size_t num_samples
Definition computationBase.hpp:41
size_t num_cycles
Definition computationBase.hpp:40
std::chrono::duration< double, std::micro > dur_publish
Definition computationBase.hpp:47
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
Definition commandReplier.cpp:22
Definition computationBase.hpp:33
Definition commandReplier.cpp:22
A container that can hold any type of service.