13#ifndef IPCQ_DDT_FORWARDER_HPP
14#define IPCQ_DDT_FORWARDER_HPP
27#include <ipcq/adapter.hpp>
28#include <ipcq/reader.hpp>
30#include <numapp/numapolicies.hpp>
31#include <numapp/thread.hpp>
33#include <fmt/format.h>
38#include <shared_mutex>
47template <
typename Tuple>
51struct Wrapper<std::
tuple<PublisherType...>> {
69template <
typename FwdInfo,
typename ReaderType = ipcq::Reader<
typename FwdInfo::Topic>>
72 using Topic =
typename FwdInfo::Topic;
81 static_assert(FwdInfo::ID.find_first_not_of(
"abcdefghijklmnopqrstuvwxyz_0123456789") ==
82 std::string_view::npos,
83 "DDT Forwarder ID contains illegal characters!");
88 , m_oldb(services.Get<
OldbIf>())
91 , m_oldb_prefix(
fmt::format(
"/forwarders/{}",
GetId()))
92 , m_rtr_prefix_static(
fmt::format(
"/{}/static/forwarders/{}",
comp_id,
GetId()))
93 , m_rtr_prefix_dynamic(
fmt::format(
"/{}/dynamic/forwarders/{}",
comp_id,
GetId()))
95 fmt::format(
"{}/forwarders/{}/publishers",
comp_id,
GetId()), services))
101 m_samples_forwarded_reg = m_metrics.
AddCounter(
102 &m_samples_forwarded,
103 CounterMetricInfo(m_oldb_prefix +
"/samples_forwarded",
"Number of samples forwarded"));
105 m_last_sample_id_forwarded_reg =
106 m_metrics.
AddCounter(&m_last_sample_id_forwarded,
108 "Last sample id forwarded"));
110 m_freq_estimator = std::make_unique<FrequencyEstimator>(
111 m_metrics,
"Estimated frequency of the data forwarder", m_oldb_prefix);
113 m_dur_monitor = std::make_unique<DurationMonitor>(
114 m_metrics,
"Duration of publishing data to DDT", m_oldb_prefix);
117 std::make_unique<BufferMonitor>(m_metrics,
"SHM read buffer occupancy", m_oldb_prefix);
133 if (m_processing_thread.joinable()) {
134 m_processing_thread.join();
144 m_processing_thread = numapp::MakeThread(
GetId().
substr(0, 15),
145 m_thread_policies.value_or(numapp::NumaPolicies()),
146 [&]() { Process(); });
149 using namespace std::chrono_literals;
150 std::this_thread::sleep_for(10
ms);
172 if (m_processing_thread.joinable()) {
173 m_processing_thread.join();
182 std::scoped_lock
lock(m_exception_mutex);
183 m_exception = std::current_exception();
198 std::apply([&](
auto&&... pub) { ((pub.Update()), ...); }, m_publishers);
203 auto lock = std::shared_lock{m_exception_mutex};
205 std::rethrow_exception(m_exception);
211 using namespace std::chrono_literals;
215 const std::error_code
ok{};
216 std::pair<std::error_code, size_t> result;
221 m_samples_forwarded.Store(0);
222 m_last_sample_id_forwarded.Store(0);
243 std::this_thread::sleep_for(1
ms);
258 if (m_subsample_factor > 1) {
259 m_sampling_counter = m_sampling_counter % m_subsample_factor;
260 if (m_sampling_counter == 0) {
263 to_skip = std::min(
to_read, m_subsample_factor - m_sampling_counter);
271 m_last_sample_id_forwarded.Store(
sample.sample_id);
272 auto t1 = std::chrono::steady_clock::now();
273 ExtractAndPublish(
sample);
274 auto t2 = std::chrono::steady_clock::now();
275 m_dur_monitor->Tick(
t2 -
t1);
276 m_freq_estimator->Tick();
277 m_samples_forwarded++;
283 m_sampling_counter += result.second;
285 if (
not(result.first ==
ok or result.first == ipcq::Error::Timeout)) {
286 std::string error =
"Error reading from ipcq: " + result.first.message();
292 std::this_thread::sleep_for(1
ms);
300 std::scoped_lock
lock(m_exception_mutex);
301 m_exception = std::current_exception();
308 auto func = [&](
auto& pub) {
309 if (pub.IsEnabled()) {
311 auto e = std::remove_reference_t<
decltype(pub)>::StreamInfo::Extract(
sample);
313 pub.Publish(std::get<0>(
e),
314 reinterpret_cast<const uint8_t*
>(std::get<1>(
e).data()),
315 std::get<1>(
e).
size());
319 std::apply([&](
auto&&... pub) { (func(pub), ...); }, m_publishers);
326 std::string m_comp_id;
327 std::string m_oldb_prefix;
328 std::string m_rtr_prefix_static;
329 std::string m_rtr_prefix_dynamic;
331 typename FwdInfo::DdtPublisherTupleType m_publishers;
333 std::atomic<State> m_requested_state;
334 std::exception_ptr m_exception =
nullptr;
335 std::shared_mutex m_exception_mutex;
337 std::thread m_processing_thread;
342 std::string m_queue_name;
347 std::optional<numapp::NumaPolicies> m_thread_policies;
362 perfc::CounterI64 m_samples_forwarded;
363 perfc::ScopedRegistration m_samples_forwarded_reg;
368 perfc::CounterI64 m_last_sample_id_forwarded;
369 perfc::ScopedRegistration m_last_sample_id_forwarded_reg;
371 std::unique_ptr<FrequencyEstimator> m_freq_estimator;
372 std::unique_ptr<DurationMonitor> m_dur_monitor;
373 std::unique_ptr<BufferMonitor> m_buffer_monitor;
378 inline static constexpr size_t MAX_SAMPLES_READ = 16;
Header file for Buffer Monitor.
Monitors min, mean and max occupation of a buffer and publishes them to OLDB.
Definition bufferMonitor.hpp:37
Component metrics interface.
Definition componentMetricsIf.hpp:85
virtual perfc::ScopedRegistration AddCounter(CounterVariant counter, CounterMetricInfo info)=0
Add a counter to be included in component metrics, identified by its address, together with info to t...
Defines auxiliary information associated with each counter registered with ComponentMetricsIf.
Definition componentMetricsIf.hpp:46
This class provides a wrapper for a data point path.
Definition dataPointPath.hpp:74
Monitors min, mean and max duration and publishes them to OLDB.
Definition durationMonitor.hpp:37
Estimates the frequency in which Tick is called and publishes result to OLDB.
Definition frequencyEstimator.hpp:31
This Exception is raised when the ipc queue returns an error that cannot be handled by the Telemetry ...
Definition exceptions.hpp:382
Base interface for all OLDB adapters.
Definition oldbIf.hpp:25
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition repositoryIf.ipp:1711
Base interface for all Runtime Configuration Repository adapters.
Definition runtimeRepoIf.hpp:27
Container class that holds services of any type.
Definition serviceContainer.hpp:39
Base class defining common interface for all DDT forwarders.
Definition ddtForwarder.hpp:46
void AssertState(const std::set< State > &states)
Definition ddtForwarder.hpp:144
virtual void SetState(State state)
Definition ddtForwarder.hpp:131
State
States a forwarder unit can be in.
Definition ddtForwarder.hpp:54
log4cplus::Logger & m_logger
Definition ddtForwarder.hpp:167
State GetState()
Get the state of the forwarder unit.
Definition ddtForwarder.hpp:91
std::string GetId()
Get identifier of the forwarder unit.
Definition ddtForwarder.hpp:84
DDT Forwarder unit that ingests data from from SHM and republishes it on several DDT streams.
Definition ipcqDdtForwarder.hpp:70
void Idle() override
Stop publishing DDT streams.
Definition ipcqDdtForwarder.hpp:165
virtual ~IpcqDdtForwarder()
Definition ipcqDdtForwarder.hpp:131
void Recover() override
Stop the processing thread of the forwarder unit and clear errors.
Definition ipcqDdtForwarder.hpp:177
IpcqDdtForwarder(const std::string &comp_id, ServiceContainer &services)
Definition ipcqDdtForwarder.hpp:85
typename FwdInfo::Topic Topic
Definition ipcqDdtForwarder.hpp:72
void Start() override
Start the processing thread of the forwarder unit.
Definition ipcqDdtForwarder.hpp:138
void Stop() override
Stop the processing thread of the forwarder unit.
Definition ipcqDdtForwarder.hpp:170
void Update() override
Reload dynamic configuration of the forwarder unit.
Definition ipcqDdtForwarder.hpp:190
void CheckErrors() override
Check for Errors, will rethrow errors thrown in the forwarder.
Definition ipcqDdtForwarder.hpp:201
void Run() override
Start publishing DDT streams.
Definition ipcqDdtForwarder.hpp:160
Header file for ComponentMetricsIf.
Base class defining common interface for all DDT forwarders.
Header file for Duration Monitor.
Provides macros and utilities for exception handling.
Header file for Frequency Estimator.
Definition ddsSub.hpp:151
Definition commandReplier.cpp:22
std::optional< numapp::NumaPolicies > GetNumaPolicies(RepositoryIf &repo, const DataPointPath &path)
Constructs a NumaPolicies object from the configuration datapoints found under the given datapoint pa...
Definition repositoryIfUtils.cpp:33
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Definition businessLogic.cpp:24
Header file for OldbIf, which defines the API for OldbAdapters.
Provides utilities to simplify use of RepositoryIf.
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.