13#ifndef RTCTK_DATATASK_READERTHREAD_HPP
14#define RTCTK_DATATASK_READERTHREAD_HPP
25#include <ipcq/reader.hpp>
26#include <numapp/numapolicies.hpp>
27#include <numapp/thread.hpp>
46 :
rtctk::componentFramework::RtctkException(
"Request '" + req_name +
"' timed out!") {
56 :
rtctk::componentFramework::RtctkException(
"An asynchronous error occured!") {
60 :
rtctk::componentFramework::RtctkException(
"An asynchronous error occured: '" + text +
78template <
typename TopicType,
template <
typename>
typename ReaderType = ipcq::Reader>
82 : m_logger(
rtctk::componentFramework::GetLogger(
"rtctk"))
84 , m_thread_name(
"thread_name",
"readerThread")
85 , m_queue_name(
"queue_name")
86 , m_thread_affinity(
"thread_affinity")
87 , m_samples_to_read(
"samples_to_read")
88 , m_samples_to_skip(
"samples_to_skip", 0)
89 , m_loop_frequency(
"loop_frequency")
90 , m_error_margin(
"error_margin")
92 , m_callback_on_data()
93 , m_callback_init_thread() {
97 if (m_thread.joinable()) {
114 using namespace std::chrono_literals;
116 LOG4CPLUS_TRACE(m_logger,
"ReaderThread::Spawn()");
119 assert(m_callback_on_data);
122 m_queue_name.CheckSet();
123 m_thread_name.CheckSet();
124 m_samples_to_read.CheckSet();
125 m_samples_to_skip.CheckSet();
126 m_loop_frequency.CheckSet();
130 m_thread_name.Lock();
131 m_thread_affinity.Lock();
132 m_loop_frequency.Lock();
134 if (!m_error_margin.IsSet()) {
135 m_error_margin.Set(1.5);
137 m_error_margin.Lock();
139 auto policies = numapp::NumaPolicies();
140 if (m_thread_affinity.IsSet()) {
141 auto cpu_mask = numapp::Cpumask::MakeFromCpuStringAll(
142 std::to_string(m_thread_affinity.Get()).c_str());
143 policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
146 auto f = SendRequestAsync(Command::Idle);
148 m_thread = numapp::MakeThread(m_thread_name.Get(), policies, &ReaderThread::Work,
this);
151 auto status = f.wait_for(1200ms);
152 if (status != std::future_status::ready) {
167 using namespace std::chrono_literals;
169 LOG4CPLUS_TRACE(m_logger,
"ReaderThread::Join()");
171 auto f = SendRequestAsync(Command::Exit);
175 std::max(1200ms, detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
176 auto status = f.wait_for(timeout);
180 m_queue_name.Unlock();
181 m_thread_name.Unlock();
182 m_thread_affinity.Unlock();
183 m_loop_frequency.Unlock();
185 if (status != std::future_status::ready) {
194 using namespace std::chrono_literals;
196 LOG4CPLUS_TRACE(m_logger,
"ReaderThread::Run()");
199 SendRequestSync(Command::Run, 1200ms);
206 using namespace std::chrono_literals;
208 LOG4CPLUS_TRACE(m_logger,
"ReaderThread::Idle()");
212 std::max(1200ms, detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
213 SendRequestSync(Command::Idle, timeout);
223 using namespace std::chrono_literals;
224 std::error_code
const ok{};
225 auto timeout = detail::CalcTimeout(m_samples_to_read.Get() + m_samples_to_skip.Get(),
226 m_loop_frequency.Get(),
227 m_error_margin.Get());
229 auto ret = m_comp_allowed.Pend(timeout);
236 using namespace std::chrono_literals;
237 using namespace std::chrono;
238 auto timeout = detail::CalcTimeout(m_samples_to_read.Get() + m_samples_to_skip.Get(),
239 m_loop_frequency.Get(),
240 m_error_margin.Get());
242 auto time_start = steady_clock::now();
244 std::this_thread::sleep_for(5ms);
246 if (st.StopRequested()) {
248 m_logger,
"ReaderThread::WaitUntilComputationAllowed() aborted via StopToken!");
252 auto ret = m_comp_allowed.TryPend();
254 std::error_code
const ok{};
255 if (ret.value() != ok) {
261 if (duration_cast<milliseconds>(steady_clock::now() - time_start) > timeout) {
262 CII_THROW(
AsynchronousError,
"ReaderThread::WaitUntilComputationAllowed() timeout");
281 m_queue_name.Set(name);
298 m_thread_name.Set(name);
306 m_thread_affinity.Set(affinity);
314 m_samples_to_read.Set(value);
322 m_samples_to_skip.Set(value);
331 m_loop_frequency.Set(value);
340 m_error_margin.Set(value);
352 m_callback_on_data = callback;
363 m_callback_init_thread = std::move(callback);
367 enum class State :
unsigned {
380 const std::map<State, std::string> m_state_text = {
381 {State::Error,
"Error"},
383 {State::Starting,
"Starting"},
384 {State::Terminating,
"Terminating"},
385 {State::Idle,
"Idle"},
386 {State::Reading,
"Reading"},
387 {State::Skipping,
"Skipping"},
388 {State::Waiting,
"Waiting"},
389 {State::Dropping,
"Dropping"},
393 enum class Command :
unsigned {
401 const std::map<Command, std::string> m_command_text = {
402 {Command::None,
"-"},
403 {Command::Run,
"Run"},
404 {Command::Idle,
"Idle"},
405 {Command::Exit,
"Exit"},
413 std::future<void> SendRequestAsync(Command cmd) {
414 Request<Command> req(cmd);
415 auto f = req.GetReplyFuture();
416 m_request_q.Post(std::move(req));
426 template <
class Rep,
class Period>
427 void SendRequestSync(Command cmd, std::chrono::duration<Rep, Period> timeout) {
428 auto f = SendRequestAsync(cmd);
429 auto status = f.wait_for(timeout);
430 if (status != std::future_status::ready) {
431 CII_THROW(RequestTimedOut, m_command_text.at(cmd));
435 Request<Command> GetRequest() {
436 return m_request_q.TryPend().value_or(Request(Command::None));
444 using namespace std::chrono;
445 using namespace std::chrono_literals;
447 auto req = GetRequest();
449 m_state = State::Off;
450 State next_state = State::Starting;
451 State prev_state = State::Off;
453 auto time_start = steady_clock::now();
454 milliseconds time_elapsed{0};
455 auto duration_skip =
CalcTimeout(m_samples_to_skip.Get(), m_loop_frequency.Get(), 1.0);
456 auto timeout_wait{duration_skip / 2};
457 auto timeout_drop{duration_skip / 2};
462 std::error_code
const ok{};
463 std::error_code ret = ok;
465 auto reader = ReaderType<TopicType>::MakeReader(m_queue_name.Get().c_str(), 30s);
468 prev_state = m_state;
469 if (m_state != next_state) {
470 m_state = next_state;
473 "ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) <<
"'");
477 case State::Starting: {
479 if (m_callback_init_thread) {
480 m_callback_init_thread();
482 next_state = State::Idle;
484 ret = std::make_error_code(std::errc::timed_out);
485 next_state = State::Error;
491 if (m_state != prev_state) {
497 if (req.GetPayload() == Command::Exit) {
498 next_state = State::Terminating;
500 }
else if (req.GetPayload() == Command::Run) {
503 next_state = ret == ok ? State::Reading : State::Error;
507 std::this_thread::sleep_for(10ms);
508 next_state = m_state;
512 case State::Reading: {
513 if (m_state != prev_state) {
514 if (prev_state == State::Idle) {
515 m_comp_allowed.Clear();
518 to_read = m_samples_to_read.Get();
519 to_skip = m_samples_to_skip.Get();
523 if (req.GetPayload() == Command::Exit) {
524 next_state = State::Terminating;
526 }
else if (req.GetPayload() == Command::Idle) {
527 next_state = State::Idle;
532 size_t to_read_now = m_loop_frequency.Get();
534 if (to_read_now == 0 and to_read > 0) {
538 if (to_read_now > to_read) {
539 to_read_now = to_read;
545 m_loop_frequency.Get(),
546 m_error_margin.Get());
548 to_read -= to_read_now;
551 next_state = State::Error;
552 }
else if (to_read == 0) {
553 if (m_mode.Get() == ReaderMode::Continuous) {
554 next_state = State::Skipping;
556 next_state = State::Idle;
558 m_comp_allowed.Post(ok);
561 next_state = m_state;
566 case State::Skipping: {
568 if (req.GetPayload() == Command::Exit) {
569 next_state = State::Terminating;
571 }
else if (req.GetPayload() == Command::Idle) {
572 next_state = State::Idle;
577 size_t to_skip_now = m_loop_frequency.Get();
579 if (to_skip_now == 0 and to_skip > 0) {
583 if (to_skip_now > to_skip) {
584 to_skip_now = to_skip;
587 ret =
Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
589 to_skip -= to_skip_now;
592 next_state = State::Error;
593 }
else if (to_skip == 0) {
594 if (m_comp_done.TryWait()) {
595 next_state = State::Reading;
597 next_state = State::Waiting;
600 next_state = m_state;
605 case State::Waiting: {
606 if (m_state != prev_state) {
607 time_start = steady_clock::now();
611 if (req.GetPayload() == Command::Exit) {
612 next_state = State::Terminating;
614 }
else if (req.GetPayload() == Command::Idle) {
615 next_state = State::Idle;
619 if (m_comp_done.TryWait()) {
620 next_state = State::Reading;
624 std::this_thread::sleep_for(1ms);
626 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
627 if ((time_elapsed > timeout_wait) or (
NumFree(reader) == 0)) {
628 next_state = State::Dropping;
630 next_state = m_state;
635 case State::Dropping: {
636 if (m_state != prev_state) {
637 time_start = steady_clock::now();
641 if (req.GetPayload() == Command::Exit) {
642 next_state = State::Terminating;
644 }
else if (req.GetPayload() == Command::Idle) {
645 next_state = State::Idle;
649 if (m_comp_done.TryWait()) {
651 next_state = ret == ok ? State::Reading : State::Error;
655 std::this_thread::sleep_for(1ms);
657 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
658 if (time_elapsed > timeout_drop) {
659 ret = std::make_error_code(std::errc::timed_out);
660 next_state = State::Error;
662 next_state = m_state;
668 if (m_state != prev_state) {
669 m_comp_allowed.Post(ret);
670 LOG4CPLUS_ERROR(m_logger,
"ReaderThread::Work() - " << ret.message());
674 if (req.GetPayload() == Command::Exit) {
675 next_state = State::Terminating;
676 }
else if (req.GetPayload() == Command::Idle) {
677 next_state = State::Idle;
679 std::this_thread::sleep_for(100ms);
680 next_state = m_state;
685 case State::Terminating: {
686 next_state = State::Off;
698 log4cplus::Logger& m_logger;
700 std::thread m_thread;
718 std::function<void(
const TopicType& sample)> m_callback_on_data;
719 std::function<void()> m_callback_init_thread;
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:237
Definition: readerThread.hpp:53
AsynchronousError(const std::string &text)
Definition: readerThread.hpp:59
AsynchronousError()
Definition: readerThread.hpp:55
Definition: messageQueue.hpp:29
Class for basic handling of parameters in read thread.
Definition: parameter.hpp:52
ReaderThread for the Data Class.
Definition: readerThread.hpp:79
void Spawn()
Spawn the reader thread.
Definition: readerThread.hpp:113
void Run()
send synchronous run request to readerThread.
Definition: readerThread.hpp:193
void RegisterOnDataCallback(std::function< void(const TopicType &sample)> callback)
Required callback.
Definition: readerThread.hpp:351
void SetQueueName(std::string const &name)
Required setter.
Definition: readerThread.hpp:279
void Join()
waits for thread to complete and rejoins
Definition: readerThread.hpp:166
void SetThreadName(std::string const &name)
Optional setter.
Definition: readerThread.hpp:296
void SetMode(ReaderMode mode)
Optional setter.
Definition: readerThread.hpp:288
void WaitUntilComputationAllowed()
wait until the computation can be launched.
Definition: readerThread.hpp:222
void Idle()
send Idle request to readerThread.
Definition: readerThread.hpp:205
void SetCpuAffinity(int affinity)
Optional setter.
Definition: readerThread.hpp:305
void SetLoopFrequency(float value)
Required setter.
Definition: readerThread.hpp:330
void SetErrorMargin(float value)
Optional setter.
Definition: readerThread.hpp:339
void RegisterInitThreadCallback(std::function< void()> callback)
Optional callback.
Definition: readerThread.hpp:362
void SetSamplesToRead(size_t value)
Required setter.
Definition: readerThread.hpp:313
~ReaderThread()
Definition: readerThread.hpp:96
ReaderThread()
Definition: readerThread.hpp:81
void WaitUntilComputationAllowed(rtctk::componentFramework::StopToken st)
Definition: readerThread.hpp:235
void SetSamplesToSkip(size_t value)
Required setter.
Definition: readerThread.hpp:321
void SignalComputationDone()
returns from the business logic that the calculation has completed so that the readerThread is safe t...
Definition: readerThread.hpp:271
Definition: readerThread.hpp:43
RequestTimedOut(const std::string &req_name)
Definition: readerThread.hpp:45
Definition: semaphore.hpp:26
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
A simple message queue implementation.
rad::StopToken StopToken
Definition: stopToken.hpp:20
Definition: readerHelpers.hpp:24
std::chrono::milliseconds CalcTimeout(size_t count, float loop_frequency, float error_margin)
Helper function to calculate the estimated time to read the a number of samples at a given frequency.
Definition: readerHelpers.hpp:36
std::error_code Read(ReaderType &reader, Operation &&op, size_t count, float loop_frequency, float error_margin)
Helper function to wrap the ipcq.read with handling of timeouts and count values.
Definition: readerHelpers.hpp:56
std::error_code Skip(ReaderType &reader, size_t count, float loop_frequency, float error_margin)
Helper function to wrap the ipcq.skip with handling of timeouts and count values.
Definition: readerHelpers.hpp:110
std::error_code Reset(ReaderType &reader)
Helper function to reset the ipcq.reader to latest sample.
Definition: readerHelpers.hpp:160
size_t NumFree(ReaderType &reader)
Helper function to get the free space in the shm.
Definition: readerHelpers.hpp:176
Definition: computationBase.hpp:33
ReaderMode
Definition: readerThread.hpp:65
Definition: commandReplier.cpp:22
A Parameter class used for Data Tasks.
Helper methods to read data from shared memory queue.
A Request class used in Data Tasks.
A simple semaphore implementation.
void Error(const char *msg)
Definition: main.cpp:43