24 #ifndef GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
25 #define GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
36 #include <boost/asio/write.hpp>
37 #include <boost/system/error_code.hpp>
55 class InterThreadTransporter;
56 template <
typename InnerTransporter>
class InterProcessForwarder;
66 template <
typename ProtobufEndpo
int,
typename ASIOEndpo
int>
69 ProtobufEndpoint pb_ep;
70 pb_ep.set_addr(asio_ep.address().to_string());
71 pb_ep.set_port(asio_ep.port());
77 PubSubLayer subscribe_layer,
typename IOConfig,
typename SocketType,
78 template <
class>
class ThreadType,
bool use_indexed_groups =
false>
81 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
82 IOConfig, SocketType, ThreadType, use_indexed_groups>,
83 line_in_group, publish_layer, use_indexed_groups>,
85 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
86 IOConfig, SocketType, ThreadType, use_indexed_groups>,
87 line_out_group, subscribe_layer, use_indexed_groups>
95 : ThreadType<IOConfig>(
config, this->loop_max_frequency(), index),
97 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
98 SocketType, ThreadType, use_indexed_groups>,
99 line_in_group, publish_layer, use_indexed_groups>(index),
101 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
102 SocketType, ThreadType, use_indexed_groups>,
103 line_out_group, subscribe_layer, use_indexed_groups>(index),
107 auto data_out_callback =
108 [
this](std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) {
109 if (!io_msg->has_index() || io_msg->index() == this->index())
115 this->
template subscribe_out<goby::middleware::protobuf::IOData>(data_out_callback);
117 if (!glog_group_added_)
120 glog_group_added_ =
true;
127 incoming_mail_notify_thread_.reset(
new std::thread([
this]() {
128 while (this->alive())
130 std::unique_lock<std::mutex>
lock(incoming_mail_notify_mutex_);
131 this->interthread().cv()->wait(
lock);
137 this->set_name(thread_name_);
144 std::lock_guard<std::mutex> l(incoming_mail_notify_mutex_);
145 this->interthread().cv()->notify_all();
147 incoming_mail_notify_thread_->join();
148 incoming_mail_notify_thread_.reset();
156 if (incoming_mail_notify_thread_)
157 incoming_mail_notify_thread_->detach();
159 auto status = std::make_shared<protobuf::IOStatus>();
163 this->
template unsubscribe_out<goby::middleware::protobuf::IOData>();
166 template <
class IOThreadImplementation>
168 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg);
171 void write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
174 goby::glog <<
group(glog_group_) <<
"(" << io_msg->data().size() <<
"B) <"
175 << ((this->index() == -1) ? std::string() :
std::to_string(this->index()))
176 <<
" " << io_msg->ShortDebugString() << std::endl;
177 if (io_msg->data().empty())
179 if (!socket_ || !socket_->is_open())
187 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
188 *io_msg->mutable_data() = bytes;
194 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
196 if (this->index() != -1)
197 io_msg->set_index(this->index());
201 << ((this->index() == -1) ? std::string() :
std::to_string(this->index()))
202 <<
" " << io_msg->ShortDebugString() << std::endl;
204 this->publish_in(io_msg);
232 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
241 void loop()
override;
245 std::unique_ptr<SocketType> socket_;
253 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
255 std::string glog_group_;
256 std::string thread_name_;
257 bool glog_group_added_{
false};
260 template <
class IOThreadImplementation>
262 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
264 boost::asio::async_write(
265 this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
267 [this_thread, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred) {
268 if (!ec && bytes_transferred > 0)
270 this_thread->handle_write_success(bytes_transferred);
274 this_thread->handle_write_error(ec);
288 template <
class>
class ThreadType,
bool use_indexed_groups>
290 subscribe_layer, IOConfig, SocketType, ThreadType,
291 use_indexed_groups>::try_open()
295 socket_.reset(
new SocketType(io_));
305 backoff_interval_ = min_backoff_interval_;
307 auto status = std::make_shared<protobuf::IOStatus>();
308 if (this->index() != -1)
309 status->set_index(this->index());
319 next_open_attempt_ = now + backoff_interval_;
321 catch (
const std::exception&
e)
323 auto status = std::make_shared<protobuf::IOStatus>();
324 if (this->index() != -1)
325 status->set_index(this->index());
328 goby::middleware::protobuf::IOError&
error = *
status->mutable_error();
329 error.set_code(goby::middleware::protobuf::IOError::IO__INIT_FAILURE);
330 error.set_text(
e.what() + std::string(
": config (") + this->cfg().ShortDebugString() +
")");
334 <<
"Failed to open/configure socket/serial_port: "
335 <<
error.ShortDebugString() << std::endl;
337 if (backoff_interval_ < max_backoff_interval_)
338 backoff_interval_ *= 2.0;
341 next_open_attempt_ = now + backoff_interval_;
344 << backoff_interval_ / std::chrono::seconds(1)
345 <<
" seconds" << std::endl;
354 template <
class>
class ThreadType,
bool use_indexed_groups>
356 subscribe_layer, IOConfig, SocketType, ThreadType,
357 use_indexed_groups>::loop()
359 if (socket_ && socket_->is_open())
369 if (now > next_open_attempt_)
386 template <
class>
class ThreadType,
bool use_indexed_groups>
388 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
389 use_indexed_groups>::handle_read_error(
const boost::system::error_code& ec)
391 auto status = std::make_shared<protobuf::IOStatus>();
392 if (this->index() != -1)
393 status->set_index(this->index());
396 goby::middleware::protobuf::IOError& error = *
status->mutable_error();
397 error.set_code(goby::middleware::protobuf::IOError::IO__READ_FAILURE);
398 error.set_text(ec.message());
402 <<
"Failed to read from the socket/serial_port: "
403 << error.ShortDebugString() << std::endl;
412 template <
class>
class ThreadType,
bool use_indexed_groups>
414 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
415 use_indexed_groups>::handle_write_error(
const boost::system::error_code& ec)
417 auto status = std::make_shared<protobuf::IOStatus>();
418 if (this->index() != -1)
419 status->set_index(this->index());
422 goby::middleware::protobuf::IOError& error = *
status->mutable_error();
423 error.set_code(goby::middleware::protobuf::IOError::IO__WRITE_FAILURE);
424 error.set_text(ec.message());
428 <<
"Failed to write to the socket/serial_port: "
429 << error.ShortDebugString() << std::endl;
simple exception class for goby applications
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
boost::asio::io_context & mutable_io()
const std::string & glog_group()
void initialize() override
void write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
void handle_read_success(std::size_t bytes_transferred, const std::string &bytes)
SocketType & mutable_socket()
Access the (mutable) socket (or serial_port) object.
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)=0
Starts an asynchronous write from data published.
bool socket_is_open()
Does the socket exist and is it open?
void handle_read_error(const boost::system::error_code &ec)
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
void handle_write_error(const boost::system::error_code &ec)
virtual void async_read()=0
Starts an asynchronous read on the socket.
IOThread(const IOConfig &config, int index, std::string glog_group="i/o")
Constructs the thread.
virtual void open_socket()=0
Opens the newly created socket/serial_port.
friend void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
void handle_write_success(std::size_t bytes_transferred)
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
goby::util::logger::GroupSetter group(std::string n)
detail namespace with internal helper functions
@ error
throw a parse_error exception in case of a tag
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::DriverConfig, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::abc::protobuf::Config >, 11, false > config
void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
ProtobufEndpoint endpoint_convert(const ASIOEndpoint &asio_ep)
std::string to_string(goby::middleware::protobuf::Layer layer)
std::recursive_mutex mutex
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.
std::chrono::time_point< SteadyClock > time_point
static time_point now() noexcept
Returns the current steady time unless SimulatorSettings::using_sim_time == true in which case a simu...
std::chrono::microseconds duration
Duration type.