24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERMODULE_H
29 #include <sys/types.h>
48 inline bool operator<(
const SerializerTransporterKey& k1,
const SerializerTransporterKey& k2)
50 return k1.marshalling_scheme() != k2.marshalling_scheme()
51 ? (k1.marshalling_scheme() < k2.marshalling_scheme())
52 : (k1.type() != k2.type()
53 ? (k1.type() < k2.type())
54 : (k1.group() != k2.group() ? (k1.group() < k2.group()) :
false));
58 template <
typename Derived,
typename InnerTransporter>
65 template <
typename InnerTransporter>
82 template <
typename Data,
int scheme>
88 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
89 goby::middleware::protobuf::SerializerTransporterMessage
msg;
90 auto* key =
msg.mutable_key();
92 key->set_marshalling_scheme(
scheme);
94 key->set_group(std::string(
group));
95 msg.set_allocated_data(sbytes);
97 *key->mutable_cfg() = publisher.
cfg();
98 this->
inner().template publish<Base::to_portal_group_>(
msg);
101 template <
typename Data,
int scheme>
102 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
const Group&
group,
105 if (subscriptions_.empty())
108 protobuf::SerializerTransporterMessage>(
109 [
this](
const protobuf::SerializerTransporterMessage&
msg) {
110 auto range = subscriptions_.equal_range(
msg.key());
111 for (
auto it = range.first; it != range.second; ++it)
112 { it->second->post(
msg.data().begin(),
msg.data().end()); }
115 auto local_subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
117 middleware::Subscriber<Data>(goby::middleware::protobuf::TransporterConfig(),
118 [=](
const Data& d) {
return group; }));
120 using goby::middleware::intermodule::protobuf::Subscription;
121 Subscription subscription;
123 subscription.mutable_key()->set_marshalling_scheme(
scheme);
125 subscription.mutable_key()->set_group(std::string(
group));
126 subscription.set_action(Subscription::SUBSCRIBE);
128 this->
inner().template publish<Base::to_portal_group_>(subscription);
130 subscriptions_.insert(std::make_pair(subscription.key(), local_subscription));
133 template <
typename Data,
int scheme>
void _unsubscribe(
const Group&
group)
135 using goby::middleware::intermodule::protobuf::Subscription;
136 Subscription unsubscription;
138 unsubscription.mutable_key()->set_marshalling_scheme(
scheme);
140 unsubscription.mutable_key()->set_group(std::string(
group));
141 unsubscription.set_action(Subscription::UNSUBSCRIBE);
142 this->
inner().template publish<Base::to_portal_group_>(unsubscription);
144 subscriptions_.erase(unsubscription.key());
146 if (subscriptions_.empty())
149 protobuf::SerializerTransporterMessage>();
152 void _unsubscribe_all()
154 using goby::middleware::intermodule::protobuf::Subscription;
155 Subscription unsubscription;
157 unsubscription.set_action(Subscription::UNSUBSCRIBE_ALL);
158 this->
inner().template publish<Base::to_portal_group_>(unsubscription);
160 subscriptions_.clear();
163 protobuf::SerializerTransporterMessage>();
175 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
181 std::multimap<protobuf::SerializerTransporterKey,
182 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
186 template <
typename Derived,
typename InnerTransporter>
200 using goby::middleware::intermodule::protobuf::Subscription;
201 using goby::middleware::protobuf::SerializerTransporterMessage;
202 this->
inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
203 [
this](
const SerializerTransporterMessage& d) {
204 static_cast<Derived*
>(
this)->_receive_publication_forwarded(d);
207 this->
inner().template subscribe<Base::to_portal_group_, Subscription>(
208 [
this](
const Subscription& s) {
209 auto on_subscribe = [
this](
const SerializerTransporterMessage& d) {
210 this->
inner().template publish<Base::from_portal_group_>(d);
212 auto sub = std::make_shared<SerializationInterModuleSubscription>(on_subscribe, s);
216 case Subscription::SUBSCRIBE:
217 case Subscription::UNSUBSCRIBE:
218 static_cast<Derived*
>(
this)->_receive_subscription_forwarded(sub);
220 case Subscription::UNSUBSCRIBE_ALL:
221 static_cast<Derived*
>(
this)->_unsubscribe_all(s.id());
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
InnerTransporter & inner()
Implements the forwarder concept for the intermodule layer.
virtual ~InterModuleForwarder()
InterModuleForwarder(InnerTransporter &inner)
Construct a forwarder for the intermodule layer.
InterModulePortalBase(InnerTransporter &inner)
virtual ~InterModulePortalBase()
Base class for implementing transporters (both portal and forwarder) for the interprocess layer.
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer
void unsubscribe_all()
Unsubscribe from all current subscriptions.
static constexpr Group from_portal_group_
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
goby::util::logger::GroupSetter group(std::string n)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
std::string full_process_and_thread_id(std::thread::id i=std::this_thread::get_id())
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static std::string type_name()
The marshalling scheme specific string name for this type.