9 #ifndef RAD_MSG_REQUESTOR_HPP
10 #define RAD_MSG_REQUESTOR_HPP
15 #include <rad/Logger.hpp>
16 #include <rad/Assert.hpp>
17 #include <rad/Exceptions.hpp>
18 #include <rad/Errors.hpp>
19 #include <rad/MsgRequestorRaw.hpp>
21 #include <azmq/socket.hpp>
22 #include <azmq/message.hpp>
24 #include <boost/asio.hpp>
26 #include <google/protobuf/message.h>
33 template<
typename TYPEREQ,
typename TYPEREP>
38 const std::string& identity,
39 boost::asio::io_service& ios,
43 size_t Send(
const TYPEREQ& payload,
const long timeout = 0);
49 void Callback(
const std::error_code& errCode,
const std::string& msgTypeId,
const void* pData,
const size_t dataSize);
52 std::function<void(const std::error_code& , TYPEREP)> mReplyHandler;
61 template<
typename TYPEREQ,
typename TYPEREP>
63 const std::string& identity,
64 boost::asio::io_service& ios,
66 : mMsgRequestorRaw(endpoint, identity, ios, std::bind(&
MsgRequestor::Callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)),
67 mReplyHandler(replyHandler)
75 template<
typename TYPEREQ,
typename TYPEREP>
87 template<
typename TYPEREQ,
typename TYPEREP>
97 std::string payloadType = payload.GetDescriptor()->full_name();
100 if (payload.SerializeToString(&str) ==
false) {
101 RAD_LOG_ERROR() <<
"Failed serializing to string payload type <" << payloadType <<
">";
104 return mMsgRequestorRaw.Send(payloadType, str, timeout);
114 template<
typename TYPEREQ,
typename TYPEREP>
122 mReplyHandler(errCode, reply);
125 if (reply.ParseFromArray(pData, dataSize)) {
126 mReplyHandler({},
reply);
128 RAD_LOG_ERROR() <<
"Failed to parse reply type <" << msgTypeId <<
">";
141 template<
typename TREQ,
typename TREP>
145 using handler_t = std::function<void(const std::error_code&, reply_t)>;
155 const std::string& identity,
156 boost::asio::io_service& ios)
157 : m_raw_requestor(endpoint, identity, ios) {
178 std::chrono::milliseconds
const timeout = std::chrono::milliseconds(0)) {
181 std::string payload_type = payload.GetDescriptor()->full_name();
184 if (payload.SerializeToString(&str) ==
false) {
185 RAD_LOG_ERROR() <<
"Failed serializing to string payload type <" << payload_type
192 const void* p_data,
const size_t size) {
202 if (reply.ParseFromArray(p_data, size)) {
205 RAD_LOG_ERROR() <<
"Failed to parse reply type <" << msg_type_id <<
">";
MsgRequestor & operator=(const MsgRequestor &)=delete
Definition: MsgRequestor.hpp:142
#define RAD_ASSERTPTR(a)
Definition: Assert.hpp:16
#define RAD_LOG_ERROR()
Definition: Logger.hpp:266
MsgRequestor(const std::string &endpoint, const std::string &identity, boost::asio::io_service &ios, std::function< void(const std::error_code &, TYPEREP)> replyHandler)
Definition: MsgRequestor.hpp:62
size_t Send(const TYPEREQ &payload, const long timeout=0)
Definition: MsgRequestor.hpp:88
std::function< void(const std::error_code &, reply_t)> handler_t
Definition: MsgRequestor.hpp:145
MsgRequestor2 & operator=(const MsgRequestor2 &)=delete
TREP reply_t
Definition: MsgRequestor.hpp:144
TREQ request_t
Definition: MsgRequestor.hpp:143
Definition: MsgRequestorRaw.hpp:30
virtual ~MsgRequestor2()
Definition: MsgRequestor.hpp:159
optional bool timeout
Definition: requests.proto:13
MsgRequestor2(const std::string &endpoint, const std::string &identity, boost::asio::io_service &ios)
Definition: MsgRequestor.hpp:154
size_t AsyncSendReceive(std::string const &payloadType, std::string const &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition: MsgRequestorRaw.cpp:266
size_t AsyncSendReceive(const request_t &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition: MsgRequestor.hpp:176
optional string reply
Definition: requests.proto:26
Definition: MsgRequestorRaw.hpp:67
def handler
Definition: test_dispatcher.py:11
#define RAD_LOG_TRACE()
Definition: Logger.hpp:319
Definition: MsgRequestor.hpp:34
optional int32 error_code
Definition: topics.proto:14
virtual ~MsgRequestor()
Definition: MsgRequestor.hpp:76