12#ifndef RTCTK_STANDALONETOOLS_SHMPUB_HPP
13#define RTCTK_STANDALONETOOLS_SHMPUB_HPP
16#include <boost/program_options.hpp>
19#include <cfitsio/fitsio.h>
22#include <numapp/mempolicy.hpp>
23#include <numapp/numapolicies.hpp>
24#include <numapp/thread.hpp>
27#include <ipcq/writer.hpp>
40static bool g_stop =
false;
48 std::cout <<
"\nSignal to exit received\n";
78template <
class TopicType,
class WriterType = ipcq::Writer<TopicType>>
87 , m_sample_id_increment(1)
88 , m_repeat_mode(false)
89 , m_help_only(false) {
90 using namespace boost::program_options;
93 options_description desc(
"Allowed options");
96 (
"help,h",
"produce help message")
98 value<std::string>(&m_filename)->default_value(
""),
99 "fits input file: if not provided the app will generate data")
101 value<std::string>(&m_queue_name)->default_value(
"default_shm_queue"),
104 value<size_t>(&m_queue_size)->default_value(1000),
107 value<int>(&m_sample_delay)->default_value(10),
108 "inter-sample delay in ms")
109 (
"numa-node,n", value<int>(&m_numa),
"numa node for shm queue")
111 value<int>(&m_print_every)->default_value(0),
112 "when to print to screen the number of sample written")
114 value<int>(&m_gen_frames)->default_value(100),
115 "Number of frames to generate")
116 (
"sample-id-increment,i",
117 value<unsigned>(&m_sample_id_increment)->default_value(1),
118 "sample_id increment")
120 bool_switch(&m_repeat_mode),
121 "Repeat output when all samples are written");
125 store(command_line_parser(argc, argv).options(desc).run(), vm);
128 if (vm.count(
"help")) {
130 std::cout << desc <<
"\n";
133 std::cout <<
"fits-file: " << m_filename <<
"\n";
134 std::cout <<
"queue-name: " << m_queue_name <<
"\n";
135 std::cout <<
"queue-size: " << m_queue_size <<
"\n";
136 std::cout <<
"sample-delay: " << m_sample_delay <<
"\n";
137 if (vm.count(
"numa-node")) {
138 std::cout <<
"numa-node: " << m_numa <<
"\n";
140 std::cout <<
"print-every: " << m_print_every <<
"\n";
141 std::cout <<
"gen-frames: " << m_gen_frames <<
"\n";
142 std::cout <<
"sample-id-increment: " << m_sample_id_increment <<
"\n";
143 std::cout <<
"repeat-mode: " << m_repeat_mode <<
"\n";
145 if (vm.count(
"numa-node")) {
147 std::make_unique<WriterType>(m_queue_name.c_str(),
149 numapp::MemPolicy::MakeBindNode(m_numa));
151 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(), m_queue_size);
154 }
catch (
const std::exception& e) {
155 std::cerr <<
"Exception:" << e.what() <<
"\n";
181 std::vector<TopicType> data;
186 if (not m_filename.empty()) {
187 std::cout <<
"Reading data from FITS file: " << m_filename <<
"\n";
190 std::cout <<
"Generating data\n";
196 throw std::runtime_error(
"Data vector is not populated so will exit");
200 std::cout <<
"Writing data to shared memory queue\n";
203 }
catch (
const std::exception& e) {
204 std::cout << e.what() <<
"\n";
211 std::this_thread::sleep_for(std::chrono::seconds(2));
231 virtual std::vector<TopicType>
ReadFits(std::string filename) = 0;
247 virtual std::vector<TopicType>
GenData(
int num_frames) = 0;
277 return m_sample_delay;
287 return m_repeat_mode;
302 void WriteToShm(std::vector<TopicType>& data) {
303 using namespace std::chrono;
305 size_t n_written = 0;
306 auto t_sent = steady_clock::now();
307 auto t_last = t_sent;
309 for (
auto& sample : data) {
315 sample.sample_id = n_written * m_sample_id_increment;
317 t_sent = steady_clock::now();
318 std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
320 throw std::runtime_error(
"Error writing to shm: " + err.message());
324 if (n_written && m_print_every && (n_written % m_print_every == 0)) {
325 auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
326 std::cout <<
"Samples written: " << n_written <<
"\n";
327 std::cout <<
"Total time to write " << m_print_every <<
" : " << dur <<
" ms\n";
328 std::cout <<
"Average frame time: " <<
static_cast<float>(dur) / m_print_every
332 while (duration_cast<milliseconds>(steady_clock::now() - t_sent).count() <
337 }
while (m_repeat_mode);
340 std::string m_queue_name;
342 std::string m_filename;
348 unsigned m_sample_id_increment;
352 std::unique_ptr<WriterType> m_writer;
373 int col, typecode, anynul;
379 fits_get_colnum(fptr, CASESEN,
const_cast<char*
>(name.c_str()), &col, &status);
381 fits_report_error(stderr, status);
382 throw std::runtime_error(
"Error getting column: " + name);
385 fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
387 fits_report_error(stderr, status);
388 throw std::runtime_error(
"Error getting coltype of:" + name);
392 std::cout <<
"name: " << name <<
"\n";
393 std::cout <<
"col: " << col <<
"\n";
394 std::cout <<
"typecode: " << typecode <<
"\n";
395 std::cout <<
"repeat: " << repeat <<
"\n";
396 std::cout <<
"width: " << width <<
"\n";
401 data.resize(repeat * nrows);
403 fits_read_col(fptr, typecode, col, 1, 1, repeat * nrows, &nulval, d, &anynul, &status);
405 fits_report_error(stderr, status);
406 throw std::runtime_error(
"Error reading column: " + name);