RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
mudpiProcessor.hpp
Go to the documentation of this file.
1
13#ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_MUDPIPROCESSOR_HPP
14#define RTCTK_REUSABLECOMPONENT_TELREPUB_MUDPIPROCESSOR_HPP
15
16#include <boost/asio.hpp>
17#include <cstdint>
18#include <fmt/format.h>
19
20#include "agnostictopicif.hpp"
21#include "llnetio/mudpi/mudpi.hpp"
23#include "queue.hpp"
26#include <gsl/span>
27#include <limits>
32#include <taiclock/taiClock.hpp>
33#include <vector>
34
35namespace rtctk::telRepub {
36
41 std::string dds_topic_name;
42 llnetio::mudpi::TopicId mudpi_topic_id;
43 std::uint32_t expected_sample_id_increment = 1u;
44 std::chrono::milliseconds receive_sample_timeout_ms =
45 std::chrono::milliseconds(20000); // default 20s
46};
47
57 using AgnosticTopic = rtctk::componentFramework::AgnosticTopic;
63 struct PacketData {
64 std::vector<QueueElement> queue_elements{};
65 std::vector<gsl::span<const uint8_t>> payloads{};
66 taiclock::TaiClock::time_point time_stamp{};
67 llnetio::mudpi::SampleId sample_id{0};
68
69 void Reset() {
70 queue_elements.clear();
71 payloads.clear();
72 time_stamp = {};
73 sample_id = 0;
74 }
75 };
76
77 QueuePtr m_queue;
78 PacketData m_packet_data{};
79
80 ComponentMetricsIf& m_metrics;
81 AlertServiceIf& m_alerts;
82
83 llnetio::mudpi::NumFrames m_num_frames = std::numeric_limits<llnetio::mudpi::NumFrames>::max();
84 llnetio::mudpi::TopicId m_topic_id = 0;
85
86 std::string m_topic_name;
87 std::string m_metric_path_prefix;
88
89 llnetio::mudpi::SampleId m_expected_sample_id = 0;
90 llnetio::mudpi::FrameId m_expected_frame_id = 0;
91
92 llnetio::mudpi::SampleId m_expected_sample_id_increment;
93
94 bool m_sample_finished = false;
95
96 enum ProcessorState : uint8_t { Synchronising, Processing };
97
98 ProcessorState m_state = Synchronising;
99
104 log4cplus::Logger m_logger;
105
107
112 // @note: OLDB does not support uint64_t which we really want, so we have to use signed 64.
113 perfc::CounterI64 m_pc_frames_recvd;
114 perfc::ScopedRegistration m_pc_frames_recvd_reg;
115 perfc::CounterI64 m_pc_samples_recvd;
116 perfc::ScopedRegistration m_pc_samples_recvd_reg;
117 perfc::CounterI64 m_pc_frame_errors;
118 perfc::ScopedRegistration m_pc_frame_errors_reg;
119 perfc::CounterI64 m_pc_sample_errors;
120 perfc::ScopedRegistration m_pc_sample_errors_reg;
121 perfc::CounterI64 m_pc_last_sample_id_recvd;
122 perfc::ScopedRegistration m_pc_last_sample_id_recvd_reg;
123
125 componentFramework::DurationMonitor<> m_wrangler_duration;
126
127 std::atomic<std::chrono::steady_clock::time_point> m_last_sample_received;
128
129 // receive timeout alert
130 componentFramework::AlertSource m_receive_timeout_alert;
131 std::chrono::milliseconds m_receive_sample_timeout_ms =
132 std::chrono::milliseconds(0); // default - we do not check for timeout
133
134public:
145
149 uint16_t GetTopicId() const;
150
159 ProcessMudpi(AgnosticTopic& topic, const WranglerFunction& wrangler);
160
164 void ResetCounters();
165
172 bool Monitor();
173
174}; // MudpiProcessor
175
176std::error_code
177DefaultWrangler(gsl::span<const gsl::span<const uint8_t>> input, std::vector<uint8_t>& output);
178
179} // namespace rtctk::telRepub
180
181#endif // RTCTK_REUSABLECOMPONENT_TELREPUB_MUDPIPROCESSOR_HPP
Declares AlertService.
Alert Service interface.
Definition alertServiceIf.hpp:128
Models a single alert source that can be set or cleared.
Definition alertServiceIf.hpp:47
Component metrics interface.
Definition componentMetricsIf.hpp:85
Monitors min, mean and max duration and publishes them to OLDB.
Definition durationMonitor.hpp:37
Estimates the frequency in which Tick is called and publishes result to OLDB.
Definition frequencyEstimator.hpp:31
Container class that holds services of any type.
Definition serviceContainer.hpp:39
Processing MUDPI data received by UDP receiver: rtctk::telRepub::UdpReceiver.
Definition mudpiProcessor.hpp:56
void ResetCounters()
Reset the counters.
Definition mudpiProcessor.cpp:288
bool Monitor()
Monitoring MUDPI processor for different problems (alerts): like timeouts ....
Definition mudpiProcessor.cpp:300
ErrorCode< MudpiProcessorError > ProcessMudpi(AgnosticTopic &topic, const WranglerFunction &wrangler)
Here the actual processing is done (frame by frame (MUDPI datagram)).
Definition mudpiProcessor.cpp:128
uint16_t GetTopicId() const
To query the topic id of the current mudpi processor object.
Definition mudpiProcessor.cpp:123
MudpiProcessor(QueuePtr queue, CfgMudpiProc &cfg, componentFramework::ServiceContainer &service)
MUDPI Processor constructor.
Definition mudpiProcessor.cpp:46
Header file for ComponentMetricsIf.
Header file for Duration Monitor.
Header file for Frequency Estimator.
MUDPI processor error codes.
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Definition ddsPubThread.cpp:17
std::shared_ptr< Queue > QueuePtr
Definition queue.hpp:39
std::error_code DefaultWrangler(const gsl::span< const gsl::span< const uint8_t > > input, std::vector< uint8_t > &output)
Definition mudpiProcessor.cpp:312
std::function< std::error_code( const gsl::span< const gsl::span< const uint8_t > >, std::vector< uint8_t > &)> WranglerFunction
The wrangler function that is called with a span of spans containing the payload and an vector where ...
Definition wrangler.hpp:28
UDP Buffer management.
Wrangler: User extension point.
A container that can hold any type of service.
Structure to hold MudpiProcessor's configuration.
Definition mudpiProcessor.hpp:40
std::uint32_t expected_sample_id_increment
Definition mudpiProcessor.hpp:43
llnetio::mudpi::TopicId mudpi_topic_id
Definition mudpiProcessor.hpp:42
std::string dds_topic_name
Definition mudpiProcessor.hpp:41
std::chrono::milliseconds receive_sample_timeout_ms
Definition mudpiProcessor.hpp:44
Definition mudpiProcessingError.hpp:28
Definition queue.hpp:31