RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
shmPub.hpp
Go to the documentation of this file.
1
12#ifndef RTCTK_STANDALONETOOLS_SHMPUB_HPP
13#define RTCTK_STANDALONETOOLS_SHMPUB_HPP
14
15// arg parsing
16#include <boost/program_options.hpp>
17
18// cfitsio
19#include <cfitsio/fitsio.h>
20
21// include the numapp for threading
22#include <numapp/mempolicy.hpp>
23#include <numapp/numapolicies.hpp>
24#include <numapp/thread.hpp>
25
26// include the ipcq for writer
27#include <ipcq/writer.hpp>
28
29#include <atomic>
30#include <chrono>
31#include <ctime>
32#include <functional>
33#include <iostream>
34#include <numeric>
35#include <vector>
36
37namespace rtctk::standaloneTools {
38
40static bool g_stop = false;
41
47void SignalHandler(int signal) {
48 std::cout << "\nSignal to exit received\n";
49 g_stop = true;
50}
51
78template <class TopicType, class WriterType = ipcq::Writer<TopicType>>
79class ShmPub {
80public:
81 ShmPub(int argc, char* argv[])
82 : m_queue_size(0)
83 , m_sample_delay(0)
84 , m_numa(0)
85 , m_print_every(0)
86 , m_gen_frames(0)
87 , m_sample_id_increment(1)
88 , m_repeat_mode(false)
89 , m_help_only(false) {
90 using namespace boost::program_options;
91
92 try {
93 options_description desc("Allowed options");
94 // clang-format off
95 desc.add_options()
96 ("help,h", "produce help message")
97 ("fits-file,f",
98 value<std::string>(&m_filename)->default_value(""),
99 "fits input file: if not provided the app will generate data")
100 ("queue-name,q",
101 value<std::string>(&m_queue_name)->default_value("default_shm_queue"),
102 "shm queue name")
103 ("queue-size,s",
104 value<size_t>(&m_queue_size)->default_value(1000),
105 "size of the queue")
106 ("sample-delay,d",
107 value<int>(&m_sample_delay)->default_value(10),
108 "inter-sample delay in ms")
109 ("numa-node,n", value<int>(&m_numa), "numa node for shm queue")
110 ("print-every,p",
111 value<int>(&m_print_every)->default_value(0),
112 "when to print to screen the number of sample written")
113 ("gen-frames,g",
114 value<int>(&m_gen_frames)->default_value(100),
115 "Number of frames to generate")
116 ("sample-id-increment,i",
117 value<unsigned>(&m_sample_id_increment)->default_value(1),
118 "sample_id increment")
119 ("repeat-mode,r",
120 bool_switch(&m_repeat_mode),
121 "Repeat output when all samples are written");
122 // clang-format on
123
124 variables_map vm;
125 store(command_line_parser(argc, argv).options(desc).run(), vm);
126 notify(vm);
127
128 if (vm.count("help")) {
129 m_help_only = true;
130 std::cout << desc << "\n";
131 } else {
132 m_help_only = false;
133 std::cout << "fits-file: " << m_filename << "\n";
134 std::cout << "queue-name: " << m_queue_name << "\n";
135 std::cout << "queue-size: " << m_queue_size << "\n";
136 std::cout << "sample-delay: " << m_sample_delay << "\n";
137 if (vm.count("numa-node")) {
138 std::cout << "numa-node: " << m_numa << "\n";
139 }
140 std::cout << "print-every: " << m_print_every << "\n";
141 std::cout << "gen-frames: " << m_gen_frames << "\n";
142 std::cout << "sample-id-increment: " << m_sample_id_increment << "\n";
143 std::cout << "repeat-mode: " << m_repeat_mode << "\n";
144
145 if (vm.count("numa-node")) {
146 m_writer =
147 std::make_unique<WriterType>(m_queue_name.c_str(),
148 m_queue_size,
149 numapp::MemPolicy::MakeBindNode(m_numa));
150 } else {
151 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(), m_queue_size);
152 }
153 }
154 } catch (const std::exception& e) {
155 std::cerr << "Exception:" << e.what() << "\n";
156 }
157 }
158
159 virtual ~ShmPub() = default;
160
171 int Run() {
172 if (m_help_only) {
173 return 0;
174 }
175
176 int ret_val = 0;
177
178 try {
179 signal(SIGINT, SignalHandler);
180
181 std::vector<TopicType> data;
182
183 // checks if filename has been indicated if it has loads data by calling the user
184 // overloaded function ReadFits if not provided calls the user overloaded function
185 // GenData
186 if (not m_filename.empty()) {
187 std::cout << "Reading data from FITS file: " << m_filename << "\n";
188 data = ReadFits(m_filename);
189 } else {
190 std::cout << "Generating data\n";
191 data = GenData(m_gen_frames);
192 }
193
194 // check to make sure m_data is populated
195 if (data.empty()) {
196 throw std::runtime_error("Data vector is not populated so will exit");
197 }
198
199 // calls main loop
200 std::cout << "Writing data to shared memory queue\n";
201 WriteToShm(data);
202
203 } catch (const std::exception& e) {
204 std::cout << e.what() << "\n";
205 ret_val = -1;
206 }
207
208 // Close queue to signal and give readers time detach from queue
209 m_writer->Close();
210#ifndef UNIT_TEST
211 std::this_thread::sleep_for(std::chrono::seconds(2));
212#endif
213
214 return ret_val;
215 }
216
231 virtual std::vector<TopicType> ReadFits(std::string filename) = 0;
232
247 virtual std::vector<TopicType> GenData(int num_frames) = 0;
248
262 virtual void AdjustSample(TopicType& sample) const {};
263
264protected:
266 std::string GetQueueName() const {
267 return m_queue_name;
268 }
269
271 size_t GetQueueSize() const {
272 return m_queue_size;
273 }
274
276 int GetSampleDelay() const {
277 return m_sample_delay;
278 }
279
281 int GetNuma() const {
282 return m_numa;
283 }
284
286 bool GetRepeatMode() const {
287 return m_repeat_mode;
288 }
289
290private:
302 void WriteToShm(std::vector<TopicType>& data) {
303 using namespace std::chrono;
304
305 size_t n_written = 0;
306 auto t_sent = steady_clock::now();
307 auto t_last = t_sent;
308 do {
309 for (auto& sample : data) {
310 if (g_stop) {
311 return;
312 }
313 AdjustSample(sample);
314 if (m_repeat_mode) {
315 sample.sample_id = n_written * m_sample_id_increment;
316 }
317 t_sent = steady_clock::now();
318 std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
319 if (err) {
320 throw std::runtime_error("Error writing to shm: " + err.message());
321 }
322
323 n_written++;
324 if (n_written && m_print_every && (n_written % m_print_every == 0)) {
325 auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
326 std::cout << "Samples written: " << n_written << "\n";
327 std::cout << "Total time to write " << m_print_every << " : " << dur << " ms\n";
328 std::cout << "Average frame time: " << static_cast<float>(dur) / m_print_every
329 << " ms\n";
330 t_last = t_sent;
331 }
332 while (duration_cast<milliseconds>(steady_clock::now() - t_sent).count() <
333 m_sample_delay) {
334 }
335 }
336
337 } while (m_repeat_mode);
338 }
339
340 std::string m_queue_name; //< Queue name to be used by the writer
341 size_t m_queue_size; //< number of position in shm queue
342 std::string m_filename; //< path to fits file being read
343 int m_sample_delay; //< delay between samples being writter (ms)
344 int m_numa; //< which numa node to provide writer
345
346 int m_print_every; //< print status every N samples
347 int m_gen_frames; //< if generation data how many sample to be generated
348 unsigned m_sample_id_increment; //< sample id increment
349 bool m_repeat_mode; //< data will loop forever with an ever increasing sample_id
350 bool m_help_only; //< if help only will not enter writing loop
351
352 std::unique_ptr<WriterType> m_writer; //< the ipcq writer
353};
354
369template <class T>
370std::vector<T>
371ReadColumnFromFits(fitsfile* fptr, const std::string& name, long nrows, bool output = false) {
372 int status = 0;
373 int col, typecode, anynul;
374 long repeat, width;
375 float nulval;
376
377 // The const_cast is a workaround for a buggy cfitsio API. The argument is never actually
378 // modified and should have been declared const.
379 fits_get_colnum(fptr, CASESEN, const_cast<char*>(name.c_str()), &col, &status);
380 if (status) {
381 fits_report_error(stderr, status);
382 throw std::runtime_error("Error getting column: " + name);
383 }
384
385 fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
386 if (status) {
387 fits_report_error(stderr, status);
388 throw std::runtime_error("Error getting coltype of:" + name);
389 }
390
391 if (output) {
392 std::cout << "name: " << name << "\n";
393 std::cout << "col: " << col << "\n";
394 std::cout << "typecode: " << typecode << "\n";
395 std::cout << "repeat: " << repeat << "\n";
396 std::cout << "width: " << width << "\n";
397 }
398
399 // load in required data
400 std::vector<T> data;
401 data.resize(repeat * nrows); // we are assuming the vector to be matrix with row major.
402 T* d = data.data();
403 fits_read_col(fptr, typecode, col, 1, 1, repeat * nrows, &nulval, d, &anynul, &status);
404 if (status) {
405 fits_report_error(stderr, status);
406 throw std::runtime_error("Error reading column: " + name);
407 }
408 return data;
409}
410
411} // namespace rtctk::standaloneTools
412
413#endif // RTCTK_STANDALONETOOLS_SHMPUB_HPP
ShmPub parent class.
Definition shmPub.hpp:79
int GetSampleDelay() const
Returns the sample delay argument set on the command line.
Definition shmPub.hpp:276
int GetNuma() const
Returns the NUMA node argument set on the command line.
Definition shmPub.hpp:281
size_t GetQueueSize() const
Returns the shared memory queue size argument set on the command line.
Definition shmPub.hpp:271
int Run()
Entry point for running the ShmPub.
Definition shmPub.hpp:171
ShmPub(int argc, char *argv[])
Definition shmPub.hpp:81
virtual std::vector< TopicType > GenData(int num_frames)=0
Generates data to be circulated.
virtual void AdjustSample(TopicType &sample) const
Adjust the contents of a data sample just before publishing to shared memory.
Definition shmPub.hpp:262
virtual std::vector< TopicType > ReadFits(std::string filename)=0
Reads in data from a FITS file.
std::string GetQueueName() const
Returns the shared memory queue name argument set on the command line.
Definition shmPub.hpp:266
bool GetRepeatMode() const
Returns the repeat mode flag set on the command line.
Definition shmPub.hpp:286
Definition genDdsPublisher.hpp:20
std::vector< T > ReadColumnFromFits(fitsfile *fptr, const std::string &name, long nrows, bool output=false)
helper function for reading columns of fits table
Definition shmPub.hpp:371
void SignalHandler(int signal)
Handles basic signals to allow simple exiting from a ShmPub process.
Definition shmPub.hpp:47