13#ifndef IPCQ_DDT_FORWARDER_HPP
14#define IPCQ_DDT_FORWARDER_HPP
27#include <ipcq/adapter.hpp>
28#include <ipcq/reader.hpp>
30#include <numapp/numapolicies.hpp>
31#include <numapp/thread.hpp>
33#include <fmt/format.h>
38#include <shared_mutex>
47template <
typename Tuple>
50template <
typename... PublisherType>
51struct Wrapper<std::tuple<PublisherType...>> {
54 static std::tuple<PublisherType...>
55 MakePublishers(std::string
const& db_prefix, ServiceContainer& services) {
56 return std::make_tuple(PublisherType(db_prefix, services)...);
69template <
typename FwdInfo,
typename ReaderType = ipcq::Reader<
typename FwdInfo::Topic>>
72 using Topic =
typename FwdInfo::Topic;
81 static_assert(FwdInfo::ID.find_first_not_of(
"abcdefghijklmnopqrstuvwxyz_0123456789") ==
82 std::string_view::npos,
83 "DDT Forwarder ID contains illegal characters!");
86 :
DdtForwarder(comp_id,
"IpcqDdtForwarder", std::string(FwdInfo::ID), services)
88 , m_oldb(services.Get<
OldbIf>())
91 , m_oldb_prefix(fmt::format(
"/forwarders/{}",
GetId()))
92 , m_rtr_prefix_static(fmt::format(
"/{}/static/forwarders/{}", comp_id,
GetId()))
93 , m_rtr_prefix_dynamic(fmt::format(
"/{}/dynamic/forwarders/{}", comp_id,
GetId()))
94 , m_publishers(Wrapper<typename FwdInfo::DdtPublisherTupleType>::MakePublishers(
95 fmt::format(
"{}/forwarders/{}/publishers", comp_id,
GetId()), services))
101 m_samples_forwarded_reg = m_metrics.
AddCounter(
102 &m_samples_forwarded,
103 CounterMetricInfo(m_oldb_prefix +
"/samples_forwarded",
"Number of samples forwarded"));
105 m_last_sample_id_forwarded_reg =
106 m_metrics.
AddCounter(&m_last_sample_id_forwarded,
108 "Last sample id forwarded"));
110 m_freq_estimator = std::make_unique<FrequencyEstimator>(
111 m_metrics,
"Estimated frequency of the data forwarder", m_oldb_prefix);
113 m_dur_monitor = std::make_unique<DurationMonitor>(
114 m_metrics,
"Duration of publishing data to DDT", m_oldb_prefix);
117 std::make_unique<BufferMonitor>(m_metrics,
"SHM read buffer occupancy", m_oldb_prefix);
121 auto queue_name_path =
DataPointPath{m_rtr_prefix_static +
"/shm_queue_name"};
122 m_queue_name = m_rtr.
GetDataPoint<std::string>(queue_name_path);
124 auto thread_policies_path =
DataPointPath{m_rtr_prefix_static +
"/thread_policies"};
127 auto subsample_factor_path =
DataPointPath{m_rtr_prefix_dynamic +
"/subsample_factor"};
128 m_subsample_factor = m_rtr.
GetDataPoint<int64_t>(subsample_factor_path);
133 if (m_processing_thread.joinable()) {
134 m_processing_thread.join();
144 m_processing_thread = numapp::MakeThread(
GetId().substr(0, 15),
145 m_thread_policies.value_or(numapp::NumaPolicies()),
146 [&]() { Process(); });
149 using namespace std::chrono_literals;
150 std::this_thread::sleep_for(10ms);
172 if (m_processing_thread.joinable()) {
173 m_processing_thread.join();
182 std::scoped_lock lock(m_exception_mutex);
183 m_exception = std::current_exception();
193 LOG4CPLUS_INFO(
m_logger, fmt::format(
"Updating IpcqDdtForwarder '{}'",
GetId()));
195 auto subsample_factor_path =
DataPointPath{m_rtr_prefix_dynamic +
"/subsample_factor"};
196 m_subsample_factor = m_rtr.
GetDataPoint<int64_t>(subsample_factor_path);
198 std::apply([&](
auto&&... pub) { ((pub.Update()), ...); }, m_publishers);
203 auto lock = std::shared_lock{m_exception_mutex};
205 std::rethrow_exception(m_exception);
211 using namespace std::chrono_literals;
215 std::error_code
const ok{};
216 std::pair<std::error_code, size_t> result;
218 std::vector<Topic> read_buffer;
219 read_buffer.reserve(MAX_SAMPLES_READ);
221 m_samples_forwarded.Store(0);
222 m_last_sample_id_forwarded.Store(0);
224 ReaderType reader(m_queue_name.c_str());
226 size_t reader_capacity = reader.Capacity();
236 if (
auto ret = reader.Reset(); ret == ok) {
239 CII_THROW(
IpcqError,
"Error resetting ipcq Reader");
243 std::this_thread::sleep_for(1ms);
252 to_read = reader.NumAvailable();
256 to_read = std::min(to_read, MAX_SAMPLES_READ);
258 if (m_subsample_factor > 1) {
259 m_sampling_counter = m_sampling_counter % m_subsample_factor;
260 if (m_sampling_counter == 0) {
263 to_skip = std::min(to_read, m_subsample_factor - m_sampling_counter);
268 m_buffer_monitor->Tick(reader.NumAvailable(), reader_capacity);
269 result = reader.Read(ipcq::BackInserter(read_buffer), to_read, 100ms);
270 for (
const auto& sample : read_buffer) {
271 m_last_sample_id_forwarded.Store(sample.sample_id);
272 auto t1 = std::chrono::steady_clock::now();
273 ExtractAndPublish(sample);
274 auto t2 = std::chrono::steady_clock::now();
275 m_dur_monitor->Tick(t2 - t1);
276 m_freq_estimator->Tick();
277 m_samples_forwarded++;
281 result = reader.Skip(to_skip, 100ms);
283 m_sampling_counter += result.second;
285 if (not(result.first == ok or result.first == ipcq::Error::Timeout)) {
286 std::string error =
"Error reading from ipcq: " + result.first.message();
292 std::this_thread::sleep_for(1ms);
300 std::scoped_lock lock(m_exception_mutex);
301 m_exception = std::current_exception();
307 void ExtractAndPublish(
Topic const& sample) {
308 auto func = [&](
auto& pub) {
309 if (pub.IsEnabled()) {
311 auto e = std::remove_reference_t<
decltype(pub)>::StreamInfo::Extract(sample);
313 pub.Publish(std::get<0>(e),
314 reinterpret_cast<const uint8_t*
>(std::get<1>(e).data()),
315 std::get<1>(e).size());
319 std::apply([&](
auto&&... pub) { (func(pub), ...); }, m_publishers);
326 std::string m_comp_id;
327 std::string m_oldb_prefix;
328 std::string m_rtr_prefix_static;
329 std::string m_rtr_prefix_dynamic;
331 typename FwdInfo::DdtPublisherTupleType m_publishers;
333 std::atomic<State> m_requested_state;
334 std::exception_ptr m_exception =
nullptr;
335 std::shared_mutex m_exception_mutex;
337 std::thread m_processing_thread;
342 std::string m_queue_name;
347 std::optional<numapp::NumaPolicies> m_thread_policies;
352 int64_t m_subsample_factor;
357 uint64_t m_sampling_counter;
362 perfc::CounterI64 m_samples_forwarded;
363 perfc::ScopedRegistration m_samples_forwarded_reg;
368 perfc::CounterI64 m_last_sample_id_forwarded;
369 perfc::ScopedRegistration m_last_sample_id_forwarded_reg;
371 std::unique_ptr<FrequencyEstimator> m_freq_estimator;
372 std::unique_ptr<DurationMonitor> m_dur_monitor;
373 std::unique_ptr<BufferMonitor> m_buffer_monitor;
378 inline static constexpr size_t MAX_SAMPLES_READ = 16;
Header file for Buffer Monitor.
Monitors min, mean and max occupation of a buffer and publishes them to OLDB.
Definition: bufferMonitor.hpp:37
Component metrics interface.
Definition: componentMetricsIf.hpp:184
virtual perfc::ScopedRegistration AddCounter(CounterVariant counter, CounterMetricInfo info)=0
Add a counter to be included in component metrics, identified by its address, together with info to t...
Defines auxiliary information associated with each counter registered with ComponentMetricsIf.
Definition: componentMetricsIf.hpp:46
This class provides a wrapper for a data point path.
Definition: dataPointPath.hpp:73
Monitors min, mean and max duration and publishes them to OLDB.
Definition: durationMonitor.hpp:37
Estimates the frequency in which Tick is called and publishes result to OLDB.
Definition: frequencyEstimator.hpp:31
This Exception is raised when the ipc queue returns an error that cannot be handled by the Telemetry ...
Definition: exceptions.hpp:392
Base interface for all OLDB adapters.
Definition: oldbIf.hpp:26
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition: repositoryIf.hpp:574
Base interface for all Runtime Configuration Repository adapters.
Definition: runtimeRepoIf.hpp:28
Container class that holds services of any type.
Definition: serviceContainer.hpp:39
Base class defining common interface for all DDT forwarders.
Definition: ddtForwarder.hpp:46
void AssertState(std::set< State > const &states)
Definition: ddtForwarder.hpp:144
State
States a forwarder unit can be in.
Definition: ddtForwarder.hpp:54
virtual void SetState(State state)
Definition: ddtForwarder.hpp:131
log4cplus::Logger & m_logger
Definition: ddtForwarder.hpp:167
State GetState()
Get the state of the forwarder unit.
Definition: ddtForwarder.hpp:91
std::string GetId()
Get identifier of the forwarder unit.
Definition: ddtForwarder.hpp:84
DDT Forwarder unit that ingests data from from SHM and republishes it on several DDT streams.
Definition: ipcqDdtForwarder.hpp:70
void Idle() override
Stop publishing DDT streams.
Definition: ipcqDdtForwarder.hpp:165
virtual ~IpcqDdtForwarder()
Definition: ipcqDdtForwarder.hpp:131
void Recover() override
Stop the processing thread of the forwarder unit and clear errors.
Definition: ipcqDdtForwarder.hpp:177
typename FwdInfo::Topic Topic
Definition: ipcqDdtForwarder.hpp:72
void Start() override
Start the processing thread of the forwarder unit.
Definition: ipcqDdtForwarder.hpp:138
IpcqDdtForwarder(std::string const &comp_id, ServiceContainer &services)
Definition: ipcqDdtForwarder.hpp:85
void Stop() override
Stop the processing thread of the forwarder unit.
Definition: ipcqDdtForwarder.hpp:170
void Update() override
Reload dynamic configuration of the forwarder unit.
Definition: ipcqDdtForwarder.hpp:190
void CheckErrors() override
Check for Errors, will rethrow errors thrown in the forwarder.
Definition: ipcqDdtForwarder.hpp:201
void Run() override
Start publishing DDT streams.
Definition: ipcqDdtForwarder.hpp:160
Header file for ComponentMetricsIf.
Base class defining common interface for all DDT forwarders.
Header file for Duration Monitor.
Provides macros and utilities for exception handling.
Header file for Frequency Estimator.
Definition: commandReplier.cpp:22
std::optional< numapp::NumaPolicies > GetNumaPolicies(RepositoryIf &repo, const DataPointPath &path)
Constructs a NumaPolicies object from the configuration datapoints found under the given datapoint pa...
Definition: repositoryIfUtils.cpp:33
Definition: businessLogic.cpp:24
Header file for OldbIf, which defines the API for OldbAdapters.
Provides utilities to simplify use of RepositoryIf.
Header file for RuntimeRepoIf, which defines the API for RuntimeRepoAdapters.