25 #ifndef GOBY_ACOMMS_QUEUE_QUEUE_MANAGER_H
26 #define GOBY_ACOMMS_QUEUE_QUEUE_MANAGER_H
36 #include <boost/signals2/signal.hpp>
37 #include <google/protobuf/descriptor.h>
55 class ModemTransmission;
80 void set_cfg(
const protobuf::QueueManagerConfig& cfg);
83 void merge_cfg(
const protobuf::QueueManagerConfig& cfg);
88 template <
typename ProtobufMessage>
89 void add_queue(
const protobuf::QueuedMessageEntry& queue_cfg)
91 add_queue(ProtobufMessage::descriptor(), queue_cfg);
95 void add_queue(
const google::protobuf::Descriptor* desc,
96 const protobuf::QueuedMessageEntry& queue_cfg);
109 const protobuf::QueuedMessageMeta* meta);
158 template <
typename ProtobufMessage>
void info(std::ostream* os)
const
160 info(ProtobufMessage::descriptor(), os);
164 void info(
const google::protobuf::Descriptor* desc, std::ostream* os)
const;
172 std::string
msg_string(
const google::protobuf::Descriptor* desc)
174 return desc->full_name() +
" (" + goby::util::as<std::string>(codec_->id(desc)) +
")";
182 unsigned dccl_id = codec_->id(
msg.GetDescriptor());
183 if (!queues_.count(dccl_id))
187 return queues_[dccl_id]->meta_from_msg(
msg);
194 boost::signals2::signal<void(
const protobuf::ModemTransmission& ack_msg,
216 boost::signals2::signal<void(
const protobuf::ModemTransmission& request_msg,
218 signal_data_on_demand;
223 boost::signals2::signal<void(protobuf::QueueSize size)> signal_queue_size_change;
227 boost::signals2::signal<void(protobuf::QueuedMessageMeta* meta,
232 boost::signals2::signal<void(
const protobuf::QueuedMessageMeta& meta,
241 void qsize(
Queue* q);
244 Queue* find_next_sender(
const protobuf::ModemTransmission& message,
const std::string& data,
245 bool first_user_frame);
248 void clear_packet(
const protobuf::ModemTransmission& message);
251 void process_modem_ack(
const protobuf::ModemTransmission& ack_msg);
253 goby::acomms::protobuf::NetworkAck::AckType ack_type);
256 std::string encode_repeated(
const std::list<QueuedMessage>& msgs);
257 std::list<QueuedMessage> decode_repeated(
const std::string& orig_bytes);
258 unsigned size_repeated(
const std::list<QueuedMessage>& msgs);
263 std::map<unsigned, std::shared_ptr<Queue> > queues_;
267 std::multimap<unsigned, Queue*> waiting_for_ack_;
270 unsigned packet_ack_{0};
274 std::set<ModemId> network_ack_src_ids_;
275 std::set<ModemId> route_additional_modem_ids_;
278 std::map<ModemId, std::string> encrypt_rules_;
280 protobuf::QueueManagerConfig cfg_;
284 std::string glog_push_group_;
285 std::string glog_pop_group_;
286 std::string glog_priority_group_;
287 std::string glog_out_group_;
288 std::string glog_in_group_;
292 class ManipulatorManager
297 manips_.insert(std::make_pair(
id, manip));
303 std::multimap<unsigned int, goby::acomms::protobuf::Manipulator>::const_iterator;
304 std::pair<iterator, iterator> p = manips_.equal_range(
id);
306 for (
auto it = p.first; it != p.second; ++it)
308 if (it->second == manip)
315 void clear() { manips_.clear(); }
320 std::multimap<unsigned, goby::acomms::protobuf::Manipulator> manips_;
323 ManipulatorManager manip_manager_;
Exception class for libdccl.
provides an API to the goby-acomms Queuing Library.
QueueManager()
constructor
void set_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and overwrite completely if present) the current configuration. (protobuf::QueueManagerConfig de...
const std::string & glog_out_group()
const std::string & glog_push_group()
void add_queue(const protobuf::QueuedMessageEntry &queue_cfg)
Add a DCCL queue for use with QueueManager. Note that the queue must be added before receiving messag...
~QueueManager()=default
destructor
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
void info_all(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of all loaded queues.
void handle_modem_data_request(protobuf::ModemTransmission *msg)
Finds data to send to the modem.
std::string msg_string(const google::protobuf::Descriptor *desc)
const std::string & glog_in_group()
const std::string & glog_pop_group()
protobuf::QueuedMessageMeta meta_from_msg(const google::protobuf::Message &msg)
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
void push_message(const google::protobuf::Message &new_message, const protobuf::QueuedMessageMeta *meta)
void info(const google::protobuf::Descriptor *desc, std::ostream *os) const
An alterative form for getting information for Queues for message types not known at compile-time ("d...
void flush_queue(const protobuf::QueueFlush &flush)
Flush (delete all messages in) a queue.
void do_work()
Calculates which messages have expired and emits the goby::acomms::QueueManager::signal_expire as nec...
void add_queue(const google::protobuf::Descriptor *desc, const protobuf::QueuedMessageEntry &queue_cfg)
Alternative method for adding Queues when using Dynamic Protobuf Messages.
void info(std::ostream *os) const
Writes a human readable summary (including DCCLCodec info) of the queue for the provided DCCL type to...
void merge_cfg(const protobuf::QueueManagerConfig &cfg)
Set (and merge "repeat" fields) the current configuration. (protobuf::QueueManagerConfig defined in a...
int modem_id()
The current modem ID (MAC address) of this node.
const std::string & glog_priority_group()
std::ostream & operator<<(std::ostream &os, const MACManager &mac)
The global namespace for the Goby project.
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg