Goby3 3.1.5a
2024.05.23
Loading...
Searching...
No Matches
interprocess.h
Go to the documentation of this file.
1// Copyright 2016-2023:
2// GobySoft, LLC (2013-)
3// Community contributors (see AUTHORS file)
4// File authors:
5// Toby Schneider <toby@gobysoft.org>
6//
7//
8// This file is part of the Goby Underwater Autonomy Project Libraries
9// ("The Goby Libraries").
10//
11// The Goby Libraries are free software: you can redistribute them and/or modify
12// them under the terms of the GNU Lesser General Public License as published by
13// the Free Software Foundation, either version 2.1 of the License, or
14// (at your option) any later version.
15//
16// The Goby Libraries are distributed in the hope that they will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public License
22// along with Goby. If not, see <http://www.gnu.org/licenses/>.
23
24#ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
25#define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
26
28
29#include <atomic> // for atomic
30#include <chrono> // for mill...
31#include <condition_variable> // for cond...
32#include <deque> // for deque
33#include <functional> // for func...
34#include <iosfwd> // for size_t
35#include <memory> // for shar...
36#include <mutex> // for time...
37#include <set> // for set
38#include <string> // for string
39#include <thread> // for get_id
40#include <tuple> // for make...
41#include <unistd.h> // for getpid
42#include <unordered_map> // for unor...
43#include <utility> // for make...
44#include <vector> // for vector
45
46#include <zmq.h> // for ZMQ_...
47#include <zmq.hpp> // for sock...
48
49#include "goby/middleware/common.h" // for thre...
50#include "goby/middleware/group.h" // for Group
51#include "goby/middleware/marshalling/interface.h" // for Seri...
54#include "goby/middleware/transport/interface.h" // for Poll...
55#include "goby/middleware/transport/interprocess.h" // for Inte...
56#include "goby/middleware/transport/null.h" // for Null...
58#include "goby/middleware/transport/subscriber.h" // for Subs...
59#include "goby/time/system_clock.h" // for Syst...
60#include "goby/util/debug_logger/flex_ostream.h" // for Flex...
64
65#if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
66#define USE_OLD_ZMQ_CPP_API
67#endif
68
69#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
70#define USE_OLD_CPPZMQ_SETSOCKOPT
71#endif
72
73#if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
74#define USE_OLD_CPPZMQ_POLL
75#endif
76
77namespace goby
78{
79namespace middleware
80{
81template <typename Data> class Publisher;
82} // namespace middleware
83
84namespace zeromq
85{
86namespace groups
87{
88constexpr goby::middleware::Group manager_request{"goby::zeromq::_internal_manager_request"};
89constexpr goby::middleware::Group manager_response{"goby::zeromq::_internal_manager_response"};
90} // namespace groups
91
92void setup_socket(zmq::socket_t& socket, const protobuf::Socket& cfg);
93
100
101// scheme
102inline std::string identifier_part_to_string(int i)
103{
105}
106inline std::string identifier_part_to_string(std::thread::id i)
107{
109}
110
112template <typename Key>
113const std::string& id_component(const Key& k, std::unordered_map<Key, std::string>& map)
114{
115 auto it = map.find(k);
116 if (it != map.end())
117 return it->second;
118
119 std::string v = identifier_part_to_string(k) + "/";
120 auto it_pair = map.insert(std::make_pair(k, v));
121 return it_pair.first->second;
122}
123
124inline std::string
125make_identifier(const std::string& type_name, int scheme, const std::string& group,
126 IdentifierWildcard wildcard, const std::string& process,
127 std::unordered_map<int, std::string>* schemes_buffer = nullptr,
128 std::unordered_map<std::thread::id, std::string>* threads_buffer = nullptr)
129{
130 switch (wildcard)
131 {
132 default:
134 {
135 auto thread = std::this_thread::get_id();
136 return ("/" + group + "/" +
137 (schemes_buffer ? id_component(scheme, *schemes_buffer)
138 : std::string(identifier_part_to_string(scheme) + "/")) +
139 type_name + "/" + process + "/" +
140 (threads_buffer ? id_component(thread, *threads_buffer)
141 : std::string(identifier_part_to_string(thread) + "/")));
142 }
144 {
145 return ("/" + group + "/" +
146 (schemes_buffer ? id_component(scheme, *schemes_buffer)
147 : std::string(identifier_part_to_string(scheme) + "/")) +
148 type_name + "/" + process + "/");
149 }
151 {
152 return ("/" + group + "/" +
153 (schemes_buffer ? id_component(scheme, *schemes_buffer)
154 : std::string(identifier_part_to_string(scheme) + "/")) +
155 type_name + "/");
156 }
157 }
158}
159
160#ifdef USE_OLD_ZMQ_CPP_API
163#else
164using zmq_recv_flags_type = zmq::recv_flags;
165using zmq_send_flags_type = zmq::send_flags;
166#endif
167
168// run in the same thread as InterProcessPortal
170{
171 public:
172 InterProcessPortalMainThread(zmq::context_t& context);
174 {
175#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
176 control_socket_.setsockopt(ZMQ_LINGER, 0);
177 publish_socket_.setsockopt(ZMQ_LINGER, 0);
178#else
179 control_socket_.set(zmq::sockopt::linger, 0);
180 publish_socket_.set(zmq::sockopt::linger, 0);
181#endif
182 }
183
184 bool publish_ready() { return !hold_; }
185 bool subscribe_ready() { return have_pubsub_sockets_; }
186
187 bool recv(protobuf::InprocControl* control_msg,
190
191 void set_hold_state(bool hold);
192 bool hold_state() { return hold_; }
193
194 void publish(const std::string& identifier, const char* bytes, int size,
195 bool ignore_buffer = false);
196 void subscribe(const std::string& identifier);
197 void unsubscribe(const std::string& identifier);
199
200 std::deque<protobuf::InprocControl>& control_buffer() { return control_buffer_; }
202
203 private:
204 private:
205 zmq::socket_t control_socket_;
206 zmq::socket_t publish_socket_;
207 bool hold_{true};
208 bool have_pubsub_sockets_{false};
209
210 std::deque<std::pair<std::string, std::vector<char>>>
211 publish_queue_; //used before hold == false
212
213 // buffer messages while waiting for (un)subscribe ack
214 std::deque<protobuf::InprocControl> control_buffer_;
215};
216
217// run in a separate thread to allow zmq_.poll() to block without interrupting the main thread
219{
220 public:
222 zmq::context_t& context, std::atomic<bool>& alive,
223 std::shared_ptr<std::condition_variable_any> poller_cv);
224 void run();
226 {
227#ifdef USE_OLD_CPPZMQ_SETSOCKOPT
228 control_socket_.setsockopt(ZMQ_LINGER, 0);
229 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
230 manager_socket_.setsockopt(ZMQ_LINGER, 0);
231#else
232 control_socket_.set(zmq::sockopt::linger, 0);
233 subscribe_socket_.set(zmq::sockopt::linger, 0);
234 manager_socket_.set(zmq::sockopt::linger, 0);
235#endif
236 }
237
238 private:
239 void poll(long timeout_ms = -1);
240 void control_data(const zmq::message_t& zmq_msg);
241 void subscribe_data(const zmq::message_t& zmq_msg);
242 void manager_data(const zmq::message_t& zmq_msg);
243 void send_control_msg(const protobuf::InprocControl& control);
244 void send_manager_request(const protobuf::ManagerRequest& req);
245
246 private:
248 zmq::socket_t control_socket_;
249 zmq::socket_t subscribe_socket_;
250 zmq::socket_t manager_socket_;
251 std::atomic<bool>& alive_;
252 std::shared_ptr<std::condition_variable_any> poller_cv_;
253 std::vector<zmq::pollitem_t> poll_items_;
254 enum
255 {
256 SOCKET_CONTROL = 0,
257 SOCKET_MANAGER = 1,
258 SOCKET_SUBSCRIBE = 2
259 };
260 enum
261 {
262 NUMBER_SOCKETS = 3
263 };
264 bool have_pubsub_sockets_{false};
265 bool hold_{true};
266 bool manager_waiting_for_reply_{false};
267
268 goby::time::SystemClock::time_point next_hold_state_request_time_{
270 const goby::time::SystemClock::duration hold_state_request_period_{
271 std::chrono::milliseconds(100)};
272};
273
274template <typename InnerTransporter,
275 template <typename Derived, typename InnerTransporterType> class PortalBase>
277 : public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase>,
278 InnerTransporter>
279{
280 public:
282 InnerTransporter>;
283
285 : cfg_(cfg),
286 zmq_context_(cfg.zeromq_number_io_threads()),
287 zmq_main_(zmq_context_),
288 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
289 {
290 _init();
291 }
292
293 InterProcessPortalImplementation(InnerTransporter& inner,
295 : Base(inner),
296 cfg_(cfg),
297 zmq_context_(cfg.zeromq_number_io_threads()),
298 zmq_main_(zmq_context_),
299 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
300 {
301 _init();
302 }
303
305 {
306 if (zmq_thread_)
307 {
308 zmq_main_.reader_shutdown();
309 zmq_thread_->join();
310 }
311 }
312
314 void ready() { ready_ = true; }
315
317 bool hold_state() { return zmq_main_.hold_state(); }
318
319 friend Base;
320 friend typename Base::Base;
321
322 private:
323 void _init()
324 {
326
327 // start zmq read thread
328 zmq_thread_ = std::make_unique<std::thread>([this]() { zmq_read_thread_.run(); });
329
330 while (!zmq_main_.subscribe_ready())
331 {
332 protobuf::InprocControl control_msg;
333 if (zmq_main_.recv(&control_msg))
334 {
335 switch (control_msg.type())
336 {
338 zmq_main_.set_publish_cfg(control_msg.publish_socket());
339 break;
340 default: break;
341 }
342 }
343 }
344
345 //
346 // Handle hold state request/response using pub sub so that we ensure
347 // publishing and subscribe is completely functional before releasing the hold
348 //
349 _subscribe<protobuf::ManagerResponse, middleware::MarshallingScheme::PROTOBUF>(
350 [this](std::shared_ptr<const protobuf::ManagerResponse> response) {
351 goby::glog.is_debug3() && goby::glog << "Received ManagerResponse: "
352 << response->ShortDebugString() << std::endl;
353 if (response->request() == protobuf::PROVIDE_HOLD_STATE &&
354 response->client_pid() == getpid() &&
355 response->client_name() == cfg_.client_name())
356 {
357 zmq_main_.set_hold_state(response->hold());
358 }
359
360 // we're good to go now, so let's unsubscribe to this group
361 if (zmq_main_.publish_ready())
362 {
363 _unsubscribe<protobuf::ManagerResponse,
367 }
368 },
370 }
371
372 template <typename Data, int scheme>
373 void _publish(const Data& d, const goby::middleware::Group& group,
374 const middleware::Publisher<Data>& /*publisher*/, bool ignore_buffer = false)
375 {
378 _publish_serialized(type_name, scheme, bytes, group, ignore_buffer);
379 }
380
381 void _publish_serialized(std::string type_name, int scheme, const std::vector<char>& bytes,
382 const goby::middleware::Group& group, bool ignore_buffer = false)
383 {
384 std::string identifier = _make_fully_qualified_identifier(type_name, scheme, group) + '\0';
385 zmq_main_.publish(identifier, &bytes[0], bytes.size(), ignore_buffer);
386 }
387
388 template <typename Data, int scheme>
389 void _subscribe(std::function<void(std::shared_ptr<const Data> d)> f,
391 const middleware::Subscriber<Data>& /*subscriber*/)
392 {
393 std::string identifier =
394 _make_identifier<Data, scheme>(group, IdentifierWildcard::PROCESS_THREAD_WILDCARD);
395
396 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
397 f, group,
398 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
399 [=](const Data& /*d*/) { return group; }));
400
401 if (forwarder_subscriptions_.count(identifier) == 0 &&
402 portal_subscriptions_.count(identifier) == 0)
403 zmq_main_.subscribe(identifier);
404 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
405 }
406
407 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
408 std::function<void(const std::vector<unsigned char>&, int scheme, const std::string& type,
410 f,
411 const std::set<int>& schemes, const std::string& type_regex, const std::string& group_regex)
412 {
413 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
414 f, schemes, type_regex, group_regex);
415 _subscribe_regex(new_sub);
416 return new_sub;
417 }
418
419 template <typename Data, int scheme>
420 void _unsubscribe(
422 const middleware::Subscriber<Data>& /*subscriber*/ = middleware::Subscriber<Data>())
423 {
424 std::string identifier =
425 _make_identifier<Data, scheme>(group, IdentifierWildcard::PROCESS_THREAD_WILDCARD);
426
427 portal_subscriptions_.erase(identifier);
428
429 // If no forwarded subscriptions, do the actual unsubscribe
430 if (forwarder_subscriptions_.count(identifier) == 0)
431 zmq_main_.unsubscribe(identifier);
432 }
433
434 void _unsubscribe_all(
435 const std::string& subscriber_id = identifier_part_to_string(std::this_thread::get_id()))
436 {
437 // portal unsubscribe
438 if (subscriber_id == identifier_part_to_string(std::this_thread::get_id()))
439 {
440 for (const auto& p : portal_subscriptions_)
441 {
442 const auto& identifier = p.first;
443 if (forwarder_subscriptions_.count(identifier) == 0)
444 zmq_main_.unsubscribe(identifier);
445 }
446 portal_subscriptions_.clear();
447 }
448 else // forwarder unsubscribe
449 {
450 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
451 _forwarder_unsubscribe(
452 subscriber_id,
453 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
454 }
455
456 // regex
457 if (regex_subscriptions_.size() > 0)
458 {
459 regex_subscriptions_.erase(subscriber_id);
460 if (regex_subscriptions_.empty())
461 zmq_main_.unsubscribe("/");
462 }
463 }
464
465 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
466 {
467 int items = 0;
468 protobuf::InprocControl new_control_msg;
469
470#ifdef USE_OLD_ZMQ_CPP_API
471 int flags = ZMQ_NOBLOCK;
472#else
473 auto flags = zmq::recv_flags::dontwait;
474#endif
475
476 while (zmq_main_.recv(&new_control_msg, flags))
477 zmq_main_.control_buffer().push_back(new_control_msg);
478
479 while (!zmq_main_.control_buffer().empty())
480 {
481 const auto& control_msg = zmq_main_.control_buffer().front();
482 switch (control_msg.type())
483 {
485 {
486 ++items;
487 if (lock)
488 lock.reset();
489
490 const auto& data = control_msg.received_data();
491
492 std::string group, type, thread;
493 int scheme, process;
494 std::tie(group, scheme, type, process, thread) = parse_identifier(data);
495 std::string identifier = _make_identifier(
497
498 // build a set so if any of the handlers unsubscribes, we still have a pointer to the middleware::SerializationHandlerBase<>
499 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>>
500 subs_to_post;
501 auto portal_range = portal_subscriptions_.equal_range(identifier);
502 for (auto it = portal_range.first; it != portal_range.second; ++it)
503 subs_to_post.push_back(it->second);
504 auto forwarder_it = forwarder_subscriptions_.find(identifier);
505 if (forwarder_it != forwarder_subscriptions_.end())
506 subs_to_post.push_back(forwarder_it->second);
507
508 // actually post the data
509 {
510 const auto& data = control_msg.received_data();
511 auto null_delim_it = std::find(std::begin(data), std::end(data), '\0');
512 for (auto& sub : subs_to_post)
513 {
514 if (auto sub_sp = sub.lock())
515 sub_sp->post(null_delim_it + 1, data.end());
516 }
517 }
518
519 if (!regex_subscriptions_.empty())
520 {
521 auto null_delim_it = std::find(std::begin(data), std::end(data), '\0');
522
523 bool forwarder_subscription_posted = false;
524 for (auto& sub : regex_subscriptions_)
525 {
526 // only post at most once for forwarders as the threads will filter
527 bool is_forwarded_sub =
528 sub.first != identifier_part_to_string(std::this_thread::get_id());
529 if (is_forwarded_sub && forwarder_subscription_posted)
530 continue;
531
532 if (sub.second->post(null_delim_it + 1, data.end(), scheme, type,
533 group) &&
534 is_forwarded_sub)
535 forwarder_subscription_posted = true;
536 }
537 }
538 }
539 break;
540
542 {
543 protobuf::ManagerRequest req;
544
545 req.set_ready(ready_);
546 req.set_request(protobuf::PROVIDE_HOLD_STATE);
547 req.set_client_name(cfg_.client_name());
548 req.set_client_pid(getpid());
549
550 goby::glog.is_debug3() && goby::glog << "Published ManagerRequest: "
551 << req.ShortDebugString() << std::endl;
552
553 _publish<protobuf::ManagerRequest, middleware::MarshallingScheme::PROTOBUF>(
555 middleware::Publisher<protobuf::ManagerRequest>(), true);
556 }
557 break;
558
559 default: break;
560 }
561 zmq_main_.control_buffer().pop_front();
562 }
563 return items;
564 }
565
566 void _receive_publication_forwarded(
568 {
569 std::string identifier =
570 _make_identifier(msg.key().type(), msg.key().marshalling_scheme(), msg.key().group(),
572 '\0';
573 auto& bytes = msg.data();
574 zmq_main_.publish(identifier, &bytes[0], bytes.size());
575 }
576
577 void _receive_subscription_forwarded(
578 const std::shared_ptr<const middleware::SerializationHandlerBase<>>& subscription)
579 {
580 std::string identifier = _make_identifier(subscription->type_name(), subscription->scheme(),
581 subscription->subscribed_group(),
583
585 goby::glog << "Received subscription forwarded for identifier [" << identifier
586 << "] from subscriber id: " << subscription->subscriber_id() << std::endl;
587
588 switch (subscription->action())
589 {
591 {
592 // insert if this thread hasn't already subscribed
593 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
594 identifier) == 0)
595 {
596 // first to subscribe from a Forwarder
597 if (forwarder_subscriptions_.count(identifier) == 0)
598 {
599 // first to subscribe (locally or forwarded)
600 if (portal_subscriptions_.count(identifier) == 0)
601 zmq_main_.subscribe(identifier);
602
603 // create Forwarder subscription
604 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
605 }
606 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
607 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
608 }
609 }
610 break;
611
613 {
614 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
615 }
616 break;
617
618 default: break;
619 }
620 }
621
622 void _forwarder_unsubscribe(const std::string& subscriber_id, const std::string& identifier)
623 {
624 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
625 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
626 {
627 bool no_forwarder_subscribers = true;
628 for (const auto& p : forwarder_subscription_identifiers_)
629 {
630 if (p.second.count(identifier) != 0)
631 {
632 no_forwarder_subscribers = false;
633 break;
634 }
635 }
636
637 // if no Forwarder subscriptions left
638 if (no_forwarder_subscribers)
639 {
640 // erase the Forwarder subscription
641 forwarder_subscriptions_.erase(it->second);
642
643 // do the actual unsubscribe if we aren't subscribe locally as well
644 if (portal_subscriptions_.count(identifier) == 0)
645 zmq_main_.unsubscribe(identifier);
646 }
647
648 forwarder_subscription_identifiers_[subscriber_id].erase(it);
649 }
650 }
651
652 void _receive_regex_subscription_forwarded(
653 std::shared_ptr<const middleware::SerializationSubscriptionRegex> subscription)
654 {
655 _subscribe_regex(subscription);
656 }
657
658 void _subscribe_regex(
659 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
660 {
661 if (regex_subscriptions_.empty())
662 zmq_main_.subscribe("/");
663
664 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
665 }
666
667 template <typename Data, int scheme>
668 std::string _make_identifier(const goby::middleware::Group& group, IdentifierWildcard wildcard)
669 {
671 scheme, group, wildcard);
672 }
673
674 std::string _make_fully_qualified_identifier(const std::string& type_name, int scheme,
675 const std::string& group)
676 {
677 return _make_identifier(type_name, scheme, group, IdentifierWildcard::THREAD_WILDCARD) +
678 id_component(std::this_thread::get_id(), threads_);
679 }
680
681 template <typename Data, int scheme>
682 std::string _make_identifier(const Data& d, const goby::middleware::Group& group,
683 IdentifierWildcard wildcard)
684 {
686 scheme, group, wildcard);
687 }
688
689 std::string _make_identifier(const std::string& type_name, int scheme, const std::string& group,
690 IdentifierWildcard wildcard)
691 {
692 return make_identifier(type_name, scheme, group, wildcard, process_, &schemes_, &threads_);
693 }
694
695 // group, scheme, type, process, thread
696 std::tuple<std::string, int, std::string, int, std::size_t>
697 parse_identifier(const std::string& identifier)
698 {
699 const int number_elements = 5;
700 std::string::size_type previous_slash = 0;
701 std::vector<std::string> elem;
702 for (auto i = 0; i < number_elements; ++i)
703 {
704 auto slash_pos = identifier.find('/', previous_slash + 1);
705 elem.push_back(identifier.substr(previous_slash + 1, slash_pos - (previous_slash + 1)));
706 previous_slash = slash_pos;
707 }
708 return std::make_tuple(elem[0], middleware::MarshallingScheme::from_string(elem[1]),
709 elem[2], std::stoi(elem[3]), std::stoull(elem[4], nullptr, 16));
710 }
711
712 private:
713 const protobuf::InterProcessPortalConfig cfg_;
714
715 std::unique_ptr<std::thread> zmq_thread_;
716 std::atomic<bool> zmq_alive_{true};
717 zmq::context_t zmq_context_;
718 InterProcessPortalMainThread zmq_main_;
719 InterProcessPortalReadThread zmq_read_thread_;
720
721 // maps identifier to subscription
722 std::unordered_multimap<std::string,
723 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
724 portal_subscriptions_;
725 // only one subscription for each forwarded identifier
726 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
727 forwarder_subscriptions_;
728 std::unordered_map<
729 std::string, std::unordered_map<std::string, typename decltype(
730 forwarder_subscriptions_)::const_iterator>>
731 forwarder_subscription_identifiers_;
732
733 std::unordered_multimap<std::string,
734 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
735 regex_subscriptions_;
736 std::string process_{std::to_string(getpid())};
737 std::unordered_map<int, std::string> schemes_;
738 std::unordered_map<std::thread::id, std::string> threads_;
739
740 bool ready_{false};
741};
742
744{
745 public:
746 Router(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg)
747 : context_(context), cfg_(cfg)
748 {
749 }
750
751 void run();
752 unsigned last_port(zmq::socket_t& socket);
753
754 Router(Router&) = delete;
756
757 public:
758 std::atomic<unsigned> pub_port{0};
759 std::atomic<unsigned> sub_port{0};
760
761 private:
762 zmq::context_t& context_;
764};
765
767{
768 public:
769 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
770 const Router& router);
771
772 Manager(zmq::context_t& context, const protobuf::InterProcessPortalConfig& cfg,
773 const Router& router, const protobuf::InterProcessManagerHold& hold)
774 : Manager(context, cfg, router)
775 {
776 for (const auto& req_c : hold.required_client()) required_clients_.insert(req_c);
777 }
778
779 void run();
780
784
786
787 private:
788 std::set<std::string> reported_clients_;
789 std::set<std::string> required_clients_;
790
791 zmq::context_t& context_;
793 const Router& router_;
794
795 std::vector<zmq::pollitem_t> poll_items_;
796 enum
797 {
798 SOCKET_MANAGER = 0,
799 SOCKET_SUBSCRIBE = 1,
800 };
801 enum
802 {
803 NUMBER_SOCKETS = 2
804 };
805
806 std::unique_ptr<zmq::socket_t> manager_socket_;
807 std::unique_ptr<zmq::socket_t> subscribe_socket_;
808 std::unique_ptr<zmq::socket_t> publish_socket_;
809
810 std::string zmq_filter_req_{make_identifier(
812 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
813 middleware::scheme<protobuf::ManagerRequest>(), groups::manager_request,
814 IdentifierWildcard::PROCESS_THREAD_WILDCARD, std::to_string(getpid()))};
815
816 std::string zmq_filter_rep_{
817 make_identifier(middleware::SerializerParserHelper<
818 protobuf::ManagerResponse,
819 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
820 middleware::scheme<protobuf::ManagerResponse>(), groups::manager_response,
821 IdentifierWildcard::NO_WILDCARDS, std::to_string(getpid())) +
822 std::string(1, '\0')};
823}; // namespace zeromq
824
825template <typename InnerTransporter = middleware::NullTransporter>
828
829} // namespace zeromq
830} // namespace goby
831
832#endif
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition group.h:60
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition publisher.h:40
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition subscriber.h:37
void set_lock_action(logger_lock::LockAction lock_action)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
bool hold_state()
When using hold functionality, returns whether the system is holding (true) and thus waiting for all ...
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
void ready()
When using hold functionality, call when the process is ready to receive publications (typically done...
void set_publish_cfg(const protobuf::Socket &cfg)
void unsubscribe(const std::string &identifier)
std::deque< protobuf::InprocControl > & control_buffer()
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())
InterProcessPortalReadThread(const protobuf::InterProcessPortalConfig &cfg, zmq::context_t &context, std::atomic< bool > &alive, std::shared_ptr< std::condition_variable_any > poller_cv)
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router, const protobuf::InterProcessManagerHold &hold)
protobuf::Socket subscribe_socket_cfg()
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router)
protobuf::Socket publish_socket_cfg()
protobuf::ManagerResponse handle_request(const protobuf::ManagerRequest &pb_request)
Router & operator=(Router &)=delete
Router(Router &)=delete
std::atomic< unsigned > sub_port
unsigned last_port(zmq::socket_t &socket)
Router(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg)
std::atomic< unsigned > pub_port
static constexpr InprocControlType REQUEST_HOLD_STATE
static constexpr InprocControlType RECEIVE
::goby::zeromq::protobuf::InprocControl_InprocControlType type() const
static constexpr InprocControlType PUB_CONFIGURATION
const ::goby::zeromq::protobuf::Socket & publish_socket() const
const std::string & required_client(int index) const
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition cstr.h:65
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition common.h:53
constexpr goby::middleware::Group manager_response
constexpr goby::middleware::Group manager_request
std::string identifier_part_to_string(int i)
int zmq_send_flags_type
int zmq_recv_flags_type
void setup_socket(zmq::socket_t &socket, const protobuf::Socket &cfg)
std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
const std::string & id_component(const Key &k, std::unordered_map< Key, std::string > &map)
Given key, find the string in the map, or create it (to_string) and store it, and return the string.
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::PROTOBUF_NAMESPACE_ID::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
util::FlexOstream glog
Access the Goby logger through this object.
type
Generic JSON types used in JWTs.
Definition jwt.h:2072
static int from_string(const std::string &s)
Convert from a string to a marshalling scheme id.
Definition interface.h:77
static std::string to_string(int e)
Convert a known marshalling scheme to a human-readable string or an unknown scheme to the string repr...
Definition interface.h:67
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition interface.h:98
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition interface.h:107
std::chrono::time_point< SystemClock > time_point
static time_point now() noexcept
Returns the current system time unless SimulatorSettings::using_sim_time is set to true,...
std::chrono::microseconds duration
Duration type.