Goby3  3.1.5
2024.05.14
liaison_container.h
Go to the documentation of this file.
1 // Copyright 2013-2022:
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 // File authors:
6 // Toby Schneider <toby@gobysoft.org>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
26 #define GOBY_ZEROMQ_LIAISON_LIAISON_CONTAINER_H
27 
28 #include <queue>
29 
30 #include <Wt/WColor>
31 #include <Wt/WContainerWidget>
32 #include <Wt/WText>
33 #include <Wt/WTimer>
34 
36 
37 #include "goby/middleware/group.h"
40 
41 namespace goby
42 {
43 namespace zeromq
44 {
45 const Wt::WColor goby_blue(28, 159, 203);
46 const Wt::WColor goby_orange(227, 96, 52);
47 
49 {
50  return "liaison_internal_publish_socket";
51 }
53 {
54  return "liaison_internal_subscribe_socket";
55 }
56 
57 class LiaisonContainer : public Wt::WContainerWidget
58 {
59  public:
61  {
62  setStyleClass("fill");
63  /* addWidget(new Wt::WText("<hr/>")); */
64  /* addWidget(name_); */
65  /* addWidget(new Wt::WText("<hr/>")); */
66  }
67 
69  {
70  goby::glog.is_debug2() && goby::glog << "~LiaisonContainer(): " << name() << std::endl;
71  }
72 
73  void set_name(const Wt::WString& name) { name_.setText(name); }
74 
75  const Wt::WString& name() { return name_.text(); }
76 
77  virtual void focus() {}
78  virtual void unfocus() {}
79  virtual void cleanup() {}
80 
81  private:
82  Wt::WText name_;
83 };
84 
85 template <typename Derived, typename GobyThread>
87 {
88  public:
89  LiaisonContainerWithComms(const goby::apps::zeromq::protobuf::LiaisonConfig& cfg)
90  {
91  static std::atomic<int> index(0);
92  index_ = index++;
93 
94  // copy configuration
95  auto thread_lambda = [this, cfg]() {
96  {
97  std::lock_guard<std::mutex> l(goby_thread_mutex);
98  goby_thread_ =
99  std::make_unique<GobyThread>(static_cast<Derived*>(this), cfg, index_);
100  }
101 
102  try
103  {
104  goby_thread_->run(thread_alive_);
105  }
106  catch (...)
107  {
108  thread_exception_ = std::current_exception();
109  }
110 
111  {
112  std::lock_guard<std::mutex> l(goby_thread_mutex);
113  goby_thread_.reset();
114  }
115  };
116 
117  thread_ = std::unique_ptr<std::thread>(new std::thread(thread_lambda));
118 
119  // wait for thread to be created
120  while (goby_thread() == nullptr) usleep(1000);
121 
122  comms_timer_.setInterval(1 / cfg.update_freq() * 1.0e3);
123  comms_timer_.timeout().connect(
124  [this](const Wt::WMouseEvent&) { this->process_from_comms(); });
125  comms_timer_.start();
126  }
127 
129  {
130  thread_alive_ = false;
131  thread_->join();
132 
133  if (thread_exception_)
134  {
135  goby::glog.is_warn() && goby::glog << "Comms thread had an uncaught exception"
136  << std::endl;
137  std::rethrow_exception(thread_exception_);
138  }
139 
140  goby::glog.is_debug2() && goby::glog << "~LiaisonContainerWithComms(): " << name()
141  << std::endl;
142  }
143 
144  void post_to_wt(std::function<void()> func)
145  {
146  std::lock_guard<std::mutex> l(comms_to_wt_mutex);
147  comms_to_wt_queue.push(func);
148  }
149 
151  {
152  std::lock_guard<std::mutex> l(wt_to_comms_mutex);
153  while (!wt_to_comms_queue.empty())
154  {
155  wt_to_comms_queue.front()();
156  wt_to_comms_queue.pop();
157  }
158  }
159 
160  protected:
161  GobyThread* goby_thread()
162  {
163  std::lock_guard<std::mutex> l(goby_thread_mutex);
164  return goby_thread_.get();
165  }
166 
167  void post_to_comms(std::function<void()> func)
168  {
169  std::lock_guard<std::mutex> l(wt_to_comms_mutex);
170  wt_to_comms_queue.push(func);
171  }
172 
174  {
175  std::lock_guard<std::mutex> l(comms_to_wt_mutex);
176  while (!comms_to_wt_queue.empty())
177  {
178  comms_to_wt_queue.front()();
179  comms_to_wt_queue.pop();
180  }
181  }
182 
183  void update_comms_freq(double hertz)
184  {
185  comms_timer_.stop();
186  comms_timer_.setInterval(1 / hertz * 1.0e3);
187  comms_timer_.start();
188  }
189 
190  private:
191  // for comms
192  std::mutex comms_to_wt_mutex;
193  std::queue<std::function<void()>> comms_to_wt_queue;
194  std::mutex wt_to_comms_mutex;
195  std::queue<std::function<void()>> wt_to_comms_queue;
196 
197  // only protects the unique_ptr, not the underlying thread
198  std::mutex goby_thread_mutex;
199  std::unique_ptr<GobyThread> goby_thread_{nullptr};
200 
201  int index_;
202  std::unique_ptr<std::thread> thread_;
203  std::atomic<bool> thread_alive_{true};
204  std::exception_ptr thread_exception_;
205 
206  Wt::WTimer comms_timer_;
207 };
208 
209 template <typename WtContainer>
211  : public goby::middleware::SimpleThread<goby::apps::zeromq::protobuf::LiaisonConfig>
212 {
213  public:
214  LiaisonCommsThread(WtContainer* container,
215  const goby::apps::zeromq::protobuf::LiaisonConfig& config, int index)
216  : goby::middleware::SimpleThread<goby::apps::zeromq::protobuf::LiaisonConfig>(
217  config, config.update_freq() * boost::units::si::hertz, index),
218  container_(container)
219  {
220  }
221 
222  void loop() override
223  {
224  // goby::glog.is_debug3() && goby::glog << "LiaisonCommsThread " << this->index() << " loop()"
225  // << std::endl;
226  container_->process_from_wt();
227  }
228 
229  private:
230  WtContainer* container_;
231 };
232 } // namespace zeromq
233 } // namespace goby
234 #endif
Implements Thread for a three layer middleware setup ([ intervehicle [ interprocess [ interthread ] ]...
Definition: simple_thread.h:46
SimpleThread(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg, double loop_freq_hertz=0, int index=-1)
Construct a thread with a given configuration, optionally a loop frequency and/or index.
Definition: simple_thread.h:58
int index() const
Definition: thread.h:145
LiaisonCommsThread(WtContainer *container, const goby::apps::zeromq::protobuf::LiaisonConfig &config, int index)
LiaisonContainerWithComms(const goby::apps::zeromq::protobuf::LiaisonConfig &cfg)
void post_to_wt(std::function< void()> func)
void post_to_comms(std::function< void()> func)
void set_name(const Wt::WString &name)
extern ::PROTOBUF_NAMESPACE_ID::internal::ExtensionIdentifier< ::goby::acomms::protobuf::DriverConfig, ::PROTOBUF_NAMESPACE_ID::internal::MessageTypeTraits< ::goby::acomms::abc::protobuf::Config >, 11, false > config
std::recursive_mutex mutex
const Wt::WColor goby_blue(28, 159, 203)
const Wt::WColor goby_orange(227, 96, 52)
std::string liaison_internal_publish_socket_name()
std::string liaison_internal_subscribe_socket_name()
The global namespace for the Goby project.
util::FlexOstream glog
Access the Goby logger through this object.