RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
ipcqDdtForwarder.hpp
Go to the documentation of this file.
1
13#ifndef IPCQ_DDT_FORWARDER_HPP
14#define IPCQ_DDT_FORWARDER_HPP
15
17
26
27#include <ipcq/adapter.hpp>
28#include <ipcq/reader.hpp>
29
30#include <numapp/numapolicies.hpp>
31#include <numapp/thread.hpp>
32
33#include <fmt/format.h>
34#include <gsl/span>
35
36#include <atomic>
37#include <memory>
38#include <shared_mutex>
39#include <string_view>
40
41namespace rtctk::ddtServer {
42
43namespace {
44
45// Helper type used to construct the tuple of DDT Publishers
46
47template <typename Tuple>
48struct Wrapper;
49
50template <typename... PublisherType>
51struct Wrapper<std::tuple<PublisherType...>> {
52 using ServiceContainer = rtctk::componentFramework::ServiceContainer;
53
54 static std::tuple<PublisherType...>
55 MakePublishers(const std::string& db_prefix, ServiceContainer& services) {
56 return std::make_tuple(PublisherType(db_prefix, services)...);
57 }
58};
59
60} // namespace
61
69template <typename FwdInfo, typename ReaderType = ipcq::Reader<typename FwdInfo::Topic>>
71public:
72 using Topic = typename FwdInfo::Topic;
80
81 static_assert(FwdInfo::ID.find_first_not_of("abcdefghijklmnopqrstuvwxyz_0123456789") ==
82 std::string_view::npos,
83 "DDT Forwarder ID contains illegal characters!");
84
85 IpcqDdtForwarder(const std::string& comp_id, ServiceContainer& services)
86 : DdtForwarder(comp_id, "IpcqDdtForwarder", std::string(FwdInfo::ID), services)
87 , m_rtr(services.Get<RuntimeRepoIf>())
88 , m_oldb(services.Get<OldbIf>())
89 , m_metrics(services.Get<ComponentMetricsIf>())
90 , m_comp_id(comp_id)
91 , m_oldb_prefix(fmt::format("/forwarders/{}", GetId())) // counters prepend comp name
92 , m_rtr_prefix_static(fmt::format("/{}/static/forwarders/{}", comp_id, GetId()))
93 , m_rtr_prefix_dynamic(fmt::format("/{}/dynamic/forwarders/{}", comp_id, GetId()))
94 , m_publishers(Wrapper<typename FwdInfo::DdtPublisherTupleType>::MakePublishers(
95 fmt::format("{}/forwarders/{}/publishers", comp_id, GetId()), services))
96 , m_requested_state(State::STOPPED) {
97 using namespace rtctk::componentFramework;
98
99 // reader monitoring
100
101 m_samples_forwarded_reg = m_metrics.AddCounter(
102 &m_samples_forwarded,
103 CounterMetricInfo(m_oldb_prefix + "/samples_forwarded", "Number of samples forwarded"));
104
105 m_last_sample_id_forwarded_reg =
106 m_metrics.AddCounter(&m_last_sample_id_forwarded,
107 CounterMetricInfo(m_oldb_prefix + "/last_sample_id_forwarded",
108 "Last sample id forwarded"));
109
110 m_freq_estimator = std::make_unique<FrequencyEstimator>(
111 m_metrics, "Estimated frequency of the data forwarder", m_oldb_prefix);
112
113 m_dur_monitor = std::make_unique<DurationMonitor>(
114 m_metrics, "Duration of publishing data to DDT", m_oldb_prefix);
115
116 m_buffer_monitor =
117 std::make_unique<BufferMonitor>(m_metrics, "SHM read buffer occupancy", m_oldb_prefix);
118
119 // reader configuration
120
121 auto queue_name_path = DataPointPath{m_rtr_prefix_static + "/shm_queue_name"};
122 m_queue_name = m_rtr.GetDataPoint<std::string>(queue_name_path);
123
124 auto thread_policies_path = DataPointPath{m_rtr_prefix_static + "/thread_policies"};
125 m_thread_policies = GetNumaPolicies(m_rtr, thread_policies_path);
126
127 auto subsample_factor_path = DataPointPath{m_rtr_prefix_dynamic + "/subsample_factor"};
128 m_subsample_factor = m_rtr.GetDataPoint<int64_t>(subsample_factor_path);
129 }
130
132 m_requested_state = State::STOPPED;
133 if (m_processing_thread.joinable()) {
134 m_processing_thread.join();
135 }
136 }
137
138 void Start() override {
140 m_requested_state = State::STARTING;
142
143 // we trim the thread name to not cause an exception from numapp
144 m_processing_thread = numapp::MakeThread(GetId().substr(0, 15),
145 m_thread_policies.value_or(numapp::NumaPolicies()),
146 [&]() { Process(); });
147
148 while (1) {
149 using namespace std::chrono_literals;
150 std::this_thread::sleep_for(10ms);
151
152 CheckErrors();
153
154 if (GetState() != State::STARTING) {
155 break;
156 }
157 }
158 }
159
160 void Run() override {
162 m_requested_state = State::RUNNING;
163 }
164
165 void Idle() override {
167 m_requested_state = State::IDLE;
168 }
169
170 void Stop() override {
171 m_requested_state = State::STOPPED;
172 if (m_processing_thread.joinable()) {
173 m_processing_thread.join();
174 }
175 }
176
177 void Recover() override {
178 Stop();
179
180 // clear stored exception
181 {
182 std::scoped_lock lock(m_exception_mutex);
183 m_exception = std::current_exception();
184 }
185
186 // make sure that we get out of error state
188 }
189
190 void Update() override {
191 using namespace rtctk::componentFramework;
192
193 LOG4CPLUS_INFO(m_logger, fmt::format("Updating IpcqDdtForwarder '{}'", GetId()));
194
195 auto subsample_factor_path = DataPointPath{m_rtr_prefix_dynamic + "/subsample_factor"};
196 m_subsample_factor = m_rtr.GetDataPoint<int64_t>(subsample_factor_path);
197
198 std::apply([&](auto&&... pub) { ((pub.Update()), ...); }, m_publishers);
199 }
200
201 void CheckErrors() override {
202 // cppcheck-suppress unreadVariable
203 auto lock = std::shared_lock{m_exception_mutex};
204 if (m_exception) {
205 std::rethrow_exception(m_exception);
206 }
207 }
208
209private:
210 void Process() {
211 using namespace std::chrono_literals;
212 using namespace rtctk::componentFramework;
213
214 try {
215 const std::error_code ok{};
216 std::pair<std::error_code, size_t> result;
217
218 std::vector<Topic> read_buffer;
219 read_buffer.reserve(MAX_SAMPLES_READ);
220
221 m_samples_forwarded.Store(0);
222 m_last_sample_id_forwarded.Store(0);
223
224 ReaderType reader(m_queue_name.c_str());
225
226 size_t reader_capacity = reader.Capacity();
227 size_t to_skip = 0;
228 size_t to_read = 0;
229
231
232 while (m_requested_state != State::STOPPED) {
233 switch (GetState()) {
234 case State::IDLE:
235 if (m_requested_state == State::RUNNING) {
236 if (auto ret = reader.Reset(); ret == ok) {
238 } else {
239 CII_THROW(IpcqError, "Error resetting ipcq Reader");
240 }
241 break;
242 }
243 std::this_thread::sleep_for(1ms);
244 break;
245
246 case State::RUNNING:
247 if (m_requested_state == State::IDLE) {
249 break;
250 }
251 to_skip = 0;
252 to_read = reader.NumAvailable();
253 if (to_read == 0) { // try to always read at least one
254 to_read = 1;
255 }
256 to_read = std::min(to_read, MAX_SAMPLES_READ);
257
258 if (m_subsample_factor > 1) {
259 m_sampling_counter = m_sampling_counter % m_subsample_factor;
260 if (m_sampling_counter == 0) {
261 to_read = 1;
262 } else {
263 to_skip = std::min(to_read, m_subsample_factor - m_sampling_counter);
264 }
265 }
266
267 if (to_skip == 0) {
268 m_buffer_monitor->Tick(reader.NumAvailable(), reader_capacity);
269 result = reader.Read(ipcq::BackInserter(read_buffer), to_read, 100ms);
270 for (const auto& sample : read_buffer) {
271 m_last_sample_id_forwarded.Store(sample.sample_id);
272 auto t1 = std::chrono::steady_clock::now();
273 ExtractAndPublish(sample);
274 auto t2 = std::chrono::steady_clock::now();
275 m_dur_monitor->Tick(t2 - t1);
276 m_freq_estimator->Tick();
277 m_samples_forwarded++;
278 }
279 read_buffer.clear();
280 } else {
281 result = reader.Skip(to_skip, 100ms);
282 }
283 m_sampling_counter += result.second;
284 // check for error
285 if (not(result.first == ok or result.first == ipcq::Error::Timeout)) {
286 std::string error = "Error reading from ipcq: " + result.first.message();
287 CII_THROW(IpcqError, error);
288 }
289 break;
290
291 default:
292 std::this_thread::sleep_for(1ms);
293 }
294 }
295
297
298 } catch (...) {
299 {
300 std::scoped_lock lock(m_exception_mutex);
301 m_exception = std::current_exception();
302 }
304 }
305 }
306
307 void ExtractAndPublish(const Topic& sample) {
308 auto func = [&](auto& pub) {
309 if (pub.IsEnabled()) {
310 // invoke user function that extracts data from SHM Topic sample
311 auto e = std::remove_reference_t<decltype(pub)>::StreamInfo::Extract(sample);
312 // publish extracted data to DDT
313 pub.Publish(std::get<0>(e),
314 reinterpret_cast<const uint8_t*>(std::get<1>(e).data()),
315 std::get<1>(e).size());
316 }
317 };
318
319 std::apply([&](auto&&... pub) { (func(pub), ...); }, m_publishers);
320 }
321
322 RuntimeRepoIf& m_rtr;
323 OldbIf& m_oldb;
324 ComponentMetricsIf& m_metrics;
325
326 std::string m_comp_id;
327 std::string m_oldb_prefix;
328 std::string m_rtr_prefix_static;
329 std::string m_rtr_prefix_dynamic;
330
331 typename FwdInfo::DdtPublisherTupleType m_publishers;
332
333 std::atomic<State> m_requested_state;
334 std::exception_ptr m_exception = nullptr;
335 std::shared_mutex m_exception_mutex;
336
337 std::thread m_processing_thread;
338
342 std::string m_queue_name;
343
347 std::optional<numapp::NumaPolicies> m_thread_policies;
348
352 int64_t m_subsample_factor;
353
357 uint64_t m_sampling_counter;
358
362 perfc::CounterI64 m_samples_forwarded;
363 perfc::ScopedRegistration m_samples_forwarded_reg;
364
368 perfc::CounterI64 m_last_sample_id_forwarded;
369 perfc::ScopedRegistration m_last_sample_id_forwarded_reg;
370
371 std::unique_ptr<FrequencyEstimator> m_freq_estimator;
372 std::unique_ptr<DurationMonitor> m_dur_monitor;
373 std::unique_ptr<BufferMonitor> m_buffer_monitor;
374
378 inline static constexpr size_t MAX_SAMPLES_READ = 16;
379};
380
381} // namespace rtctk::ddtServer
382
383#endif // IPCQ_DDT_FORWARDER
Header file for Buffer Monitor.
Monitors min, mean and max occupation of a buffer and publishes them to OLDB.
Definition bufferMonitor.hpp:37
Component metrics interface.
Definition componentMetricsIf.hpp:85
virtual perfc::ScopedRegistration AddCounter(CounterVariant counter, CounterMetricInfo info)=0
Add a counter to be included in component metrics, identified by its address, together with info to t...
Defines auxiliary information associated with each counter registered with ComponentMetricsIf.
Definition componentMetricsIf.hpp:46
This class provides a wrapper for a data point path.
Definition dataPointPath.hpp:74
Monitors min, mean and max duration and publishes them to OLDB.
Definition durationMonitor.hpp:37
Estimates the frequency in which Tick is called and publishes result to OLDB.
Definition frequencyEstimator.hpp:31
This Exception is raised when the ipc queue returns an error that cannot be handled by the Telemetry ...
Definition exceptions.hpp:382
Base interface for all OLDB adapters.
Definition oldbIf.hpp:25
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition repositoryIf.ipp:1711
Base interface for all Runtime Configuration Repository adapters.
Definition runtimeRepoIf.hpp:27
Container class that holds services of any type.
Definition serviceContainer.hpp:39
Base class defining common interface for all DDT forwarders.
Definition ddtForwarder.hpp:46
void AssertState(const std::set< State > &states)
Definition ddtForwarder.hpp:144
virtual void SetState(State state)
Definition ddtForwarder.hpp:131
State
States a forwarder unit can be in.
Definition ddtForwarder.hpp:54
log4cplus::Logger & m_logger
Definition ddtForwarder.hpp:167
State GetState()
Get the state of the forwarder unit.
Definition ddtForwarder.hpp:91
std::string GetId()
Get identifier of the forwarder unit.
Definition ddtForwarder.hpp:84
DDT Forwarder unit that ingests data from from SHM and republishes it on several DDT streams.
Definition ipcqDdtForwarder.hpp:70
void Idle() override
Stop publishing DDT streams.
Definition ipcqDdtForwarder.hpp:165
virtual ~IpcqDdtForwarder()
Definition ipcqDdtForwarder.hpp:131
void Recover() override
Stop the processing thread of the forwarder unit and clear errors.
Definition ipcqDdtForwarder.hpp:177
IpcqDdtForwarder(const std::string &comp_id, ServiceContainer &services)
Definition ipcqDdtForwarder.hpp:85
typename FwdInfo::Topic Topic
Definition ipcqDdtForwarder.hpp:72
void Start() override
Start the processing thread of the forwarder unit.
Definition ipcqDdtForwarder.hpp:138
void Stop() override
Stop the processing thread of the forwarder unit.
Definition ipcqDdtForwarder.hpp:170
void Update() override
Reload dynamic configuration of the forwarder unit.
Definition ipcqDdtForwarder.hpp:190
void CheckErrors() override
Check for Errors, will rethrow errors thrown in the forwarder.
Definition ipcqDdtForwarder.hpp:201
void Run() override
Start publishing DDT streams.
Definition ipcqDdtForwarder.hpp:160
Header file for ComponentMetricsIf.
Base class defining common interface for all DDT forwarders.
Header file for Duration Monitor.
Provides macros and utilities for exception handling.
Header file for Frequency Estimator.
Definition ddsSub.hpp:151
Definition commandReplier.cpp:22
std::optional< numapp::NumaPolicies > GetNumaPolicies(RepositoryIf &repo, const DataPointPath &path)
Constructs a NumaPolicies object from the configuration datapoints found under the given datapoint pa...
Definition repositoryIfUtils.cpp:33
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Definition businessLogic.cpp:24
Header file for OldbIf, which defines the API for OldbAdapters.
Provides utilities to simplify use of RepositoryIf.
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.