13#ifndef RTCTK_DATATASK_COMPUTATIONBASE_HPP
14#define RTCTK_DATATASK_COMPUTATIONBASE_HPP
21#include <ipcq/reader.hpp>
22#include <numapp/numapolicies.hpp>
23#include <numapp/thread.hpp>
25#include <fmt/format.h>
30#include <shared_mutex>
37class ComputationMonitor {
45 std::chrono::duration<double, std::micro>
dur_read;
52 explicit ComputationMonitor(ComponentMetricsIf& metrics)
53 : m_start_time(std::chrono::steady_clock::now()) {
56 m_pc_cycles_reg = metrics.AddCounter(
59 m_pc_samples_reg = metrics.AddCounter(
62 m_pc_sample_id_reg = metrics.AddCounter(
65 m_pc_freq_estimate_reg =
66 metrics.AddCounter(&m_pc_freq_estimate,
69 m_pc_occupancy_reg = metrics.AddCounter(
72 m_pc_dur_read_reg = metrics.AddCounter(
75 m_pc_dur_compute_reg = metrics.AddCounter(
76 &m_pc_dur_compute,
CounterMetricInfo(
"duration_compute",
"compute duration [us]"));
78 m_pc_dur_publish_reg = metrics.AddCounter(
79 &m_pc_dur_publish,
CounterMetricInfo(
"duration_publish",
"publish duration [us]"));
82 void Tick(Stats
const& stats)
noexcept {
83 using namespace std::chrono_literals;
85 auto now_time = std::chrono::steady_clock::now();
86 auto elapsed_time = now_time - m_start_time;
87 if (elapsed_time > 2s) {
88 m_start_time = now_time;
90 m_pc_cycles.Store(stats.num_cycles);
91 m_pc_samples.Store(stats.num_samples);
92 m_pc_sample_id.Store(stats.last_sample_id);
93 m_pc_freq_estimate.Store(stats.freq_estimate);
94 m_pc_occupancy.Store(stats.buffer_occupancy);
95 m_pc_dur_read.Store(stats.dur_read.count());
96 m_pc_dur_compute.Store(stats.dur_compute.count());
97 m_pc_dur_publish.Store(stats.dur_publish.count());
101 void Reset() noexcept {
102 m_start_time = std::chrono::steady_clock::now();
103 m_pc_cycles.Store(0);
104 m_pc_samples.Store(0);
105 m_pc_sample_id.Store(0);
106 m_pc_freq_estimate.Store(0);
107 m_pc_occupancy.Store(0.0);
108 m_pc_dur_read.Store(0);
109 m_pc_dur_compute.Store(0);
110 m_pc_dur_publish.Store(0);
114 std::chrono::steady_clock::time_point m_start_time;
116 perfc::CounterI64 m_pc_cycles;
117 perfc::ScopedRegistration m_pc_cycles_reg;
119 perfc::CounterI64 m_pc_samples;
120 perfc::ScopedRegistration m_pc_samples_reg;
122 perfc::CounterI64 m_pc_sample_id;
123 perfc::ScopedRegistration m_pc_sample_id_reg;
125 perfc::CounterDouble m_pc_freq_estimate;
126 perfc::ScopedRegistration m_pc_freq_estimate_reg;
128 perfc::CounterDouble m_pc_occupancy;
129 perfc::ScopedRegistration m_pc_occupancy_reg;
131 perfc::CounterI64 m_pc_dur_read;
132 perfc::ScopedRegistration m_pc_dur_read_reg;
134 perfc::CounterI64 m_pc_dur_compute;
135 perfc::ScopedRegistration m_pc_dur_compute_reg;
137 perfc::CounterI64 m_pc_dur_publish;
138 perfc::ScopedRegistration m_pc_dur_publish_reg;
157template <
typename TopicTypeX,
typename ReaderType = ipcq::Reader<TopicTypeX>>
186 std::string
const& shm_name,
189 std::chrono::milliseconds sample_timeout,
190 std::optional<numapp::NumaPolicies> thread_policies)
191 : m_logger(
rtctk::componentFramework::GetLogger(
"rtctk"))
192 , m_services(services)
193 , m_shm_name(shm_name)
196 , m_chunk_size(std::max(1l, (std::chrono::milliseconds(500) / sample_timeout)))
197 , m_sample_timeout(sample_timeout)
198 , m_thread_policies(std::move(thread_policies))
199 , m_command(Command::
IDLE)
221 m_to_read.store(value, std::memory_order_relaxed);
228 return m_to_read.load(std::memory_order_relaxed);
238 m_to_skip.store(value, std::memory_order_relaxed);
245 return m_to_skip.load(std::memory_order_relaxed);
255 m_exception =
nullptr;
256 m_command = Command::IDLE;
257 m_thread = numapp::MakeThread(
"Computation",
258 m_thread_policies.value_or(numapp::NumaPolicies()),
259 &ComputationBase::Work,
262 using namespace std::chrono_literals;
263 std::this_thread::sleep_for(10ms);
279 m_command = Command::EXIT;
280 if (m_thread.joinable()) {
294 void Run(std::optional<size_t> cycles = std::nullopt) {
295 m_cycles_to_run = cycles.value_or(0);
296 m_command = Command::RUN;
306 void RunOnceSync(std::optional<std::chrono::milliseconds> poll_interval = std::nullopt) {
310 using namespace std::chrono_literals;
311 std::this_thread::sleep_for(poll_interval.value_or(10ms));
327 m_command = Command::IDLE;
338 auto state = m_state.load(std::memory_order_relaxed);
339 auto command = m_command.load(std::memory_order_relaxed);
341 if (state ==
State::IDLE and command == Command::RUN) {
356 return m_cycles.load(std::memory_order_relaxed);
365 auto lock = std::shared_lock{m_exception_mutex};
367 std::rethrow_exception(m_exception);
412 using namespace std::chrono_literals;
414 std::error_code
const ok{};
415 std::error_code ret = ok;
419 m_cycles.store(cycles, std::memory_order_relaxed);
421 auto t0 = std::chrono::steady_clock::now();
428 auto reader = ReaderType::MakeReader(m_shm_name.c_str(), 30s);
435 Command command = m_command.load(std::memory_order_relaxed);
436 if (command == Command::EXIT) {
438 m_state.store(
State::OFF, std::memory_order_relaxed);
440 }
else if (command == Command::IDLE) {
443 m_cycles.store(cycles, std::memory_order_relaxed);
447 m_state.store(
State::IDLE, std::memory_order_relaxed);
449 std::this_thread::sleep_for(10ms);
451 auto prev_state = m_state.exchange(
State::RUNNING, std::memory_order_relaxed);
457 fmt::format(
"SHM Reset failed: {}", ret.message()));
461 auto occupancy = Occupancy(reader);
463 size_t to_read = m_to_read.load(std::memory_order_relaxed);
464 size_t to_skip = m_to_skip.load(std::memory_order_relaxed);
468 size_t sample_idx = 0;
470 t0 = std::chrono::steady_clock::now();
471 ret = Read(reader, to_read, [&](
TopicType const& sample) {
481 "Work() Read: cycle {}, buffer occupancy {}", cycles, occupancy));
483 fmt::format(
"SHM Read failed: {}", ret.message()));
486 if (m_command.load(std::memory_order_relaxed) != Command::RUN) {
492 t1 = std::chrono::steady_clock::now();
494 t2 = std::chrono::steady_clock::now();
496 t3 = std::chrono::steady_clock::now();
498 ret = Skip(reader, to_skip);
503 "Work() Skip: cycle {}, buffer occupancy {}", cycles, occupancy));
505 fmt::format(
"SHM Skip failed: {}", ret.message()));
508 t4 = std::chrono::steady_clock::now();
511 m_cycles.store(cycles, std::memory_order_relaxed);
517 (to_read + to_skip) / std::chrono::duration<float>(t4 - t0).count(),
524 if (
size_t c2r = m_cycles_to_run.load(); c2r != 0 and c2r == cycles) {
531 std::scoped_lock lock(m_exception_mutex);
532 m_exception = std::current_exception();
536 template <
typename Operation>
537 std::error_code Read(ReaderType& reader,
size_t to_read, Operation&& op) {
538 using namespace std::chrono;
541 std::error_code
const ok;
542 std::pair<std::error_code, size_t> ret;
543 milliseconds time_elapsed{0};
544 auto time_start = steady_clock::now();
547 if (m_command.load(std::memory_order_relaxed) != Command::RUN) {
552 size_t to_read_now = std::min(m_chunk_size, to_read - read);
554 ret = reader.Read(std::forward<Operation>(op), to_read_now, m_sample_timeout);
555 if (ret.first != ok) {
559 if (read == to_read) {
563 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
564 if (time_elapsed > m_sample_timeout * to_read) {
565 return std::make_error_code(std::errc::timed_out);
570 std::error_code Skip(ReaderType& reader,
size_t to_skip) {
571 using namespace std::chrono;
574 std::error_code
const ok;
575 std::pair<std::error_code, size_t> ret;
576 milliseconds time_elapsed{0};
577 auto time_start = steady_clock::now();
580 if ((to_skip == 0) or (m_command.load(std::memory_order_relaxed) != Command::RUN)) {
585 size_t to_skip_now = std::min(m_chunk_size, to_skip - skipped);
587 ret = reader.Skip(to_skip_now, m_sample_timeout);
588 if (ret.first != ok) {
591 skipped += ret.second;
592 if (skipped == to_skip) {
596 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
597 if (time_elapsed > m_sample_timeout * to_skip) {
598 return std::make_error_code(std::errc::timed_out);
603 std::error_code Reset(ReaderType& reader) {
604 auto ret = reader.Reset();
605 if (ret == ipcq::Error::WouldBlock) {
611 float Occupancy(ReaderType& reader) {
612 return 100.0 * (
static_cast<float>(reader.NumAvailable()) / reader.Size());
616 enum class Command { RUN,
IDLE,
EXIT };
618 log4cplus::Logger& m_logger;
620 std::string m_shm_name;
621 std::atomic<size_t> m_to_read;
622 std::atomic<size_t> m_to_skip;
624 std::chrono::milliseconds m_sample_timeout;
625 std::optional<numapp::NumaPolicies> m_thread_policies;
626 std::atomic<Command> m_command;
627 std::atomic<State> m_state;
628 std::atomic<size_t> m_cycles_to_run;
629 std::atomic<size_t> m_cycles;
630 std::exception_ptr m_exception =
nullptr;
631 std::shared_mutex m_exception_mutex;
632 std::thread m_thread;
Component metrics interface.
Definition: componentMetricsIf.hpp:184
Defines auxiliary information associated with each counter registered with ComponentMetricsIf.
Definition: componentMetricsIf.hpp:46
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:237
Container class that holds services of any type.
Definition: serviceContainer.hpp:39
Service & Get(const std::string &name="")
Get a handle to a service.
Definition: serviceContainer.hpp:100
Base class for Data Task computations.
Definition: computationBase.hpp:158
void Run(std::optional< size_t > cycles=std::nullopt)
Commands the worker thread to perform number of computation cycles (async method).
Definition: computationBase.hpp:294
void Spawn()
Spawns the worker thread (sync method).
Definition: computationBase.hpp:253
virtual void OnThreadStart()
Optional user-hook called after worker thread started and reader was created.
Definition: computationBase.hpp:377
virtual void CopyData(size_t sample_idx, TopicType const &sample) noexcept=0
User-hook used to copy data of a single computation cycle into user-owned sample buffer.
virtual void OnCycleStart(size_t to_read)
Optional user-hook called immediately before a read-cycle from SHM starts.
Definition: computationBase.hpp:385
void Idle()
Commands the worker thread to stop performing computation cycles (async method).
Definition: computationBase.hpp:326
void SetSamplesToSkip(size_t value)
Sets the number of samples to skip.
Definition: computationBase.hpp:237
size_t GetSamplesToSkip() const
Gets the number of samples to skip.
Definition: computationBase.hpp:244
void CheckErrors()
Checks for errors in the worker thread and rethrows them to the caller.
Definition: computationBase.hpp:364
void RunOnceSync(std::optional< std::chrono::milliseconds > poll_interval=std::nullopt)
Commands the worker thread to perform a single computation cycle (sync method).
Definition: computationBase.hpp:306
virtual ~ComputationBase()=default
Destructor.
size_t GetCycles() const
Retrieves number of computation cycles performed since running.
Definition: computationBase.hpp:355
void SetSamplesToRead(size_t value)
Sets the number of samples to read.
Definition: computationBase.hpp:220
void Join()
Terminates the worker thread (sync method).
Definition: computationBase.hpp:278
ComputationBase(ServiceContainer &services, std::string const &shm_name, size_t to_read, size_t to_skip, std::chrono::milliseconds sample_timeout, std::optional< numapp::NumaPolicies > thread_policies)
Constructor.
Definition: computationBase.hpp:185
virtual void Compute()=0
User-hook to perform a computation cycle.
TopicTypeX TopicType
Definition: computationBase.hpp:160
size_t GetSamplesToRead() const
Gets the number of samples to read.
Definition: computationBase.hpp:227
virtual void Publish()=0
User-hook to publish a computation result.
State GetState() const
Returns the current state of the worker thread.
Definition: computationBase.hpp:337
State
Current state of the worker thread.
Definition: computationBase.hpp:165
Header file for ComponentMetricsIf.
float freq_estimate
Definition: computationBase.hpp:43
std::chrono::duration< double, std::micro > dur_compute
Definition: computationBase.hpp:46
std::chrono::duration< double, std::micro > dur_read
Definition: computationBase.hpp:45
uint32_t last_sample_id
Definition: computationBase.hpp:42
float buffer_occupancy
Definition: computationBase.hpp:44
size_t num_samples
Definition: computationBase.hpp:41
size_t num_cycles
Definition: computationBase.hpp:40
std::chrono::duration< double, std::micro > dur_publish
Definition: computationBase.hpp:47
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
Definition: commandReplier.cpp:22
std::error_code Reset(ReaderType &reader)
Helper function to reset the ipcq.reader to latest sample.
Definition: readerHelpers.hpp:160
Definition: computationBase.hpp:33
Definition: commandReplier.cpp:22
A container that can hold any type of service.