Goby3  3.1.5
2024.05.14
serialization_handlers.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 // James D. Turner <james.turner@nrl.navy.mil>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_MIDDLEWARE_TRANSPORT_SERIALIZATION_HANDLERS_H
26 #define GOBY_MIDDLEWARE_TRANSPORT_SERIALIZATION_HANDLERS_H
27 
28 #include <chrono>
29 #include <memory>
30 #include <regex>
31 #include <thread>
32 #include <unordered_map>
33 
34 #include "goby/exception.h"
35 #include "goby/util/binary.h"
36 
37 #include "goby/middleware/common.h"
41 
42 #include "interface.h"
43 #include "null.h"
44 
45 namespace goby
46 {
47 namespace middleware
48 {
50 template <typename Metadata, typename Enable = void> class SerializationHandlerPostSelector
51 {
52 };
53 
55 template <typename Metadata>
57  typename std::enable_if_t<std::is_void<Metadata>::value>>
58 {
59  public:
61  virtual ~SerializationHandlerPostSelector() = default;
62 
63  virtual std::string::const_iterator post(std::string::const_iterator b,
64  std::string::const_iterator e) const = 0;
65 #ifndef _LIBCPP_VERSION
66  virtual std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
67  std::vector<char>::const_iterator e) const = 0;
68 #endif
69  virtual const char* post(const char* b, const char* e) const = 0;
70 };
71 
73 template <typename Metadata>
75  typename std::enable_if_t<!std::is_void<Metadata>::value>>
76 {
77  public:
79  virtual ~SerializationHandlerPostSelector() = default;
80 
81  virtual std::string::const_iterator post(std::string::const_iterator b,
82  std::string::const_iterator e,
83  const Metadata& metadata) const = 0;
84 #ifndef _LIBCPP_VERSION
85  virtual std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
86  std::vector<char>::const_iterator e,
87  const Metadata& metadata) const = 0;
88 #endif
89  virtual const char* post(const char* b, const char* e, const Metadata& metadata) const = 0;
90 };
91 
95 template <typename Metadata = void>
97 {
98  public:
100  virtual ~SerializationHandlerBase() = default;
101 
102  virtual const std::string& type_name() const = 0;
103  virtual const Group& subscribed_group() const = 0;
104 
105  virtual int scheme() const = 0;
106 
108  {
109  SUBSCRIBE,
110  UNSUBSCRIBE,
112  };
113  virtual SubscriptionAction action() const = 0;
114 
115  std::thread::id thread_id() const { return thread_id_; }
116  virtual std::string subscriber_id() const { return subscriber_id_; }
117 
118  private:
119  const std::thread::id thread_id_{std::this_thread::get_id()};
120  const std::string subscriber_id_{goby::middleware::thread_id(thread_id_)};
121 };
122 
123 template <typename Metadata>
126 {
127  return s1.scheme() == s2.scheme() && s1.type_name() == s2.type_name() &&
128  s1.subscribed_group() == s2.subscribed_group() && s1.action() == s2.action();
129 }
130 
135 template <typename Data, int scheme_id>
137 {
138  public:
139  typedef std::function<void(std::shared_ptr<const Data> data)> HandlerType;
140 
143  const Subscriber<Data>& subscriber = Subscriber<Data>())
144  : handler_(handler),
145  type_name_(SerializerParserHelper<Data, scheme_id>::type_name()),
146  group_(group),
147  subscriber_(subscriber)
148  {
149  }
150 
151  // handle an incoming message
152  std::string::const_iterator post(std::string::const_iterator b,
153  std::string::const_iterator e) const override
154  {
155  return _post(b, e);
156  }
157 
158 #ifndef _LIBCPP_VERSION
159  std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
160  std::vector<char>::const_iterator e) const override
161  {
162  return _post(b, e);
163  }
164 #endif
165 
166  const char* post(const char* b, const char* e) const override { return _post(b, e); }
167 
169  {
171  }
172 
173  // getters
174  const std::string& type_name() const override { return type_name_; }
175  const Group& subscribed_group() const override { return group_; }
176  int scheme() const override { return scheme_id; }
177 
178  private:
179  template <typename CharIterator>
180  CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end) const
181  {
182  CharIterator actual_end;
183  auto msg = SerializerParserHelper<Data, scheme_id>::parse(bytes_begin, bytes_end,
184  actual_end, type_name_);
185 
186  if (subscribed_group() == subscriber_.group(*msg) && handler_)
187  handler_(msg);
188 
189  return actual_end;
190  }
191 
192  private:
193  HandlerType handler_;
194  const std::string type_name_;
195  const Group group_;
196  Subscriber<Data> subscriber_;
197 };
198 
203 template <typename Data, int scheme_id>
205  : public SerializationHandlerBase<intervehicle::protobuf::Header>
206 {
207  public:
208  typedef std::function<void(std::shared_ptr<const Data> data)> HandlerType;
209 
212  const Subscriber<Data>& subscriber = Subscriber<Data>())
213  : handler_(handler),
214  type_name_(SerializerParserHelper<Data, scheme_id>::type_name()),
215  group_(group),
216  subscriber_(subscriber)
217  {
218  }
219 
220  // handle an incoming message
221  std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e,
222  const intervehicle::protobuf::Header& header) const override
223  {
224  return _post(b, e, header);
225  }
226 
227 #ifndef _LIBCPP_VERSION
228  std::vector<char>::const_iterator
229  post(std::vector<char>::const_iterator b, std::vector<char>::const_iterator e,
230  const intervehicle::protobuf::Header& header) const override
231  {
232  return _post(b, e, header);
233  }
234 #endif
235 
236  const char* post(const char* b, const char* e,
237  const intervehicle::protobuf::Header& header) const override
238  {
239  return _post(b, e, header);
240  }
241 
243  action() const override
244  {
246  intervehicle::protobuf::Header>::SubscriptionAction::SUBSCRIBE;
247  }
248 
249  // getters
250  const std::string& type_name() const override { return type_name_; }
251  const Group& subscribed_group() const override { return group_; }
252  int scheme() const override { return scheme_id; }
253 
254  private:
255  template <typename CharIterator>
256  CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end,
257  const intervehicle::protobuf::Header& header) const
258  {
259  CharIterator actual_end;
260  auto msg = SerializerParserHelper<Data, scheme_id>::parse(bytes_begin, bytes_end,
261  actual_end, type_name_);
262 
263  subscriber_.set_link_data(*msg, header);
264 
265  if (subscribed_group() == subscriber_.group(*msg) && handler_)
266  handler_(msg);
267 
268  return actual_end;
269  }
270 
271  private:
272  HandlerType handler_;
273  const std::string type_name_;
274  const Group group_;
275  Subscriber<Data> subscriber_;
276 };
277 
279 template <typename Data, int scheme_id, typename Metadata>
281 {
282  public:
283  typedef std::function<void(const Data& data, const Metadata& md)> HandlerType;
284 
286  : handler_(handler), type_name_(SerializerParserHelper<Data, scheme_id>::type_name())
287  {
288  }
289 
290  PublisherCallback(HandlerType handler, const Data& data)
291  : handler_(handler), type_name_(SerializerParserHelper<Data, scheme_id>::type_name(data))
292  {
293  }
294 
295  // handle an incoming message
296  std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e,
297  const Metadata& md) const override
298  {
299  return _post(b, e, md);
300  }
301 
302 #ifndef _LIBCPP_VERSION
303  std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
304  std::vector<char>::const_iterator e,
305  const Metadata& md) const override
306  {
307  return _post(b, e, md);
308  }
309 #endif
310 
311  const char* post(const char* b, const char* e, const Metadata& md) const override
312  {
313  return _post(b, e, md);
314  }
315 
317  {
319  }
320 
321  // getters
322  const std::string& type_name() const override { return type_name_; }
323  const Group& subscribed_group() const override { return group_; }
324  int scheme() const override { return scheme_id; }
325 
326  private:
327  template <typename CharIterator>
328  CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end, const Metadata& md) const
329  {
330  CharIterator actual_end;
331  auto msg = SerializerParserHelper<Data, scheme_id>::parse(bytes_begin, bytes_end,
332  actual_end, type_name_);
333 
334  if (handler_)
335  handler_(*msg, md);
336  return actual_end;
337  }
338 
339  private:
340  HandlerType handler_;
341  const std::string type_name_;
342  Group group_{Group(Group::broadcast_group)};
343 };
344 
349 template <typename Data, int scheme_id>
351 {
352  public:
354  : type_name_(SerializerParserHelper<Data, scheme_id>::type_name()), group_(group)
355  {
356  }
357 
358  std::string::const_iterator post(std::string::const_iterator b,
359  std::string::const_iterator e) const override
360  {
361  throw(goby::Exception("Cannot call post on an UnSubscription"));
362  }
363 
364 #ifndef _LIBCPP_VERSION
365  std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
366  std::vector<char>::const_iterator e) const override
367  {
368  throw(goby::Exception("Cannot call post on an UnSubscription"));
369  }
370 #endif
371 
372  const char* post(const char* b, const char* e) const override
373  {
374  throw(goby::Exception("Cannot call post on an UnSubscription"));
375  }
376 
378  {
380  }
381 
382  // getters
383  const std::string& type_name() const override { return type_name_; }
384  const Group& subscribed_group() const override { return group_; }
385  int scheme() const override { return scheme_id; }
386 
387  private:
388  const std::string type_name_;
389  const Group group_;
390 };
391 
394 {
395  public:
396  typedef std::function<void(const std::vector<unsigned char>&, int scheme,
397  const std::string& type, const Group& group)>
399 
400  SerializationSubscriptionRegex(HandlerType handler, const std::set<int>& schemes,
401  const std::string& type_regex = ".*",
402  const std::string& group_regex = ".*")
403  : handler_(handler), schemes_(schemes), type_regex_(type_regex), group_regex_(group_regex)
404  {
405  }
406 
407  void update_type_regex(const std::string& type_regex) { type_regex_.assign(type_regex); }
408  void update_group_regex(const std::string& group_regex) { group_regex_.assign(group_regex); }
409 
410  // handle an incoming message
411  // return true if posted
412  template <typename CharIterator>
413  bool post(CharIterator bytes_begin, CharIterator bytes_end, int scheme, const std::string& type,
414  const std::string& group) const
415  {
417  schemes_.count(scheme)) &&
418  std::regex_match(type, type_regex_) && std::regex_match(group, group_regex_))
419  {
420  std::vector<unsigned char> data(bytes_begin, bytes_end);
421  handler_(data, scheme, type, goby::middleware::DynamicGroup(group));
422  return true;
423  }
424  else
425  {
426  return false;
427  }
428  }
429 
430  std::thread::id thread_id() const { return thread_id_; }
431  std::string subscriber_id() const { return subscriber_id_; }
432 
433  private:
434  HandlerType handler_;
435  const std::set<int> schemes_;
436  std::regex type_regex_;
437  std::regex group_regex_;
438  const std::thread::id thread_id_{std::this_thread::get_id()};
439  const std::string subscriber_id_{goby::middleware::thread_id(thread_id_)};
440 };
441 
444 {
445  public:
446  std::thread::id thread_id() const { return thread_id_; }
447  std::string subscriber_id() const { return subscriber_id_; }
448 
449  private:
450  const std::thread::id thread_id_{std::this_thread::get_id()};
451  const std::string subscriber_id_{goby::middleware::thread_id(thread_id_)};
452 };
453 
456 {
457  public:
458  typedef std::function<void(const protobuf::SerializerTransporterMessage& d)> HandlerType;
459 
461  const intermodule::protobuf::Subscription sub)
462  : handler_(handler), sub_cfg_(sub), group_(sub_cfg_.key().group()), subscriber_id_(sub.id())
463  {
464  }
465 
466  // handle an incoming message
467  std::string::const_iterator post(std::string::const_iterator b,
468  std::string::const_iterator e) const override
469  {
470  return _post(b, e);
471  }
472 
473 #ifndef _LIBCPP_VERSION
474  std::vector<char>::const_iterator post(std::vector<char>::const_iterator b,
475  std::vector<char>::const_iterator e) const override
476  {
477  return _post(b, e);
478  }
479 #endif
480 
481  const char* post(const char* b, const char* e) const override { return _post(b, e); }
482 
484  {
485  switch (sub_cfg_.action())
486  {
487  default:
488  case intermodule::protobuf::Subscription::SUBSCRIBE:
490  case intermodule::protobuf::Subscription::UNSUBSCRIBE:
491  case intermodule::protobuf::Subscription::UNSUBSCRIBE_ALL:
493  }
494  }
495 
496  // getters
497  const std::string& type_name() const override { return sub_cfg_.key().type(); }
498  const Group& subscribed_group() const override { return group_; }
499  int scheme() const override { return sub_cfg_.key().marshalling_scheme(); }
500 
501  std::string subscriber_id() const override { return subscriber_id_; }
502 
503  private:
504  template <typename CharIterator>
505  CharIterator _post(CharIterator bytes_begin, CharIterator bytes_end) const
506  {
507  protobuf::SerializerTransporterMessage msg;
508  std::string* sbytes = new std::string(bytes_begin, bytes_end);
509  *msg.mutable_key() = sub_cfg_.key();
510  msg.set_allocated_data(sbytes);
511  handler_(msg);
512 
513  CharIterator actual_end = bytes_end;
514  return actual_end;
515  }
516 
517  private:
518  HandlerType handler_;
519  intermodule::protobuf::Subscription sub_cfg_;
520  DynamicGroup group_;
521  const std::thread::id thread_id_;
522  const std::string subscriber_id_;
523 };
524 
525 } // namespace middleware
526 } // namespace goby
527 
528 #endif
simple exception class for goby applications
Definition: exception.h:35
Implementation of Group for dynamic (run-time) instantiations. Use Group directly for static (compile...
Definition: group.h:119
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
Definition: group.h:62
Represents a subscription to a serialized data type (intervehicle layer).
const char * post(const char *b, const char *e, const intervehicle::protobuf::Header &header) const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e, const intervehicle::protobuf::Header &header) const override
SerializationHandlerBase< intervehicle::protobuf::Header >::SubscriptionAction action() const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e, const intervehicle::protobuf::Header &header) const override
std::function< void(std::shared_ptr< const Data > data)> HandlerType
IntervehicleSerializationSubscription(HandlerType handler, const Group &group=Group(Group::broadcast_group), const Subscriber< Data > &subscriber=Subscriber< Data >())
Represents a callback for a published data type (e.g. acked_func or expired_func)
std::function< void(const Data &data, const Metadata &md)> HandlerType
const Group & subscribed_group() const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e, const Metadata &md) const override
const char * post(const char *b, const char *e, const Metadata &md) const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e, const Metadata &md) const override
SerializationHandlerBase< Metadata >::SubscriptionAction action() const override
PublisherCallback(HandlerType handler, const Data &data)
const std::string & type_name() const override
Base class for handling posting callbacks for serialized data types (interprocess and outer)
virtual const std::string & type_name() const =0
virtual SubscriptionAction action() const =0
virtual const Group & subscribed_group() const =0
virtual std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const =0
virtual std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const =0
virtual std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e, const Metadata &metadata) const =0
virtual std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e, const Metadata &metadata) const =0
virtual const char * post(const char *b, const char *e, const Metadata &metadata) const =0
Selector class for enabling SerializationHandlerBase::post() override signature based on whether the ...
Represents a(n) (un)subscription from an InterModuleForwarder.
SerializationInterModuleSubscription(HandlerType handler, const intermodule::protobuf::Subscription sub)
const char * post(const char *b, const char *e) const override
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const override
SerializationHandlerBase ::SubscriptionAction action() const override
std::function< void(const protobuf::SerializerTransporterMessage &d)> HandlerType
Represents a regex subscription to a serialized data type (interprocess and outer layers).
SerializationSubscriptionRegex(HandlerType handler, const std::set< int > &schemes, const std::string &type_regex=".*", const std::string &group_regex=".*")
void update_type_regex(const std::string &type_regex)
void update_group_regex(const std::string &group_regex)
std::function< void(const std::vector< unsigned char > &, int scheme, const std::string &type, const Group &group)> HandlerType
bool post(CharIterator bytes_begin, CharIterator bytes_end, int scheme, const std::string &type, const std::string &group) const
Represents a subscription to a serialized data type (interprocess layer).
SerializationSubscription(HandlerType handler, const Group &group=Group(Group::broadcast_group), const Subscriber< Data > &subscriber=Subscriber< Data >())
const std::string & type_name() const override
const char * post(const char *b, const char *e) const override
SerializationHandlerBase ::SubscriptionAction action() const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const override
std::function< void(std::shared_ptr< const Data > data)> HandlerType
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const override
Represents an unsubscription to all subscribed data for a given thread.
Represents an unsubscription to a serialized data type (interprocess and outer layers).
std::string::const_iterator post(std::string::const_iterator b, std::string::const_iterator e) const override
const std::string & type_name() const override
SerializationHandlerBase ::SubscriptionAction action() const override
const char * post(const char *b, const char *e) const override
std::vector< char >::const_iterator post(std::vector< char >::const_iterator b, std::vector< char >::const_iterator e) const override
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:37
goby::util::logger::GroupSetter group(std::string n)
typename std::enable_if< B, T >::type enable_if_t
Definition: json.hpp:3079
bool operator==(const Group &a, const Group &b)
Definition: group.h:105
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition: common.h:53
constexpr T e
Definition: constants.h:35
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
type
Generic JSON types used in JWTs.
Definition: jwt.h:2072
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:98
static std::shared_ptr< DataType > parse(CharIterator bytes_begin, CharIterator bytes_end, CharIterator &actual_end, const std::string &type=type_name())
Given a beginning and end iterator to bytes, parse the data and return it.
Definition: interface.h:129