12#ifndef RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
13#define RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
15#include <boost/io/ios_state.hpp>
21#include <ipcq/adapter.hpp>
22#include <ipcq/reader.hpp>
39 int Run(
int argc,
char* argv[]);
110 return m_sample_counter;
128 bool ParseArguments(
int argc,
char* argv[]);
129 void WriteBufferToFile(
const void* buffer,
size_t size);
130 bool TerminateProcess();
132 std::string m_queue_name;
133 std::string m_filename;
134 int64_t m_max_samples;
135 int64_t m_skip_samples;
136 bool m_print_samples;
138 int64_t m_sample_counter;
151template <
typename Topic,
152 class ConditionPolicy = ipcq::BoostConditionPolicy,
153 class ShmTraits = ipcq::detail::BoostInterprocessTraits>
171 boost::io::ios_flags_saver saved_state(std::cout);
173 auto buffer =
reinterpret_cast<const uint8_t*
>(&sample);
174 size_t max_bytes_to_print = 64;
176 max_bytes_to_print = std::numeric_limits<size_t>::max();
178 bool last_was_endl =
false;
179 for (
size_t n = 0; n <
sizeof(Topic) and n < max_bytes_to_print; ++n) {
180 std::cout <<
"0x" << std::setfill(
'0') << std::setw(2) << std::right << std::noshowbase
181 << std::hex << static_cast<unsigned int>(buffer[n]);
182 if ((n + 1) % 16 == 0) {
184 last_was_endl =
true;
187 last_was_endl =
false;
190 if (not last_was_endl) {
193 if (
sizeof(Topic) > max_bytes_to_print) {
194 std::cout <<
"... (data continues) ...\n";
199 using Reader = ipcq::BasicReader<Topic, ConditionPolicy, ShmTraits>;
210 void Initialise()
override {
212 m_reader = std::make_unique<Reader>(
GetQueueName().c_str());
213 }
catch (
const std::exception& error) {
214 std::string msg =
"Failed to create the shared memory reader for queue '" +
216 throw std::runtime_error(msg);
223 void Finalise()
override {
225 m_reader.reset(
nullptr);
226 }
catch (
const std::exception& error) {
227 std::string msg =
"Failed to destroy the shared memory reader for queue '" +
229 throw std::runtime_error(msg);
241 bool ReadSample()
override {
242 if (not m_samples.empty()) {
243 m_samples.pop_front();
245 if (not m_samples.empty()) {
248 using namespace std::chrono_literals;
249 auto count = m_reader->NumAvailable();
250 auto [error, num_elements] = m_reader->Read(ipcq::BackInserter(m_samples), count, 100ms);
252 if (error == ipcq::make_error_code(ipcq::Error::Timeout)) {
254 }
else if (error == ipcq::make_error_code(ipcq::Error::InconsistentState)) {
260 if (!m_reader->Reset()) {
263 std::cerr <<
"Note: SHM reader state reset.\n";
267 std::string msg =
"Failed to read from shared memory: " + error.message();
268 throw std::runtime_error(msg);
271 return num_elements > 0;
278 assert(not m_samples.empty());
285 const void* GetSampleData()
const override {
286 assert(not m_samples.empty());
287 return reinterpret_cast<const void*
>(&m_samples.front());
293 size_t GetSampleSize()
const override {
294 return sizeof(Topic);
297 std::deque<Topic> m_samples;
298 std::unique_ptr<Reader> m_reader;