13#ifndef RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
14#define RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
16#include <Metadaqif.hpp>
18#include <rtctk/componentFramework/events.rad.hpp>
22#include <rtctk/metadataCollector/acquisitor.rad.hpp>
28template <
typename Super>
31template <
typename Super>
54 std::string
getId()
const override;
56 void setId(
const std::string&
id)
override;
58 bool hasKey()
const override;
62 std::unique_ptr<metadaqif::DaqReply>
clone()
const;
64 std::unique_ptr<metadaqif::DaqReply>
cloneKey()
const;
78 const std::vector<std::string>&
files,
79 const std::string&
kw);
81 std::string
getId()
const override;
83 void setId(
const std::string&
id)
override;
89 std::vector<std::string>
getFiles()
const override;
91 void setFiles(
const std::vector<std::string>&
fs)
override;
93 bool hasKey()
const override;
95 bool keyEquals(
const metadaqif::DaqStopReply&
other)
const override;
97 std::unique_ptr<metadaqif::DaqStopReply>
clone()
const;
99 std::unique_ptr<metadaqif::DaqStopReply>
cloneKey()
const;
103 std::vector<std::string> m_files;
114 std::string
getId()
const override;
116 void setId(
const std::string&
id)
override;
130 metadaqif::DaqState
getState()
const override;
132 void setState(metadaqif::DaqState
st)
override;
134 std::vector<std::string>
getFiles()
const override;
136 void setFiles(
const std::vector<std::string>&
fs)
override;
138 bool hasKey()
const override;
142 std::unique_ptr<metadaqif::DaqStatus>
clone()
const;
144 std::unique_ptr<metadaqif::DaqStatus>
cloneKey()
const;
150 metadaqif::DaqState m_st;
151 std::vector<std::string> m_files;
166 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>>
167 StartDaq(
const std::string&
id)
override;
169 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStopReply>>
170 StopDaq(
const std::string&
id)
override;
172 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>>
173 AbortDaq(
const std::string&
id)
override;
175 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStatus>>
179 log4cplus::Logger& m_logger;
189template <
typename Super>
191 static_assert(std::is_base_of_v<RtcComponent, Super>,
"'Acquisitor' requires 'RtcComponent'");
208 using Super::InputStage::InputStage;
211 Super::InputStage::Start();
222 engine.RegisterRejectHandler(
223 events::StartDaq::ID, [](
const rad::AnyEvent&
event,
const std::string& state) {
224 if (
auto req = rad::GetPayloadNothrow<events::StartDaq>(
event); req) {
225 const auto&
id = req->GetRequestPayload();
230 engine.RegisterRejectHandler(
231 events::StopDaq::ID, [](
const rad::AnyEvent&
event,
const std::string& state) {
232 if (
auto req = rad::GetPayloadNothrow<events::StopDaq>(
event); req) {
233 const auto&
id = req->GetRequestPayload();
238 engine.RegisterRejectHandler(
239 events::AbortDaq::ID, [](
const rad::AnyEvent&
event,
const std::string& state) {
240 if (
auto req = rad::GetPayloadNothrow<events::AbortDaq>(
event); req) {
241 const auto&
id = req->GetRequestPayload();
246 engine.RegisterRejectHandler(
247 events::GetDaqStatus::ID, [](
const rad::AnyEvent&
event,
const std::string& state) {
248 if (
auto req = rad::GetPayloadNothrow<events::GetDaqStatus>(
event); req) {
249 const auto&
id = req->GetRequestPayload();
256 this->m_no_disable_in_states.push_back(
"On::Operational::Preparing");
257 this->m_no_disable_in_states.push_back(
"On::Operational::Acquiring");
258 this->m_no_disable_in_states.push_back(
"On::Operational::Finalising");
259 this->m_no_disable_in_states.push_back(
"On::Operational::Aborting");
260 this->m_no_disable_in_states.push_back(
"On::Operational::Recovering");
261 this->m_no_disable_in_states.push_back(
"On::Operational::Error");
265 this->m_no_update_in_states.push_back(
"On::Operational::Preparing");
266 this->m_no_update_in_states.push_back(
"On::Operational::Acquiring");
267 this->m_no_update_in_states.push_back(
"On::Operational::Finalising");
268 this->m_no_update_in_states.push_back(
"On::Operational::Aborting");
269 this->m_no_update_in_states.push_back(
"On::Operational::Recovering");
270 this->m_no_update_in_states.push_back(
"On::Operational::Error");
274 engine.RegisterAction(
"ActionClearHistory",
275 [
this](
auto c) { m_acquisition_history.clear(); });
279 engine.RegisterGuard(
"GuardPreparingAllowed", [
this](
auto c) {
281 auto id = req->GetRequestPayload();
283#ifdef FEATURE_REQUIRE_UNIQUE_ID_FOR_NEW_SESSION
284 return m_acquisition_history.count(
id) == 0;
290 engine.RegisterAction(
"ActionPreparingEntry", [
this](
auto c) {
292 m_active_id = m_tmp_start_request->GetRequestPayload();
293 std::string
id = m_active_id.value();
297 status.
setState(metadaqif::Acquiring);
299 m_acquisition_history[id] = status;
302 engine.RegisterAction(
"ActionPreparingDone", [
this](
auto c) {
303 if (m_tmp_start_request) {
304 std::string
id = m_active_id.value();
305 m_tmp_start_request->SetReplyValue(std::make_shared<DaqReply>(
id));
306 m_tmp_start_request =
nullptr;
310 engine.RegisterAction(
"ActionPreparingFailed", [
this](
auto c) {
311 if (m_tmp_start_request) {
312 std::string
id = m_active_id.value();
315 m_acquisition_history.at(
id).setState(metadaqif::Failed);
316 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
317 m_acquisition_history.at(
id).setMessage(
msg);
318 m_tmp_start_request->SetException(metadaqif::DaqException(
id,
msg));
319 m_tmp_start_request =
nullptr;
325 engine.RegisterGuard(
"GuardFinalisingAllowed", [
this](
auto c) {
327 auto id = req->GetRequestPayload();
328 return m_active_id.value() == id;
331 engine.RegisterAction(
"ActionFinalisingEntry", [
this](
auto c) {
335 engine.RegisterAction(
"ActionFinalisingDone", [
this](
auto c) {
336 if (m_tmp_stop_request) {
337 std::string
id = m_active_id.value();
338 m_acquisition_history.at(
id).setState(metadaqif::Succeeded);
339 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
341 m_tmp_stop_request->SetReplyValue(
342 std::make_shared<DaqStopReply>(m_acquisition_history.at(
id).getId(),
343 m_acquisition_history.at(
id).getFiles(),
344 m_acquisition_history.at(
id).getKeywords()));
345 m_tmp_stop_request =
nullptr;
349 engine.RegisterAction(
"ActionFinalisingFailed", [
this](
auto c) {
350 if (m_tmp_stop_request) {
351 std::string
id = m_active_id.value();
354 m_acquisition_history.at(
id).setState(metadaqif::Failed);
355 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
356 m_acquisition_history.at(
id).setMessage(
msg);
357 m_tmp_stop_request->SetException(metadaqif::DaqException(
id,
msg));
358 m_tmp_stop_request =
nullptr;
364 engine.RegisterGuard(
"GuardAbortingAllowed", [
this](
auto c) {
366 auto id = req->GetRequestPayload();
367 return m_active_id.value() == id;
370 engine.RegisterAction(
"ActionAbortingEntry", [
this](
auto c) {
374 engine.RegisterAction(
"ActionAbortingDone", [
this](
auto c) {
375 if (m_tmp_abort_request) {
376 std::string
id = this->m_active_id.value();
377 m_acquisition_history.at(
id).setState(metadaqif::Aborted);
378 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
379 m_tmp_abort_request->SetReplyValue(std::make_shared<DaqReply>(
id));
380 m_tmp_abort_request =
nullptr;
386 engine.RegisterAction(
"ActionGetDaqStatus", [
this](
auto c) {
388 auto id =
request->GetRequestPayload();
390 if (m_acquisition_history.count(
id)) {
391 request->SetReplyValue(std::move(m_acquisition_history.at(
id).clone()));
393 auto status = std::make_shared<DaqStatus>();
395 status->setState(metadaqif::NotStarted);
396 status->setTimestamp(0);
397 request->SetReplyValue(status);
403 engine.RegisterAction(
"ActionRecoveringEntry", [
this](
auto c) {
404 std::string
id = m_active_id.value();
407 m_acquisition_history.at(
id).setState(metadaqif::Failed);
408 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
409 m_acquisition_history.at(
id).setMessage(
msg);
417 std::string
id = m_active_id.value();
420 this->m_success_handler,
421 this->m_error_handler);
424 "ActivityFinalising",
426 std::string
id = m_active_id.value();
429 m_acquisition_history.at(
id).setId(result.getId());
430 m_acquisition_history.at(
id).setFiles(result.getFiles());
431 m_acquisition_history.at(
id).setKeywords(result.getKeywords());
433 this->m_success_handler,
434 this->m_error_handler);
442 this->m_error_handler);
449 this->m_success_handler,
450 this->m_error_handler);
453 "ActivityRecovering",
457 this->m_success_handler,
458 this->m_error_handler);
462 double GetCurrentTime() {
463 using namespace std::chrono;
464 auto t = high_resolution_clock::now();
468 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
470 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqStopReply>, std::string>>
472 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
474 std::map<std::string, DaqStatus> m_acquisition_history;
476 std::optional<std::string> m_active_id;
483 this->mm.ModStateType(
"On::Operational",
Parallel);
490 this->mm.AddState(
Simple,
"On::Operational::Acquiring",
parent_region,
"ActivityAcquiring");
491 this->mm.AddState(
Simple,
"On::Operational::Recovering",
parent_region,
"ActivityRecovering",
"ActionRecoveringEntry");
492 this->mm.AddState(
Simple,
"On::Operational::Preparing",
parent_region,
"ActivityPreparing",
"ActionPreparingEntry");
493 this->mm.AddState(
Simple,
"On::Operational::Finalising",
parent_region,
"ActivityFinalising",
"ActionFinalisingEntry");
494 this->mm.AddState(
Simple,
"On::Operational::Aborting",
parent_region,
"ActivityAborting",
"ActionAbortingEntry");
496 this->mm.AddTrans(
"On::Operational::Initial",
"On::Operational::Idle",
"",
"",
"ActionClearHistory");
497 this->mm.AddTrans(
parent_region,
"",
"events.GetDaqStatus",
"",
"ActionGetDaqStatus");
498 this->mm.AddTrans(
"On::Operational::Idle",
"On::Operational::Preparing",
"events.StartDaq",
"GuardPreparingAllowed");
499 this->mm.AddTrans(
"On::Operational::Preparing",
"On::Operational::Acquiring",
"events.Done",
"",
"ActionPreparingDone");
500 this->mm.AddTrans(
"On::Operational::Preparing",
"On::Operational::Idle",
"events.Error",
"",
"ActionPreparingFailed");
501 this->mm.AddTrans(
"On::Operational::Acquiring",
"On::Operational::Finalising",
"events.StopDaq",
"GuardFinalisingAllowed");
502 this->mm.AddTrans(
"On::Operational::Finalising",
"On::Operational::Idle",
"events.Done",
"",
"ActionFinalisingDone");
503 this->mm.AddTrans(
"On::Operational::Finalising",
"On::Operational::Idle",
"events.Error",
"",
"ActionFinalisingFailed");
504 this->mm.AddTrans(
"On::Operational::Acquiring",
"On::Operational::Aborting",
"events.AbortDaq",
"GuardAbortingAllowed");
505 this->mm.AddTrans(
"On::Operational::Aborting",
"On::Operational::Idle",
"events.Done",
"",
"ActionAbortingDone");
506 this->mm.AddTrans(
"On::Operational::Acquiring",
"On::Operational::Recovering",
"events.Error");
507 this->mm.AddTrans(
"On::Operational::Recovering",
"On::Operational::Idle",
"events.Done");
Class that handles reception of commands using MAL.
Definition commandReplier.hpp:30
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition exceptions.hpp:159
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition exceptions.hpp:177
Definition stateMachineEngine.hpp:35
Provides macros and utilities for exception handling.
Definition commandReplier.cpp:22
@ Simple
Definition model.hpp:23
@ Composite
Definition model.hpp:23
@ Parallel
Definition model.hpp:23
@ Initial
Definition model.hpp:23
rad::StopToken StopToken
Definition stopToken.hpp:20
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Lifecycle of a basic 'RtcComponent'.
Life cycle extension to make RtcComponent Loopaware.
Definition loopaware.hpp:32
Life cycle extension to make RtcComponent Runnable.
Definition runnable.hpp:31