24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
37 #include <boost/units/quantity.hpp>
63 class ModemTransmission;
69 template <
typename Data>
class Publisher;
73 inline size_t data_size(
const SerializerTransporterMessage&
msg) {
return msg.data().size(); }
75 inline bool operator==(
const SerializerTransporterMessage& a,
const SerializerTransporterMessage& b)
77 return (a.key().serialize_time() == b.key().serialize_time() &&
78 a.key().marshalling_scheme() == b.key().marshalling_scheme() &&
79 a.key().type() == b.key().type() && a.key().group() == b.key().group() &&
80 a.data() == b.data());
83 inline bool operator<(
const SerializerTransporterMessage& a,
const SerializerTransporterMessage& b)
85 if (a.key().serialize_time() != b.key().serialize_time())
86 return a.key().serialize_time() < b.key().serialize_time();
87 else if (a.key().marshalling_scheme() != b.key().marshalling_scheme())
88 return a.key().marshalling_scheme() < b.key().marshalling_scheme();
89 else if (a.key().type() != b.key().type())
90 return a.key().type() < b.key().type();
91 else if (a.key().group() != b.key().group())
92 return a.key().group() < b.key().group();
94 return a.data() < b.data();
99 namespace intervehicle
103 inline bool operator==(
const TransporterConfig& a,
const TransporterConfig& b)
105 return a.SerializeAsString() == b.SerializeAsString();
110 template <
typename Data>
111 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
115 auto* sbytes =
new std::string(bytes.begin(), bytes.end());
116 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
118 auto* key =
msg->mutable_key();
121 key->set_group(std::string(
group));
122 key->set_group_numeric(
group.numeric());
123 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
124 key->set_serialize_time_with_units(now);
125 *key->mutable_cfg() = publisher.
cfg();
126 msg->set_allocated_data(sbytes);
132 InterProcessForwarder<InterThreadTransporter>>
144 void _data_request(goby::acomms::protobuf::ModemTransmission*
msg);
145 void _buffer_message(
146 const std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>&
msg);
147 void _receive(
const goby::acomms::protobuf::ModemTransmission& rx_msg);
148 void _forward_subscription(intervehicle::protobuf::Subscription subscription);
149 void _accept_subscription(
const intervehicle::protobuf::Subscription& subscription);
152 intervehicle::protobuf::ExpireData::ExpireReason reason);
157 _create_buffer_id(
const goby::middleware::protobuf::SerializerTransporterKey& key)
160 key.group_numeric());
163 subbuffer_id_type _create_buffer_id(
const intervehicle::protobuf::Subscription& subscription)
165 return _create_buffer_id(subscription.dccl_id(), subscription.group());
180 bool dest_in_subnet =
181 (dest_id &
cfg().subnet_mask()) == (
cfg().modem_id() &
cfg().subnet_mask());
184 <<
"Dest: " << dest_id
185 <<
" is not in subnet (our id: " <<
cfg().modem_id()
186 <<
", mask: " <<
cfg().subnet_mask() <<
")" << std::endl;
188 return dest_in_subnet;
191 void _publish_subscription_report(
const intervehicle::protobuf::Subscription& changed);
194 std::unique_ptr<InterThreadTransporter> interthread_;
195 std::unique_ptr<InterProcessForwarder<InterThreadTransporter>> interprocess_;
197 std::multimap<subbuffer_id_type, goby::middleware::protobuf::SerializerTransporterKey>
198 publisher_buffer_cfg_;
200 std::map<modem_id_type, std::multimap<subbuffer_id_type, intervehicle::protobuf::Subscription>>
201 subscriber_buffer_cfg_;
203 std::map<subbuffer_id_type, std::set<modem_id_type>> subbuffers_created_;
205 goby::middleware::protobuf::SerializerTransporterKey subscription_key_;
206 std::set<modem_id_type> subscription_subbuffers_;
210 using frame_type =
int;
211 std::map<frame_type, std::vector<goby::acomms::DynamicBuffer<buffer_data_type>::Value>>
214 std::unique_ptr<goby::acomms::ModemDriverBase> driver_;
217 std::string glog_group_;
219 static std::map<std::string, void*> driver_plugins_;
std::string subbuffer_id_type
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Class that holds additional metadata and callback functions related to a publication (and is optional...
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
const intervehicle::protobuf::PortalConfig::LinkConfig & cfg() const
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::subbuffer_id_type subbuffer_id_type
ModemDriverThread(const intervehicle::protobuf::PortalConfig::LinkConfig &cfg)
goby::middleware::protobuf::SerializerTransporterMessage buffer_data_type
goby::util::logger::GroupSetter group(std::string n)
bool operator==(const TransporterConfig &a, const TransporterConfig &b)
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
size_t data_size(const SerializerTransporterMessage &msg)
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
static unsigned id(CharIterator begin, CharIterator end)
std::chrono::time_point< SteadyClock > time_point
std::chrono::microseconds duration
Duration type.