Goby3  3.1.5
2024.05.14
driver_thread.h
Go to the documentation of this file.
1 // Copyright 2019-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
26 
27 #include <algorithm>
28 #include <chrono>
29 #include <cstddef>
30 #include <map>
31 #include <memory>
32 #include <ostream>
33 #include <set>
34 #include <string>
35 #include <vector>
36 
37 #include <boost/units/quantity.hpp>
38 
43 #include "goby/middleware/group.h"
51 #include "goby/time/convert.h"
52 #include "goby/time/steady_clock.h"
53 #include "goby/time/system_clock.h"
54 #include "goby/time/types.h"
56 
57 namespace goby
58 {
59 namespace acomms
60 {
61 namespace protobuf
62 {
63 class ModemTransmission;
64 } // namespace protobuf
65 } // namespace acomms
66 
67 namespace middleware
68 {
69 template <typename Data> class Publisher;
70 
71 namespace protobuf
72 {
73 inline size_t data_size(const SerializerTransporterMessage& msg) { return msg.data().size(); }
74 
75 inline bool operator==(const SerializerTransporterMessage& a, const SerializerTransporterMessage& b)
76 {
77  return (a.key().serialize_time() == b.key().serialize_time() &&
78  a.key().marshalling_scheme() == b.key().marshalling_scheme() &&
79  a.key().type() == b.key().type() && a.key().group() == b.key().group() &&
80  a.data() == b.data());
81 }
82 
83 inline bool operator<(const SerializerTransporterMessage& a, const SerializerTransporterMessage& b)
84 {
85  if (a.key().serialize_time() != b.key().serialize_time())
86  return a.key().serialize_time() < b.key().serialize_time();
87  else if (a.key().marshalling_scheme() != b.key().marshalling_scheme())
88  return a.key().marshalling_scheme() < b.key().marshalling_scheme();
89  else if (a.key().type() != b.key().type())
90  return a.key().type() < b.key().type();
91  else if (a.key().group() != b.key().group())
92  return a.key().group() < b.key().group();
93  else
94  return a.data() < b.data();
95 }
96 
97 } // namespace protobuf
98 
99 namespace intervehicle
100 {
101 namespace protobuf
102 {
103 inline bool operator==(const TransporterConfig& a, const TransporterConfig& b)
104 {
105  return a.SerializeAsString() == b.SerializeAsString();
106 }
107 
108 } // namespace protobuf
109 
110 template <typename Data>
111 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
112 serialize_publication(const Data& d, const Group& group, const Publisher<Data>& publisher)
113 {
115  auto* sbytes = new std::string(bytes.begin(), bytes.end());
116  auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
117 
118  auto* key = msg->mutable_key();
119  key->set_marshalling_scheme(MarshallingScheme::DCCL);
121  key->set_group(std::string(group));
122  key->set_group_numeric(group.numeric());
123  auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
124  key->set_serialize_time_with_units(now);
125  *key->mutable_cfg() = publisher.cfg();
126  msg->set_allocated_data(sbytes);
127  return msg;
128 }
129 
131  : public goby::middleware::Thread<intervehicle::protobuf::PortalConfig::LinkConfig,
132  InterProcessForwarder<InterThreadTransporter>>
133 {
134  public:
135  using buffer_data_type = goby::middleware::protobuf::SerializerTransporterMessage;
138 
139  ModemDriverThread(const intervehicle::protobuf::PortalConfig::LinkConfig& cfg);
140  void loop() override;
141  int tx_queue_size() { return buffer_.size(); }
142 
143  private:
144  void _data_request(goby::acomms::protobuf::ModemTransmission* msg);
145  void _buffer_message(
146  const std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>& msg);
147  void _receive(const goby::acomms::protobuf::ModemTransmission& rx_msg);
148  void _forward_subscription(intervehicle::protobuf::Subscription subscription);
149  void _accept_subscription(const intervehicle::protobuf::Subscription& subscription);
150  void _expire_value(const goby::time::SteadyClock::time_point now,
152  intervehicle::protobuf::ExpireData::ExpireReason reason);
153 
154  subbuffer_id_type _create_buffer_id(unsigned dccl_id, unsigned group);
155 
157  _create_buffer_id(const goby::middleware::protobuf::SerializerTransporterKey& key)
158  {
159  return _create_buffer_id(detail::DCCLSerializerParserHelperBase::id(key.type()),
160  key.group_numeric());
161  }
162 
163  subbuffer_id_type _create_buffer_id(const intervehicle::protobuf::Subscription& subscription)
164  {
165  return _create_buffer_id(subscription.dccl_id(), subscription.group());
166  }
167 
168  void _try_create_or_update_buffer(modem_id_type dest_id, const subbuffer_id_type& buffer_id);
169 
170  modem_id_type _broadcast_id() { return cfg().modem_id() & cfg().subnet_mask(); }
171 
172  // id within subnet
173  modem_id_type _id_within_subnet(modem_id_type id) { return id - _broadcast_id(); }
174 
175  // full id
176  modem_id_type _full_id(modem_id_type id_in_subnet) { return id_in_subnet + _broadcast_id(); }
177 
178  bool _dest_is_in_subnet(modem_id_type dest_id)
179  {
180  bool dest_in_subnet =
181  (dest_id & cfg().subnet_mask()) == (cfg().modem_id() & cfg().subnet_mask());
182  if (!dest_in_subnet)
184  << "Dest: " << dest_id
185  << " is not in subnet (our id: " << cfg().modem_id()
186  << ", mask: " << cfg().subnet_mask() << ")" << std::endl;
187 
188  return dest_in_subnet;
189  }
190 
191  void _publish_subscription_report(const intervehicle::protobuf::Subscription& changed);
192 
193  private:
194  std::unique_ptr<InterThreadTransporter> interthread_;
195  std::unique_ptr<InterProcessForwarder<InterThreadTransporter>> interprocess_;
196 
197  std::multimap<subbuffer_id_type, goby::middleware::protobuf::SerializerTransporterKey>
198  publisher_buffer_cfg_;
199 
200  std::map<modem_id_type, std::multimap<subbuffer_id_type, intervehicle::protobuf::Subscription>>
201  subscriber_buffer_cfg_;
202 
203  std::map<subbuffer_id_type, std::set<modem_id_type>> subbuffers_created_;
204 
205  goby::middleware::protobuf::SerializerTransporterKey subscription_key_;
206  std::set<modem_id_type> subscription_subbuffers_;
207 
209 
210  using frame_type = int;
211  std::map<frame_type, std::vector<goby::acomms::DynamicBuffer<buffer_data_type>::Value>>
212  pending_ack_;
213 
214  std::unique_ptr<goby::acomms::ModemDriverBase> driver_;
216 
217  std::string glog_group_;
218 
219  static std::map<std::string, void*> driver_plugins_;
220 
221  goby::time::SteadyClock::time_point next_modem_report_time_;
222  const goby::time::SteadyClock::duration modem_report_interval_;
223 };
224 
225 } // namespace intervehicle
226 } // namespace middleware
227 } // namespace goby
228 
229 #endif
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Definition: mac_manager.h:51
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: publisher.h:40
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition: thread.h:61
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::subbuffer_id_type subbuffer_id_type
ModemDriverThread(const intervehicle::protobuf::PortalConfig::LinkConfig &cfg)
goby::middleware::protobuf::SerializerTransporterMessage buffer_data_type
goby::util::logger::GroupSetter group(std::string n)
bool operator==(const TransporterConfig &a, const TransporterConfig &b)
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
size_t data_size(const SerializerTransporterMessage &msg)
Definition: driver_thread.h:73
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:98
static unsigned id(CharIterator begin, CharIterator end)
std::chrono::time_point< SteadyClock > time_point
Definition: steady_clock.h:45
std::chrono::microseconds duration
Duration type.
Definition: steady_clock.h:42