186 const std::string& shm_name,
189 std::chrono::milliseconds sample_timeout,
190 std::optional<numapp::NumaPolicies> thread_policies)
191 : m_logger(
rtctk::componentFramework::GetLogger(
"rtctk"))
192 , m_services(services)
193 , m_shm_name(shm_name)
196 , m_chunk_size(std::max(1l, (std::chrono::milliseconds(500) / sample_timeout)))
197 , m_sample_timeout(sample_timeout)
198 , m_thread_policies(std::move(thread_policies))
199 , m_command(Command::
IDLE)
221 m_to_read.store(value, std::memory_order_relaxed);
228 return m_to_read.load(std::memory_order_relaxed);
238 m_to_skip.store(value, std::memory_order_relaxed);
245 return m_to_skip.load(std::memory_order_relaxed);
255 m_exception =
nullptr;
256 m_command = Command::IDLE;
257 m_thread = numapp::MakeThread(
"Computation",
258 m_thread_policies.value_or(numapp::NumaPolicies()),
259 &ComputationBase::Work,
262 using namespace std::chrono_literals;
263 std::this_thread::sleep_for(10ms);
279 m_command = Command::EXIT;
280 if (m_thread.joinable()) {
294 void Run(std::optional<size_t> cycles = std::nullopt) {
295 m_cycles_to_run = cycles.value_or(0);
296 m_command = Command::RUN;
306 void RunOnceSync(std::optional<std::chrono::milliseconds> poll_interval = std::nullopt) {
310 using namespace std::chrono_literals;
311 std::this_thread::sleep_for(poll_interval.value_or(10ms));
327 m_command = Command::IDLE;
338 auto state = m_state.load(std::memory_order_relaxed);
339 auto command = m_command.load(std::memory_order_relaxed);
341 if (state ==
State::IDLE and command == Command::RUN) {
356 return m_cycles.load(std::memory_order_relaxed);
365 auto lock = std::shared_lock{m_exception_mutex};
367 std::rethrow_exception(m_exception);
412 using namespace std::chrono_literals;
414 const std::error_code ok{};
415 std::error_code ret = ok;
419 m_cycles.store(cycles, std::memory_order_relaxed);
421 auto t0 = std::chrono::steady_clock::now();
428 auto reader = ReaderType::MakeReader(m_shm_name.c_str(), 30s);
435 Command command = m_command.load(std::memory_order_relaxed);
436 if (command == Command::EXIT) {
438 m_state.store(
State::OFF, std::memory_order_relaxed);
440 }
else if (command == Command::IDLE) {
443 m_cycles.store(cycles, std::memory_order_relaxed);
447 m_state.store(
State::IDLE, std::memory_order_relaxed);
449 std::this_thread::sleep_for(10ms);
451 auto prev_state = m_state.exchange(
State::RUNNING, std::memory_order_relaxed);
457 fmt::format(
"SHM Reset failed: {}", ret.message()));
461 auto occupancy = Occupancy(reader);
463 size_t to_read = m_to_read.load(std::memory_order_relaxed);
464 size_t to_skip = m_to_skip.load(std::memory_order_relaxed);
468 size_t sample_idx = 0;
470 t0 = std::chrono::steady_clock::now();
471 ret = Read(reader, to_read, [&](
const TopicType& sample) {
481 "Work() Read: cycle {}, buffer occupancy {}", cycles, occupancy));
483 fmt::format(
"SHM Read failed: {}", ret.message()));
486 if (m_command.load(std::memory_order_relaxed) != Command::RUN) {
492 t1 = std::chrono::steady_clock::now();
494 t2 = std::chrono::steady_clock::now();
496 t3 = std::chrono::steady_clock::now();
498 ret = Skip(reader, to_skip);
503 "Work() Skip: cycle {}, buffer occupancy {}", cycles, occupancy));
505 fmt::format(
"SHM Skip failed: {}", ret.message()));
508 t4 = std::chrono::steady_clock::now();
511 m_cycles.store(cycles, std::memory_order_relaxed);
517 (to_read + to_skip) / std::chrono::duration<float>(t4 - t0).count(),
524 if (
size_t c2r = m_cycles_to_run.load(); c2r != 0 and c2r == cycles) {
531 std::scoped_lock lock(m_exception_mutex);
532 m_exception = std::current_exception();
536 template <
typename Operation>
537 std::error_code Read(ReaderType& reader,
size_t to_read, Operation&& op) {
538 using namespace std::chrono;
541 const std::error_code ok;
542 std::pair<std::error_code, size_t> ret;
543 milliseconds time_elapsed{0};
544 auto time_start = steady_clock::now();
547 if (m_command.load(std::memory_order_relaxed) != Command::RUN) {
552 size_t to_read_now = std::min(m_chunk_size, to_read - read);
554 ret = reader.Read(std::forward<Operation>(op), to_read_now, m_sample_timeout);
555 if (ret.first != ok) {
559 if (read == to_read) {
563 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
564 if (time_elapsed > m_sample_timeout * to_read) {
565 return std::make_error_code(std::errc::timed_out);
570 std::error_code Skip(ReaderType& reader,
size_t to_skip) {
571 using namespace std::chrono;
574 const std::error_code ok;
575 std::pair<std::error_code, size_t> ret;
576 milliseconds time_elapsed{0};
577 auto time_start = steady_clock::now();
580 if ((to_skip == 0) or (m_command.load(std::memory_order_relaxed) != Command::RUN)) {
585 size_t to_skip_now = std::min(m_chunk_size, to_skip - skipped);
587 ret = reader.Skip(to_skip_now, m_sample_timeout);
588 if (ret.first != ok) {
591 skipped += ret.second;
592 if (skipped == to_skip) {
596 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
597 if (time_elapsed > m_sample_timeout * to_skip) {
598 return std::make_error_code(std::errc::timed_out);
603 std::error_code Reset(ReaderType& reader) {
604 auto ret = reader.Reset();
605 if (ret == ipcq::Error::WouldBlock) {
611 float Occupancy(ReaderType& reader) {
612 return 100.0 * (
static_cast<float>(reader.NumAvailable()) / reader.Size());
616 enum class Command : uint8_t { RUN,
IDLE,
EXIT };
618 log4cplus::Logger& m_logger;
620 std::string m_shm_name;
621 std::atomic<size_t> m_to_read;
622 std::atomic<size_t> m_to_skip;
624 std::chrono::milliseconds m_sample_timeout;
625 std::optional<numapp::NumaPolicies> m_thread_policies;
626 std::atomic<Command> m_command;
627 std::atomic<State> m_state;
628 std::atomic<size_t> m_cycles_to_run;
629 std::atomic<size_t> m_cycles;
630 std::exception_ptr m_exception =
nullptr;
631 std::shared_mutex m_exception_mutex;
632 std::thread m_thread;