1#ifndef HLCC_OLDBMUX_CIIPUBLISHER_HPP_
2#define HLCC_OLDBMUX_CIIPUBLISHER_HPP_
10#include <mal/utility/LoadMal.hpp>
11#include <mal/ps/qos/QoS.hpp>
12#include <mal/ps/Publisher.hpp>
13#include <mal/utility/Uri.hpp>
15#include <rad/oldbInterface.hpp>
16#include <ecos/taiClock.hpp>
18#include <ciiLogManager.hpp>
39using ObsCallback = std::function<void (
const T&)>;
40using ObsCallbackWithTimestamp = std::function<void (
const T&,
const ecos::TaiClock::time_point&)>;
41using ObsCallbackWithDpQuality = std::function<void (
const T&,
const elt::oldb::CiiOldbDpQuality&)>;
42using ObsCallbackWithTimestampAndDpQuality =
43 std::function<void (
const T&,
const ecos::TaiClock::time_point&,
const elt::oldb::CiiOldbDpQuality&)>;
44using ObsVariant = std::variant<ObsCallback, ObsCallbackWithTimestamp, ObsCallbackWithDpQuality, ObsCallbackWithTimestampAndDpQuality>;
54 explicit ObservablePublisher<T>(std::unique_ptr<::elt::mal::ps::Publisher<T>> malpub,
bool use_malpub =
true);
60 void AddObserver(ObsCallbackWithTimestamp observer_function);
63 void AddObserver(ObsCallbackWithDpQuality observer_function);
66 void AddObserver(ObsCallbackWithTimestampAndDpQuality observer_function);
71 const std::chrono::milliseconds& timeout,
72 const ecos::TaiClock::time_point& timestamp = ecos::TaiClock::now(),
73 const elt::oldb::CiiOldbDpQuality& dp_quality = elt::oldb::CiiOldbDpQuality::OK)
const;
76 const log4cplus::Logger logger = elt::log::CiiLogManager::GetLogger();
77 const std::unique_ptr<::elt::mal::ps::Publisher<T>> m_malpub;
78 const bool m_use_malpub;
81 std::vector<ObsVariant> m_observers;
86: m_malpub {std::move(malpub)}, m_use_malpub {use_malpub} {
91 m_observers.push_back(observer_function);
96 m_observers.push_back(observer_function);
101 m_observers.push_back(observer_function);
106 m_observers.push_back(observer_function);
111 return m_malpub->createDataEntity();
116 const std::chrono::milliseconds& timeout,
117 const ecos::TaiClock::time_point& timestamp,
118 const elt::oldb::CiiOldbDpQuality& dp_quality)
const {
119 auto t_0 = std::chrono::steady_clock::now();
120 if (m_malpub !=
nullptr && m_use_malpub) {
121 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"publishing over MAL");
123 m_malpub->publish(
data, timeout);
124 }
catch (std::exception& exc) {
125 std::string cause = exc.what();
126 logger.log(log4cplus::ERROR_LOG_LEVEL,
"Ignoring: publishing to MAL failed, due to: " + cause);
130 auto t_1 = std::chrono::steady_clock::now();
131 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"notifying observers");
132 for (ObsVariant obs_func : m_observers) {
135 if (std::holds_alternative<ObsCallback>(obs_func)) {
136 auto func = std::get<ObsCallback>(obs_func);
140 else if (std::holds_alternative<ObsCallbackWithTimestamp>(obs_func)) {
141 auto func = std::get<ObsCallbackWithTimestamp>(obs_func);
142 func(
data, timestamp);
145 else if (std::holds_alternative<ObsCallbackWithDpQuality>(obs_func)) {
146 auto func = std::get<ObsCallbackWithDpQuality>(obs_func);
147 func(
data, dp_quality);
150 else if (std::holds_alternative<ObsCallbackWithTimestampAndDpQuality>(obs_func)) {
151 auto func = std::get<ObsCallbackWithTimestampAndDpQuality>(obs_func);
152 func(
data, timestamp, dp_quality);
156 logger.log(log4cplus::ERROR_LOG_LEVEL,
"Observable function variant holds NO valid"
159 }
catch (std::exception& exc) {
160 std::string cause = exc.what();
161 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"Ignoring: callback to observer failed, due to: " + cause);
164 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"publish done");
Definition: observablePublisher.hpp:37
PublishingTimeElapsed Publish(const T &data, const std::chrono::milliseconds &timeout, const ecos::TaiClock::time_point ×tamp=ecos::TaiClock::now(), const elt::oldb::CiiOldbDpQuality &dp_quality=elt::oldb::CiiOldbDpQuality::OK) const
Definition: observablePublisher.hpp:115
ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher< T > > malpub, bool use_malpub=true)
Definition: observablePublisher.hpp:85
void AddObserver(ObsCallback observer_function)
Definition: observablePublisher.hpp:90
std::shared_ptr< T > createDataEntity()
Definition: observablePublisher.hpp:110
Definition: ciiOldbDataPointAsync.hpp:30
ccsinsdetifllnetio::PointingKernelPositions data
Definition: pkp_llnetio_subscriber.cpp:29
Definition: observablePublisher.hpp:27
std::chrono::nanoseconds observers_elapsed
Definition: observablePublisher.hpp:29
std::chrono::nanoseconds malpub_elapsed
Definition: observablePublisher.hpp:28