RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
acquisitor.hpp
Go to the documentation of this file.
1
13#ifndef RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
14#define RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
15
16#include <Metadaqif.hpp>
17#include <Rtctkif.hpp>
18#include <rtctk/componentFramework/events.rad.hpp>
22#include <rtctk/metadataCollector/acquisitor.rad.hpp>
23
25
26using namespace rtctk::componentFramework;
27
28template <typename Super>
29struct Runnable;
30
31template <typename Super>
32struct Loopaware;
33
38class DaqRequestRejected : public metadaqif::DaqException {
39public:
40 DaqRequestRejected(const std::string& request_id,
41 const std::string& session_id,
42 const std::string& state_id);
43};
44
48class DaqReply : public metadaqif::DaqReply {
49public:
50 DaqReply() = default;
51
52 DaqReply(const std::string& id);
53
54 std::string getId() const override;
55
56 void setId(const std::string& id) override;
57
58 bool hasKey() const override;
59
60 bool keyEquals(const metadaqif::DaqReply& other) const override;
61
62 std::unique_ptr<metadaqif::DaqReply> clone() const;
63
64 std::unique_ptr<metadaqif::DaqReply> cloneKey() const;
65
66private:
67 std::string m_id;
68};
69
73class DaqStopReply : public metadaqif::DaqStopReply {
74public:
75 DaqStopReply() = default;
76
77 DaqStopReply(const std::string& id,
78 const std::vector<std::string>& files,
79 const std::string& kw);
80
81 std::string getId() const override;
82
83 void setId(const std::string& id) override;
84
85 std::string getKeywords() const override;
86
87 void setKeywords(const std::string& kw) override;
88
89 std::vector<std::string> getFiles() const override;
90
91 void setFiles(const std::vector<std::string>& fs) override;
92
93 bool hasKey() const override;
94
95 bool keyEquals(const metadaqif::DaqStopReply& other) const override;
96
97 std::unique_ptr<metadaqif::DaqStopReply> clone() const;
98
99 std::unique_ptr<metadaqif::DaqStopReply> cloneKey() const;
100
101private:
102 std::string m_id;
103 std::vector<std::string> m_files;
104 std::string m_kw;
105};
106
110class DaqStatus : public metadaqif::DaqStatus {
111public:
112 DaqStatus() = default;
113
114 std::string getId() const override;
115
116 void setId(const std::string& id) override;
117
118 std::string getKeywords() const override;
119
120 void setKeywords(const std::string& kw) override;
121
122 std::string getMessage() const override;
123
124 void setMessage(const std::string& msg) override;
125
126 double getTimestamp() const override;
127
128 void setTimestamp(double ts) override;
129
130 metadaqif::DaqState getState() const override;
131
132 void setState(metadaqif::DaqState st) override;
133
134 std::vector<std::string> getFiles() const override;
135
136 void setFiles(const std::vector<std::string>& fs) override;
137
138 bool hasKey() const override;
139
140 bool keyEquals(const metadaqif::DaqStatus& other) const override;
141
142 std::unique_ptr<metadaqif::DaqStatus> clone() const;
143
144 std::unique_ptr<metadaqif::DaqStatus> cloneKey() const;
145
146private:
147 std::string m_id;
148 std::string m_msg;
149 double m_ts;
150 metadaqif::DaqState m_st;
151 std::vector<std::string> m_files;
152 std::string m_kw;
153};
154
160class AcquisitorCmdsImpl : public metadaqif::AsyncMetaDaq {
161public:
163
165
166 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>>
167 StartDaq(const std::string& id) override;
168
169 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStopReply>>
170 StopDaq(const std::string& id) override;
171
172 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>>
173 AbortDaq(const std::string& id) override;
174
175 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStatus>>
176 GetDaqStatus(const std::string& id) override;
177
178private:
179 log4cplus::Logger& m_logger;
180 StateMachineEngine& m_engine;
181};
182
189template <typename Super>
190struct Acquisitor : Super {
191 static_assert(std::is_base_of_v<RtcComponent, Super>, "'Acquisitor' requires 'RtcComponent'");
192 static_assert(not is_base_of_template_v<Runnable, Super>, "'Acquisitor' excludes 'Runnable'");
193 static_assert(not is_base_of_template_v<Loopaware, Super>, "'Acquisitor' excludes 'Loopaware'");
194
195 class BizLogicIf : public Super::BizLogicIf {
196 public:
197 virtual void ActivityPreparing(StopToken st, const std::string& id) {};
200 return {};
201 };
202 virtual void ActivityAborting(StopToken st) {};
204 };
205
206 class InputStage : public Super::InputStage {
207 public:
208 using Super::InputStage::InputStage;
209
210 void Start() override {
211 Super::InputStage::Start();
212
213 AcquisitorCmdsImpl::Register(this->m_replier, this->m_engine);
214 }
215 };
216
217 class OutputStage : public Super::OutputStage {
218 public:
220 // Handlers ###################################################################
221
222 engine.RegisterRejectHandler(
223 events::StartDaq::ID, [](const rad::AnyEvent& event, const std::string& state) {
224 if (auto req = rad::GetPayloadNothrow<events::StartDaq>(event); req) {
225 const auto& id = req->GetRequestPayload();
226 req->SetException(DaqRequestRejected{events::StartDaq::ID, id, state});
227 }
228 });
229
230 engine.RegisterRejectHandler(
231 events::StopDaq::ID, [](const rad::AnyEvent& event, const std::string& state) {
232 if (auto req = rad::GetPayloadNothrow<events::StopDaq>(event); req) {
233 const auto& id = req->GetRequestPayload();
234 req->SetException(DaqRequestRejected{events::StopDaq::ID, id, state});
235 }
236 });
237
238 engine.RegisterRejectHandler(
239 events::AbortDaq::ID, [](const rad::AnyEvent& event, const std::string& state) {
240 if (auto req = rad::GetPayloadNothrow<events::AbortDaq>(event); req) {
241 const auto& id = req->GetRequestPayload();
242 req->SetException(DaqRequestRejected{events::AbortDaq::ID, id, state});
243 }
244 });
245
246 engine.RegisterRejectHandler(
247 events::GetDaqStatus::ID, [](const rad::AnyEvent& event, const std::string& state) {
248 if (auto req = rad::GetPayloadNothrow<events::GetDaqStatus>(event); req) {
249 const auto& id = req->GetRequestPayload();
250 req->SetException(DaqRequestRejected{events::GetDaqStatus::ID, id, state});
251 }
252 });
253
254 // No Disable in states ###############################################################
255
256 this->m_no_disable_in_states.push_back("On::Operational::Preparing");
257 this->m_no_disable_in_states.push_back("On::Operational::Acquiring");
258 this->m_no_disable_in_states.push_back("On::Operational::Finalising");
259 this->m_no_disable_in_states.push_back("On::Operational::Aborting");
260 this->m_no_disable_in_states.push_back("On::Operational::Recovering");
261 this->m_no_disable_in_states.push_back("On::Operational::Error");
262
263 // No Update in states ################################################################
264
265 this->m_no_update_in_states.push_back("On::Operational::Preparing");
266 this->m_no_update_in_states.push_back("On::Operational::Acquiring");
267 this->m_no_update_in_states.push_back("On::Operational::Finalising");
268 this->m_no_update_in_states.push_back("On::Operational::Aborting");
269 this->m_no_update_in_states.push_back("On::Operational::Recovering");
270 this->m_no_update_in_states.push_back("On::Operational::Error");
271
272 // ClearHistory #####################################################################
273
274 engine.RegisterAction("ActionClearHistory",
275 [this](auto c) { m_acquisition_history.clear(); });
276
277 // StartDaq #####################################################################
278
279 engine.RegisterGuard("GuardPreparingAllowed", [this](auto c) {
281 auto id = req->GetRequestPayload();
282// #define FEATURE_REQUIRE_UNIQUE_ID_FOR_NEW_SESSION
283#ifdef FEATURE_REQUIRE_UNIQUE_ID_FOR_NEW_SESSION
284 return m_acquisition_history.count(id) == 0;
285#else
286 return true;
287#endif
288 });
289
290 engine.RegisterAction("ActionPreparingEntry", [this](auto c) {
291 m_tmp_start_request = GetPayloadNothrow<events::StartDaq>(c);
292 m_active_id = m_tmp_start_request->GetRequestPayload();
293 std::string id = m_active_id.value();
294
295 DaqStatus status;
296 status.setId(id);
297 status.setState(metadaqif::Acquiring);
298 status.setTimestamp(GetCurrentTime());
299 m_acquisition_history[id] = status;
300 });
301
302 engine.RegisterAction("ActionPreparingDone", [this](auto c) {
303 if (m_tmp_start_request) {
304 std::string id = m_active_id.value();
305 m_tmp_start_request->SetReplyValue(std::make_shared<DaqReply>(id));
306 m_tmp_start_request = nullptr;
307 }
308 });
309
310 engine.RegisterAction("ActionPreparingFailed", [this](auto c) {
311 if (m_tmp_start_request) {
312 std::string id = m_active_id.value();
314 std::string msg = NestedExceptionPrinter(std::move(*eptr)).Str();
315 m_acquisition_history.at(id).setState(metadaqif::Failed);
316 m_acquisition_history.at(id).setTimestamp(GetCurrentTime());
317 m_acquisition_history.at(id).setMessage(msg);
318 m_tmp_start_request->SetException(metadaqif::DaqException(id, msg));
319 m_tmp_start_request = nullptr;
320 }
321 });
322
323 // StopDaq #####################################################################
324
325 engine.RegisterGuard("GuardFinalisingAllowed", [this](auto c) {
327 auto id = req->GetRequestPayload();
328 return m_active_id.value() == id;
329 });
330
331 engine.RegisterAction("ActionFinalisingEntry", [this](auto c) {
332 m_tmp_stop_request = GetPayloadNothrow<events::StopDaq>(c);
333 });
334
335 engine.RegisterAction("ActionFinalisingDone", [this](auto c) {
336 if (m_tmp_stop_request) {
337 std::string id = m_active_id.value();
338 m_acquisition_history.at(id).setState(metadaqif::Succeeded);
339 m_acquisition_history.at(id).setTimestamp(GetCurrentTime());
340
341 m_tmp_stop_request->SetReplyValue(
342 std::make_shared<DaqStopReply>(m_acquisition_history.at(id).getId(),
343 m_acquisition_history.at(id).getFiles(),
344 m_acquisition_history.at(id).getKeywords()));
345 m_tmp_stop_request = nullptr;
346 }
347 });
348
349 engine.RegisterAction("ActionFinalisingFailed", [this](auto c) {
350 if (m_tmp_stop_request) {
351 std::string id = m_active_id.value();
353 std::string msg = NestedExceptionPrinter(std::move(*eptr)).Str();
354 m_acquisition_history.at(id).setState(metadaqif::Failed);
355 m_acquisition_history.at(id).setTimestamp(GetCurrentTime());
356 m_acquisition_history.at(id).setMessage(msg);
357 m_tmp_stop_request->SetException(metadaqif::DaqException(id, msg));
358 m_tmp_stop_request = nullptr;
359 }
360 });
361
362 // AbortDaq #####################################################################
363
364 engine.RegisterGuard("GuardAbortingAllowed", [this](auto c) {
366 auto id = req->GetRequestPayload();
367 return m_active_id.value() == id;
368 });
369
370 engine.RegisterAction("ActionAbortingEntry", [this](auto c) {
371 m_tmp_abort_request = GetPayloadNothrow<events::AbortDaq>(c);
372 });
373
374 engine.RegisterAction("ActionAbortingDone", [this](auto c) {
375 if (m_tmp_abort_request) {
376 std::string id = this->m_active_id.value();
377 m_acquisition_history.at(id).setState(metadaqif::Aborted);
378 m_acquisition_history.at(id).setTimestamp(GetCurrentTime());
379 m_tmp_abort_request->SetReplyValue(std::make_shared<DaqReply>(id));
380 m_tmp_abort_request = nullptr;
381 }
382 });
383
384 // GetDaqStatus #####################################################################
385
386 engine.RegisterAction("ActionGetDaqStatus", [this](auto c) {
388 auto id = request->GetRequestPayload();
389
390 if (m_acquisition_history.count(id)) {
391 request->SetReplyValue(std::move(m_acquisition_history.at(id).clone()));
392 } else {
393 auto status = std::make_shared<DaqStatus>();
394 status->setId(id);
395 status->setState(metadaqif::NotStarted);
396 status->setTimestamp(0);
397 request->SetReplyValue(status);
398 }
399 });
400
401 // Recover #####################################################################
402
403 engine.RegisterAction("ActionRecoveringEntry", [this](auto c) {
404 std::string id = m_active_id.value();
406 std::string msg = NestedExceptionPrinter(std::move(*eptr)).Str();
407 m_acquisition_history.at(id).setState(metadaqif::Failed);
408 m_acquisition_history.at(id).setTimestamp(GetCurrentTime());
409 m_acquisition_history.at(id).setMessage(msg);
410 });
411
412 // Activities #####################################################################
413
414 engine.RegisterActivity(
415 "ActivityPreparing",
416 [this](StopToken stop_token) {
417 std::string id = m_active_id.value();
418 static_cast<BizLogicIf&>(this->m_logic).ActivityPreparing(stop_token, id);
419 },
420 this->m_success_handler,
421 this->m_error_handler);
422
423 engine.RegisterActivity(
424 "ActivityFinalising",
425 [this](StopToken stop_token) {
426 std::string id = m_active_id.value();
427 auto result =
428 static_cast<BizLogicIf&>(this->m_logic).ActivityFinalising(stop_token);
429 m_acquisition_history.at(id).setId(result.getId());
430 m_acquisition_history.at(id).setFiles(result.getFiles());
431 m_acquisition_history.at(id).setKeywords(result.getKeywords());
432 },
433 this->m_success_handler,
434 this->m_error_handler);
435
436 engine.RegisterActivity(
437 "ActivityAcquiring",
438 [this](StopToken stop_token) {
439 static_cast<BizLogicIf&>(this->m_logic).ActivityAcquiring(stop_token);
440 },
441 nullptr,
442 this->m_error_handler);
443
444 engine.RegisterActivity(
445 "ActivityAborting",
446 [this](StopToken stop_token) {
447 static_cast<BizLogicIf&>(this->m_logic).ActivityAborting(stop_token);
448 },
449 this->m_success_handler,
450 this->m_error_handler);
451
452 engine.RegisterActivity(
453 "ActivityRecovering",
454 [this](StopToken stop_token) {
455 static_cast<BizLogicIf&>(this->m_logic).ActivityRecovering(stop_token);
456 },
457 this->m_success_handler,
458 this->m_error_handler);
459 }
460
461 private:
462 double GetCurrentTime() {
463 using namespace std::chrono;
464 auto t = high_resolution_clock::now();
465 return duration_cast<nanoseconds>(t.time_since_epoch()).count();
466 }
467
468 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
469 m_tmp_start_request;
470 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqStopReply>, std::string>>
471 m_tmp_stop_request;
472 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
473 m_tmp_abort_request;
474 std::map<std::string, DaqStatus> m_acquisition_history;
475
476 std::optional<std::string> m_active_id;
477 };
478
479 struct ModelBuilder : public Super::ModelBuilder {
480 public:
482 // clang-format off
483 this->mm.ModStateType("On::Operational", Parallel);
484
485 const std::string parent_region = "On::Operational:";
486
487 this->mm.AddState(Composite, parent_region, "On::Operational");
488 this->mm.AddState(Initial, "On::Operational::Initial", parent_region);
489 this->mm.AddState(Simple, "On::Operational::Idle", parent_region);
490 this->mm.AddState(Simple, "On::Operational::Acquiring", parent_region, "ActivityAcquiring");
491 this->mm.AddState(Simple, "On::Operational::Recovering", parent_region, "ActivityRecovering", "ActionRecoveringEntry");
492 this->mm.AddState(Simple, "On::Operational::Preparing", parent_region, "ActivityPreparing", "ActionPreparingEntry");
493 this->mm.AddState(Simple, "On::Operational::Finalising", parent_region, "ActivityFinalising", "ActionFinalisingEntry");
494 this->mm.AddState(Simple, "On::Operational::Aborting", parent_region, "ActivityAborting", "ActionAbortingEntry");
495
496 this->mm.AddTrans("On::Operational::Initial", "On::Operational::Idle", "", "", "ActionClearHistory");
497 this->mm.AddTrans(parent_region, "", "events.GetDaqStatus", "", "ActionGetDaqStatus");
498 this->mm.AddTrans("On::Operational::Idle", "On::Operational::Preparing", "events.StartDaq", "GuardPreparingAllowed");
499 this->mm.AddTrans("On::Operational::Preparing", "On::Operational::Acquiring", "events.Done", "", "ActionPreparingDone");
500 this->mm.AddTrans("On::Operational::Preparing", "On::Operational::Idle", "events.Error", "", "ActionPreparingFailed");
501 this->mm.AddTrans("On::Operational::Acquiring", "On::Operational::Finalising", "events.StopDaq", "GuardFinalisingAllowed");
502 this->mm.AddTrans("On::Operational::Finalising", "On::Operational::Idle", "events.Done", "", "ActionFinalisingDone");
503 this->mm.AddTrans("On::Operational::Finalising", "On::Operational::Idle", "events.Error", "", "ActionFinalisingFailed");
504 this->mm.AddTrans("On::Operational::Acquiring", "On::Operational::Aborting", "events.AbortDaq", "GuardAbortingAllowed");
505 this->mm.AddTrans("On::Operational::Aborting", "On::Operational::Idle", "events.Done", "", "ActionAbortingDone");
506 this->mm.AddTrans("On::Operational::Acquiring", "On::Operational::Recovering", "events.Error");
507 this->mm.AddTrans("On::Operational::Recovering", "On::Operational::Idle", "events.Done");
508 // clang-format on
509 }
510 };
511};
512
513} // namespace rtctk::metadataCollector
514
515#endif // RTCTK_COMPONENTFRAMEWORK_ACQUISITOR_HPP
Class that handles reception of commands using MAL.
Definition commandReplier.hpp:30
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition exceptions.hpp:159
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition exceptions.hpp:177
Definition stateMachineEngine.hpp:35
Class that handles reception of metadaqif commands using MAL.
Definition acquisitor.hpp:160
::elt::mal::future< std::shared_ptr< metadaqif::DaqStopReply > > StopDaq(const std::string &id) override
Definition acquisitor.cpp:198
::elt::mal::future< std::shared_ptr< metadaqif::DaqReply > > StartDaq(const std::string &id) override
Definition acquisitor.cpp:189
::elt::mal::future< std::shared_ptr< metadaqif::DaqStatus > > GetDaqStatus(const std::string &id) override
Definition acquisitor.cpp:216
::elt::mal::future< std::shared_ptr< metadaqif::DaqReply > > AbortDaq(const std::string &id) override
Definition acquisitor.cpp:207
static void Register(CommandReplier &replier, StateMachineEngine &engine)
Definition acquisitor.cpp:178
AcquisitorCmdsImpl(StateMachineEngine &engine)
Definition acquisitor.cpp:184
virtual void ActivityAborting(StopToken st)
Definition acquisitor.hpp:202
virtual void ActivityRecovering(StopToken st)
Definition acquisitor.hpp:203
virtual DaqStopReply ActivityFinalising(StopToken st)
Definition acquisitor.hpp:199
virtual void ActivityPreparing(StopToken st, const std::string &id)
Definition acquisitor.hpp:197
virtual void ActivityAcquiring(StopToken st)
Definition acquisitor.hpp:198
void Start() override
Definition acquisitor.hpp:210
OutputStage(StateMachineEngine &engine, BizLogicIf &bl)
Definition acquisitor.hpp:219
Definition acquisitor.hpp:48
bool hasKey() const override
Definition acquisitor.cpp:39
bool keyEquals(const metadaqif::DaqReply &other) const override
Definition acquisitor.cpp:43
void setId(const std::string &id) override
Definition acquisitor.cpp:35
std::unique_ptr< metadaqif::DaqReply > cloneKey() const
Definition acquisitor.cpp:51
std::string getId() const override
Definition acquisitor.cpp:31
std::unique_ptr< metadaqif::DaqReply > clone() const
Definition acquisitor.cpp:47
Thrown if command is not allowed in current state or guard.
Definition acquisitor.hpp:38
DaqRequestRejected(const std::string &request_id, const std::string &session_id, const std::string &state_id)
Definition acquisitor.cpp:19
Definition acquisitor.hpp:110
bool hasKey() const override
Definition acquisitor.cpp:153
std::unique_ptr< metadaqif::DaqStatus > clone() const
Definition acquisitor.cpp:161
void setState(metadaqif::DaqState st) override
Definition acquisitor.cpp:141
void setMessage(const std::string &msg) override
Definition acquisitor.cpp:125
void setTimestamp(double ts) override
Definition acquisitor.cpp:133
std::string getMessage() const override
Definition acquisitor.cpp:121
void setId(const std::string &id) override
Definition acquisitor.cpp:109
bool keyEquals(const metadaqif::DaqStatus &other) const override
Definition acquisitor.cpp:157
void setFiles(const std::vector< std::string > &fs) override
Definition acquisitor.cpp:149
std::vector< std::string > getFiles() const override
Definition acquisitor.cpp:145
std::string getKeywords() const override
Definition acquisitor.cpp:113
void setKeywords(const std::string &kw) override
Definition acquisitor.cpp:117
std::unique_ptr< metadaqif::DaqStatus > cloneKey() const
Definition acquisitor.cpp:172
metadaqif::DaqState getState() const override
Definition acquisitor.cpp:137
double getTimestamp() const override
Definition acquisitor.cpp:129
std::string getId() const override
Definition acquisitor.cpp:105
Definition acquisitor.hpp:73
void setFiles(const std::vector< std::string > &fs) override
Definition acquisitor.cpp:83
bool keyEquals(const metadaqif::DaqStopReply &other) const override
Definition acquisitor.cpp:91
std::unique_ptr< metadaqif::DaqStopReply > clone() const
Definition acquisitor.cpp:95
std::string getKeywords() const override
Definition acquisitor.cpp:71
std::string getId() const override
Definition acquisitor.cpp:63
void setId(const std::string &id) override
Definition acquisitor.cpp:67
bool hasKey() const override
Definition acquisitor.cpp:87
std::unique_ptr< metadaqif::DaqStopReply > cloneKey() const
Definition acquisitor.cpp:99
std::vector< std::string > getFiles() const override
Definition acquisitor.cpp:79
void setKeywords(const std::string &kw) override
Definition acquisitor.cpp:75
Provides macros and utilities for exception handling.
Definition commandReplier.cpp:22
@ Simple
Definition model.hpp:23
@ Composite
Definition model.hpp:23
@ Parallel
Definition model.hpp:23
@ Initial
Definition model.hpp:23
rad::StopToken StopToken
Definition stopToken.hpp:20
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Definition acquisitor.cpp:15
Lifecycle of a basic 'RtcComponent'.
A simple Stop Token.
Life cycle extension to make RtcComponent Loopaware.
Definition loopaware.hpp:32
Life cycle extension to make RtcComponent Runnable.
Definition runnable.hpp:31
ModelBuilder(StateMachineEngine &engine)
Definition acquisitor.hpp:481
Life cycle extension for Acquisitor RtcComponent.
Definition acquisitor.hpp:190