24 #ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25 #define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
28 #include <type_traits>
29 #include <unordered_map>
45 inline bool operator==(
const DynamicBufferConfig& a,
const DynamicBufferConfig& b)
47 return a.SerializeAsString() == b.SerializeAsString();
62 template <
typename Container>
size_t data_size(
const Container& c) {
return c.size(); }
68 using size_type =
typename std::deque<T>::size_type;
99 void update(
const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
101 using goby::acomms::protobuf::DynamicBufferConfig;
103 throw(
goby::Exception(
"Configuration vector must not be empty for DynamicSubBuffer"));
109 std::result_of<decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::
type;
110 using value_base_type =
111 std::result_of<decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::
type;
113 ttl_type ttl_sum = 0;
114 ttl_type ttl_divisor = 0;
115 value_base_type value_base_sum = 0;
116 value_base_type value_base_divisor = 0;
118 for (
const auto&
cfg : cfgs)
120 if (
cfg.has_ack_required() && (!cfg_.has_ack_required() ||
cfg.ack_required()))
121 cfg_.set_ack_required(
cfg.ack_required());
122 if (
cfg.has_blackout_time() &&
123 (!cfg_.has_blackout_time() ||
cfg.blackout_time() < cfg_.blackout_time()))
124 cfg_.set_blackout_time(
cfg.blackout_time());
125 if (
cfg.has_max_queue() &&
126 (!cfg_.has_max_queue() ||
cfg.max_queue() > cfg_.max_queue()))
127 cfg_.set_max_queue(
cfg.max_queue());
128 if (
cfg.has_newest_first() && (!cfg_.has_newest_first() ||
cfg.newest_first()))
129 cfg_.set_newest_first(
cfg.newest_first());
133 ttl_sum +=
cfg.ttl();
137 if (
cfg.has_value_base())
139 value_base_sum +=
cfg.value_base();
140 ++value_base_divisor;
145 cfg_.set_ttl(ttl_sum / ttl_divisor);
146 if (value_base_divisor > 0)
147 cfg_.set_value_base(value_base_sum / value_base_divisor);
151 goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
155 const goby::acomms::protobuf::DynamicBufferConfig&
cfg()
const {
return cfg_; }
163 Value&
top(
typename Clock::time_point reference = Clock::now(),
164 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
166 for (
auto& datum_pair : data_)
168 auto& datum_last_access = datum_pair.first;
169 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
171 last_access_ = reference;
172 datum_last_access = last_access_;
173 return datum_pair.second;
180 size_t top_size(
typename Clock::time_point reference = Clock::now(),
181 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
183 for (
const auto& datum_pair : data_)
185 const auto& datum_last_access = datum_pair.first;
186 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
187 return data_size(datum_pair.second.data);
195 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
197 for (
const auto& datum_pair : data_)
199 const auto& datum_last_access = datum_pair.first;
200 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
221 std::pair<double, ValueResult>
222 top_value(
typename Clock::time_point reference = Clock::now(),
223 size_type max_bytes = std::numeric_limits<size_type>::max(),
224 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
230 return std::make_pair(-std::numeric_limits<double>::infinity(),
233 return std::make_pair(-std::numeric_limits<double>::infinity(),
238 return std::make_pair(-std::numeric_limits<double>::infinity(),
241 using Duration = std::chrono::microseconds;
243 double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
244 double ttl = goby::time::convert_duration<Duration>(cfg_.ttl_with_units()).count();
245 double v_b = cfg_.value_base();
247 double v = v_b * dt / ttl;
254 bool in_blackout(
typename Clock::time_point reference = Clock::now())
const
257 goby::time::convert_duration<typename Clock::duration>(cfg_.blackout_time_with_units());
259 return reference <= (last_access_ + blackout);
262 bool empty()
const {
return data_.empty(); }
268 void pop() { data_.pop_front(); }
275 std::vector<Value>
push(
const T& t,
typename Clock::time_point reference = Clock::now())
277 std::vector<Value> exceeded;
279 if (cfg_.newest_first())
280 data_.push_front(std::make_pair(zero_point_,
Value({reference, t})));
282 data_.push_back(std::make_pair(zero_point_,
Value({reference, t})));
284 while (data_.size() > cfg_.max_queue())
286 exceeded.push_back(data_.back().second);
295 std::vector<Value>
expire(
typename Clock::time_point reference = Clock::now())
297 std::vector<Value> expired;
299 auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.ttl_with_units());
300 if (cfg_.newest_first())
302 while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
304 expired.push_back(data_.back().second);
310 while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
312 expired.push_back(data_.front().second);
327 for (
auto it = data_.begin(), end = data_.end(); it != end; ++it)
329 const auto& datum_pair = it->second;
330 if (datum_pair == value)
337 if (cfg_.newest_first() && datum_pair.push_time < value.push_time)
339 else if (!cfg_.newest_first() && datum_pair.push_time > value.push_time)
346 goby::acomms::protobuf::DynamicBufferConfig cfg_;
349 std::deque<std::pair<typename Clock::time_point, Value>> data_;
350 typename Clock::time_point last_access_{Clock::now()};
352 typename Clock::time_point zero_point_{std::chrono::seconds(0)};
356 template <
typename T,
typename Clock = goby::time::SteadyClock>
class DynamicBuffer
362 glog_priority_group_ =
"goby::acomms::buffer::priority::" +
std::to_string(
id);
386 const goby::acomms::protobuf::DynamicBufferConfig& cfg)
388 create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
398 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
400 if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
401 throw(
goby::Exception(
"Subbuffer ID: " + sub_id +
" already exists."));
412 const goby::acomms::protobuf::DynamicBufferConfig& cfg)
414 replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
423 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
426 create(dest_id, sub_id, cfgs);
435 const goby::acomms::protobuf::DynamicBufferConfig& cfg)
437 update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
446 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
448 auto it = sub_[dest_id].find(sub_id);
449 if (it != sub_[dest_id].end())
450 it->second.update(cfgs);
452 create(dest_id, sub_id, cfgs);
461 sub_[dest_id].erase(sub_id);
471 std::vector<Value> exceeded;
473 for (
const auto&
e : sub_exceeded)
481 for (
const auto& sub_id_p : sub_)
483 for (
const auto& sub_p : sub_id_p.second)
485 if (!sub_p.second.empty())
497 for (
const auto& sub_id_p : sub_)
499 for (
const auto& sub_p : sub_id_p.second)
size += sub_p.second.size();
511 size_type max_bytes = std::numeric_limits<size_type>::max(),
512 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
517 glog <<
group(glog_priority_group_) <<
"Starting priority contest (dest: "
520 <<
", max_bytes: " << max_bytes <<
"):" << std::endl;
522 typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
524 double winning_value = -std::numeric_limits<double>::infinity();
526 auto now = Clock::now();
530 "DynamicBuffer::top() has no queues with this destination"));
534 : sub_.find(dest_id),
537 : ++sub_.find(dest_id);
538 sub_id_it != sub_id_end; ++sub_id_it)
540 for (
auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
541 sub_it != sub_end; ++sub_it)
545 std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
547 std::string value_or_reason;
555 value_or_reason =
"empty";
559 value_or_reason =
"blackout";
563 value_or_reason =
"too large";
567 value_or_reason =
"ack wait";
572 <<
" [dest: " << sub_id_it->first
573 <<
", n: " << sub_it->second.size()
574 <<
"]: " << value_or_reason << std::endl;
576 if (value > winning_value)
578 winning_value = value;
579 winning_sub = sub_it;
580 dest_id = sub_id_it->first;
585 if (winning_value == -std::numeric_limits<double>::infinity())
587 "DynamicBuffer::top() has no queue with a winning value"));
589 const auto& top_p = winning_sub->second.top(now, ack_timeout);
591 <<
" (" <<
data_size(top_p.data) <<
"B)" << std::endl;
593 return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
603 return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
611 auto now = Clock::now();
612 std::vector<Value> expired;
613 for (
auto& sub_id_p : sub_)
615 for (
auto& sub_p : sub_id_p.second)
617 auto sub_expired = sub_p.second.expire(now);
618 for (
const auto&
e : sub_expired)
619 expired.push_back({sub_id_p.first, sub_p.first,
e.push_time,
e.data});
630 if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
632 " does not exist, must call create(...) first."));
633 return sub_.at(dest_id).at(sub_id);
638 std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
640 std::string glog_priority_group_;
641 static std::atomic<int> count_;
645 template <
typename T,
typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);
simple exception class for goby applications
~DynamicBufferNoDataException()
DynamicBufferNoDataException(const std::string &reason)
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
bool empty() const
Is this buffer empty (that is, are all subbuffers empty)?
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Replace an existing subbuffer merging the given configuration (See DynamicSubBuffer() for details)
std::vector< Value > expire()
Erase any values that have exceeded their time-to-live.
typename DynamicSubBuffer< T, Clock >::size_type size_type
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update an existing subbuffer without removing the messsages (or creates the buffer if it doesn't alre...
void update(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Update an existing subbuffer without removing the messsages.
bool erase(const Value &value)
Erase a value.
void replace(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Replace an existing subbuffer with the given configuration (any messages in the subbuffer will be era...
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a new subbuffer merging the given configuration (See DynamicSubBuffer() for details)
DynamicSubBuffer< T, Clock > & sub(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Reference a given subbuffer.
Value top(modem_id_type dest_id=goby::acomms::QUERY_DESTINATION_ID, size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the top value in a priority contest between all subbuffers.
std::vector< Value > push(const Value &fvt)
Push a new message to the buffer.
void create(modem_id_type dest_id, const subbuffer_id_type &sub_id, const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a new subbuffer with the given configuration.
void remove(modem_id_type dest_id, const subbuffer_id_type &sub_id)
Remove an existing subbuffer.
std::string subbuffer_id_type
size_type size() const
Size of the buffer (that is, sum of the subbuffer sizes)
Represents a time-dependent priority queue for a single group of messages (e.g. for a single DCCL ID)
Value & top(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0))
Returns the value at the top of the queue that hasn't been sent wihin ack_timeout.
@ ALL_MESSAGES_WAITING_FOR_ACK
DynamicSubBuffer(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Create a subbuffer merging two or more configuration objects.
const goby::acomms::protobuf::DynamicBufferConfig & cfg() const
Return the aggregate configuration.
std::vector< Value > push(const T &t, typename Clock::time_point reference=Clock::now())
Push a value to the queue.
bool erase(const Value &value)
Erase a value.
bool in_blackout(typename Clock::time_point reference=Clock::now()) const
Returns if buffer is in blackout.
typename std::deque< T >::size_type size_type
bool empty() const
Returns if this queue is empty.
std::vector< Value > expire(typename Clock::time_point reference=Clock::now())
Erase any values that have exceeded their time-to-live.
void update(const std::vector< goby::acomms::protobuf::DynamicBufferConfig > &cfgs)
Update the configurations without clearing the buffer.
bool all_waiting_for_ack(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
returns true if all messages have been sent within ack_timeout of the reference provided and thus non...
void pop()
Pop the value on the top of the queue.
size_type size() const
Retrieves the size of the queue.
size_t top_size(typename Clock::time_point reference=Clock::now(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Returns the size (in bytes) of the top of the queue that hasn't been sent within ack_timeout.
std::pair< double, ValueResult > top_value(typename Clock::time_point reference=Clock::now(), size_type max_bytes=std::numeric_limits< size_type >::max(), typename Clock::duration ack_timeout=std::chrono::microseconds(0)) const
Provides the numerical priority value based on this subbuffer's base priority, time-to-live (ttl) and...
DynamicSubBuffer(const goby::acomms::protobuf::DynamicBufferConfig &cfg)
Create a subbuffer with the given configuration.
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
goby::util::logger::GroupSetter group(std::string n)
bool operator==(const ModemTransmission &a, const ModemTransmission &b)
size_t data_size(const Container &c)
constexpr int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
type
Generic JSON types used in JWTs.
Clock::time_point push_time
subbuffer_id_type subbuffer_id
Clock::time_point push_time
bool operator==(const Value &a) const