iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ModulePortImpl.H
Go to the documentation of this file.
1 /*! @file
2  @author Laurent Itti
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 
36 #ifndef INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MODULEPORTIMPL_H
37 #define INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MODULEPORTIMPL_H
38 
39 #include <nrt/Core/Debugging/Log.H>
42 
43 // Define a bunch of macros
45 
46 #include <nrt/External/cereal/types/string.hpp>
47 
48 // ######################################################################
49 // ######################################################################
50 // ######################################################################
51 
52 template <class Port, class Msg, class Ret> inline
55 launchRemoteCallback(nrt::blackboard::MessageSubscriberRemoteCallback rcb,
56  std::shared_future<void> fut, std::string const transactionkey)
57 {
58  // First, wait on the shared future, so that we know that the message has arrived at the remote
59  // blackboard. This will throw if there is any problem with transmitting to the remote blackboard:
60  fut.get();
61 
62  // Now, run the remote callback, and get a string result:
63  std::shared_ptr<std::string const> retstr = rcb(transactionkey);
64 
65  // Deserialize the message and return it, this will throw if anything goes wrong with the deserialization:
66  std::istringstream is(*retstr);
67  nrt::iarchive archive(is);
68  std::shared_ptr<Ret> m(new Ret);
69  archive(*m);
70 
71  return m;
72 }
73 
74 // ######################################################################
75 template <class Port, class Msg, class Ret> template <typename... Args> inline
78 {
79  return std::unique_ptr<Msg>(new Msg(std::forward<Args>(args)...));
80 }
81 
82 // ######################################################################
83 template <class Port, class Msg> inline
86 launchRemoteCallback(nrt::blackboard::MessageSubscriberRemoteCallback rcb,
87  std::shared_future<void> fut, std::string const transactionkey)
88 {
89  // First, wait on the shared future, so that we know that the message has arrived at the remote
90  // blackboard. This will throw if there is any problem with transmitting to the remote blackboard:
91  fut.get();
92 
93  // Now, run the remote callback, and get a string result, which we ignore since we return nothing (should be
94  // empty anyway):
95  rcb(transactionkey);
96 }
97 
98 // ######################################################################
99 template <class Port, class Msg> template <typename... Args> inline
102 {
103  return std::unique_ptr<Msg>(new Msg(std::forward<Args>(args)...));
104 }
105 
106 // ######################################################################
107 template <class Port, typename T> inline
108 typename nrt::MessagePosting<Port, nrt::Message<T>, void>::RetPtr
110 launchRemoteCallback(nrt::blackboard::MessageSubscriberRemoteCallback rcb,
111  std::shared_future<void> fut, std::string const transactionkey)
112 {
113  // First, wait on the shared future, so that we know that the message has arrived at the remote
114  // blackboard. This will throw if there is any problem with transmitting to the remote blackboard:
115  fut.get();
116 
117  // Now, run the remote callback, and get a string result, which we ignore since we return nothing (should be
118  // empty anyway):
119  rcb(transactionkey);
120 }
121 
122 // ######################################################################
123 template <class Port, typename T> template <typename... Args> inline
124 typename nrt::MessagePosting<Port, nrt::Message<T>, void>::MsgUptr
125 nrt::MessagePosting<Port, nrt::Message<T>, void>::make_message(Args && ... args)
126 {
127  return std::unique_ptr<nrt::Message<T> >(new nrt::Message<T>(std::forward<Args>(args)...));
128 }
129 
130 // ######################################################################
131 // ######################################################################
132 // ######################################################################
133 
134 // ######################################################################
135 // General case
136 // ######################################################################
137 template <class Port, class Msg, class Ret> inline
139  std::shared_ptr<Msg const>(msg)
140 { }
141 
142 // ######################################################################
143 template <class Port, class Msg, class Ret> inline
147 {
148  nrt::Blackboard::instance().incrementActiveCallbackCount();
150  nrt::Blackboard::instance().decrementActiveCallbackCount();
151  return ret;
152 }
153 
154 // ######################################################################
155 template <class Port, class Msg, class Ret> template <typename... Args> inline
158 {
159  return std::unique_ptr<Ret>(new Ret(std::forward<Args>(args)...));
160 }
161 
162 // ######################################################################
163 // Specialization for void return
164 // ######################################################################
165 template <class Port, class Msg> inline
167  std::shared_ptr<Msg const>(msg)
168 { }
169 
170 // ######################################################################
171 template <class Port, class Msg> inline
173 {
174  nrt::Blackboard::instance().incrementActiveCallbackCount();
175  func();
176  nrt::Blackboard::instance().decrementActiveCallbackCount();
177 }
178 
179 // ######################################################################
180 template <class Port, class Msg> template <typename... Args> inline
182 { }
183 
184 // ######################################################################
185 // Specialization for nrt::Message<T> message and void return
186 // ######################################################################
187 template <class Port, typename T> inline
189  std::shared_ptr<nrt::Message<T> const>(msg)
190 { }
191 
192 // ######################################################################
193 template <class Port, typename T> inline
195 runBoundOnMessage(std::function<void()> func)
196 {
197  nrt::Blackboard::instance().incrementActiveCallbackCount();
198  func();
199  nrt::Blackboard::instance().decrementActiveCallbackCount();
200 }
201 
202 // ######################################################################
203 template <class Port, typename T> template <typename... Args> inline
204 void nrt::MessageSubscription<Port, nrt::Message<T>, void>::make_return_message(Args && ... args)
205 { }
206 
207 // ######################################################################
208 // ######################################################################
209 // ######################################################################
210 
211 // SimpleMessageSubscription implementation
212 
213 template <typename T> inline
215 SimpleMessageSubscription(std::shared_ptr<typename nrt::Message<T> const> msg) :
216  nrt::MessageSubscription<SimpleMessageSubscription<T>, nrt::Message<T>, void>(msg)
217 { }
218 
219 template <typename T> inline
221 SimpleMessageSubscription(std::shared_ptr<T const> msg) :
222  nrt::MessageSubscription<SimpleMessageSubscription<T>, T, void>(msg)
223 { }
224 
225 // ######################################################################
226 // ######################################################################
227 // ######################################################################
228 
229 // MessagePosterCore inlined implementation
230 
231 // ######################################################################
232 template <class Posting> inline
234 {
235  typedef typename Posting::MsgType Msg;
236  typedef typename Posting::RetType Ret;
237 
238  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value,
239  "Posting::MsgType must derive from nrt::MessageBase");
240  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
241  "Posting::RetType must be void or derive from nrt::MessageBase");
242 
243  // Initialize our MessagePosterCoreBase:
244  msgtype = nrt::MessageType<Msg>();
245  rettype = nrt::MessageType<Ret>();
246  module = uid().str();
247  portname = Posting::portname();
248  description = Posting::description();
249  topic = "";
250  sequence = -1; // the Blackboard will set that up upon registration
251  splittable = Posting::isSplittable;
252 
253  // Register with our Blackboard:
254  nrt::Blackboard::instance().registerMessagePoster<Posting>(this);
255 }
256 
257 // ######################################################################
258 template <class Posting> inline
260 MessagePosterCore(std::string const & mod, std::string const & portna, std::string const & descr)
261 {
262  typedef typename Posting::MsgType Msg;
263  typedef typename Posting::RetType Ret;
264 
265  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value,
266  "Posting::MsgType must derive from nrt::MessageBase");
267  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
268  "Posting::RetType must be void or derive from nrt::MessageBase");
269 
270  // Initialize our MessagePosterCoreBase:
271  msgtype = nrt::MessageType<Msg>();
272  rettype = nrt::MessageType<Ret>();
273  module = mod;
274  portname = portna;
275  description = descr;
276  topic = "";
277  sequence = -1; // the Blackboard will set that up upon registration
278  splittable = Posting::isSplittable;
279 
280  // Register with our Blackboard:
281  nrt::Blackboard::instance().registerMessagePoster<Posting>(this);
282 }
283 
284 // ######################################################################
285 template <class Posting> inline
287 { nrt::Blackboard::instance().unRegisterMessagePoster<Posting>(this); }
288 
289 // ######################################################################
290 template <class Posting> inline
292 {
293 #define BBWHAT std::string("module [") + module + " poster [" + portname + "]"
294 
295  if (Posting::isSplittable == false) BBTHROW(UnsplittablePort);
296 
297  // Get unique write access to our internals:
298  boost::unique_lock<boost::shared_mutex> lck(mtx);
299 
300  // Ignore if we are already split:
301  if (splitPosters) return;
302 
303  // Create the ports, they will register with the blackboard at construction:
304  NRT_BBDEBUG("create for module " << module << " portname " << portname << " msg " << msgtype);
305 
306  splitPosters = Posting::MsgType::createSubPosters(module, portname);
307 
308 #undef BBWHAT
309 }
310 
311 // ######################################################################
312 template <class Posting> inline
314 {
315 #define BBWHAT std::string("module [") + module + " poster [" + portname + "]"
316 
317  // Get unique write access to our internals:
318  boost::unique_lock<boost::shared_mutex> lck(mtx);
319 
320  // Simply invalidate our splitPosters, if any:
321  if (splitPosters)
322  {
323  NRT_BBDEBUG("delete for module " << module << " portname " << portname << " msg " << msgtype);
324  splitPosters.reset();
325  }
326 
327 #undef BBWHAT
328 }
329 
330 // ######################################################################
331 template <class Posting> inline
333 {
334  boost::shared_lock<boost::shared_mutex> lck(mtx);
335  if (splitPosters) return true; else return false;
336 }
337 
338 // ######################################################################
339 template <class Posting> template <typename... Args> inline
340 typename Posting::MsgUptr
342 {
343  return Posting::make_message(std::forward<Args>(args)...);
344 }
345 
346 // ######################################################################
347 template <class Posting> inline
349 nrt::MessagePosterCore<Posting>::post(std::unique_ptr<typename Posting::MsgType> & m) const
350 {
351  typedef typename Posting::MsgType Msg;
352 
353  // Grab the unique_ptr and make a shared_ptr out of it for our internal use only:
354  std::shared_ptr<Msg const> msg = std::move(m);
355 
357 }
358 
359 // ######################################################################
360 template <class Posting> inline
362 nrt::MessagePosterCore<Posting>::post(std::unique_ptr<typename Posting::MsgType> && m) const
363 {
364  typedef typename Posting::MsgType Msg;
365 
366  // Grab the unique_ptr and make a shared_ptr out of it for our internal use only:
367  std::shared_ptr<Msg const> msg = std::move(m);
368 
370 }
371 
372 // ######################################################################
373 template <class Posting> inline
375 nrt::MessagePosterCore<Posting>::repost(std::shared_ptr<typename Posting::MsgType const> & m) const
376 {
377  typedef typename Posting::MsgType Msg;
378 
379  // Grab the shared_ptr contents and invalidate the original:
380  std::shared_ptr<Msg const> msg(nullptr);
381 
382  msg.swap(m);
383 
385 }
386 
387 // ######################################################################
388 template <class Posting> inline
390 nrt::MessagePosterCore<Posting>::doPost(std::shared_ptr<typename Posting::MsgType const> msg) const
391 {
392 #define BBWHAT std::string("posting [") + portname + "]"
393 
394  // Get a vector of results ready:
395  PostResultsType res;
396 
397  // Drop the post if we are not running() anymore (trying to stop...)
398  if (nrt::Blackboard::instance().running() == false)
399  { NRT_WARNING("Dropped " << BBWHAT << " while not in running() state."); return res; }
400 
401  // post!
402  this->doPostInternal(res.futures, msg);
403 
404  // return the future results:
405  return res;
406 
407 #undef BBWHAT
408 }
409 
410 // ######################################################################
411 template <class Posting> inline
412 void nrt::MessagePosterCore<Posting>::doPostInternal(std::list<std::future<typename Posting::RetPtr> > & fut,
413  std::shared_ptr<typename Posting::MsgType const> msg) const
414 {
415 #define BBWHAT std::string("posting [") + portname + "]"
416 
417  typedef typename Posting::MsgType Msg;
418 
419  // Store the message on our local message pool:
420  nrt::Blackboard::instance().storeMessage<Msg>(msg, messagekey);
421 
422  {
423  // Get shared read access to our internals:
424  boost::shared_lock<boost::shared_mutex> lck(mtx);
425 
426  // #################### Whole message post:
427 
428  // If we have local callbacks, launch them first:
429  for (auto const & lcb : localCallbacks) fut.push_back(nrt::async(lcb, msg));
430 
431  // If we have remote callbacks, serialize the message once, and then and launch them all:
432  if (remotePostData.size())
433  {
434  // Serialize the message only once:
435  std::shared_ptr<std::string> sermsg;
436  try
437  {
438  std::ostringstream archiveStream;
439  nrt::oarchive archive(archiveStream);
440  archive( *(msg.get()) );
441  sermsg.reset(new std::string(archiveStream.str()));
442  }
443  catch (cereal::Exception const & e)
444  {
445  BBTHROWX(MessageSerializationError, e.what());
446  }
447  catch (...)
448  {
449  BBTHROW(MessageSerializationError);
450  }
451 
452  // Create a unique key for this transaction; this is needed in case two threads call the present doPostInternal()
453  // simultaneously, so that we don't confuse the messages on the receiver end. We will just use the bbuid,
454  // moduleuid, and the address of the serialized message as our key:
455  std::ostringstream keystream; keystream << module << '|' << sermsg;
456  std::string const transactionkey = keystream.str();
457 
458  // For each relevant remote bb, launch the call to remotePost and get a shared_future<void> out of it:
459  for (auto & rpd : remotePostData) {
460 
461  size_t const numcb = rpd.second.remoteCallbacks.size();
462 
463  // Dispatch the serialized message to that remote blackboard:
464  std::shared_future<void> f = nrt::async(rpd.second.remotePostHandler, sermsg, transactionkey, numcb);
465 
466  // Now for each callback, wait on our shared future, launch the remote callback, get the serialized result, and
467  // deserialize it. Launch the remote callbacks:
468  for (auto const & rcb : rpd.second.remoteCallbacks)
469  fut.push_back(nrt::async(Posting::launchRemoteCallback, rcb, f, transactionkey));
470  }
471  }
472 
473  // If we have remote subscribers that want the nrt::AnyMessage version of this message, wrap the message, serialize
474  // the message once, and then and launch all the callbacks. FIXME: if a remote blackboard has subscribers for both
475  // the native message and the nrt::AnyMessage wrapped version of it, we will end up transmitting it twice (once as
476  // native and once wrapped in nrt::AnyMessage; though the types are different, the contents are deep down the
477  // same). This is because in general we need to wrap Msg into nrt::AnyMessage here (at the posting site) as the
478  // remote may not necessariy know how to de-serialize type Msg, if it only has subscribers to AnyMessage:
479  if (remotePostDataAny.size())
480  {
481  // Wrap the message into nrt::AnyMessage:
482  std::shared_ptr<nrt::AnyMessage const> msgany(new nrt::AnyMessage(msg));
483 
484  // Serialize the AnyMessage only once:
485  std::shared_ptr<std::string> sermsg;
486  try
487  {
488  std::ostringstream archiveStream;
489  nrt::oarchive archive(archiveStream);
490  archive(*msgany.get());
491  sermsg.reset(new std::string(archiveStream.str()));
492  }
493  catch (cereal::Exception const & e)
494  {
495  BBTHROWX(MessageSerializationError, e.what());
496  }
497  catch (...)
498  {
499  BBTHROW(MessageSerializationError);
500  }
501 
502  // Create a unique key for this transaction; this is needed in case two threads call the present doPostInternal()
503  // simultaneously, so that we don't confuse the messages on the receiver end. We will just use the bbuid,
504  // moduleuid, and the address of the serialized message as our key:
505  std::ostringstream keystream; keystream << module << '|' << sermsg;
506  std::string const transactionkey = keystream.str();
507 
508  // For each relevant remote bb, launch the call to remotePost and get a shared_future<void> out of it:
509  for (auto & rpd : remotePostDataAny) {
510 
511  size_t const numcb = rpd.second.remoteCallbacks.size();
512 
513  // Dispatch the serialized message to that remote blackboard:
514  std::shared_future<void> f = nrt::async(rpd.second.remotePostHandler, sermsg, transactionkey, numcb);
515 
516  // Now for each callback, wait on our shared future, launch the remote callback, get the serialized result, and
517  // deserialize it. Launch the remote callbacks:
518  for (auto const & rcb : rpd.second.remoteCallbacks)
519  fut.push_back(nrt::async(Posting::launchRemoteCallback, rcb, f, transactionkey));
520  }
521  }
522 
523  // #################### Split message post, if we are split (recursive):
524  if (splitPosters) msg->doSplitPost(*splitPosters, fut);
525 
526  NRT_BBDEBUG("Dispatched [" << nrt::MessageType<Msg>() << "] to " << localCallbacks.size() <<
527  " local callbacks and " << remotePostData.size() + remotePostDataAny.size() << " remote callbacks.");
528  }
529 #undef BBWHAT
530 }
531 
532 // ######################################################################
533 template <class Posting> inline
535 {
536  typedef typename Posting::MsgType Msg;
537 
538  // Delete any matching message from our local message pool:
539  nrt::Blackboard::instance().expungeMessage<Msg>(messagekey);
540 }
541 
542 // ######################################################################
543 template <class Posting> inline
545 make_connector(std::string const & name, std::string const & namespc, nrt::ConnectorType const type,
546  std::string const & topicfilter, std::string const & topic) const
547 {
548  return nrt::Blackboard::instance().createConnector(nrt::ConnectorFlavor::Poster, msgtype, rettype,
549  name, namespc, type, topic, topicfilter);
550 }
551 
552 // ######################################################################
553 template <class Posting> inline
555 { localCallbacks.clear(); }
556 
557 // ######################################################################
558 template <class Posting> inline
560 {
561 #define BBWHAT std::string("poster [") + portname + "]"
562 
563  typedef typename Posting::MsgPtr MsgPtr;
564  typedef typename Posting::RetPtr RetPtr;
565 
566  // *** If message types match exactly, direct connection (this includes nrt::AnyMessage to nrt::AnyMessage):
567  if (this->msgtype == sub->msgtype)
568  {
569  NRT_BBDEBUG("msg to msg: " << this->msgtype << " to " << sub->msgtype);
570 
571  // Get the full derived callback wrapper:
572  nrt::MessageSubscriberCallbackWrapper<MsgPtr, RetPtr> * cbwrap =
573  dynamic_cast<nrt::MessageSubscriberCallbackWrapper<MsgPtr, RetPtr> * >(sub->callbackWrapperBase.get());
574  if (!cbwrap) BBTHROW(InternalInconsistency);
575 
576  // Grab its callback and push it onto our vector of callbacks:
577  localCallbacks.push_back(cbwrap->callback);
578  }
579 
580  // *** Posting a given MsgType, subscribing to nrt::AnyMessage:
581  else if (sub->msgtype == "nrt::AnyMessage")
582  {
583  NRT_BBDEBUG("msg to any: " << this->msgtype << " to " << sub->msgtype);
584 
585  nrt::MessageSubscriberCallbackWrapper<nrt::blackboard::AnyMsgPtr, RetPtr> * cbwrap =
586  dynamic_cast<nrt::MessageSubscriberCallbackWrapper<nrt::blackboard::AnyMsgPtr, RetPtr> * >
587  (sub->callbackWrapperBase.get());
588  if (!cbwrap) BBTHROWX(InternalInconsistency, "Ooops, unexpected subscriber type");
589 
590  // Grab its callback and push it onto our vector of callbacks, after some wrapping:
591  std::function<RetPtr(nrt::blackboard::AnyMsgPtr)> cb = cbwrap->callback;
592  localCallbacks.push_back( [cb](MsgPtr msg) -> RetPtr
593  { std::shared_ptr<nrt::AnyMessage const> am(new nrt::AnyMessage(msg)); return cb(am); }
594  );
595  }
596 
597  // *** Posting AnyMessage, subscribing to a given message type:
598  else if (this->msgtype == "nrt::AnyMessage")
599  {
600  NRT_BBDEBUG("any to msg: " << this->msgtype << " to " << sub->msgtype << " ret = " << this->rettype);
601 
602  // Get the full derived callback wrapper:
603  nrt::MessageSubscriberCallbackAnyWrapper<RetPtr> * cbwrapany =
604  dynamic_cast<nrt::MessageSubscriberCallbackAnyWrapper<RetPtr> * >(sub->callbackAnyWrapperBase.get());
605  if (!cbwrapany) BBTHROW(InternalInconsistency);
606 
607  // Grab its callback and push it onto our vector of callbacks:
608  // FIXME: we do one un-necessary (pointer) cast of the message here because we are testing for message type at
609  // runtime rather than compile time (which would be in the posting I guess)
610  std::function<RetPtr(nrt::blackboard::AnyMsgPtr)> cb = cbwrapany->callbackany;
611 
612  localCallbacks.push_back( [cb](MsgPtr msg) -> RetPtr {
613  std::shared_ptr<nrt::AnyMessage const> amsg = std::dynamic_pointer_cast<nrt::AnyMessage const>(msg);
614  if (!amsg) throw nrt::exception::BlackboardException
615  (nrt::exception::BlackboardException::BadAnyMessageCast, "Oooops", "bad AnyMessage cast");
616  return cb(amsg); }
617  );
618  }
619  else
620  BBTHROWX(InternalInconsistency, "Ooops, type confusion");
621 
622 #undef BBWHAT
623 }
624 
625 // ######################################################################
626 template <class T> inline
629 { }
630 
631 // ######################################################################
632 template <class T> inline
633 nrt::SplitMessagePoster<T>::SplitMessagePoster(std::string const & mod, std::string const & portna,
634  std::string const & descr) :
635  nrt::MessagePosterCore<SimpleMessagePosting<T> >(mod, portna, descr)
636 { }
637 
638 // ######################################################################
639 template <class T> inline
641 { }
642 
643 // ######################################################################
644 // If trying to post something that's nrt::Message<T>, just post it
645 // Otherwise, wrap it
646 template <typename T> template <typename TT> inline
647 typename std::enable_if<! std::is_same<typename nrt::SimpleMessagePosting<T>::MsgType, TT>::value, void>::type
648 nrt::SplitMessagePoster<T>::doPostInternalWrapper(std::list<std::future<void> > & fut, std::shared_ptr<TT> msg) const
649 { this->doPostInternal(fut, std::shared_ptr<nrt::Message<T> const>(new nrt::Message<T>(msg))); }
650 
651 template <typename T> template <typename TT> inline
652 typename std::enable_if<std::is_same<typename nrt::SimpleMessagePosting<T>::MsgType, TT>::value, void>::type
653 nrt::SplitMessagePoster<T>::doPostInternalWrapper(std::list<std::future<void> > & fut, std::shared_ptr<TT> msg) const
654 { this->doPostInternal(fut, msg); }
655 
656 // ######################################################################
657 // ######################################################################
658 // ######################################################################
659 
660 // MessageCheckerCore inlined implementation
661 
662 // ######################################################################
663 template <class Checking> inline
665 {
666  typedef typename Checking::MsgType Msg;
667 
668  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Checking::MsgType must derive from nrt::MessageBase");
669 
670  // Initialize our MessageCheckerCoreBase:
671  msgtype = nrt::MessageType<Msg>();
672  module = uid().str();
673  portname = Checking::portname();
674  description = Checking::description();
675  topicfilt = "";
676  sequence = -1; // the Blackboard will set that up upon registration
677  splittable = Checking::isSplittable;
678 
679  nrt::Blackboard::instance().registerMessageChecker<Checking>(this);
680 }
681 
682 // ######################################################################
683 template <class Checking> inline
685 MessageCheckerCore(std::string const & mod, std::string const & portna, std::string const & descr)
686 {
687  typedef typename Checking::MsgType Msg;
688 
689  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Checking::MsgType must derive from nrt::MessageBase");
690 
691  // Initialize our MessageCheckerCoreBase:
692  msgtype = nrt::MessageType<Msg>();
693  module = mod;
694  portname = portna;
695  description = descr;
696  topicfilt = "";
697  sequence = -1; // the Blackboard will set that up upon registration
698  splittable = Checking::isSplittable;
699 
700  nrt::Blackboard::instance().registerMessageChecker<Checking>(this);
701 }
702 
703 // ######################################################################
704 template <class Checking> inline
706 { return false; }
707 
708 // ######################################################################
709 template <class Checking> inline
711 { nrt::Blackboard::instance().unRegisterMessageChecker<Checking>(this); }
712 
713 // ######################################################################
714 template <class Checking> inline
717 {
718 #define BBWHAT std::string("checking [") + portname + "]"
719 
720  typedef typename Checking::MsgType Msg;
721 
723 
724  // Drop the check if we are not running() anymore (trying to stop...)
725  if (nrt::Blackboard::instance().running() == false)
726  { NRT_WARNING("Dropped " << BBWHAT << " while not in running() state."); return results; }
727 
728  // get shared read access to our internals:
729  boost::shared_lock<boost::shared_mutex> lck(mtx);
730 
731  // first, get the local results:
732  if (localMessageKeys.size()) {
733  std::string const checkkey = module + '|' + portname;
734 
735  for (auto const mkey : localMessageKeys) {
736  // Get the message from the blackboard, if any was posted (otherwise, will be null):
737  std::shared_ptr<nrt::MessageBase const> msgbase = nrt::Blackboard::instance().getMessage(mkey, mcp, checkkey);
738  if (msgbase) {
739  // Cast message back to its full derived type:
740  std::shared_ptr<Msg const> msg = std::dynamic_pointer_cast<Msg const>(msgbase);
741  if (!msg) BBTHROW(InternalInconsistency);
742 
743  results.messages.push(msg);
744  }
745  }
746  }
747 
748  // Now the remote messages:
749  for (auto const & func : remoteCheckCalls) {
750  std::function<std::shared_ptr<std::vector<std::shared_ptr<Msg const> > >()>
751  doitfunc = [func, mcp, this]() {
752  // Create a vector of results:
753  std::shared_ptr<std::vector<std::shared_ptr<Msg const> > > retval(new std::vector<std::shared_ptr<Msg const> >);
754 
755  // get the remote messages as serialized bytes:
756  NRT_BBDEBUG("Initiating remote check() request over the network...");
757  std::shared_ptr<std::string const> msgstr = func(mcp);
758  NRT_BBDEBUG("Done with remote check() request over the network.");
759 
760  // deserialize the archive:
761  try {
762  std::istringstream is(*msgstr);
763  nrt::iarchive archive(is);
764  size_t num;
765  archive( num );
766  for (size_t ii = 0; ii < num; ++ii)
767  {
768  Msg *m = new Msg;
769  archive( *m );
770  retval->push_back(std::shared_ptr<Msg const>(m));
771  }
772  }
773  catch (cereal::Exception const & e)
774  {
775  BBTHROWX(MessageSerializationError, e.what());
776  }
777  catch (...)
778  {
779  BBTHROW(MessageSerializationError);
780  }
781 
782  return retval;
783  };
784 
785  // Launch the remote check() call and store a future to it:
786  results.futures.push_back(nrt::async(doitfunc));
787  }
788 
789  return results;
790 
791 #undef BBWHAT
792 }
793 
794 // ######################################################################
795 template <class Checking> inline
797 make_connector(std::string const & name, std::string const & namespc, nrt::ConnectorType const type,
798  std::string const & topic, std::string const & topicfilter) const
799 {
800  return nrt::Blackboard::instance().createConnector(nrt::ConnectorFlavor::Checker, msgtype, "",
801  name, namespc, type, topic, topicfilter);
802 }
803 
804 // ######################################################################
805 // ######################################################################
806 // ######################################################################
807 
808 // MessageSubscriberCore inlined implementation
809 
810 template <class MsgPtr, class RetPtr> inline
811 nrt::MessageSubscriberCallbackWrapper<MsgPtr, RetPtr>::~MessageSubscriberCallbackWrapper()
812 { }
813 
814 template <class MsgPtr> inline
815 nrt::MessageSubscriberCallbackWrapper<MsgPtr, void>::~MessageSubscriberCallbackWrapper()
816 { }
817 
818 template <class RetPtr> inline
819 nrt::MessageSubscriberCallbackWrapper<nrt::blackboard::AnyMsgPtr, RetPtr>::~MessageSubscriberCallbackWrapper()
820 { }
821 
822 inline
823 nrt::MessageSubscriberCallbackWrapper<nrt::blackboard::AnyMsgPtr, void>::~MessageSubscriberCallbackWrapper()
824 { }
825 
826 template <class RetPtr> inline
827 nrt::MessageSubscriberCallbackAnyWrapper<RetPtr>::~MessageSubscriberCallbackAnyWrapper()
828 { }
829 
830 inline
831 nrt::MessageSubscriberCallbackAnyWrapper<void>::~MessageSubscriberCallbackAnyWrapper()
832 { }
833 
834 // ######################################################################
835 template <class Subscription> inline
837 {
838  typedef typename Subscription::MsgType Msg;
839  typedef typename Subscription::RetType Ret;
840 
841  // Initialize our MessageSubscriberCoreBase:
842  msgtype = nrt::MessageType<Msg>();
843  rettype = nrt::MessageType<Ret>();
844  module = uid().str();
845  portname = Subscription::portname();
846  description = Subscription::description();
847  topicfilt = "";
848  sequence = -1; // the Blackboard will set that up upon registration
849  splittable = Subscription::isSplittable;
850 
851  // Construct our callback wrappers:
852  constructCallback();
853 
854  // Now that we are ready for action, register with our blackboard:
855  nrt::Blackboard::instance().registerMessageSubscriber<Subscription>(this);
856 }
857 
858 // ######################################################################
859 template <class Subscription> inline
861 MessageSubscriberCore(std::string const & mod, std::string const & portna, std::string const & descr)
862 {
863  typedef typename Subscription::MsgType Msg;
864  typedef typename Subscription::RetType Ret;
865 
866  // Initialize our MessageSubscriberCoreBase:
867  msgtype = nrt::MessageType<Msg>();
868  rettype = nrt::MessageType<Ret>();
869  module = mod;
870  portname = portna;
871  description = descr;
872  topicfilt = "";
873  sequence = -1; // the Blackboard will set that up upon registration
874  splittable = Subscription::isSplittable;
875 
876  // Construct our callback wrappers:
877  constructCallback();
878 
879  // Now that we are ready for action, register with our blackboard:
880  nrt::Blackboard::instance().registerMessageSubscriber<Subscription>(this);
881 }
882 
883 // ######################################################################
884 template <class Subscription> inline
886 {
887  typedef typename Subscription::MsgType Msg;
888  typedef typename Subscription::RetType Ret;
889 
890  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value,
891  "Subscription::MsgType must derive from nrt::MessageBase");
892  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
893  "Subscription::RetType must be void or derive from nrt::MessageBase");
894 
895  std::string const bbwhere = "onMessage("+msgtype+" : "+rettype+") callback for subscriber port ["+
896  portname+"] of module [";
897 
898  std::shared_ptr<MessageSubscriberCallbackWrapper<typename Subscription::MsgPtr, typename Subscription::RetPtr> >
899  cbwrap(new MessageSubscriberCallbackWrapper<typename Subscription::MsgPtr, typename Subscription::RetPtr>);
900 
901  // Create a semi-bound callback to the onMessage() function of that subscriber. We bind the template arg, object
902  // pointer, and function pointer, and we leave open the Message and return for later binding when we use the function:
903  cbwrap->callback =
904  [this, bbwhere](typename Subscription::MsgPtr msg)
905  {
906  try
907  {
908  // Run the onMessage() function. We need a bit of trickery around it to make sure that we call
909  // incrementActiveCallbackCount() on the Blackboard before the call, and decrementActiveCallbackCount() after
910  // the call, but we need to be able to handle both void and message returns, hence the runBoundOnMessage()
911  // wrapper here. Note that since we increment the count in runBoundOnMessage() and then may throw, we will
912  // decrement the count in our catch() statements below:
914  {
915  nrt::Component * comp = dynamic_cast<nrt::Component *>(this);
916  std::string const cname = comp ? comp->meta().str() : this->uid().str();
917 
918  nrt::Event evt = nrt::SystemProfiler::instance().event(bbwhere + cname + ']');
919  return Subscription::
920  runBoundOnMessage(std::bind(&nrt::MessageSubscriberCore<Subscription>::onMessage, this, Subscription(msg)));
921  }
922  else
923  {
924  return Subscription::
925  runBoundOnMessage(std::bind(&nrt::MessageSubscriberCore<Subscription>::onMessage, this, Subscription(msg)));
926  }
927  }
928  catch (...)
929  {
930  nrt::Blackboard::instance().decrementActiveCallbackCount();
931  nrt::Component * comp = dynamic_cast<nrt::Component *>(this);
932  std::string const cname = comp ? comp->meta().str() : this->uid().str();
933  nrt::blackboard::catchAndRethrow(bbwhere + cname + ']', static_cast<nrt::ModuleBase *>(this));
934  throw; // keep gcc happy, catchAndRethrow() already rethrows for us...
935  }
936  };
937 
938  // Hook up our callback wrapper:
939  this->callbackWrapperBase = cbwrap;
940 
941  // And now the version for incoming nrt::AnyMessage:
942  typename Subscription::MsgPtr phony; // to deduce type of template constructor of MessageSubscriberCallbackAnyWrapper
943  std::shared_ptr<MessageSubscriberCallbackAnyWrapper<typename Subscription::RetPtr> >
944  cbwrapany(new MessageSubscriberCallbackAnyWrapper<typename Subscription::RetPtr>(phony));
945 
946  cbwrapany->callbackany = [this, bbwhere](nrt::blackboard::AnyMsgPtr msgany)
947  {
948  try
949  {
950  // first, extract underlying message type from the received nrt::AnyMessage:
951  std::shared_ptr<Msg const> msgcast = msgany->get<Msg>();
952  if (!msgcast) throw nrt::exception::BlackboardException
953  (nrt::exception::BlackboardException::BadAnyMessageCast, "Oooops", "bad AnyMessage cast");
954 
955  // Run the onMessage() function. We need a bit of trickery around it to make sure that we call
956  // incrementActiveCallbackCount() on the Blackboard before the call, and decrementActiveCallbackCount() after
957  // the call, but we need to be able to handle both void and message returns, hence the runBoundOnMessage()
958  // wrapper here. Note that since we increment the count in runBoundOnMessage() and then may throw, we will
959  // decrement the count in our catch() statements below:
960  if (nrt::SystemProfiler::instance().profileBlackboard)
961  {
962  nrt::Component * comp = dynamic_cast<nrt::Component *>(this);
963  std::string const cname = comp ? comp->meta().str() : this->uid().str();
964 
965  nrt::Event evt = nrt::SystemProfiler::instance().event(bbwhere + cname + ']');
966  return Subscription::
967  runBoundOnMessage(std::bind(&nrt::MessageSubscriberCore<Subscription>::onMessage, this,
968  Subscription(msgcast)));
969  }
970  else
971  {
972  return Subscription::
973  runBoundOnMessage(std::bind(&nrt::MessageSubscriberCore<Subscription>::onMessage, this,
974  Subscription(msgcast)));
975  }
976  }
977  catch (...)
978  {
979  nrt::Blackboard::instance().decrementActiveCallbackCount();
980  nrt::Component * comp = dynamic_cast<nrt::Component *>(this);
981  std::string const cname = comp ? comp->meta().str() : this->uid().str();
982  nrt::blackboard::catchAndRethrow(bbwhere + cname + ']', static_cast<nrt::ModuleBase *>(this));
983  throw; // keep gcc happy, catchAndRethrow() already rethrows for us...
984  }
985  };
986 
987  this->callbackAnyWrapperBase = cbwrapany;
988 }
989 
990 // ######################################################################
991 template <class Subscription> inline
993 { nrt::Blackboard::instance().unRegisterMessageSubscriber<Subscription>(this); }
994 
995 // ######################################################################
996 template <class Subscription> inline
998 {
999 #define BBWHAT std::string("module [") + module + " subscriber [" + portname + "]"
1000 
1001  if (Subscription::isSplittable == false) BBTHROW(UnsplittablePort);
1002  NRT_BBDEBUG("create for module " << module << " portname " << portname << " msg " << msgtype);
1003 
1004  // Get unique write access to our internals:
1005  boost::unique_lock<boost::shared_mutex> lck(mtx);
1006 
1007  // Ignore if we are already split:
1008  if (splitSubscribers) return;
1009 
1010  // Allocate our split received message:
1011  splitReceiveMessage.reset(new typename Subscription::MsgType());
1012 
1013  // Allocate our bitfield, and initialize to nothing received:
1014  splitReceiveBits.reset(new std::array<bool, numSplitReceiveParts>());
1015  splitReceiveBits->fill(false);
1016 
1017  // Create the ports, they will register with the blackboard at construction:
1018  splitSubscribers = splitReceiveMessage->createSubSubscribers(this, module, portname);
1019 
1020 #undef BBWHAT
1021 }
1022 
1023 // ######################################################################
1024 template <class Subscription> inline
1026 {
1027 #define BBWHAT std::string("module [") + module + " subscriber [" + portname + "]"
1028 
1029  // Get unique write access to our internals:
1030  boost::unique_lock<boost::shared_mutex> lck(mtx);
1031 
1032  if (splitSubscribers)
1033  {
1034  NRT_BBDEBUG("delete for module " << module << " portname " << portname << " msg " << msgtype);
1035 
1036  // In case we have a partially filled message here, we need to first notify the subs so that they stop blocking:
1037  abortSplitReceive();
1038  }
1039 
1040  // Kill our sub-ports, partial message, and bitfield:
1041  splitSubscribers.reset();
1042  splitReceiveMessage.reset();
1043  splitReceiveBits.reset();
1044 
1045 #undef BBWHAT
1046 }
1047 // ######################################################################
1048 template <class Subscription> inline
1050 {
1051  boost::shared_lock<boost::shared_mutex> lck(mtx);
1052 
1053  if (splitSubscribers) return true; else return false;
1054 }
1055 
1056 // ######################################################################
1057 template <class Subscription> template <typename... Args> inline
1060 {
1061  return Subscription::make_return_message(std::forward<Args>(args)...);
1062 }
1063 
1064 // ######################################################################
1065 template <class Subscription> inline
1067 make_connector(std::string const & name, std::string const & namespc, nrt::ConnectorType const type,
1068  std::string const & topic, std::string const & topicfilter) const
1069 {
1070  return nrt::Blackboard::instance().createConnector(nrt::ConnectorFlavor::Subscriber, msgtype, "",
1071  name, namespc, type, topic, topicfilter);
1072 }
1073 
1074 
1075 // ######################################################################
1076 template <class Subscription> inline
1078 {
1079  std::unique_lock<std::mutex> ulck(splitDataMutex);
1080 
1081  // Assert the corresponding bit:
1082  (*splitReceiveBits)[bitnum] = true;
1083 
1084  // Are all our bits set? if not, just block on our condition variable:
1085  for (bool b : *splitReceiveBits)
1086  if (b == false)
1087  {
1088  // not all bits set yet. First acquire the mutex associated with our condition variable:
1089  std::unique_lock<std::mutex> pulck(splitReceiveMutex);
1090 
1091  // Then release our lock onto our data so others can bring more data:
1092  ulck.unlock();
1093 
1094  // Then block on our condition variable, which will be notified when message is complete:
1095  splitReceiveCond.wait(pulck);
1096 
1097  // By the time we unlock here, we have nothing left to do
1098  return;
1099  }
1100 
1101  // All right, the message is complete! First trigger our callback and block until it's complete. Note that we are
1102  // still holding ulck here. If onMessage() throws we are just going to let that through (??)
1103  onMessage(Subscription(splitReceiveMessage));
1104 
1105  // Zero-out our bitfield so we are ready for the next message:
1106  splitReceiveBits->fill(false);
1107 
1108  // Notify all previously blocked subs that we are done:
1109  splitReceiveCond.notify_all();
1110 }
1111 
1112 // ######################################################################
1113 template <class Subscription> inline
1115 {
1116 #define BBWHAT std::string("module [") + module + " subscriber [" + portname + "]"
1117 
1118  if (splitSubscribers)
1119  {
1120  NRT_BBDEBUG(BBWHAT);
1121 
1122  // In case we have a partially filled message here, we need to first notify the subs so that they stop
1123  // blocking. As we do this there might be more in the queue that will then block. So we iterate a few times...
1124  for (int i = 0; i < 10; ++i)
1125  {
1126  splitReceiveBits->fill(false);
1127  splitReceiveCond.notify_all(); // free up our blocked sub-subscribers
1128  std::this_thread::sleep_for(std::chrono::milliseconds(100));
1129  bool finished = true; for (bool b : *splitReceiveBits) if (b) { finished = false; break; }
1130  if (finished) break;
1131  }
1132  }
1133 
1134 #undef BBWHAT
1135 }
1136 
1137 // ######################################################################
1138 // ######################################################################
1139 template <typename T> inline
1140 nrt::SplitMessageSubscriber<T, false>::
1141 SplitMessageSubscriber(nrt::MessageSubscriberCoreBase * parent, std::shared_ptr<T> & field, size_t const bitnum,
1142  std::string const & mod, std::string const & portna, std::string const & descr) :
1143  nrt::MessageSubscriberCore<nrt::SimpleMessageSubscription<T, false> >(mod, portna, descr),
1144  itsParent(parent), itsField(field), itsBitnum(bitnum)
1145 { }
1146 
1147 template <typename T> inline
1148 nrt::SplitMessageSubscriber<T, false>::~SplitMessageSubscriber()
1149 { }
1150 
1151 template <typename T> inline
1153 {
1154  // make sure we don't run several instances of the present function concurrently:
1155  std::unique_lock<std::mutex> ulck(itsOnMessageMtx);
1156 
1157  // Stuff the message into our parent's temporary partial composite message, update the appropriate bitfield,
1158  // and block until the partial message is complete and the parent's onMessage() gets triggered:
1159  itsField = msg->value_;
1160 
1161  // Notify the parent. If we are the last field, then onMessage() will trigger on the parent and by the time it returns
1162  // we'll be all done here. Otherwise, parent message not yet complete, and we will block here until it is:
1163  itsParent->notifySplitReceive(itsBitnum);
1164 }
1165 
1166 template <typename T> inline
1167 nrt::SplitMessageSubscriber<T, true>::
1168 SplitMessageSubscriber(nrt::MessageSubscriberCoreBase * parent, std::shared_ptr<T> & field, size_t const bitnum,
1169  std::string const & mod, std::string const & portna, std::string const & descr) :
1170  nrt::MessageSubscriberCore<nrt::SimpleMessageSubscription<T, true> >(mod, portna, descr),
1171  itsParent(parent), itsField(field), itsBitnum(bitnum)
1172 { }
1173 
1174 template <typename T> inline
1175 nrt::SplitMessageSubscriber<T, true>::~SplitMessageSubscriber()
1176 { }
1177 
1178 template <typename T> inline
1180 {
1181  // make sure we don't run several instances of the present function concurrently:
1182  std::unique_lock<std::mutex> ulck(itsOnMessageMtx);
1183 
1184  // Stuff the message into our parent's temporary partial composite message, update the appropriate bitfield,
1185  // and block until the partial message is complete and the parent's onMessage() gets triggered:
1186  itsField = std::const_pointer_cast<T>(msg);
1187 
1188  // Notify the parent. If we are the last field, then onMessage() will trigger on the parent and by the time it returns
1189  // we'll be all done here. Otherwise, parent message not yet complete, and we will block here until it is:
1190  itsParent->notifySplitReceive(itsBitnum);
1191 }
1192 
1193 // Un-define a bunch of macros
1195 
1196 #endif // INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MODULEPORTIMPL_H