RTC Toolkit 4.0.2
Loading...
Searching...
No Matches
readerThread.hpp
Go to the documentation of this file.
1
13#ifndef RTCTK_DATATASK_READERTHREAD_HPP
14#define RTCTK_DATATASK_READERTHREAD_HPP
15
24
25#include <ipcq/reader.hpp>
26#include <numapp/numapolicies.hpp>
27#include <numapp/thread.hpp>
28
29#include <chrono>
30#include <cmath>
31#include <ctime>
32#include <functional>
33#include <map>
34#include <stdexcept>
35#include <thread>
36
37namespace rtctk::dataTask {
38
39// TODO: this one is more high level, different file
44public:
45 RequestTimedOut(const std::string& req_name)
46 : rtctk::componentFramework::RtctkException("Request '" + req_name + "' timed out!") {
47 }
48};
49
54public:
56 : rtctk::componentFramework::RtctkException("An asynchronous error occured!") {
57 }
58
59 AsynchronousError(const std::string& text)
60 : rtctk::componentFramework::RtctkException("An asynchronous error occured: '" + text +
61 "'!") {
62 }
63};
64
65enum class [[deprecated]] ReaderMode : unsigned { Single, Continuous };
66
78template <typename TopicType, template <typename> typename ReaderType = ipcq::Reader>
79class [[deprecated]] ReaderThread {
80public:
82 : m_logger(rtctk::componentFramework::GetLogger("rtctk"))
83 , m_mode("reader_mode", ReaderMode::Continuous)
84 , m_thread_name("thread_name", "readerThread")
85 , m_queue_name("queue_name")
86 , m_thread_affinity("thread_affinity")
87 , m_samples_to_read("samples_to_read")
88 , m_samples_to_skip("samples_to_skip", 0)
89 , m_loop_frequency("loop_frequency")
90 , m_error_margin("error_margin")
91 , m_state(State::Off)
92 , m_callback_on_data()
93 , m_callback_init_thread() {
94 }
95
97 if (m_thread.joinable()) {
98 Join();
99 }
100 }
101
113 void Spawn() {
114 using namespace std::chrono_literals;
115
116 LOG4CPLUS_TRACE(m_logger, "ReaderThread::Spawn()");
117
118 // check if callback to OnDataAvailable has been registered
119 assert(m_callback_on_data);
120
121 m_mode.CheckSet();
122 m_queue_name.CheckSet();
123 m_thread_name.CheckSet();
124 m_samples_to_read.CheckSet();
125 m_samples_to_skip.CheckSet();
126 m_loop_frequency.CheckSet();
127
128 m_mode.Lock();
129 m_queue_name.Lock();
130 m_thread_name.Lock();
131 m_thread_affinity.Lock();
132 m_loop_frequency.Lock();
133
134 if (!m_error_margin.IsSet()) {
135 m_error_margin.Set(1.5);
136 }
137 m_error_margin.Lock();
138
139 auto policies = numapp::NumaPolicies();
140 if (m_thread_affinity.IsSet()) {
141 auto cpu_mask = numapp::Cpumask::MakeFromCpuStringAll(
142 std::to_string(m_thread_affinity.Get()).c_str());
143 policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
144 }
145
146 auto f = SendRequestAsync(Command::Idle);
147
148 m_thread = numapp::MakeThread(m_thread_name.Get(), policies, &ReaderThread::Work, this);
149
150 // SM is starting quickly, so hard-coded timeout will do
151 auto status = f.wait_for(1200ms);
152 if (status != std::future_status::ready) {
153 CII_THROW(RequestTimedOut, m_command_text.at(Command::Idle));
154 }
155 }
156
166 void Join() {
167 using namespace std::chrono_literals;
168
169 LOG4CPLUS_TRACE(m_logger, "ReaderThread::Join()");
170
171 auto f = SendRequestAsync(Command::Exit);
172
173 // here timeout depends on loop_frequency and sample_to_read/skip
174 auto timeout =
175 std::max(1200ms, detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
176 auto status = f.wait_for(timeout);
177 m_thread.join();
178
179 m_mode.Unlock();
180 m_queue_name.Unlock();
181 m_thread_name.Unlock();
182 m_thread_affinity.Unlock();
183 m_loop_frequency.Unlock();
184
185 if (status != std::future_status::ready) {
186 CII_THROW(RequestTimedOut, m_command_text.at(Command::Exit));
187 }
188 }
189
193 void Run() {
194 using namespace std::chrono_literals;
195
196 LOG4CPLUS_TRACE(m_logger, "ReaderThread::Run()");
197
198 // SM is going to state running quickly, so hard-coded timeout will do
199 SendRequestSync(Command::Run, 1200ms);
200 }
201
205 void Idle() {
206 using namespace std::chrono_literals;
207
208 LOG4CPLUS_TRACE(m_logger, "ReaderThread::Idle()");
209
210 // here timeout depends on loop_frequency and sample_to_read/skip
211 auto timeout =
212 std::max(1200ms, detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
213 SendRequestSync(Command::Idle, timeout);
214 }
215
223 using namespace std::chrono_literals;
224 std::error_code const ok{};
225 auto timeout = detail::CalcTimeout(m_samples_to_read.Get() + m_samples_to_skip.Get(),
226 m_loop_frequency.Get(),
227 m_error_margin.Get());
228
229 auto ret = m_comp_allowed.Pend(timeout);
230 if (ret != ok) {
231 CII_THROW(AsynchronousError, ret.message());
232 }
233 }
234
236 using namespace std::chrono_literals;
237 using namespace std::chrono;
238 auto timeout = detail::CalcTimeout(m_samples_to_read.Get() + m_samples_to_skip.Get(),
239 m_loop_frequency.Get(),
240 m_error_margin.Get());
241
242 auto time_start = steady_clock::now();
243 while (true) {
244 std::this_thread::sleep_for(5ms);
245
246 if (st.StopRequested()) {
247 LOG4CPLUS_TRACE(
248 m_logger, "ReaderThread::WaitUntilComputationAllowed() aborted via StopToken!");
249 break;
250 }
251
252 auto ret = m_comp_allowed.TryPend();
253 if (ret) {
254 std::error_code const ok{};
255 if (ret.value() != ok) {
256 CII_THROW(AsynchronousError, ret.value().message());
257 }
258 break;
259 }
260
261 if (duration_cast<milliseconds>(steady_clock::now() - time_start) > timeout) {
262 CII_THROW(AsynchronousError, "ReaderThread::WaitUntilComputationAllowed() timeout");
263 }
264 }
265 }
266
272 m_comp_done.Post();
273 }
274
279 void SetQueueName(std::string const& name) {
280 // TODO is there a max lenght of the name?
281 m_queue_name.Set(name);
282 }
283
288 void SetMode(ReaderMode mode) {
289 m_mode.Set(mode);
290 }
291
296 void SetThreadName(std::string const& name) {
297 // TODO check for max 16 characters length
298 m_thread_name.Set(name);
299 }
300
305 void SetCpuAffinity(int affinity) {
306 m_thread_affinity.Set(affinity);
307 }
308
313 void SetSamplesToRead(size_t value) {
314 m_samples_to_read.Set(value);
315 }
316
321 void SetSamplesToSkip(size_t value) {
322 m_samples_to_skip.Set(value);
323 }
324
330 void SetLoopFrequency(float value) {
331 m_loop_frequency.Set(value);
332 }
333
339 void SetErrorMargin(float value) {
340 m_error_margin.Set(value);
341 }
342
343 // call back registration
344 // TODO: add checks
351 void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback) {
352 m_callback_on_data = callback;
353 }
354
362 void RegisterInitThreadCallback(std::function<void()> callback) {
363 m_callback_init_thread = std::move(callback);
364 }
365
366private:
367 enum class State : unsigned {
368 Error,
369 Off,
370 Starting,
371 Terminating,
372 Idle,
373 Reading,
374 Skipping,
375 Waiting,
376 Dropping,
377 }; //< known states of the readerThread
378
379 // clang-format off
380 const std::map<State, std::string> m_state_text = {
381 {State::Error, "Error"},
382 {State::Off, "Off"},
383 {State::Starting, "Starting"},
384 {State::Terminating, "Terminating"},
385 {State::Idle, "Idle"},
386 {State::Reading, "Reading"},
387 {State::Skipping, "Skipping"},
388 {State::Waiting, "Waiting"},
389 {State::Dropping, "Dropping"},
390 }; //< States names
391 // clang-format on
392
393 enum class Command : unsigned {
394 None,
395 Run,
396 Idle,
397 Exit,
398 }; //< expected commands
399
400 // clang-format off
401 const std::map<Command, std::string> m_command_text = {
402 {Command::None, "-"},
403 {Command::Run, "Run"},
404 {Command::Idle, "Idle"},
405 {Command::Exit, "Exit"},
406 }; //< expected commands names
407 // clang-format on
408
413 std::future<void> SendRequestAsync(Command cmd) {
414 Request<Command> req(cmd);
415 auto f = req.GetReplyFuture();
416 m_request_q.Post(std::move(req));
417 return f;
418 }
419
426 template <class Rep, class Period>
427 void SendRequestSync(Command cmd, std::chrono::duration<Rep, Period> timeout) {
428 auto f = SendRequestAsync(cmd);
429 auto status = f.wait_for(timeout);
430 if (status != std::future_status::ready) {
431 CII_THROW(RequestTimedOut, m_command_text.at(cmd));
432 }
433 }
434
435 Request<Command> GetRequest() {
436 return m_request_q.TryPend().value_or(Request(Command::None));
437 }
438
442 void Work() {
443 using namespace rtctk::dataTask::detail;
444 using namespace std::chrono;
445 using namespace std::chrono_literals;
446
447 auto req = GetRequest();
448
449 m_state = State::Off;
450 State next_state = State::Starting;
451 State prev_state = State::Off;
452
453 auto time_start = steady_clock::now();
454 milliseconds time_elapsed{0};
455 auto duration_skip = CalcTimeout(m_samples_to_skip.Get(), m_loop_frequency.Get(), 1.0);
456 auto timeout_wait{duration_skip / 2};
457 auto timeout_drop{duration_skip / 2};
458
459 size_t to_read = 0;
460 size_t to_skip = 0;
461
462 std::error_code const ok{};
463 std::error_code ret = ok;
464
465 auto reader = ReaderType<TopicType>::MakeReader(m_queue_name.Get().c_str(), 30s);
466
467 while (1) {
468 prev_state = m_state;
469 if (m_state != next_state) {
470 m_state = next_state;
471 LOG4CPLUS_TRACE(
472 m_logger,
473 "ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) << "'");
474 }
475
476 switch (m_state) {
477 case State::Starting: {
478 try {
479 if (m_callback_init_thread) {
480 m_callback_init_thread();
481 }
482 next_state = State::Idle;
483 } catch (...) {
484 ret = std::make_error_code(std::errc::timed_out); // TODO: other code
485 next_state = State::Error;
486 }
487 break;
488 }
489
490 case State::Idle: {
491 if (m_state != prev_state) {
492 req.SetReply();
493 }
494
495 req = GetRequest();
496
497 if (req.GetPayload() == Command::Exit) {
498 next_state = State::Terminating;
499 break;
500 } else if (req.GetPayload() == Command::Run) {
501 m_comp_done.Clear();
502 ret = Reset(reader);
503 next_state = ret == ok ? State::Reading : State::Error;
504 break;
505 }
506
507 std::this_thread::sleep_for(10ms);
508 next_state = m_state;
509 break;
510 }
511
512 case State::Reading: {
513 if (m_state != prev_state) {
514 if (prev_state == State::Idle) {
515 m_comp_allowed.Clear();
516 req.SetReply();
517 }
518 to_read = m_samples_to_read.Get();
519 to_skip = m_samples_to_skip.Get();
520 }
521
522 req = GetRequest();
523 if (req.GetPayload() == Command::Exit) {
524 next_state = State::Terminating;
525 break;
526 } else if (req.GetPayload() == Command::Idle) {
527 next_state = State::Idle;
528 break;
529 }
530
531 // how many can we read in 1s?
532 size_t to_read_now = m_loop_frequency.Get();
533 // if less than 1 then read at least 1
534 if (to_read_now == 0 and to_read > 0) {
535 to_read_now = 1;
536 }
537 // but never read more than to_read
538 if (to_read_now > to_read) {
539 to_read_now = to_read;
540 }
541
542 ret = Read(reader,
543 m_callback_on_data,
544 to_read_now,
545 m_loop_frequency.Get(),
546 m_error_margin.Get());
547
548 to_read -= to_read_now;
549
550 if (ret != ok) {
551 next_state = State::Error;
552 } else if (to_read == 0) {
553 if (m_mode.Get() == ReaderMode::Continuous) {
554 next_state = State::Skipping;
555 } else {
556 next_state = State::Idle;
557 }
558 m_comp_allowed.Post(ok);
559
560 } else {
561 next_state = m_state;
562 }
563 break;
564 }
565
566 case State::Skipping: {
567 req = GetRequest();
568 if (req.GetPayload() == Command::Exit) {
569 next_state = State::Terminating;
570 break;
571 } else if (req.GetPayload() == Command::Idle) {
572 next_state = State::Idle;
573 break;
574 }
575
576 // how many can we skip in 1s?
577 size_t to_skip_now = m_loop_frequency.Get();
578 // if less than 1 then read at least 1
579 if (to_skip_now == 0 and to_skip > 0) {
580 to_skip_now = 1;
581 }
582 // but never skip more than to_skip
583 if (to_skip_now > to_skip) {
584 to_skip_now = to_skip;
585 }
586
587 ret = Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
588
589 to_skip -= to_skip_now;
590
591 if (ret != ok) {
592 next_state = State::Error;
593 } else if (to_skip == 0) {
594 if (m_comp_done.TryWait()) {
595 next_state = State::Reading;
596 } else {
597 next_state = State::Waiting;
598 }
599 } else {
600 next_state = m_state;
601 }
602 break;
603 }
604
605 case State::Waiting: {
606 if (m_state != prev_state) {
607 time_start = steady_clock::now();
608 }
609
610 req = GetRequest();
611 if (req.GetPayload() == Command::Exit) {
612 next_state = State::Terminating;
613 break;
614 } else if (req.GetPayload() == Command::Idle) {
615 next_state = State::Idle;
616 break;
617 }
618
619 if (m_comp_done.TryWait()) {
620 next_state = State::Reading;
621 break;
622 }
623
624 std::this_thread::sleep_for(1ms);
625
626 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
627 if ((time_elapsed > timeout_wait) or (NumFree(reader) == 0)) { // TODO: revisit
628 next_state = State::Dropping;
629 } else {
630 next_state = m_state;
631 }
632 break;
633 }
634
635 case State::Dropping: {
636 if (m_state != prev_state) {
637 time_start = steady_clock::now();
638 }
639
640 req = GetRequest();
641 if (req.GetPayload() == Command::Exit) {
642 next_state = State::Terminating;
643 break;
644 } else if (req.GetPayload() == Command::Idle) {
645 next_state = State::Idle;
646 break;
647 }
648
649 if (m_comp_done.TryWait()) {
650 ret = Reset(reader);
651 next_state = ret == ok ? State::Reading : State::Error;
652 break;
653 }
654
655 std::this_thread::sleep_for(1ms);
656
657 time_elapsed = duration_cast<milliseconds>(steady_clock::now() - time_start);
658 if (time_elapsed > timeout_drop) {
659 ret = std::make_error_code(std::errc::timed_out);
660 next_state = State::Error;
661 } else {
662 next_state = m_state;
663 }
664 break;
665 }
666
667 case State::Error: {
668 if (m_state != prev_state) {
669 m_comp_allowed.Post(ret);
670 LOG4CPLUS_ERROR(m_logger, "ReaderThread::Work() - " << ret.message());
671 }
672
673 req = GetRequest();
674 if (req.GetPayload() == Command::Exit) {
675 next_state = State::Terminating;
676 } else if (req.GetPayload() == Command::Idle) {
677 next_state = State::Idle;
678 } else {
679 std::this_thread::sleep_for(100ms);
680 next_state = m_state;
681 }
682 break;
683 }
684
685 case State::Terminating: {
686 next_state = State::Off;
687 break;
688 }
689
690 case State::Off: {
691 req.SetReply();
692 return;
693 }
694 }
695 }
696 }
697
698 log4cplus::Logger& m_logger;
699
700 std::thread m_thread; //< the readerThead member
701
702 Parameter<ReaderMode> m_mode; //< parameters mode
703 Parameter<std::string> m_thread_name; //< parameters thread name
704 Parameter<std::string> m_queue_name; //< parameter queue name
705 Parameter<size_t> m_thread_affinity; //< parameter cpu affinity
706 Parameter<size_t> m_samples_to_read; //< parameter samples to be read
707 Parameter<size_t> m_samples_to_skip; //< parameter samples to skip
708 Parameter<float> m_loop_frequency; //< parameter loop frequency
709 Parameter<float> m_error_margin; //< parameter timeout tolerance
710
712 MessageQueue<std::error_code> m_comp_allowed;
713
714 Semaphore m_comp_done;
715
716 State m_state;
717
718 std::function<void(const TopicType& sample)> m_callback_on_data;
719 std::function<void()> m_callback_init_thread;
720};
721
722} // namespace rtctk::dataTask
723
724#endif // RTCTK_DATATASK_READERTHREAD_HPP
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:237
Definition: readerThread.hpp:53
AsynchronousError(const std::string &text)
Definition: readerThread.hpp:59
AsynchronousError()
Definition: readerThread.hpp:55
Definition: messageQueue.hpp:29
Class for basic handling of parameters in read thread.
Definition: parameter.hpp:52
ReaderThread for the Data Class.
Definition: readerThread.hpp:79
void Spawn()
Spawn the reader thread.
Definition: readerThread.hpp:113
void Run()
send synchronous run request to readerThread.
Definition: readerThread.hpp:193
void RegisterOnDataCallback(std::function< void(const TopicType &sample)> callback)
Required callback.
Definition: readerThread.hpp:351
void SetQueueName(std::string const &name)
Required setter.
Definition: readerThread.hpp:279
void Join()
waits for thread to complete and rejoins
Definition: readerThread.hpp:166
void SetThreadName(std::string const &name)
Optional setter.
Definition: readerThread.hpp:296
void SetMode(ReaderMode mode)
Optional setter.
Definition: readerThread.hpp:288
void WaitUntilComputationAllowed()
wait until the computation can be launched.
Definition: readerThread.hpp:222
void Idle()
send Idle request to readerThread.
Definition: readerThread.hpp:205
void SetCpuAffinity(int affinity)
Optional setter.
Definition: readerThread.hpp:305
void SetLoopFrequency(float value)
Required setter.
Definition: readerThread.hpp:330
void SetErrorMargin(float value)
Optional setter.
Definition: readerThread.hpp:339
void RegisterInitThreadCallback(std::function< void()> callback)
Optional callback.
Definition: readerThread.hpp:362
void SetSamplesToRead(size_t value)
Required setter.
Definition: readerThread.hpp:313
~ReaderThread()
Definition: readerThread.hpp:96
ReaderThread()
Definition: readerThread.hpp:81
void WaitUntilComputationAllowed(rtctk::componentFramework::StopToken st)
Definition: readerThread.hpp:235
void SetSamplesToSkip(size_t value)
Required setter.
Definition: readerThread.hpp:321
void SignalComputationDone()
returns from the business logic that the calculation has completed so that the readerThread is safe t...
Definition: readerThread.hpp:271
Definition: readerThread.hpp:43
RequestTimedOut(const std::string &req_name)
Definition: readerThread.hpp:45
Definition: semaphore.hpp:26
Provides macros and utilities for exception handling.
Logging Support Library based on log4cplus.
A simple message queue implementation.
rad::StopToken StopToken
Definition: stopToken.hpp:20
Definition: readerHelpers.hpp:24
std::chrono::milliseconds CalcTimeout(size_t count, float loop_frequency, float error_margin)
Helper function to calculate the estimated time to read the a number of samples at a given frequency.
Definition: readerHelpers.hpp:36
std::error_code Read(ReaderType &reader, Operation &&op, size_t count, float loop_frequency, float error_margin)
Helper function to wrap the ipcq.read with handling of timeouts and count values.
Definition: readerHelpers.hpp:56
std::error_code Skip(ReaderType &reader, size_t count, float loop_frequency, float error_margin)
Helper function to wrap the ipcq.skip with handling of timeouts and count values.
Definition: readerHelpers.hpp:110
std::error_code Reset(ReaderType &reader)
Helper function to reset the ipcq.reader to latest sample.
Definition: readerHelpers.hpp:160
size_t NumFree(ReaderType &reader)
Helper function to get the free space in the shm.
Definition: readerHelpers.hpp:176
Definition: computationBase.hpp:33
ReaderMode
Definition: readerThread.hpp:65
Definition: commandReplier.cpp:22
A Parameter class used for Data Tasks.
Helper methods to read data from shared memory queue.
A Request class used in Data Tasks.
A simple semaphore implementation.
void Error(const char *msg)
Definition: main.cpp:43
A simple Stop Token.