RTC Toolkit 5.0.0
Loading...
Searching...
No Matches
repositorySubscriberIfTestSuite.hpp
Go to the documentation of this file.
1
13#ifndef RTCTK_COMPONENTFRAMEWORK_TEST_REPOSITORYSUBSCRIBERIFTESTSUITE_HPP
14#define RTCTK_COMPONENTFRAMEWORK_TEST_REPOSITORYSUBSCRIBERIFTESTSUITE_HPP
15
16#include <memory>
18
19#include <gtest/gtest.h>
20
21#include <chrono>
22#include <stdexcept>
23#include <thread>
24
26
27static std::shared_ptr<RepositorySubscriberIf> MakeRepository();
28
29template <typename T>
31public:
32 void PushBack(const T& path) {
33 std::lock_guard lock{m_mutex};
34 m_data.push_back(path);
35 }
36
37 T operator[](size_t idx) {
38 std::lock_guard lock{m_mutex};
39 return m_data.at(idx);
40 }
41
42 size_t Size() {
43 std::lock_guard lock{m_mutex};
44 return m_data.size();
45 }
46
47 bool Contains(const T& val) {
48 std::lock_guard lock{m_mutex};
49 auto it = std::find(m_data.begin(), m_data.end(), val);
50 return it != m_data.end();
51 }
52
53 void AwaitSize(size_t target_size, std::chrono::seconds timeout = std::chrono::seconds(5)) {
54 auto t_start = std::chrono::steady_clock::now();
55 while (Size() < target_size) {
56 std::this_thread::sleep_for(std::chrono::microseconds(100));
57 if ((std::chrono::steady_clock::now() - t_start) > timeout) {
58 FAIL() << "ThreadSafeQ::AwaitSize: Timed out waiting for size "
59 << std::to_string(target_size);
60 }
61 }
62 }
63
64private:
65 std::mutex m_mutex;
66 std::vector<T> m_data;
67};
68
69std::chrono::milliseconds g_sleep_duration = std::chrono::milliseconds(0);
70
71void Sleep() {
72 if (g_sleep_duration > std::chrono::milliseconds(0)) {
73 std::this_thread::sleep_for(g_sleep_duration);
74 }
75}
76
77class Subscription : public testing::Test {
78public:
79 void SetUp() override {
80 repo = MakeRepository();
81 path1 = "/foo"_dppath;
82 path2 = "/bar"_dppath;
83 link = "/link"_dppath;
84 }
85
86 void TearDown() override {
87 if (repo == nullptr) {
88 // in case we could not connect in the first place
89 return;
90 }
91 if (repo->DataPointExists(link)) {
92 repo->DeleteDataPoint(link);
93 }
94 if (repo->DataPointExists(path1)) {
95 repo->DeleteDataPoint(path1);
96 }
97 if (repo->DataPointExists(path2)) {
98 repo->DeleteDataPoint(path2);
99 }
100 }
101
102protected:
103 std::shared_ptr<RepositorySubscriberIf> repo;
107};
108
109TEST_F(Subscription, ValueSubscription) {
110 {
111 int int_create = 0;
113 req.CreateDataPoint(path1, int_create);
114 req.CreateDataPoint(path2, int_create);
115 repo->SendRequest(req).Wait();
116 }
117
122 int num_cb_q_1 = 0;
123 int num_cb_q_2 = 0;
124
125 {
126 auto sub1 = repo->Subscribe<int>(
127 path1,
128 [&](const DataPointPath& path,
129 const int& data,
130 const RepositoryIf::MetaData& metadata) {
131 path_q_1.PushBack(path);
132 value_q_1.PushBack(data);
133 },
134 // NOLINTNEXTLINE(performance-unnecessary-value-param)
135 [&](std::exception_ptr error) { error_q_1.PushBack(error); });
136
137 auto sub2 = repo->Subscribe(
138 path2,
139 [&](const DataPointPath& path, const RepositoryIf::MetaData& metadata) {
140 path_q_2.PushBack(path);
141 },
142 nullptr);
143
144 // Wait a moment for the subscription to complete before updating the data point.
145 Sleep();
146
147 {
148 int int_write = 3;
150 req.WriteDataPoint(path1, int_write);
151 req.WriteDataPoint(path2, int_write);
152 repo->SendRequest(req).Wait();
153
154 path_q_1.AwaitSize(1);
155 value_q_1.AwaitSize(1);
156 path_q_2.AwaitSize(1);
157 EXPECT_TRUE(path_q_1.Contains(path1));
159 EXPECT_TRUE(path_q_2.Contains(path2));
160 EXPECT_EQ(error_q_1.Size(), 0);
161 }
162 } // here the subscription goes out of scope and we unsubscribe
163
164 num_cb_q_1 = path_q_1.Size();
165 num_cb_q_2 = path_q_2.Size();
166
167 {
168 int int_write = 4;
170 req.WriteDataPoint(path1, int_write);
171 req.WriteDataPoint(path2, int_write);
172 repo->SendRequest(req).Wait();
173
174 // best effort: sleep a short time and check that really no more callbacks were triggered
175 std::this_thread::sleep_for(std::chrono::milliseconds(200));
176
177 // we ensure that no new callbacks have arrived
178 EXPECT_EQ(error_q_1.Size(), 0);
182 }
183}
184
186 repo->CreateDataPoint(path1, 0);
187
190 int num_cb_q_1 = 0;
191 int num_cb_q_2 = 0;
192
193 auto sub1 = repo->Subscribe(
194 path1,
195 [&](const DataPointPath& path, const RepositoryIf::MetaData& metadata) {
196 path_q_1.PushBack(path);
197 },
198 nullptr);
199
200 {
201 auto sub2 = repo->Subscribe(
202 path1,
203 [&](const DataPointPath& path, const RepositoryIf::MetaData& metadata) {
204 path_q_2.PushBack(path);
205 },
206 nullptr);
207
208 // Wait a moment for the subscription to complete before updating the data point.
209 Sleep();
210
211 {
212 repo->WriteDataPoint(path1, 3);
213
214 path_q_1.AwaitSize(1);
215 path_q_2.AwaitSize(1);
216 EXPECT_TRUE(path_q_1.Contains(path1));
217 EXPECT_TRUE(path_q_2.Contains(path1));
218 }
219 } // here sub 2 goes out of scope and we unsubscribe
220
221 num_cb_q_1 = path_q_1.Size();
222 num_cb_q_2 = path_q_2.Size();
223
224 {
225 repo->WriteDataPoint(path1, 4);
226
227 path_q_1.AwaitSize(num_cb_q_1 + 1);
228
229 // best effort: sleep a short time and check that really no more callbacks were triggered
230 std::this_thread::sleep_for(std::chrono::milliseconds(200));
231
232 EXPECT_EQ(path_q_1.Size(), (num_cb_q_1 + 1));
234 }
235}
236
238 {
239 int int_create = 0;
241 req.CreateDataPoint(path1, int_create);
242 repo->SendRequest(req).Wait();
243 }
244
245 std::atomic_bool stop = false;
246
247 // this thread keeps writing the datapoint with high frequency
248 auto writer_func = [&] {
249 int value = 3;
250 while (stop == false) {
251 repo->WriteDataPoint(path1, value);
252 value++;
253 }
254 };
255
256 // this thread subscribes, waits for notification and unsubscribes with high frequency
257 auto subscriber_func = [&] {
258 for (unsigned i = 0; i < 1000; i++) {
260
261 auto sub = repo->Subscribe(
262 path1,
263 [&](const DataPointPath& path, const RepositoryIf::MetaData& metadata) {
264 path_q_1.PushBack(path);
265 },
266 nullptr);
267
269 } // here the subscription goes out of scope and we unsubscribe
270
271 stop = true;
272 };
273
274 // let the threads to their work
275 std::thread t1{writer_func};
276 std::thread t2{subscriber_func};
277 t1.join();
278 t2.join();
279}
280
282 {
283 int int_create = 0;
285 req.CreateDataPoint(path1, int_create);
286 req.CreateSymlink(path1, link);
287 repo->SendRequest(req).Wait();
288 }
289
293 int num_cb_q = 0;
294
295 {
296 auto sub = repo->Subscribe<int>(
297 link,
298 [&](const DataPointPath& path,
299 const int& data,
300 const RepositoryIf::MetaData& metadata) {
301 path_q.PushBack(path);
302 value_q.PushBack(data);
303 },
304 // NOLINTNEXTLINE(performance-unnecessary-value-param)
305 [&](std::exception_ptr error) { error_q.PushBack(error); });
306
307 // Wait a moment for the subscription to complete before updating the data point.
308 Sleep();
309
310 {
311 int int_write = 3;
312 repo->WriteDataPoint(path1, int_write);
313
314 path_q.AwaitSize(1);
315 value_q.AwaitSize(1);
316 EXPECT_TRUE(path_q.Contains(link));
317 EXPECT_TRUE(value_q.Contains(int_write));
318 EXPECT_EQ(error_q.Size(), 0);
319 }
320 } // here the subscription goes out of scope and we unsubscribe
321
322 num_cb_q = path_q.Size();
323
324 {
325 int int_write = 4;
326 repo->WriteDataPoint(path1, int_write);
327
328 // best effort: sleep a short time and check that really no more callbacks were triggered
329 std::this_thread::sleep_for(std::chrono::milliseconds(200));
330
331 EXPECT_EQ(error_q.Size(), 0);
332 EXPECT_EQ(path_q.Size(), num_cb_q);
333 EXPECT_EQ(value_q.Size(), num_cb_q);
334 }
335}
336
337TEST_F(Subscription, CreateDeleteSubscription) {
341
342 {
343 auto subscription =
344 repo->Subscribe([&](const DataPointPath& path) { create_q.PushBack(path); },
345 [&](const DataPointPath& path) { delete_q.PushBack(path); },
346 // NOLINTNEXTLINE(performance-unnecessary-value-param)
347 [&](std::exception_ptr error) { error_q.PushBack(error); });
348
349 {
350 int int_create = 0;
352 req.CreateDataPoint(path1, int_create);
353 req.CreateDataPoint(path2, int_create);
354 repo->SendRequest(req).Wait();
355
356 create_q.AwaitSize(2);
357 EXPECT_TRUE(create_q.Contains(path1));
358 EXPECT_TRUE(create_q.Contains(path2));
359 EXPECT_EQ(delete_q.Size(), 0);
360 EXPECT_EQ(error_q.Size(), 0);
361 }
362
363 {
365 req.DeleteDataPoint(path2);
366 req.DeleteDataPoint(path1);
367 repo->SendRequest(req).Wait();
368
369 delete_q.AwaitSize(2);
370 EXPECT_EQ(create_q.Size(), 2);
371 EXPECT_TRUE(delete_q.Contains(path1));
372 EXPECT_TRUE(delete_q.Contains(path2));
373 EXPECT_EQ(error_q.Size(), 0);
374 }
375 } // here the subscription goes out of scope and we unsubscribe
376
377 {
378 int int_create = 0;
380 req.CreateDataPoint(path1, int_create);
381 req.CreateDataPoint(path2, int_create);
382 repo->SendRequest(req).Wait();
383
384 // best effort: sleep a short time and check that really no more callbacks were triggered
385 std::this_thread::sleep_for(std::chrono::milliseconds(200));
386
387 EXPECT_EQ(create_q.Size(), 2);
388 EXPECT_EQ(delete_q.Size(), 2);
389 EXPECT_EQ(error_q.Size(), 0);
390 }
391}
392
397
398 {
399 auto subscription =
400 repo->Subscribe([&](const DataPointPath& path) { create_q.PushBack(path); },
401 [&](const DataPointPath& path) { delete_q.PushBack(path); },
402 // NOLINTNEXTLINE(performance-unnecessary-value-param)
403 [&](std::exception_ptr error) { error_q.PushBack(error); });
404
405 {
406 int int_create = 0;
408 req.CreateDataPoint(path1, int_create);
409 req.CreateSymlink(path1, path2);
410 repo->SendRequest(req).Wait();
411
412 create_q.AwaitSize(2);
413 EXPECT_TRUE(create_q.Contains(path1));
414 EXPECT_TRUE(create_q.Contains(path2));
415 EXPECT_EQ(delete_q.Size(), 0);
416 EXPECT_EQ(error_q.Size(), 0);
417 }
418
419 {
421 req.DeleteDataPoint(path2);
422 req.DeleteDataPoint(path1);
423 repo->SendRequest(req).Wait();
424
425 delete_q.AwaitSize(2);
426 EXPECT_EQ(create_q.Size(), 2);
427 EXPECT_TRUE(delete_q.Contains(path1));
428 EXPECT_TRUE(delete_q.Contains(path2));
429 EXPECT_EQ(error_q.Size(), 0);
430 }
431 } // here the subscription goes out of scope and we unsubscribe
432
433 {
434 int int_create = 0;
436 req.CreateDataPoint(path1, int_create);
437 req.CreateSymlink(path1, path2);
438 repo->SendRequest(req).Wait();
439
440 // best effort: sleep a short time and check that really no more callbacks were triggered
441 std::this_thread::sleep_for(std::chrono::milliseconds(200));
442
443 EXPECT_EQ(create_q.Size(), 2);
444 EXPECT_EQ(delete_q.Size(), 2);
445 EXPECT_EQ(error_q.Size(), 0);
446 }
447}
448
451 {
452 auto sub = repo->Subscribe<int>(
453 path1,
454 [&](const DataPointPath& path,
455 const int& data,
456 const RepositoryIf::MetaData& metadata) {},
457 // NOLINTNEXTLINE(performance-unnecessary-value-param)
458 [&](std::exception_ptr error) {});
459 },
461}
462
463} // namespace rtctk::componentFramework::test
464
465#endif // RTCTK_COMPONENTFRAMEWORK_TEST_REPOSITORYSUBSCRIBERIFTESTSUITE_HPP
This class provides a wrapper for a data point path.
Definition dataPointPath.hpp:74
An object representing one or more asynchronous I/O requests to a repository.
Definition repositoryIf.hpp:683
void DeleteDataPoint(const DataPointPath &path, const CallbackType &callback=nullptr)
Definition repositoryIf.cpp:295
void CreateSymlink(const DataPointPath &dp, const DataPointPath &link, const CallbackType &callback=nullptr)
Definition repositoryIf.cpp:326
void CreateDataPoint(const DataPointPath &path, const T &initial_value, std::optional< std::reference_wrapper< const MetaData > > metadata=std::nullopt, const CallbackType &callback=nullptr)
Add a request to create a new datapoint.
Definition repositoryIf.ipp:1338
void WriteDataPoint(const DataPointPath &path, const T &buffer, std::optional< std::reference_wrapper< const MetaData > > metadata=std::nullopt, const CallbackType &callback=nullptr)
Definition repositoryIf.ipp:1471
Class for passing/receiving metadata to/from the repository.
Definition repositoryIf.hpp:195
Definition repositorySubscriberIfTestSuite.hpp:77
void TearDown() override
Definition repositorySubscriberIfTestSuite.hpp:86
DataPointPath path2
Definition repositorySubscriberIfTestSuite.hpp:105
void SetUp() override
Definition repositorySubscriberIfTestSuite.hpp:79
std::shared_ptr< RepositorySubscriberIf > repo
Definition repositorySubscriberIfTestSuite.hpp:103
DataPointPath path1
Definition repositorySubscriberIfTestSuite.hpp:104
DataPointPath link
Definition repositorySubscriberIfTestSuite.hpp:106
Definition repositorySubscriberIfTestSuite.hpp:30
size_t Size()
Definition repositorySubscriberIfTestSuite.hpp:42
void AwaitSize(size_t target_size, std::chrono::seconds timeout=std::chrono::seconds(5))
Definition repositorySubscriberIfTestSuite.hpp:53
void PushBack(const T &path)
Definition repositorySubscriberIfTestSuite.hpp:32
T operator[](size_t idx)
Definition repositorySubscriberIfTestSuite.hpp:37
bool Contains(const T &val)
Definition repositorySubscriberIfTestSuite.hpp:47
Definition fakeClock.cpp:15
std::chrono::milliseconds g_sleep_duration
Definition repositorySubscriberIfTestSuite.hpp:69
TEST_F(Callbacks, CreateDataPointCallback)
Definition repositoryIfTestSuite.hpp:1223
void Sleep()
Definition repositorySubscriberIfTestSuite.hpp:71
elt::mal::future< std::string > InjectReqRepEvent(StateMachineEngine &engine)
Definition malEventInjector.hpp:23
Header file for RepositorySubscriberIf and related base classes.