hlcc 2.0.0-beta2+pre1
Loading...
Searching...
No Matches
observablePublisher.hpp
Go to the documentation of this file.
1#ifndef HLCC_OLDBMUX_CIIPUBLISHER_HPP_
2#define HLCC_OLDBMUX_CIIPUBLISHER_HPP_
3
4#include <string>
5#include <functional>
6#include <memory>
7#include <variant>
8
9#include <mal/Cii.hpp>
10#include <mal/utility/LoadMal.hpp>
11#include <mal/ps/qos/QoS.hpp>
12#include <mal/ps/Publisher.hpp>
13#include <mal/utility/Uri.hpp>
14
15#include <rad/oldbInterface.hpp>
16#include <ecos/taiClock.hpp>
17
18#include <ciiLogManager.hpp>
19
20namespace hlcc::oldbmux {
21
28 std::chrono::nanoseconds malpub_elapsed;
29 std::chrono::nanoseconds observers_elapsed;
30};
31
36template<typename T>
38
39using ObsCallback = std::function<void (const T&)>;
40using ObsCallbackWithTimestamp = std::function<void (const T&, const ecos::TaiClock::time_point&)>;
41using ObsCallbackWithDpQuality = std::function<void (const T&, const elt::oldb::CiiOldbDpQuality&)>;
42using ObsCallbackWithTimestampAndDpQuality =
43 std::function<void (const T&, const ecos::TaiClock::time_point&, const elt::oldb::CiiOldbDpQuality&)>;
44using ObsVariant = std::variant<ObsCallback, ObsCallbackWithTimestamp, ObsCallbackWithDpQuality, ObsCallbackWithTimestampAndDpQuality>;
45
46public:
47
54 explicit ObservablePublisher<T>(std::unique_ptr<::elt::mal::ps::Publisher<T>> malpub, bool use_malpub = true);
55
56 // Accepts the callback functions without user timestamp (internal timestamping)
57 void AddObserver(ObsCallback observer_function);
58
59 // Accepts the callback functions with user-provided timestamp parameter
60 void AddObserver(ObsCallbackWithTimestamp observer_function);
61
62 // Accepts the callback functions with Oldb datapoint quality
63 void AddObserver(ObsCallbackWithDpQuality observer_function);
64
65 // Accepts the callback functions with user-provided timestamp parameter and Oldb datapoint quality
66 void AddObserver(ObsCallbackWithTimestampAndDpQuality observer_function);
67
68 std::shared_ptr<T> createDataEntity();
69
71 const std::chrono::milliseconds& timeout,
72 const ecos::TaiClock::time_point& timestamp = ecos::TaiClock::now(),
73 const elt::oldb::CiiOldbDpQuality& dp_quality = elt::oldb::CiiOldbDpQuality::OK) const;
74
75private:
76 const log4cplus::Logger logger = elt::log::CiiLogManager::GetLogger();
77 const std::unique_ptr<::elt::mal::ps::Publisher<T>> m_malpub;
78 const bool m_use_malpub;
79 // Vector that will hold the observer functions in a variant due to both possibilities
80 // with and without timestamp parameter
81 std::vector<ObsVariant> m_observers;
82};
83
84template<typename T>
85ObservablePublisher<T>::ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher<T>> malpub, bool use_malpub)
86: m_malpub {std::move(malpub)}, m_use_malpub {use_malpub} {
87}
88
89template<typename T>
90void ObservablePublisher<T>::AddObserver(ObsCallback observer_function) {
91 m_observers.push_back(observer_function);
92}
93
94template<typename T>
95void ObservablePublisher<T>::AddObserver(ObsCallbackWithTimestamp observer_function) {
96 m_observers.push_back(observer_function);
97}
98
99template<typename T>
100void ObservablePublisher<T>::AddObserver(ObsCallbackWithDpQuality observer_function) {
101 m_observers.push_back(observer_function);
102}
103
104template<typename T>
105void ObservablePublisher<T>::AddObserver(ObsCallbackWithTimestampAndDpQuality observer_function) {
106 m_observers.push_back(observer_function);
107}
108
109template<typename T>
111 return m_malpub->createDataEntity();
112}
113
114template<typename T>
116 const std::chrono::milliseconds& timeout,
117 const ecos::TaiClock::time_point& timestamp,
118 const elt::oldb::CiiOldbDpQuality& dp_quality) const {
119 auto t_0 = std::chrono::steady_clock::now();
120 if (m_malpub != nullptr && m_use_malpub) {
121 logger.log(log4cplus::DEBUG_LOG_LEVEL, "publishing over MAL");
122 try {
123 m_malpub->publish(data, timeout);
124 } catch (std::exception& exc) {
125 std::string cause = exc.what();
126 logger.log(log4cplus::ERROR_LOG_LEVEL, "Ignoring: publishing to MAL failed, due to: " + cause);
127 }
128 }
129
130 auto t_1 = std::chrono::steady_clock::now();
131 logger.log(log4cplus::DEBUG_LOG_LEVEL, "notifying observers");
132 for (ObsVariant obs_func : m_observers) {
133 try {
134 // Check if variant contains callback function without timestamp
135 if (std::holds_alternative<ObsCallback>(obs_func)) {
136 auto func = std::get<ObsCallback>(obs_func);
137 func(data);
138 }
139 // Check if variant contains callback function with timestamp
140 else if (std::holds_alternative<ObsCallbackWithTimestamp>(obs_func)) {
141 auto func = std::get<ObsCallbackWithTimestamp>(obs_func);
142 func(data, timestamp);
143 }
144 // Check if variant contains callback function with Oldb dp quality
145 else if (std::holds_alternative<ObsCallbackWithDpQuality>(obs_func)) {
146 auto func = std::get<ObsCallbackWithDpQuality>(obs_func);
147 func(data, dp_quality);
148 }
149 // Check if variant contains callback function with timestamp & Oldb dp quality
150 else if (std::holds_alternative<ObsCallbackWithTimestampAndDpQuality>(obs_func)) {
151 auto func = std::get<ObsCallbackWithTimestampAndDpQuality>(obs_func);
152 func(data, timestamp, dp_quality);
153 }
154 // If variant is empty (should not be possible) log an error
155 else {
156 logger.log(log4cplus::ERROR_LOG_LEVEL, "Observable function variant holds NO valid"
157 "alternative.");
158 }
159 } catch (std::exception& exc) {
160 std::string cause = exc.what();
161 logger.log(log4cplus::DEBUG_LOG_LEVEL, "Ignoring: callback to observer failed, due to: " + cause);
162 }
163 }
164 logger.log(log4cplus::DEBUG_LOG_LEVEL, "publish done");
165
166 return PublishingTimeElapsed {(t_1 - t_0), (std::chrono::steady_clock::now() - t_1)};
167}
168
169}
170// namespace
171
172#endif
Definition: observablePublisher.hpp:37
PublishingTimeElapsed Publish(const T &data, const std::chrono::milliseconds &timeout, const ecos::TaiClock::time_point &timestamp=ecos::TaiClock::now(), const elt::oldb::CiiOldbDpQuality &dp_quality=elt::oldb::CiiOldbDpQuality::OK) const
Definition: observablePublisher.hpp:115
ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher< T > > malpub, bool use_malpub=true)
Definition: observablePublisher.hpp:85
void AddObserver(ObsCallback observer_function)
Definition: observablePublisher.hpp:90
std::shared_ptr< T > createDataEntity()
Definition: observablePublisher.hpp:110
Definition: ciiOldbDataPointAsync.hpp:30
ccsinsdetifllnetio::PointingKernelPositions data
Definition: pkp_llnetio_subscriber.cpp:29
Definition: observablePublisher.hpp:27
std::chrono::nanoseconds observers_elapsed
Definition: observablePublisher.hpp:29
std::chrono::nanoseconds malpub_elapsed
Definition: observablePublisher.hpp:28