24 #ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
25 #define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
31 #include <condition_variable>
42 #include <unordered_map>
65 #if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
66 #define USE_OLD_ZMQ_CPP_API
69 #if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
70 #define USE_OLD_CPPZMQ_SETSOCKOPT
73 #if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
74 #define USE_OLD_CPPZMQ_POLL
81 template <
typename Data>
class Publisher;
112 template <
typename Key>
113 const std::string&
id_component(
const Key& k, std::unordered_map<Key, std::string>& map)
115 auto it = map.find(k);
120 auto it_pair = map.insert(std::make_pair(k, v));
121 return it_pair.first->second;
127 std::unordered_map<int, std::string>* schemes_buffer =
nullptr,
128 std::unordered_map<std::thread::id, std::string>* threads_buffer =
nullptr)
135 auto thread = std::this_thread::get_id();
136 return (
"/" +
group +
"/" +
139 type_name +
"/" + process +
"/" +
145 return (
"/" +
group +
"/" +
148 type_name +
"/" + process +
"/");
152 return (
"/" +
group +
"/" +
160 #ifdef USE_OLD_ZMQ_CPP_API
175 #ifdef USE_OLD_CPPZMQ_SETSOCKOPT
176 control_socket_.setsockopt(ZMQ_LINGER, 0);
177 publish_socket_.setsockopt(ZMQ_LINGER, 0);
179 control_socket_.set(zmq::sockopt::linger, 0);
180 publish_socket_.set(zmq::sockopt::linger, 0);
187 bool recv(protobuf::InprocControl* control_msg,
194 void publish(
const std::string& identifier,
const char* bytes,
int size,
195 bool ignore_buffer =
false);
200 std::deque<protobuf::InprocControl>&
control_buffer() {
return control_buffer_; }
208 bool have_pubsub_sockets_{
false};
210 std::deque<std::pair<std::string, std::vector<char>>>
214 std::deque<protobuf::InprocControl> control_buffer_;
222 zmq::context_t& context, std::atomic<bool>& alive,
223 std::shared_ptr<std::condition_variable_any> poller_cv);
227 #ifdef USE_OLD_CPPZMQ_SETSOCKOPT
228 control_socket_.setsockopt(ZMQ_LINGER, 0);
229 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
230 manager_socket_.setsockopt(ZMQ_LINGER, 0);
232 control_socket_.set(zmq::sockopt::linger, 0);
233 subscribe_socket_.set(zmq::sockopt::linger, 0);
234 manager_socket_.set(zmq::sockopt::linger, 0);
239 void poll(
long timeout_ms = -1);
240 void control_data(
const zmq::message_t& zmq_msg);
241 void subscribe_data(
const zmq::message_t& zmq_msg);
242 void manager_data(
const zmq::message_t& zmq_msg);
243 void send_control_msg(
const protobuf::InprocControl& control);
244 void send_manager_request(
const protobuf::ManagerRequest& req);
247 const protobuf::InterProcessPortalConfig& cfg_;
251 std::atomic<bool>& alive_;
252 std::shared_ptr<std::condition_variable_any> poller_cv_;
253 std::vector<zmq::pollitem_t> poll_items_;
264 bool have_pubsub_sockets_{
false};
266 bool manager_waiting_for_reply_{
false};
271 std::chrono::milliseconds(100)};
274 template <
typename InnerTransporter,
275 template <
typename Derived,
typename InnerTransporterType>
class PortalBase>
277 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase>,
286 zmq_context_(cfg.zeromq_number_io_threads()),
287 zmq_main_(zmq_context_),
288 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
294 const protobuf::InterProcessPortalConfig& cfg)
297 zmq_context_(cfg.zeromq_number_io_threads()),
298 zmq_main_(zmq_context_),
299 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
320 friend typename Base::Base;
328 zmq_thread_ = std::make_unique<std::thread>([
this]() { zmq_read_thread_.
run(); });
332 protobuf::InprocControl control_msg;
333 if (zmq_main_.
recv(&control_msg))
335 switch (control_msg.type())
337 case protobuf::InprocControl::PUB_CONFIGURATION:
349 _subscribe<protobuf::ManagerResponse, middleware::MarshallingScheme::PROTOBUF>(
350 [
this](std::shared_ptr<const protobuf::ManagerResponse> response) {
352 << response->ShortDebugString() << std::endl;
354 response->client_pid() == getpid() &&
355 response->client_name() == cfg_.client_name())
357 zmq_main_.set_hold_state(response->hold());
363 _unsubscribe<protobuf::ManagerResponse,
364 middleware::MarshallingScheme::PROTOBUF>(
365 groups::manager_response,
366 middleware::Subscriber<protobuf::ManagerResponse>());
372 template <
typename Data,
int scheme>
378 _publish_serialized(type_name,
scheme, bytes,
group, ignore_buffer);
381 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
384 std::string identifier = _make_fully_qualified_identifier(type_name,
scheme,
group) +
'\0';
385 zmq_main_.
publish(identifier, &bytes[0], bytes.size(), ignore_buffer);
388 template <
typename Data,
int scheme>
389 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
391 const middleware::Subscriber<Data>& )
393 std::string identifier =
396 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
398 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
399 [=](
const Data& ) {
return group; }));
401 if (forwarder_subscriptions_.count(identifier) == 0 &&
402 portal_subscriptions_.count(identifier) == 0)
404 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
407 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
408 std::function<
void(
const std::vector<unsigned char>&,
int scheme,
const std::string&
type,
411 const std::set<int>& schemes,
const std::string& type_regex,
const std::string& group_regex)
413 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
414 f, schemes, type_regex, group_regex);
415 _subscribe_regex(new_sub);
419 template <
typename Data,
int scheme>
422 const middleware::Subscriber<Data>& = middleware::Subscriber<Data>())
424 std::string identifier =
427 portal_subscriptions_.erase(identifier);
430 if (forwarder_subscriptions_.count(identifier) == 0)
434 void _unsubscribe_all(
440 for (
const auto& p : portal_subscriptions_)
442 const auto& identifier = p.first;
443 if (forwarder_subscriptions_.count(identifier) == 0)
446 portal_subscriptions_.clear();
450 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
451 _forwarder_unsubscribe(
453 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
457 if (regex_subscriptions_.size() > 0)
459 regex_subscriptions_.erase(subscriber_id);
460 if (regex_subscriptions_.empty())
465 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
468 protobuf::InprocControl new_control_msg;
470 #ifdef USE_OLD_ZMQ_CPP_API
471 int flags = ZMQ_NOBLOCK;
473 auto flags = zmq::recv_flags::dontwait;
476 while (zmq_main_.
recv(&new_control_msg, flags))
482 switch (control_msg.type())
484 case protobuf::InprocControl::RECEIVE:
490 const auto& data = control_msg.received_data();
494 std::tie(
group,
scheme,
type, process, thread) = parse_identifier(data);
495 std::string identifier = _make_identifier(
499 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>>
501 auto portal_range = portal_subscriptions_.equal_range(identifier);
502 for (
auto it = portal_range.first; it != portal_range.second; ++it)
503 subs_to_post.push_back(it->second);
504 auto forwarder_it = forwarder_subscriptions_.find(identifier);
505 if (forwarder_it != forwarder_subscriptions_.end())
506 subs_to_post.push_back(forwarder_it->second);
510 const auto& data = control_msg.received_data();
511 auto null_delim_it = std::find(std::begin(data), std::end(data),
'\0');
512 for (
auto& sub : subs_to_post)
514 if (
auto sub_sp = sub.lock())
515 sub_sp->post(null_delim_it + 1, data.end());
519 if (!regex_subscriptions_.empty())
521 auto null_delim_it = std::find(std::begin(data), std::end(data),
'\0');
523 bool forwarder_subscription_posted =
false;
524 for (
auto& sub : regex_subscriptions_)
527 bool is_forwarded_sub =
529 if (is_forwarded_sub && forwarder_subscription_posted)
532 if (sub.second->post(null_delim_it + 1, data.end(),
scheme,
type,
535 forwarder_subscription_posted =
true;
541 case protobuf::InprocControl::REQUEST_HOLD_STATE:
543 protobuf::ManagerRequest req;
545 req.set_ready(ready_);
547 req.set_client_name(cfg_.client_name());
548 req.set_client_pid(getpid());
551 << req.ShortDebugString() << std::endl;
553 _publish<protobuf::ManagerRequest, middleware::MarshallingScheme::PROTOBUF>(
555 middleware::Publisher<protobuf::ManagerRequest>(),
true);
566 void _receive_publication_forwarded(
567 const goby::middleware::protobuf::SerializerTransporterMessage&
msg)
569 std::string identifier =
570 _make_identifier(
msg.key().type(),
msg.key().marshalling_scheme(),
msg.key().group(),
573 auto& bytes =
msg.data();
574 zmq_main_.
publish(identifier, &bytes[0], bytes.size());
577 void _receive_subscription_forwarded(
578 const std::shared_ptr<
const middleware::SerializationHandlerBase<>>& subscription)
580 std::string identifier = _make_identifier(subscription->type_name(), subscription->scheme(),
581 subscription->subscribed_group(),
585 goby::glog <<
"Received subscription forwarded for identifier [" << identifier
586 <<
"] from subscriber id: " << subscription->subscriber_id() << std::endl;
588 switch (subscription->action())
593 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
597 if (forwarder_subscriptions_.count(identifier) == 0)
600 if (portal_subscriptions_.count(identifier) == 0)
604 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
606 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
607 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
614 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
622 void _forwarder_unsubscribe(
const std::string& subscriber_id,
const std::string& identifier)
624 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
625 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
627 bool no_forwarder_subscribers =
true;
628 for (
const auto& p : forwarder_subscription_identifiers_)
630 if (p.second.count(identifier) != 0)
632 no_forwarder_subscribers =
false;
638 if (no_forwarder_subscribers)
641 forwarder_subscriptions_.erase(it->second);
644 if (portal_subscriptions_.count(identifier) == 0)
648 forwarder_subscription_identifiers_[subscriber_id].erase(it);
652 void _receive_regex_subscription_forwarded(
653 std::shared_ptr<const middleware::SerializationSubscriptionRegex> subscription)
655 _subscribe_regex(subscription);
658 void _subscribe_regex(
659 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
661 if (regex_subscriptions_.empty())
664 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
667 template <
typename Data,
int scheme>
674 std::string _make_fully_qualified_identifier(
const std::string& type_name,
int scheme,
675 const std::string&
group)
681 template <
typename Data,
int scheme>
689 std::string _make_identifier(
const std::string& type_name,
int scheme,
const std::string&
group,
696 std::tuple<std::string, int, std::string, int, std::size_t>
697 parse_identifier(
const std::string& identifier)
699 const int number_elements = 5;
700 std::string::size_type previous_slash = 0;
701 std::vector<std::string> elem;
702 for (
auto i = 0; i < number_elements; ++i)
704 auto slash_pos = identifier.find(
'/', previous_slash + 1);
705 elem.push_back(identifier.substr(previous_slash + 1, slash_pos - (previous_slash + 1)));
706 previous_slash = slash_pos;
709 elem[2], std::stoi(elem[3]), std::stoull(elem[4],
nullptr, 16));
713 const protobuf::InterProcessPortalConfig cfg_;
715 std::unique_ptr<std::thread> zmq_thread_;
716 std::atomic<bool> zmq_alive_{
true};
717 zmq::context_t zmq_context_;
718 InterProcessPortalMainThread zmq_main_;
719 InterProcessPortalReadThread zmq_read_thread_;
722 std::unordered_multimap<std::string,
723 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
724 portal_subscriptions_;
726 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
727 forwarder_subscriptions_;
729 std::string, std::unordered_map<std::string,
typename decltype(
730 forwarder_subscriptions_)::const_iterator>>
731 forwarder_subscription_identifiers_;
733 std::unordered_multimap<std::string,
734 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
735 regex_subscriptions_;
737 std::unordered_map<int, std::string> schemes_;
738 std::unordered_map<std::thread::id, std::string> threads_;
746 Router(zmq::context_t& context,
const protobuf::InterProcessPortalConfig& cfg)
747 : context_(context), cfg_(cfg)
762 zmq::context_t& context_;
763 const protobuf::InterProcessPortalConfig& cfg_;
769 Manager(zmq::context_t& context,
const protobuf::InterProcessPortalConfig& cfg,
772 Manager(zmq::context_t& context,
const protobuf::InterProcessPortalConfig& cfg,
773 const Router& router,
const protobuf::InterProcessManagerHold& hold)
774 :
Manager(context, cfg, router)
776 for (
const auto& req_c : hold.required_client()) required_clients_.insert(req_c);
781 protobuf::ManagerResponse
handle_request(
const protobuf::ManagerRequest& pb_request);
788 std::set<std::string> reported_clients_;
789 std::set<std::string> required_clients_;
791 zmq::context_t& context_;
792 const protobuf::InterProcessPortalConfig& cfg_;
795 std::vector<zmq::pollitem_t> poll_items_;
799 SOCKET_SUBSCRIBE = 1,
806 std::unique_ptr<zmq::socket_t> manager_socket_;
807 std::unique_ptr<zmq::socket_t> subscribe_socket_;
808 std::unique_ptr<zmq::socket_t> publish_socket_;
812 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
816 std::string zmq_filter_rep_{
818 protobuf::ManagerResponse,
819 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
822 std::string(1,
'\0')};
825 template <
typename InnerTransporter = m
iddleware::NullTransporter>
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Class that holds additional metadata and callback functions related to a publication (and is optional...
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
void set_lock_action(logger_lock::LockAction lock_action)
InterProcessPortalImplementation(const protobuf::InterProcessPortalConfig &cfg)
bool hold_state()
When using hold functionality, returns whether the system is holding (true) and thus waiting for all ...
InterProcessPortalImplementation(InnerTransporter &inner, const protobuf::InterProcessPortalConfig &cfg)
void ready()
When using hold functionality, call when the process is ready to receive publications (typically done...
~InterProcessPortalImplementation()
std::deque< protobuf::InprocControl > & control_buffer()
void set_publish_cfg(const protobuf::Socket &cfg)
~InterProcessPortalMainThread()
void unsubscribe(const std::string &identifier)
InterProcessPortalMainThread(zmq::context_t &context)
void subscribe(const std::string &identifier)
void publish(const std::string &identifier, const char *bytes, int size, bool ignore_buffer=false)
void send_control_msg(const protobuf::InprocControl &control)
bool recv(protobuf::InprocControl *control_msg, zmq_recv_flags_type flags=zmq_recv_flags_type())
void set_hold_state(bool hold)
InterProcessPortalReadThread(const protobuf::InterProcessPortalConfig &cfg, zmq::context_t &context, std::atomic< bool > &alive, std::shared_ptr< std::condition_variable_any > poller_cv)
~InterProcessPortalReadThread()
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router, const protobuf::InterProcessManagerHold &hold)
protobuf::Socket subscribe_socket_cfg()
Manager(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg, const Router &router)
protobuf::Socket publish_socket_cfg()
protobuf::ManagerResponse handle_request(const protobuf::ManagerRequest &pb_request)
std::atomic< unsigned > sub_port
unsigned last_port(zmq::socket_t &socket)
Router(zmq::context_t &context, const protobuf::InterProcessPortalConfig &cfg)
Router & operator=(Router &)=delete
std::atomic< unsigned > pub_port
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
goby::util::logger::GroupSetter group(std::string n)
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
std::string thread_id(std::thread::id i=std::this_thread::get_id())
constexpr goby::middleware::Group manager_response
constexpr goby::middleware::Group manager_request
@ PROCESS_THREAD_WILDCARD
std::string identifier_part_to_string(int i)
void setup_socket(zmq::socket_t &socket, const protobuf::Socket &cfg)
const std::string & id_component(const Key &k, std::unordered_map< Key, std::string > &map)
Given key, find the string in the map, or create it (to_string) and store it, and return the string.
std::string make_identifier(const std::string &type_name, int scheme, const std::string &group, IdentifierWildcard wildcard, const std::string &process, std::unordered_map< int, std::string > *schemes_buffer=nullptr, std::unordered_map< std::thread::id, std::string > *threads_buffer=nullptr)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
type
Generic JSON types used in JWTs.
static int from_string(const std::string &s)
Convert from a string to a marshalling scheme id.
static std::string to_string(int e)
Convert a known marshalling scheme to a human-readable string or an unknown scheme to the string repr...
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.
std::chrono::time_point< SystemClock > time_point
static time_point now() noexcept
Returns the current system time unless SimulatorSettings::using_sim_time is set to true,...
std::chrono::microseconds duration
Duration type.