ifw  0.0.1-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MsgRequestor.hpp
Go to the documentation of this file.
1 
9 #ifndef RAD_MSG_REQUESTOR_HPP
10 #define RAD_MSG_REQUESTOR_HPP
11 
12 #include <chrono>
13 #include <memory>
14 
15 #include <rad/Logger.hpp>
16 #include <rad/Assert.hpp>
17 #include <rad/Exceptions.hpp>
18 #include <rad/Errors.hpp>
19 #include <rad/MsgRequestorRaw.hpp>
20 
21 #include <azmq/socket.hpp>
22 #include <azmq/message.hpp>
23 
24 #include <boost/asio.hpp>
25 
26 #include <google/protobuf/message.h>
27 
28 namespace rad {
29 
33 template<typename TYPEREQ, typename TYPEREP>
34 class MsgRequestor {
35 
36 public:
37  MsgRequestor(const std::string& endpoint,
38  const std::string& identity,
39  boost::asio::io_service& ios,
40  std::function<void(const std::error_code&, TYPEREP)> replyHandler);
41  virtual ~MsgRequestor();
42 
43  size_t Send(const TYPEREQ& payload, const long timeout = 0);
44 
45  MsgRequestor(const MsgRequestor&) = delete;
46  MsgRequestor& operator= (const MsgRequestor&) = delete;
47 
48 private:
49  void Callback(const std::error_code& errCode, const std::string& msgTypeId, const void* pData, const size_t dataSize);
50 
51  MsgRequestorRaw mMsgRequestorRaw;
52  std::function<void(const std::error_code& , TYPEREP)> mReplyHandler;
53 };
54 
61 template<typename TYPEREQ, typename TYPEREP>
63  const std::string& identity,
64  boost::asio::io_service& ios,
65  std::function<void(const std::error_code& , TYPEREP)> replyHandler)
66 : mMsgRequestorRaw(endpoint, identity, ios, std::bind(&MsgRequestor::Callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)),
67  mReplyHandler(replyHandler)
68 {
69  RAD_LOG_TRACE();
70 }
71 
75 template<typename TYPEREQ, typename TYPEREP>
77 {
78  RAD_LOG_TRACE();
79 }
80 
87 template<typename TYPEREQ, typename TYPEREP>
88 size_t MsgRequestor<TYPEREQ, TYPEREP>::Send(const TYPEREQ& payload, const long timeout)
89 {
90  RAD_LOG_TRACE();
91 
92  /*
93  * Important: never call google::protobuf::ShutdownProtobufLibrary()
94  * before accessing the message descriptor!
95  */
96  RAD_ASSERTPTR(payload.GetDescriptor());
97  std::string payloadType = payload.GetDescriptor()->full_name();
98 
99  std::string str;
100  if (payload.SerializeToString(&str) == false) {
101  RAD_LOG_ERROR() << "Failed serializing to string payload type <" << payloadType << ">";
102  return 0;
103  }
104  return mMsgRequestorRaw.Send(payloadType, str, timeout);
105 }
106 
114 template<typename TYPEREQ, typename TYPEREP>
115 void MsgRequestor<TYPEREQ, TYPEREP>::Callback(const std::error_code& errCode, const std::string& msgTypeId, const void* pData, const size_t dataSize)
116 {
117  RAD_LOG_TRACE();
118 
119  TYPEREP reply;
120 
121  if (errCode) {
122  mReplyHandler(errCode, reply);
123  }
124 
125  if (reply.ParseFromArray(pData, dataSize)) {
126  mReplyHandler({}, reply);
127  } else {
128  RAD_LOG_ERROR() << "Failed to parse reply type <" << msgTypeId << ">";
129  mReplyHandler(rad::ErrorCodes::DESERIALIZATION_ERR, reply);
130  // @todo throw exception?
131  }
132 }
133 
134 
141 template<typename TREQ, typename TREP>
143  using request_t = TREQ;
144  using reply_t = TREP;
145  using handler_t = std::function<void(const std::error_code&, reply_t)>;
146 
154  MsgRequestor2(const std::string& endpoint,
155  const std::string& identity,
156  boost::asio::io_service& ios)
157  : m_raw_requestor(endpoint, identity, ios) {
158  }
159  virtual ~MsgRequestor2(){};
160 
161  MsgRequestor2(const MsgRequestor2&) = delete;
162  MsgRequestor2& operator=(const MsgRequestor2&) = delete;
163  MsgRequestor2(MsgRequestor2&&) = default;
164  MsgRequestor2& operator=(MsgRequestor2&&) = default;
165 
175  size_t
176  AsyncSendReceive(const request_t& payload,
178  std::chrono::milliseconds const timeout = std::chrono::milliseconds(0)) {
179  RAD_LOG_TRACE();
180  RAD_ASSERTPTR(payload.GetDescriptor());
181  std::string payload_type = payload.GetDescriptor()->full_name();
182 
183  std::string str;
184  if (payload.SerializeToString(&str) == false) {
185  RAD_LOG_ERROR() << "Failed serializing to string payload type <" << payload_type
186  << ">";
187  return 0;
188  }
189  return m_raw_requestor.AsyncSendReceive(
190  payload_type, str,
191  [handler](std::error_code const& ec, std::string const& msg_type_id,
192  const void* p_data, const size_t size) {
193  RAD_LOG_TRACE();
194  reply_t reply;
195  if (ec) {
196  handler(ec, reply);
197  return;
198  }
199 
200 
201  // Assert p_data != nullptr
202  if (reply.ParseFromArray(p_data, size)) {
203  handler({}, reply);
204  } else {
205  RAD_LOG_ERROR() << "Failed to parse reply type <" << msg_type_id << ">";
207  }
208  },
209  timeout);
210  }
211 
212  private:
213  MsgRequestorRaw2 m_raw_requestor;
214  };
215 
216 } // namespace rad
217 
218 #endif
MsgRequestor & operator=(const MsgRequestor &)=delete
Definition: MsgRequestor.hpp:142
#define RAD_ASSERTPTR(a)
Definition: Assert.hpp:16
#define RAD_LOG_ERROR()
Definition: Logger.hpp:266
MsgRequestor(const std::string &endpoint, const std::string &identity, boost::asio::io_service &ios, std::function< void(const std::error_code &, TYPEREP)> replyHandler)
Definition: MsgRequestor.hpp:62
size_t Send(const TYPEREQ &payload, const long timeout=0)
Definition: MsgRequestor.hpp:88
std::function< void(const std::error_code &, reply_t)> handler_t
Definition: MsgRequestor.hpp:145
MsgRequestor2 & operator=(const MsgRequestor2 &)=delete
TREP reply_t
Definition: MsgRequestor.hpp:144
TREQ request_t
Definition: MsgRequestor.hpp:143
Definition: MsgRequestorRaw.hpp:30
virtual ~MsgRequestor2()
Definition: MsgRequestor.hpp:159
optional bool timeout
Definition: requests.proto:13
MsgRequestor2(const std::string &endpoint, const std::string &identity, boost::asio::io_service &ios)
Definition: MsgRequestor.hpp:154
size_t AsyncSendReceive(std::string const &payloadType, std::string const &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition: MsgRequestorRaw.cpp:266
size_t AsyncSendReceive(const request_t &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition: MsgRequestor.hpp:176
optional string reply
Definition: requests.proto:26
Definition: MsgRequestorRaw.hpp:67
def handler
Definition: test_dispatcher.py:11
#define RAD_LOG_TRACE()
Definition: Logger.hpp:319
Definition: MsgRequestor.hpp:34
optional int32 error_code
Definition: topics.proto:14
virtual ~MsgRequestor()
Definition: MsgRequestor.hpp:76