RTC Toolkit 4.0.2
Loading...
Searching...
No Matches
ddsPubThread.hpp
Go to the documentation of this file.
1
12#ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP
13#define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP
14
15#include "agnostictopicif.h"
16#include "mudpiProcessor.hpp"
17#include "queue.hpp"
18#include <cstdint>
19#include <fastdds/dds/publisher/DataWriter.hpp>
20#include <fmt/core.h>
21#include <fmt/printf.h>
22#include <memory>
29#include <tbb/concurrent_queue.h>
30
31#include <numapp/thread.hpp>
32
33#include <chrono>
34#include <iostream>
35#include <utility>
36
37namespace rtctk::telRepub {
38
39using namespace rtctk::componentFramework;
40
46class PubThread {
47public:
62 PubThread(std::uint16_t idx,
63 DataWriter &data_writer,
65 int32_t topic_id,
66 std::uint32_t queue_size,
67 numapp::NumaPolicies const &thread_policies,
68 std::uint16_t sim_freq,
69 std::uint32_t sim_payload_size,
70 std::uint32_t expected_sample_id_increment,
71 WranglerFunction &&wrangler);
72
76 ~PubThread();
77
81 void RunAsync();
82
86 void IdleAsync();
87
93 bool InSimMode();
94
99
100 std::string GetTopicName();
101
102 std::shared_ptr<Queue> GetQueue();
103
104private:
110 void ProcessingLoop();
111
117 void SimProcessingLoop();
118
119 std::string MakeThreadDpName(bool is_sim_thread = true);
120 std::string MakeThreadName(bool is_sim_thread = false);
121
122 enum class State { Idle, Run, Exit };
123
124 std::shared_ptr<Queue> m_frame_queue;
125 std::uint16_t m_idx;
126 std::string m_topic_name;
127
128 log4cplus::Logger m_logger;
129
130 std::atomic<State> m_requested_state;
131 std::thread m_thread;
132 std::string m_thread_name;
133
134 MudpiProcessor m_mudpi_processor;
135
136 AgnosticTopic m_sim_msg;
137 DataWriter &m_data_writer;
138
139 ComponentMetricsIf &m_metrics;
140
141 std::uint16_t m_sim_freq;
142 std::uint32_t m_sim_payload_size; // bytes
143 std::uint32_t m_expected_sample_id_increment; // used in sim mode only
148 // @note: OLDB does not support uint64_t which we really want, so we have to use signed 64.
149 perfc::CounterI64 m_pc_published_samples;
150 perfc::ScopedRegistration m_pc_published_samples_reg;
151
152 perfc::CounterI64 m_pc_publication_errors;
153 perfc::ScopedRegistration m_pc_publication_errors_reg;
154
155 perfc::CounterI64 m_pc_last_sample_id_published;
156 perfc::ScopedRegistration m_pc_last_sample_id_published_reg;
157
158 std::unique_ptr<DurationMonitor<>> m_dur_mon;
159
160 WranglerFunction m_wrangler;
161};
162
163} // namespace rtctk::telRepub
164
165#endif // RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP
UDP Buffer management.
Component metrics interface.
Definition: componentMetricsIf.hpp:184
Processing MUDPI data received by UDP receiver: rtctk::telRepub::UdpReceiver.
Definition: mudpiProcessor.hpp:44
DDS publisher thread class that supports production and simulation mode.
Definition: ddsPubThread.hpp:46
void IdleAsync()
Disable publishing data.
Definition: ddsPubThread.cpp:126
std::string GetTopicName()
Definition: ddsPubThread.cpp:138
void RunAsync()
Enable publishing data.
Definition: ddsPubThread.cpp:117
bool InSimMode()
Check whether simulation mode is active.
Definition: ddsPubThread.cpp:130
std::shared_ptr< Queue > GetQueue()
Definition: ddsPubThread.cpp:142
~PubThread()
Joins publisher thread and prints some statistics information.
Definition: ddsPubThread.cpp:93
MudpiProcessor & GetMudpiProcessor()
Get MudpiProcessor.
Definition: ddsPubThread.cpp:134
Header file for ComponentMetricsIf.
Declares some common DDS functionality.
Header file for Duration Monitor.
Logging Support Library based on log4cplus.
MUDPI processor: check and aggregate MUDPI payload to a single topic and put to the queue for publish...
Definition: commandReplier.cpp:22
Definition: ddsPubThread.cpp:17
std::function< std::error_code(const gsl::span< const gsl::span< const uint8_t > >, std::vector< uint8_t > &)> WranglerFunction
The wrangler function that is called with a span of spans containing the payload and an vector where ...
Definition: wrangler.hpp:29
UDP Buffer management.
Wrangler: User extension point.