13#ifndef RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
14#define RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
16#include <Metadaqif.hpp>
18#include <rtctk/componentFramework/events.rad.hpp>
22#include <rtctk/metadataCollector/acquisitor.rad.hpp>
28template <
typename Super>
31template <
typename Super>
41 std::string
const& session_id,
42 std::string
const& state_id);
54 std::string
getId()
const override;
56 void setId(std::string
const&
id)
override;
58 bool hasKey()
const override;
60 bool keyEquals(metadaqif::DaqReply
const& other)
const override;
62 std::unique_ptr<metadaqif::DaqReply>
clone()
const;
64 std::unique_ptr<metadaqif::DaqReply>
cloneKey()
const;
78 std::vector<std::string>
const& files,
79 std::string
const& kw);
81 std::string
getId()
const override;
83 void setId(std::string
const&
id)
override;
89 std::vector<std::string>
getFiles()
const override;
91 void setFiles(std::vector<std::string>
const& fs)
override;
93 bool hasKey()
const override;
95 bool keyEquals(metadaqif::DaqStopReply
const& other)
const override;
97 std::unique_ptr<metadaqif::DaqStopReply>
clone()
const;
99 std::unique_ptr<metadaqif::DaqStopReply>
cloneKey()
const;
103 std::vector<std::string> m_files;
114 std::string
getId()
const override;
116 void setId(std::string
const&
id)
override;
124 void setMessage(std::string
const& msg)
override;
130 metadaqif::DaqState
getState()
const override;
132 void setState(metadaqif::DaqState st)
override;
134 std::vector<std::string>
getFiles()
const override;
136 void setFiles(std::vector<std::string>
const& fs)
override;
138 bool hasKey()
const override;
140 bool keyEquals(metadaqif::DaqStatus
const& other)
const override;
142 std::unique_ptr<metadaqif::DaqStatus>
clone()
const;
144 std::unique_ptr<metadaqif::DaqStatus>
cloneKey()
const;
150 metadaqif::DaqState m_st;
151 std::vector<std::string> m_files;
166 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>>
167 StartDaq(std::string
const&
id)
override;
169 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStopReply>>
170 StopDaq(std::string
const&
id)
override;
172 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>>
173 AbortDaq(std::string
const&
id)
override;
175 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStatus>>
179 log4cplus::Logger& m_logger;
189template <
typename Super>
191 static_assert(std::is_base_of_v<RtcComponent, Super>,
"'Acquisitor' requires 'RtcComponent'");
192 static_assert(not is_base_of_template_v<Runnable, Super>,
"'Acquisitor' excludes 'Runnable'");
193 static_assert(not is_base_of_template_v<Loopaware, Super>,
"'Acquisitor' excludes 'Loopaware'");
208 using Super::InputStage::InputStage;
211 Super::InputStage::Start();
223 events::StartDaq::ID, [](
const rad::AnyEvent& event, std::string
const& state) {
224 if (
auto req = rad::GetPayloadNothrow<events::StartDaq>(event); req) {
225 const auto&
id = req->GetRequestPayload();
231 events::StopDaq::ID, [](
const rad::AnyEvent& event, std::string
const& state) {
232 if (
auto req = rad::GetPayloadNothrow<events::StopDaq>(event); req) {
233 const auto&
id = req->GetRequestPayload();
239 events::AbortDaq::ID, [](
const rad::AnyEvent& event, std::string
const& state) {
240 if (
auto req = rad::GetPayloadNothrow<events::AbortDaq>(event); req) {
241 const auto&
id = req->GetRequestPayload();
247 events::GetDaqStatus::ID, [](
const rad::AnyEvent& event, std::string
const& state) {
248 if (
auto req = rad::GetPayloadNothrow<events::GetDaqStatus>(event); req) {
249 const auto&
id = req->GetRequestPayload();
256 this->m_no_disable_in_states.push_back(
"On::Operational::Preparing");
257 this->m_no_disable_in_states.push_back(
"On::Operational::Acquiring");
258 this->m_no_disable_in_states.push_back(
"On::Operational::Finalising");
259 this->m_no_disable_in_states.push_back(
"On::Operational::Aborting");
260 this->m_no_disable_in_states.push_back(
"On::Operational::Recovering");
261 this->m_no_disable_in_states.push_back(
"On::Operational::Error");
265 this->m_no_update_in_states.push_back(
"On::Operational::Preparing");
266 this->m_no_update_in_states.push_back(
"On::Operational::Acquiring");
267 this->m_no_update_in_states.push_back(
"On::Operational::Finalising");
268 this->m_no_update_in_states.push_back(
"On::Operational::Aborting");
269 this->m_no_update_in_states.push_back(
"On::Operational::Recovering");
270 this->m_no_update_in_states.push_back(
"On::Operational::Error");
275 [
this](
auto c) { m_acquisition_history.clear(); });
279 engine.
RegisterGuard(
"GuardPreparingAllowed", [
this](
auto c) {
280 auto req = GetPayloadNothrow<events::StartDaq>(c);
281 auto id = req->GetRequestPayload();
283#ifdef FEATURE_REQUIRE_UNIQUE_ID_FOR_NEW_SESSION
284 return m_acquisition_history.count(
id) == 0;
291 m_tmp_start_request = GetPayloadNothrow<events::StartDaq>(c);
292 m_active_id = m_tmp_start_request->GetRequestPayload();
293 std::string
id = m_active_id.value();
297 status.
setState(metadaqif::Acquiring);
299 m_acquisition_history[id] = status;
303 if (m_tmp_start_request) {
304 std::string
id = m_active_id.value();
305 m_tmp_start_request->SetReplyValue(std::make_shared<DaqReply>(
id));
306 m_tmp_start_request =
nullptr;
311 if (m_tmp_start_request) {
312 std::string
id = m_active_id.value();
313 auto eptr = GetPayloadNothrow<events::Error>(c);
315 m_acquisition_history.at(
id).setState(metadaqif::Failed);
316 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
317 m_acquisition_history.at(
id).setMessage(msg);
318 m_tmp_start_request->SetException(metadaqif::DaqException(
id, msg));
319 m_tmp_start_request =
nullptr;
325 engine.
RegisterGuard(
"GuardFinalisingAllowed", [
this](
auto c) {
326 auto req = GetPayloadNothrow<events::StopDaq>(c);
327 auto id = req->GetRequestPayload();
328 return m_active_id.value() == id;
332 m_tmp_stop_request = GetPayloadNothrow<events::StopDaq>(c);
336 if (m_tmp_stop_request) {
337 std::string
id = m_active_id.value();
338 m_acquisition_history.at(
id).setState(metadaqif::Succeeded);
339 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
341 m_tmp_stop_request->SetReplyValue(
342 std::make_shared<DaqStopReply>(m_acquisition_history.at(
id).getId(),
343 m_acquisition_history.at(
id).getFiles(),
344 m_acquisition_history.at(
id).getKeywords()));
345 m_tmp_stop_request =
nullptr;
350 if (m_tmp_stop_request) {
351 std::string
id = m_active_id.value();
352 auto eptr = GetPayloadNothrow<events::Error>(c);
354 m_acquisition_history.at(
id).setState(metadaqif::Failed);
355 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
356 m_acquisition_history.at(
id).setMessage(msg);
357 m_tmp_stop_request->SetException(metadaqif::DaqException(
id, msg));
358 m_tmp_stop_request =
nullptr;
364 engine.
RegisterGuard(
"GuardAbortingAllowed", [
this](
auto c) {
365 auto req = GetPayloadNothrow<events::AbortDaq>(c);
366 auto id = req->GetRequestPayload();
367 return m_active_id.value() == id;
371 m_tmp_abort_request = GetPayloadNothrow<events::AbortDaq>(c);
375 if (m_tmp_abort_request) {
376 std::string
id = this->m_active_id.value();
377 m_acquisition_history.at(
id).setState(metadaqif::Aborted);
378 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
379 m_tmp_abort_request->SetReplyValue(std::make_shared<DaqReply>(
id));
380 m_tmp_abort_request =
nullptr;
387 auto request = GetPayloadNothrow<events::GetDaqStatus>(c);
388 auto id = request->GetRequestPayload();
390 if (m_acquisition_history.count(
id)) {
391 request->SetReplyValue(std::move(m_acquisition_history.at(
id).clone()));
393 auto status = std::make_shared<DaqStatus>();
395 status->setState(metadaqif::NotStarted);
396 status->setTimestamp(0);
397 request->SetReplyValue(status);
404 std::string
id = m_active_id.value();
405 auto eptr = GetPayloadNothrow<events::Error>(c);
407 m_acquisition_history.at(
id).setState(metadaqif::Failed);
408 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
409 m_acquisition_history.at(
id).setMessage(msg);
417 std::string
id = m_active_id.value();
418 static_cast<BizLogicIf&
>(this->m_logic).ActivityPreparing(stop_token,
id);
420 this->m_success_handler,
421 this->m_error_handler);
424 "ActivityFinalising",
426 std::string
id = m_active_id.value();
428 static_cast<BizLogicIf&
>(this->m_logic).ActivityFinalising(stop_token);
429 m_acquisition_history.at(
id).setId(result.getId());
430 m_acquisition_history.at(
id).setFiles(result.getFiles());
431 m_acquisition_history.at(
id).setKeywords(result.getKeywords());
433 this->m_success_handler,
434 this->m_error_handler);
439 static_cast<BizLogicIf&
>(this->m_logic).ActivityAcquiring(stop_token);
442 this->m_error_handler);
447 static_cast<BizLogicIf&
>(this->m_logic).ActivityAborting(stop_token);
449 this->m_success_handler,
450 this->m_error_handler);
453 "ActivityRecovering",
455 static_cast<BizLogicIf&
>(this->m_logic).ActivityRecovering(stop_token);
457 this->m_success_handler,
458 this->m_error_handler);
462 double GetCurrentTime() {
463 using namespace std::chrono;
464 auto t = high_resolution_clock::now();
465 return duration_cast<nanoseconds>(t.time_since_epoch()).count();
468 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
470 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqStopReply>, std::string>>
472 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
474 std::map<std::string, DaqStatus> m_acquisition_history;
476 std::optional<std::string> m_active_id;
483 this->mm.ModStateType(
"On::Operational",
Parallel);
485 const std::string parent_region =
"On::Operational:";
487 this->mm.AddState(
Composite, parent_region,
"On::Operational");
488 this->mm.AddState(
Initial,
"On::Operational::Initial", parent_region);
489 this->mm.AddState(
Simple,
"On::Operational::Idle", parent_region);
490 this->mm.AddState(
Simple,
"On::Operational::Acquiring", parent_region,
"ActivityAcquiring");
491 this->mm.AddState(
Simple,
"On::Operational::Recovering", parent_region,
"ActivityRecovering",
"ActionRecoveringEntry");
492 this->mm.AddState(
Simple,
"On::Operational::Preparing", parent_region,
"ActivityPreparing",
"ActionPreparingEntry");
493 this->mm.AddState(
Simple,
"On::Operational::Finalising", parent_region,
"ActivityFinalising",
"ActionFinalisingEntry");
494 this->mm.AddState(
Simple,
"On::Operational::Aborting", parent_region,
"ActivityAborting",
"ActionAbortingEntry");
496 this->mm.AddTrans(
"On::Operational::Initial",
"On::Operational::Idle",
"",
"",
"ActionClearHistory");
497 this->mm.AddTrans(parent_region,
"",
"events.GetDaqStatus",
"",
"ActionGetDaqStatus");
498 this->mm.AddTrans(
"On::Operational::Idle",
"On::Operational::Preparing",
"events.StartDaq",
"GuardPreparingAllowed");
499 this->mm.AddTrans(
"On::Operational::Preparing",
"On::Operational::Acquiring",
"events.Done",
"",
"ActionPreparingDone");
500 this->mm.AddTrans(
"On::Operational::Preparing",
"On::Operational::Idle",
"events.Error",
"",
"ActionPreparingFailed");
501 this->mm.AddTrans(
"On::Operational::Acquiring",
"On::Operational::Finalising",
"events.StopDaq",
"GuardFinalisingAllowed");
502 this->mm.AddTrans(
"On::Operational::Finalising",
"On::Operational::Idle",
"events.Done",
"",
"ActionFinalisingDone");
503 this->mm.AddTrans(
"On::Operational::Finalising",
"On::Operational::Idle",
"events.Error",
"",
"ActionFinalisingFailed");
504 this->mm.AddTrans(
"On::Operational::Acquiring",
"On::Operational::Aborting",
"events.AbortDaq",
"GuardAbortingAllowed");
505 this->mm.AddTrans(
"On::Operational::Aborting",
"On::Operational::Idle",
"events.Done",
"",
"ActionAbortingDone");
506 this->mm.AddTrans(
"On::Operational::Acquiring",
"On::Operational::Recovering",
"events.Error");
507 this->mm.AddTrans(
"On::Operational::Recovering",
"On::Operational::Idle",
"events.Done");
Class that handles reception of commands using MAL.
Definition: commandReplier.hpp:29
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: exceptions.hpp:185
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: exceptions.hpp:203
Definition: stateMachineEngine.hpp:35
void RegisterRejectHandler(std::string const &id, RejectMethod reject)
Register reject handler.
Definition: stateMachineEngine.cpp:131
void RegisterActivity(std::string const &id, ActivityMethod activity, SuccessMethod on_success, FailureMethod on_failure)
Register activity.
Definition: stateMachineEngine.cpp:123
void RegisterGuard(std::string const &id, GuardMethod guard)
Register guard.
Definition: stateMachineEngine.cpp:110
void RegisterAction(std::string const &id, ActionMethod action)
Register action.
Definition: stateMachineEngine.cpp:86
Provides macros and utilities for exception handling.
Definition: commandReplier.cpp:22
rad::StopToken StopToken
Definition: stopToken.hpp:20
@ Simple
Definition: model.hpp:22
@ Composite
Definition: model.hpp:22
@ Parallel
Definition: model.hpp:22
@ Initial
Definition: model.hpp:22
Lifecycle of a basic 'RtcComponent'.
Life cycle extension to make RtcComponent Loopaware.
Definition: loopaware.hpp:32
Life cycle extension to make RtcComponent Runnable.
Definition: runnable.hpp:31