RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
shmSubscriber.hpp
Go to the documentation of this file.
1
12#ifndef RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
13#define RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
14
15#include <boost/io/ios_state.hpp>
16#include <cassert>
17#include <chrono>
18#include <deque>
19#include <iomanip>
20#include <iostream>
21#include <ipcq/adapter.hpp>
22#include <ipcq/reader.hpp>
23#include <limits>
24#include <memory>
25#include <string>
26
27namespace rtctk::standaloneTools {
28
36public:
37 ShmSubscriberBase() = default;
38 virtual ~ShmSubscriberBase() = default;
39 int Run(int argc, char* argv[]);
40
41protected:
49 virtual void Initialise() = 0;
50
56 virtual void Finalise() = 0;
57
71 virtual bool ReadSample() = 0;
72
79 virtual void PrintSample() = 0;
80
84 virtual const void* GetSampleData() const = 0;
85
89 virtual size_t GetSampleSize() const = 0;
90
94 inline const std::string& GetQueueName() const {
95 return m_queue_name;
96 }
97
102 inline const std::string& GetFilename() const {
103 return m_filename;
104 }
105
109 inline const int64_t GetSampleNumber() const {
110 return m_sample_counter;
111 }
112
117 inline const int64_t PrintWihtLongFormat() const {
118 return m_print_long;
119 }
120
121private:
122 // Do not allow copying or moving of this object.
123 ShmSubscriberBase(const ShmSubscriberBase& rhs) = delete;
124 ShmSubscriberBase& operator=(const ShmSubscriberBase& rhs) = delete;
125 ShmSubscriberBase(ShmSubscriberBase&& rhs) = default;
126 ShmSubscriberBase& operator=(ShmSubscriberBase&& rhs) = default;
127
128 bool ParseArguments(int argc, char* argv[]);
129 void WriteBufferToFile(const void* buffer, size_t size);
130 bool TerminateProcess();
131
132 std::string m_queue_name;
133 std::string m_filename;
134 int64_t m_max_samples;
135 int64_t m_skip_samples;
136 bool m_print_samples;
137 bool m_print_long;
138 int64_t m_sample_counter;
139};
140
151template <typename Topic,
152 class ConditionPolicy = ipcq::BoostConditionPolicy,
153 class ShmTraits = ipcq::detail::BoostInterprocessTraits>
155public:
156 ShmSubscriber() = default;
157 virtual ~ShmSubscriber() = default;
158
159protected:
170 virtual void PrintSample(const Topic& sample) {
171 boost::io::ios_flags_saver saved_state(std::cout);
172 std::cout << "Sample " << GetSampleNumber() << ":\n";
173 auto buffer = reinterpret_cast<const uint8_t*>(&sample);
174 size_t max_bytes_to_print = 64;
175 if (PrintWihtLongFormat()) {
176 max_bytes_to_print = std::numeric_limits<size_t>::max();
177 }
178 bool last_was_endl = false;
179 for (size_t n = 0; n < sizeof(Topic) and n < max_bytes_to_print; ++n) {
180 std::cout << "0x" << std::setfill('0') << std::setw(2) << std::right << std::noshowbase
181 << std::hex << static_cast<unsigned int>(buffer[n]);
182 if ((n + 1) % 16 == 0) {
183 std::cout << "\n";
184 last_was_endl = true;
185 } else {
186 std::cout << " ";
187 last_was_endl = false;
188 }
189 }
190 if (not last_was_endl) {
191 std::cout << "\n";
192 }
193 if (sizeof(Topic) > max_bytes_to_print) {
194 std::cout << "... (data continues) ...\n";
195 }
196 }
197
198private:
199 using Reader = ipcq::BasicReader<Topic, ConditionPolicy, ShmTraits>;
200
201 // Do not allow copying or moving of this object.
202 ShmSubscriber(const ShmSubscriber& rhs) = delete;
203 ShmSubscriber& operator=(const ShmSubscriber& rhs) = delete;
204 ShmSubscriber(ShmSubscriber&& rhs) noexcept = default;
205 ShmSubscriber& operator=(ShmSubscriber&& rhs) noexcept = default;
206
210 void Initialise() override {
211 try {
212 m_reader = std::make_unique<Reader>(GetQueueName().c_str());
213 } catch (const std::exception& error) {
214 std::string msg = "Failed to create the shared memory reader for queue '" +
215 GetQueueName() + "': " + error.what();
216 throw std::runtime_error(msg);
217 }
218 }
219
223 void Finalise() override {
224 try {
225 m_reader.reset(nullptr);
226 } catch (const std::exception& error) {
227 std::string msg = "Failed to destroy the shared memory reader for queue '" +
228 GetQueueName() + "': " + error.what();
229 throw std::runtime_error(msg);
230 }
231 m_samples.clear();
232 }
233
241 bool ReadSample() override {
242 if (not m_samples.empty()) {
243 m_samples.pop_front();
244 }
245 if (not m_samples.empty()) {
246 return true;
247 }
248 using namespace std::chrono_literals;
249 auto count = m_reader->NumAvailable();
250 auto [error, num_elements] = m_reader->Read(ipcq::BackInserter(m_samples), count, 100ms);
251 if (error) {
252 if (error == ipcq::make_error_code(ipcq::Error::Timeout)) {
253 return false;
254 } else if (error == ipcq::make_error_code(ipcq::Error::InconsistentState)) {
255 // Use case for this tool is not conserned about missed samples. So when we get
256 // InconsistentState because of e.g. late joining we simply reset.
257 // Reset may fail if queue is Closed or if it is empty (nothing to reset to). We
258 // ignore that as well. Eventually there will be data in the queue and Reset() will
259 // succeed, or queue will be closed and Reset won't be attempted again.
260 if (!m_reader->Reset()) {
261 // Only log in the successful case as attempts to Reset will possibly otherwise
262 // flood the console with attempts if theres no data in the queue.
263 std::cerr << "Note: SHM reader state reset.\n";
264 }
265 return false;
266 } else {
267 std::string msg = "Failed to read from shared memory: " + error.message();
268 throw std::runtime_error(msg);
269 }
270 }
271 return num_elements > 0;
272 }
273
277 void PrintSample() override {
278 assert(not m_samples.empty());
279 PrintSample(m_samples.front());
280 }
281
285 const void* GetSampleData() const override {
286 assert(not m_samples.empty());
287 return reinterpret_cast<const void*>(&m_samples.front());
288 }
289
293 size_t GetSampleSize() const override {
294 return sizeof(Topic);
295 }
296
297 std::deque<Topic> m_samples;
298 std::unique_ptr<Reader> m_reader;
299};
300
301} // namespace rtctk::standaloneTools
302
303#endif // RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
Base class to implement all non-template methods that can be pre-compiled for the ShmSubscriber.
Definition shmSubscriber.hpp:35
const int64_t PrintWihtLongFormat() const
Definition shmSubscriber.hpp:117
virtual void Initialise()=0
Should perform any needed initialisation steps for the program.
int Run(int argc, char *argv[])
Executes the shared memory subscriber program.
Definition shmSubscriber.cpp:47
virtual bool ReadSample()=0
Should read a sample into internal buffers from the shared memory.
virtual void Finalise()=0
Must cleanup any objects created in Initialise.
const int64_t GetSampleNumber() const
Definition shmSubscriber.hpp:109
virtual void PrintSample()=0
Should print the contents of the read sample to console in a human readable format.
virtual const void * GetSampleData() const =0
virtual size_t GetSampleSize() const =0
const std::string & GetFilename() const
Definition shmSubscriber.hpp:102
const std::string & GetQueueName() const
Definition shmSubscriber.hpp:94
Implements basic features for a simple shared memory subscriber program.
Definition shmSubscriber.hpp:154
virtual void PrintSample(const Topic &sample)
Prints a hex dump of the sample.
Definition shmSubscriber.hpp:170
Definition genDdsPublisher.hpp:20