iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ModulePortHelpers.H
Go to the documentation of this file.
1 /*! @file
2  @author Laurent Itti
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 
36 #ifndef INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MODULEPORTHELPERS_H
37 #define INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MODULEPORTHELPERS_H
38 
39 // Define Message, MessagePosterResults and MessageCheckerResults
46 
47 #include <sys/types.h> // for pid_t
48 #include <boost/thread/shared_mutex.hpp> // for shared_mutex, shared_lock and friends
49 #include <map>
50 #include <array>
51 
52 namespace nrt
53 {
54  class Blackboard;
55  class Module;
56  class BlackboardManager;
57  class MessagePosterCoreBase;
58  class MessageCheckerCoreBase;
59  class MessageSubscriberCoreBase;
60  template <class T> class ParameterCore;
61 
62  //! Policy flag that can be passed to check() to indicate what to check for and how
63  /*! A policy flag for MessageChecker that controls which messages will be returned, as well as the blocking behavior
64  of a call to check().
65  \ingroup module */
67  {
68  Any, //!< Any messages, regardless of whether they have already been returned to a previous call to check()
69  Unseen, //!< Only unseen messages, those returned to a previous call to check() will be skipped
70  AnyBlock, //!< Like MessageCheckerPolicy::Any but will block until at least one result is available
71  UnseenBlock //!< Like MessageCheckerPolicy::Unseenbut will block until at least one result is available
72  };
73 
74  //! Module Parameter port type, used to create ports in Modules for Parameters
75  /*! \ingroup module */
76  enum class ModuleParamPort
77  {
78  Poster, //!< Poster parameter port type
79  Checker, //!< Checker parameter port type
80  Subscriber //!< Subscriber parameter port type
81  };
82 
83  namespace blackboard
84  {
85  // Message key type
86  typedef size_t msgkey;
87 
88  // MessageSubscriber remote post handlers
89  typedef std::function<void(std::shared_ptr<std::string const> msg, std::string const & transactionkey,
90  size_t const numcallbacks)>
91  MessageSubscriberRemotePostHandler;
92 
93  // MessageSubscriber remote callbacks
94  typedef std::function<std::shared_ptr<std::string const>(std::string const & transactionkey)>
95  MessageSubscriberRemoteCallback;
96 
97  // MessageSubscriber local callback for execution while serving a remote post
98  typedef std::function<void(std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)>
99  MessageSubscriberCallbackForRemote;
100 
101  // MessageChecker remote call
102  typedef std::function<std::shared_ptr<std::string const>(nrt::MessageCheckerPolicy const)>
103  MessageCheckerRemoteCall;
104 
105  // Catch and rethrow an exception, intended for use by ports only
106  void catchAndRethrow(std::string const & bbwhere, nrt::ModuleBase * mod);
107 
108  // Message pointer type to an nrt::AnyMessage
109  typedef typename std::shared_ptr<nrt::AnyMessage const> AnyMsgPtr;
110  }
111 
112  // ######################################################################
113  // ######################################################################
114  // ######################################################################
115  //! A Posting port is a unique binding of a sent Message type and a returned Message type to a port class
116  /*! Module objects can post() on a given Posting port if and only if they derive from the corresponding MessagePoster
117  of the Posting. Typically, Msg and Ret should derive from nrt::MessageBase, with a special case allowed for Ret
118  being void. See MessagePoster for how to use post(). A convenience macro is provided (in
119  details/ModulePortHelpers.H) to easily declare a posting class:
120 
121  @code
122  NRT_DECLARE_MESSAGEPOSTER_PORT(PortName, MsgType, RetType, Description);
123  @endcode
124 
125  Where PortName is the name of the class that will embody your Posting port, MsgType is the type of posted message
126  (must derive from nrt::MessageBase), RetType is the type of message returned by any subscriber (callback) that
127  will respond to posts on this Posting (must derive from nrt::MessageBase or be void), and Description is a plain
128  C-style string describing your Posting. For example:
129 
130  @code
131  class MessageA : public nrt::MessageBase { }; // define some message type A
132 
133  NRT_DECLARE_MESSAGEPOSTER_PORT(OutputA, MessageA, void, "MessageA Output"); // Our port is named OutputA
134  @endcode
135 
136  The reason for declaring a new Posting type is to allow one to have several postings with identical message and
137  return types, but different descriptions and port classes. Although these deal with identical messages, they may
138  post in different namespaces and on different topics (see the definition of MessagePoster) and thus achieve
139  different functions.
140 
141  \ingroup module */
142  template <class Port, class Msg, class Ret>
144  {
145  public:
146  typedef Port PortType;
147  typedef Msg MsgType; //!< The outgoing message type
148  typedef Ret RetType; //!< The return message type
149 
150  typedef typename std::unique_ptr<Msg> MsgUptr; //!< Outgoing message pointer type, as passed to post()
151  typedef typename std::shared_ptr<Msg const> MsgPtr; //!< Outgoing message pointer type, as received by callbacks
152  typedef std::unique_ptr<Ret> RetUptr; //!< Return message pointer type, as returned by callback
153  typedef std::shared_ptr<Ret const> RetPtr; //!< Return message pointer type, as received back by poster
154 
155  typedef std::function<RetPtr(MsgPtr)> CallbackFuncType; //!< Callback function type
156 
157  static bool const isSplittable = false; //!< Postings with non-void returns cannot be split
158 
159  //! Allocate a message and return a unique_ptr to it, to be used by post()
160  /*! All given args are forwarded to the message constructor. */
161  template <typename... Args> static
162  MsgUptr make_message(Args && ... args);
163 
164  private:
165  template <class Posting> friend class MessagePosterCore;
166 
167  static RetPtr
168  launchRemoteCallback(nrt::blackboard::MessageSubscriberRemoteCallback rcb,
169  std::shared_future<void> fut, std::string const transactionkey);
170  };
171 
172  //! Specialization of MessagePosting for void return type
173  template <class Port, class Msg>
174  class MessagePosting<Port, Msg, void>
175  {
176  public:
177  typedef Port PortType;
178  typedef Msg MsgType; //!< The outgoing message type
179  typedef void RetType; //!< The return message type (void)
180 
181  typedef typename std::unique_ptr<Msg> MsgUptr; //!< Outgoing message pointer type, as passed to post()
182  typedef typename std::shared_ptr<Msg const> MsgPtr; //!< Outgoing message pointer type, as received by callbacks
183  typedef void RetUptr; //!< Return message pointer type, as returned by callback
184  typedef void RetPtr; //!< Return message pointer type, as received by poster
185 
186  typedef std::function<void(MsgPtr)> CallbackFuncType;
187 
188  static bool const isSplittable = Msg::isComposite; //!< CompositeMessage can be split
189 
190  //! Allocate a message and return a unique_ptr to it, to be used by post()
191  /*! All given args are forwarded to the message constructor. */
192  template <typename... Args> static
193  MsgUptr make_message(Args && ... args);
194 
195  private:
196  template <class Posting> friend class MessagePosterCore;
197 
198  static RetPtr
199  launchRemoteCallback(nrt::blackboard::MessageSubscriberRemoteCallback rcb,
200  std::shared_future<void> fut, std::string const transactionkey);
201  };
202 
203  //! Specialization of MessagePosting for void return type and nrt::Message<T> message
204  template <class Port, typename T>
205  class MessagePosting<Port, nrt::Message<T>, void>
206  {
207  public:
208  typedef Port PortType;
209  typedef typename nrt::Message<T> MsgType; //!< Outgoing message type
210  typedef void RetType; //!< The return message type (void)
211 
212  typedef typename std::unique_ptr<MsgType> MsgUptr; //!< Outgoing message pointer type, as passed to post()
213  typedef typename std::shared_ptr<MsgType const> MsgPtr; //!< Outgoing message ptr type, as received by callbacks
214  typedef void RetUptr; //!< Return message pointer type, as returned by callback
215  typedef void RetPtr; //!< Return message pointer type, as received by poster
216 
217  typedef std::function<void(MsgPtr)> CallbackFuncType;
218 
219  static bool const isSplittable = false; //!< Those messages are atomic and non-splittable
220 
221  //! Allocate a message and return a unique_ptr to it, to be used by post()
222  /*! All given args are forwarded to the message constructor. */
223  template <typename... Args> static
224  MsgUptr make_message(Args && ... args);
225 
226  private:
227  template <class Posting> friend class MessagePosterCore;
228 
229  static RetPtr
230  launchRemoteCallback(nrt::blackboard::MessageSubscriberRemoteCallback rcb,
231  std::shared_future<void> fut, std::string const transactionkey);
232  };
233 
234  // ######################################################################
235  // ######################################################################
236  // ######################################################################
237  //! A Checking is a unique binding of a Message type to a port class
238  /*! Module objects can check() on a given Checking port if and only if they derive from the corresponding
239  MessageChecker of the Checking. Msg must derive from nrt::MessageBase. See MessageChecker for how to use
240  check(). A convenience macro is provided (in details/ModuleHelpers.H) to easily declare a checking class:
241 
242  @code
243  NRT_DECLARE_MESSAGECHECKER_PORT(PortName, MsgType, Description);
244  @endcode
245 
246  Where PortName is the name of the class that will embody your Checking, MsgType is the type of message that we
247  wish to check for (must derive from nrt::MessageBase), and Description is a plain C-style string describing your
248  Checking. For example:
249 
250  @code
251  class MessageA : public nrt::MessageBase { }; // define some message type A
252 
253  NRT_DECLARE_MESSAGECHECKER_PORT(AsyncInputA, MessageA, "MessageA Async Input"); // Our port is named AsyncInputA
254  @endcode
255 
256  The reason for declaring a new Checking type is to allow one to have several ones with identical message type, but
257  different descriptions and different port classes. Although these deal with identical messages, they may check in
258  different namespaces and on different topics (see the definition of MessageChecker) and thus achieve different
259  functions.
260 
261  \ingroup module */
262  template <class Port, class Msg>
264  {
265  public:
266  typedef Port PortType;
267  typedef Msg MsgType; //!< The message type
268  static bool const isSplittable = false; //!< Cannot split any check() for now...
269  };
270 
271  // ######################################################################
272  // ######################################################################
273  // ######################################################################
274  //! A Subscription is a unique binding of a received Message type and a returned Message type to a port class
275  /*! Module objects must implement an onMessage() callback function on a given Subscription port if and only if they
276  derive from the corresponding MessageSubscriber of the Subscription. Typically, Msg and Ret should derive from
277  nrt::MessageBase, with a special case allowed for Ret being void. See MessageSubscriber for how to define
278  onMessage(). A convenience macro is provided (in details/ModuleHelpers.H) to easily declare a subscription class:
279 
280  @code
281  NRT_DECLARE_MESSAGESUBSCRIBER_PORT(PortName, MsgType, RetType, Description);
282  @endcode
283 
284  Where PortName is the name of the class that will embody your Subscription, MsgType is the type of posted message
285  (must derive from nrt::MessageBase), RetType is the type of message returned by your subscriber (callback) that
286  will respond to posts on any matching Posting (must derive from nrt::MessageBase or be void), and Description is a
287  plain C-style string describing your Subscription. For example:
288 
289  @code
290  class MessageA : public nrt::MessageBase { }; // define some message type A
291  class MessageB : public nrt::MessageBase { }; // define some message type B
292 
293  NRT_DECLARE_MESSAGESUBSCRIBER_PORT(InputA, MessageA, MessageB, "MessageA Input, returns MessageB results");
294  @endcode
295 
296  The reason for declaring a new Subscription type for each port is to allow one to have several subscribers and
297  callbacks with identical message and return types, but different descriptions and different port classes. Although
298  these deal with identical messages, they may subscribe to them in different namespaces and on different topics
299  (see the definition of MessageSubscriber) and thus achieve different functionalities.
300 
301  \ingroup module */
302  template <class Port, class Msg, class Ret>
303  class MessageSubscription : public std::shared_ptr<Msg const>
304  {
305  public:
306  //! Construct from a shared_ptr to a message
307  MessageSubscription(std::shared_ptr<Msg const> msg);
308 
309  typedef Port PortType;
310  typedef Msg MsgType; //!< The outgoing message type
311  typedef Ret RetType; //!< The return message type
312 
313  typedef std::shared_ptr<Msg const> MsgPtr; //!< The outgoing message pointer type
314  typedef std::shared_ptr<Ret const> RetPtr; //!< The return message pointer type, as received by poster
315  typedef std::unique_ptr<Ret> RetUptr; //!< The return message pointer type, as returned by callback
316 
317  static bool const isSplittable = false; //!< Subscriptions with non-void returns cannot be split
318 
319  // Run the already bound onMessage() function, informing the blackboard
320  static RetPtr runBoundOnMessage(std::function<RetUptr()> func);
321 
322  //! Allocate a return message and return a unique_ptr to it, to be used in a callback as return value
323  /*! All given args are forwarded to the return message constructor. */
324  template <typename... Args> static
325  RetUptr make_return_message(Args && ... args);
326  };
327 
328  //! Specialization of MessageSubscription for void return type
329  template <class Port, class Msg>
330  class MessageSubscription<Port, Msg, void> : public std::shared_ptr<Msg const>
331  {
332  public:
333  //! Construct from a shared_ptr to a message
334  MessageSubscription(std::shared_ptr<Msg const> msg);
335 
336  typedef Port PortType;
337  typedef Msg MsgType; //!< The outgoing message type
338  typedef void RetType; //!< The return message type (void)
339 
340  typedef std::shared_ptr<Msg const> MsgPtr; //!< The outgoing message pointer type
341  typedef void RetPtr; //!< The return message pointer type, as received by poster
342  typedef void RetUptr; //!< The return message pointer type, as returned by callback
343 
344  static bool const isSplittable = Msg::isComposite; //!< CompositeMessage can be split
345 
346  // Run the already bound onMessage() function, informing the blackboard
347  static RetPtr runBoundOnMessage(std::function<RetUptr()> func);
348 
349  //! Allocate a return message and return a unique_ptr to it, to be used in a callback as return value
350  /*! This is a no-op for subscriptions with void return type */
351  template <typename... Args> static
352  RetUptr make_return_message(Args && ... args);
353  };
354 
355  //! Specialization of MessageSubscription for void return type and nrt::Message<T> message
356  template <class Port, typename T>
357  class MessageSubscription<Port, nrt::Message<T>, void> : public std::shared_ptr<nrt::Message<T> const>
358  {
359  public:
360  //! Construct from a shared_ptr to a message
361  MessageSubscription(std::shared_ptr<nrt::Message<T> const> msg);
362 
363  typedef Port PortType;
364  typedef typename nrt::Message<T> MsgType; //!< The outgoing message type
365  typedef void RetType; //!< The return message type (void)
366 
367  typedef std::shared_ptr<MsgType const> MsgPtr; //!< The outgoing message pointer type
368  typedef void RetPtr; //!< The return message pointer type, as received by poster
369  typedef void RetUptr; //!< The return message pointer type, as returned by callback
370 
371  static bool const isSplittable = false; //!< Atomic message, not splittable
372 
373  // Run the already bound onMessage() function, informing the blackboard
374  static RetPtr runBoundOnMessage(std::function<RetUptr()> func);
375 
376  //! Allocate a return message and return a unique_ptr to it, to be used in a callback as return value
377  /*! This is a no-op for subscriptions with void return type */
378  template <typename... Args> static
379  RetUptr make_return_message(Args && ... args);
380  };
381 
382 } // namespace nrt
383 
384 // ######################################################################
385 
386 // Convenience macro to define a Posting using CRTP
387 #define NRT_DECLARE_MESSAGEPOSTER_PORT(PortName, MsgType, RetType, Description) \
388  struct PortName##__NRT__PORT__ : public nrt::MessagePosting<PortName##__NRT__PORT__, MsgType, RetType> { \
389  constexpr static char const* description() { return Description; } \
390  constexpr static char const* portname() { return #PortName; } \
391  }; \
392  class PortName : public nrt::MessagePosterCore<PortName##__NRT__PORT__> { };
393 
394 // Convenience macro to define a Checking using CRTP
395 #define NRT_DECLARE_MESSAGECHECKER_PORT(PortName, MsgType, Description) \
396  struct PortName##__NRT__PORT__ : public nrt::MessageChecking<PortName##__NRT__PORT__, MsgType> { \
397  constexpr static char const* description() { return Description; } \
398  constexpr static char const* portname() { return #PortName; } \
399  }; \
400  class PortName : public nrt::MessageCheckerCore<PortName##__NRT__PORT__> { };
401 
402 // Convenience macro to define a Subscription
403 #define NRT_DECLARE_MESSAGESUBSCRIBER_PORT(PortName, MsgType, RetType, Description) \
404  struct PortName##__NRT__PORT__ : public nrt::MessageSubscription<PortName##__NRT__PORT__, MsgType, RetType> { \
405  PortName##__NRT__PORT__(std::shared_ptr<MsgType const> msg) : \
406  nrt::MessageSubscription<PortName##__NRT__PORT__, MsgType, RetType>(msg) { } \
407  constexpr static char const* description() { return Description; } \
408  constexpr static char const* portname() { return #PortName; } \
409  }; \
410  struct PortName : public nrt::MessageSubscriberCore<PortName##__NRT__PORT__> { };
411 
412 // ######################################################################
413 namespace nrt
414 {
415  // ######################################################################
416  // ######################################################################
417  // ######################################################################
418  // Split port helpers:
419  // ######################################################################
420 
421  //! Simple CRTP struct for a MessagePosting, used by split posters
422  template <typename T, bool isMessage = std::is_base_of<nrt::MessageBase, T>::value> struct SimpleMessagePosting;
423 
424  //! Simple CRTP struct for a MessagePosting associated with a nrt::Message<T> and void return type
425  template <typename T>
426  struct SimpleMessagePosting<T, false> : public nrt::MessagePosting<SimpleMessagePosting<T>, nrt::Message<T>, void>
427  { };
428 
429  //! Simple CRTP struct for a MessagePosting associated with a T (should derive from MessageBase) and void return type
430  template <typename T>
431  struct SimpleMessagePosting<T, true> : public nrt::MessagePosting<SimpleMessagePosting<T>, T, void>
432  { };
433 
434  // ######################################################################
435  //! Simple CRTP struct for a MessageSubscription, used by split subscribers
436  template <typename T, bool isMessage = std::is_base_of<nrt::MessageBase, T>::value> struct SimpleMessageSubscription;
437 
438  //! Simple CRTP struct for a MessageSubscription, used by split subscribers
439  template <typename T>
440  struct SimpleMessageSubscription<T, false> :
441  public nrt::MessageSubscription<SimpleMessageSubscription<T>, nrt::Message<T>, void>
442  {
443  //! Construct from a shared_ptr to a message
444  SimpleMessageSubscription(std::shared_ptr<typename nrt::Message<T> const> msg);
445  };
446 
447  //! Simple CRTP struct for a MessageSubscription, used by split subscribers
448  template <typename T>
449  struct SimpleMessageSubscription<T, true> :
450  public nrt::MessageSubscription<SimpleMessageSubscription<T>, T, void>
451  {
452  //! Construct from a shared_ptr to a message
453  SimpleMessageSubscription(std::shared_ptr<T const> msg);
454  };
455 
456 
457  // ######################################################################
458  // ######################################################################
459  // ######################################################################
460  // MessagePoster helpers
461 
462  //! Generic variadic class template definition of MessagePoster
463  /*! \see MessagePoster<Posting, Tail ...> for detailed documentation */
464  template <class ... Msgs> class MessagePoster { };
465 
466  //! Special case to terminate variadic template inheritance recursion
467  /*! \see MessagePoster<Posting, Tail ...> for detailed documentation */
468  template <> class MessagePoster<> { };
469 
470  // ######################################################################
471  //! Base class for a MessagePoster
472  /*! This class encapsulates all the information about the poster which is not dependent upon the MessagePosting type
473  \ingroup module */
475  {
476  public:
477  //! Virtual destructor for safe inheritance
478  virtual ~MessagePosterCoreBase();
479 
480  //! Set topic of a MessagePoster port
481  /*! All subsequents post() calls on this MessagePoster will post their messages on the specified topic. */
482  void setTopic(std::string const & topic_);
483 
484  //! Create our split sub-ports
485  virtual void createSubPosters() = 0;
486 
487  //! Delete our split sub-ports
488  virtual void deleteSubPosters() = 0;
489 
490  //! Return true if we currently have split-sub-posters
491  virtual bool isSplit() const = 0;
492 
493  protected:
494  //! Only the Blackboard and derived classes can access our data
495  friend class nrt::Blackboard;
496 
497  std::string msgtype; //!< Type of Message we post
498  std::string rettype; //!< Type of Message we expect back from subscribers
499  std::string module; //!< Module which we belong to
500  std::string portname; //!< String name for our Posting port class
501  std::string description; //!< String human description for our Posting
502  std::string topic; //!< Topic into which we post
503  int sequence; //!< Poster sequence number within module used for ordering of ports in the GUI
504  bool splittable; //!< Can this port be split into sub-ports?
505 
506  //! Clear our internal vector of local callbacks
507  virtual void clearLocalCallbacks() = 0;
508 
509  //! Add the callback from the given MessageSubscriber to our vector of local callbacks
510  virtual void addLocalCallback(MessageSubscriberCoreBase * sub) = 0;
511 
512  struct RemotePostData
513  {
514  //! Remote post handler that gets triggered when we post a message, one per remote blackboard
515  nrt::blackboard::MessageSubscriberRemotePostHandler remotePostHandler;
516 
517  //! List of callbacks to remote MessageSubscriber objects that get triggered when we post a message
518  std::vector<nrt::blackboard::MessageSubscriberRemoteCallback> remoteCallbacks;
519  };
520 
521  //! Map of remote post handlers that get triggered when we post a message, one per remote blackboard
522  std::map<std::string /* bbuid */, RemotePostData> remotePostData;
523 
524  //! Map of remote post handlers, one per remote blackboard that has subscribers for nrt::AnyMessage version
525  std::map<std::string /* bbuid */, RemotePostData> remotePostDataAny;
526 
527  //! Pre-computed key to our Blackboard's internal repository of messages
528  nrt::blackboard::msgkey messagekey;
529 
530  mutable boost::shared_mutex mtx; //<! Shared mutex to protect our internals
531  };
532 
533  template <class T> class SplitMessagePoster;
534 
535  // ######################################################################
536  //! Module objects which derive from MessagePoster will be allowed to post Message objects
537  /*! \ingroup module */
538  template <class Posting>
539  class MessagePosterCore : public virtual ModuleBase, public MessagePosterCoreBase
540  {
541  public:
542  //! Constructor, will register us with the Blackboard
544 
545  //! Destructor, will un-register us with the Blackboard
546  virtual ~MessagePosterCore();
547 
548  //! Create our split sub-ports
549  virtual void createSubPosters();
550 
551  //! Delete our split sub-ports
552  virtual void deleteSubPosters();
553 
554  //! Return true if we currently have split-sub-posters
555  virtual bool isSplit() const;
556 
557  //! Our underlying posting type (used by Connectors)
558  typedef Posting PostingType;
559 
560  //! Type for the results of a post
562 
563  //! Allocate a message and return a unique_ptr to it, to be used by post()
564  /*! All given args are forwarded to the message constructor. */
565  template <typename... Args>
566  typename Posting::MsgUptr make_message(Args && ... args) const;
567 
568  //! Post a Message to the blackboard
569  /*! The post() method sends the given message to the Blackboard Federation and triggers any local and remote
570  callbacks, run in in parallel threads, of any Modules who have subscribed to the matching message/return types,
571  who are in the same namespace as the poster, and whose Topic Filter matches the Topic of the poster.
572 
573  Because post() takes a unique_ptr to the message, it has very low overhead in transmitting the message to local
574  subscribers (no copy of message data invoved). Users should therefore feel free to include large data structures
575  (such as an Image) in their messages. However, post() can also transparently trigger callbacks on remote
576  machines, which is more costly as both the posted Message and the resulting return Message must be serialized
577  and transported over the network. Thus, users must be careful to pay attention to bandwidth limitations when
578  constructing distributed systems. Proper use of message/return types, namespaces, and topics can help maximize
579  throughput and minimize network congestion.
580 
581  post() returns quickly, i.e., almost immediately as the only thing it does is to dispatch the message to a
582  pre-computed list of callbacks, if any, where each callback runs in a parallel thread, and possibly it also
583  serializes the message once for dispatch to a pre-computed list of remote callbacks, if any. Callbacks are
584  executed in parallel threads and may each take variable amount of time to complete. The futures in
585  MessagePosterResults allow you to check for completion of each callback, see the documentation of
586  MessagePosterResults.
587 
588  Note that an exception will be thrown if you try to post() from a MessagePoster that is not attached to a
589  Module, or if you try to post() while the system in not in the running() state. Have a look at Component.H and
590  Manager.H for some explanations about the running() state.
591 
592  @param msg A unique pointer to the message that will be posted. Note that once you post the message, the pointer
593  will be invalidated (will point to nullptr), as inside post() the Blackboard will take ownership of the pointee
594  and will std::move it out of the unique_ptr. That is, your message is gone as far as your access to it. This is
595  by design, to discourage you from modifying the message after it has been posted (and while it may be
596  serializing or transmitting over the network).
597 
598  @returns MessagePosterResults<Posting>, see the MessagePosterResults documentation for usage. Note that if you
599  just ignore the results (i.e., just write Posting::post(msg) without grabbing the results), this is equivalent
600  to doing a waitgetall() on those results. */
601  PostResultsType post(std::unique_ptr<typename Posting::MsgType> & m) const;
602 
603  //! Post a Message to the blackboard, move semantics
604  /*! This version of post() behavies just like the other one except for the move semantics. This allows posting of
605  unique_ptr to messages that are returned by functions. For example:
606  @code
607  MyPoster::post(MyPoster::make_message(some_data));
608  @endcode */
609  PostResultsType post(std::unique_ptr<typename Posting::MsgType> && m) const;
610 
611  //! Re-post a received message
612  /*! This should only used by dispatcher or queueing modules that wish to re-post on a different port messages
613  received on some port. Functionality is identical to post() except that we here accept a shared_ptr<Msg const>
614  instead of a unique_ptr<Msg const>. To discourage modification of the message in the same spirit as the
615  original use of unique_ptr, repost() will invalidate your shared_ptr (will internally swap it for a shared_ptr
616  to a null message).
617 
618  When a message is re-posted, no deep copy of the message data occurs, but all the internal bookkeeping makes
619  it look as if it was a brand new message posted in the normal way (e.g., with respect to checkers,
620  subscribers, etc). This is elegant and efficient, but be sure to respect the constness of the
621  "Posting::MsgType const" semantics of your shared_ptr and to not modify the message contents. Here are two
622  simple scenarios: a module receives messages on a subscriber port, queues them up, possibly re-orders them,
623  etc and at some point wants to post them. Using repost() is appropriate in this case. Now if the module also
624  wants to, say, update some of the message contents, such as setting an ID or an order field based on the
625  re-ordering that has occurred, then repost() is not appropriate, you should instead make a deep copy of the
626  message (e.g., construct a new one from the old one using the copy constructor), then modify that copy, and
627  then do a regular post() of the modified copy. */
628  PostResultsType repost(std::shared_ptr<typename Posting::MsgType const> & m) const;
629 
630  //! Expunge from the Blackboard any previously posted message
631  /*! This clears from the Blackboard any previously posted message from that poster. Note that the message is
632  deleted even if the poster's topic has changed. This is useful to reclaim some memory on the Blackboard if a
633  large message is known to not be needed in the future. But it is not necessary in normal operation as only the
634  most recent message for each poster is stored on the Blackboard, so memory usage does not grow
635  uncontrollably. Another interesting use of expunge is to make sure that some blocking check() will perform as
636  desired. */
637  void expunge() const;
638 
639  //! Create a connector for this port's Posting type on the master Blackboard
640  /*! \return The connector uid. This is the only way to "access" the connector, as it lives hidden deep inside the
641  master Blackboard. The Blackboard class exposes functions that allow one to set connector topics, etc using
642  the connector uid to address a particular connector.
643 
644  Beware of the order of the topic and topicfilter arguments as they differ between the poster checker and
645  subscriber version of make_connector. Think of the first one as directly connected to the physical port, and
646  the other one as the other end of the connector. */
647  std::string make_connector(std::string const & name, std::string const & namespc, nrt::ConnectorType const type,
648  std::string const & topicfilter, std::string const & topic) const;
649 
650  private:
651  template <class ...> friend class nrt::MessagePoster;
652  friend class nrt::Module;
653  friend class nrt::BlackboardManager;
654  template <class T> friend class nrt::ParameterCore;
655  friend class nrt::MessageBase;
656  template <class T> friend class nrt::SplitMessagePoster;
657 
658  PostResultsType doPost(std::shared_ptr<typename Posting::MsgType const> m) const;
659 
660  //! Internal split posting of a shared_ptr<Posting::MsgType>, that's a direct post
661  void doPostInternal(std::list<std::future<typename Posting::RetPtr> > & fut,
662  std::shared_ptr<typename Posting::MsgType const> m) const;
663 
664  //! Clear our internal vector of local callbacks
665  virtual void clearLocalCallbacks();
666 
667  //! Add the callback from the given MessageSubscriber to our vector of local callbacks
668  virtual void addLocalCallback(MessageSubscriberCoreBase * sub);
669 
670  //! List of callbacks to local MessageSubscriber objects that get triggered when we post a message
671  std::vector<typename Posting::CallbackFuncType> localCallbacks;
672 
673  //! Special constructor used only by dynamic ports (Parameter ports, split ports)
674  MessagePosterCore(std::string const & mod, std::string const & portna, std::string const & descr);
675 
676  //! Holder for our split sub-poster ports, if any
677  std::shared_ptr<typename Posting::MsgType::SplitPostersTuple> splitPosters;
678  };
679 
680  // ######################################################################
681  //! Split message poster, implemented by MessagePoster ports that use splittable CompositeMessage messages
682  /*! \ingroup module */
683  template <typename T>
685  {
686  public:
687  //! Constructor, will register us with the Blackboard
689 
690  //! Special constructor used only by dynamic ports (Parameter ports, split ports)
691  SplitMessagePoster(std::string const & mod, std::string const & portna, std::string const & descr);
692 
693  //! Destructor, will un-register us with the Blackboard
694  virtual ~SplitMessagePoster();
695 
696  //private:
697 
698  //! Internal split posting of a shared_ptr<T>, we may have to wrap T into an nrt::Message<T>
699  template <typename TT>
700  typename std::enable_if< ! std::is_same<typename SimpleMessagePosting<T>::MsgType, TT>::value, void>::type
701  doPostInternalWrapper(std::list<std::future<void> > & fut, std::shared_ptr<TT> msg) const;
702 
703  //! Internal split posting of a shared_ptr<T>, we may have to wrap T into an nrt::Message<T>
704  template <typename TT>
705  typename std::enable_if<std::is_same<typename SimpleMessagePosting<T>::MsgType, TT>::value, void>::type
706  doPostInternalWrapper(std::list<std::future<void> > & fut, std::shared_ptr<TT> msg) const;
707  };
708 
709  // ######################################################################
710  // ######################################################################
711  // ######################################################################
712  // MessageChecker helpers
713 
714  //! Generic variadic MessageChecker class template
715  /*! \see MessageChecker<Checker, Tail ...> for detailed documentation */
716  template <class ... Checkers> class MessageChecker { };
717 
718  //! Special case to terminate recursion
719  /*! \see MessageChecker<Checker, Tail ...> for detailed documentation */
720  template <> class MessageChecker<> { };
721 
722  // ######################################################################
723  //! Base class for MessageChecker
724  /*! \ingroup module */
726  {
727  public:
728  //! Virtual destructor for safe inheritance
729  virtual ~MessageCheckerCoreBase();
730 
731  //! Set the Topic Filter for this checking
732  /*! A Topic Filter is embodied into an std::regex which is used to match a check() call to corresponding topics
733  that were used by post(). Note that in addition the namespaces must match exactly. */
734  void setTopicFilter(std::string const & topicfilt_);
735 
736  //! Return true if we currently have split-sub-checkers
737  virtual bool isSplit() const = 0;
738 
739  protected:
740  //! Only the Blackboard and derived classes can access our data
741  friend class nrt::Blackboard;
742 
743  std::string msgtype; //!< Type of Message we check for
744  std::string module; //!< Module which we belong to
745  std::string portname; //!< String name for our Checking port class
746  std::string description; //!< String human description for our Checking
747  std::string topicfilt; //!< TopicFilter with which we check()
748  int sequence; //!< Checker sequence number within module used for ordering of ports in the GUI
749  bool splittable; //!< Can this port be split into sub-ports?
750 
751  std::vector<nrt::blackboard::msgkey> localMessageKeys;
752  std::vector<nrt::blackboard::MessageCheckerRemoteCall> remoteCheckCalls;
753 
754  mutable boost::shared_mutex mtx; //<! Shared mutex to protect our internals
755  };
756 
757  // ######################################################################
758  //! Module objects which derive from MessageChecker will be allowed to post Message objects
759  /*! \ingroup module */
760  template <class Checking>
761  class MessageCheckerCore : public virtual ModuleBase, public MessageCheckerCoreBase
762  {
763  public:
764  //! Our underlying subscription type (used by Connectors)
765  typedef Checking CheckingType;
766 
767  //! Constructor, will register us with the Blackboard
769 
770  //! Destructor, will un-register us with the Blackboard
771  virtual ~MessageCheckerCore();
772 
773  //! Return true if we currently have split-sub-checkers
774  /*! Currently, this is always false, no support yet for split checkers */
775  virtual bool isSplit() const;
776 
777  //! Check for a message on the Blackboard
778  /*! \param mcp policy, either only return new messages which have not been returned yet by previous identical
779  check() calls from that exact identical caller, or return all matching messages, previously seen or not.
780 
781  Note that the declaration of check() is a bit obfuscated by the SFINAE paradigm used to allow multiple
782  checkers in one Module; the simplified declaration would look essentially like this:
783 
784  @code
785  template <class Checking>
786  nrt::MessageCheckerResults<typename Checking::MsgType>
787  check(MessageCheckerPolicy const mcp = nrt::MessageCheckerPolicy::Unseen);
788  @endcode
789 
790  Note that an exception will be thrown if you try to check() from a MessageChecker that is not attached to a
791  Module, or if you try to check() while the system in not in the running() state. Have a look at Component.H
792  and Manager.H for some explanations about the running() state. */
795 
796  //! Create a connector for this port's Checking type on the master Blackboard
797  /*! \return The connector uid. This is the only way to "access" the connector, as it lives hidden deep inside the
798  master Blackboard. The Blackboard class exposes functions that allow one to set connector topics, etc using
799  the connector uid to address a particular connector.
800 
801  Beware of the order of the topic and topicfilter arguments as they differ between the poster checker and
802  subscriber version of make_connector. Think of the first one as directly connected to the physical port, and
803  the other one as the other end of the connector. */
804  std::string make_connector(std::string const & name, std::string const & namespc, nrt::ConnectorType const type,
805  std::string const & topic, std::string const & topicfilter) const;
806 
807  private:
808  template <class ...> friend class nrt::MessageChecker;
809  friend class nrt::Blackboard;
810  template <class T> friend class nrt::ParameterCore;
811 
812  //! Special constructor used only by Parameter ports
813  MessageCheckerCore(std::string const & mod, std::string const & portna, std::string const & descr);
814  };
815 
816  // ######################################################################
817  // ######################################################################
818  // ######################################################################
819  // MessageSubscriber helpers
820 
821  //! Variadic-template MessageSubscriber class
822  /*! \see nrt::MessageSubscriber<Subscription, Tail ...> for detailed documentation */
823  template <class ... Msgs> class MessageSubscriber { };
824 
825  //! Variadic-template MessageSubscriber class
826  /*! Special case with empty template parameter pack to terminate recursion over the variadic template parameter pack
827  of nrt::MessageSubscriber.
828  \see nrt::MessageSubscriber<Subscription, Tail ...> for detailed documentation */
829  template <> class MessageSubscriber<> { };
830 
831  // ######################################################################
832  class MessageSubscriberCallbackWrapperBase
833  {
834  public:
835  virtual ~MessageSubscriberCallbackWrapperBase();
836 
837  nrt::blackboard::MessageSubscriberCallbackForRemote callbackForRemote;
838  };
839 
840  // ######################################################################
841  //! Base class for a MessageSubscriber
842  /*! \ingroup module */
844  {
845  public:
846  //! Virtual destructor for safe inheritance
847  virtual ~MessageSubscriberCoreBase();
848 
849  //! Set the TopicFilter for this subscriber
850  /*! A Topic Filter is embodied into an std::regex which is used to match our subscription to corresponding topics
851  used by post(). Note that in addition the namespaces must match exactly. */
852  void setTopicFilter(std::string const & topicfilt_);
853 
854  //! Create our split sub-ports
855  virtual void createSubSubscribers() = 0;
856 
857  //! Delete our split sub-ports
858  virtual void deleteSubSubscribers() = 0;
859 
860  //! Return true if we currently have split-sub-subscribers
861  virtual bool isSplit() const = 0;
862 
863  protected:
864  //! Only the Blackboard and related classes can access our data
865  friend class nrt::Blackboard;
866  template <class Posting> friend class MessagePosterCore;
867  template <class T, bool ISMSG> friend class SplitMessageSubscriber;
868 
869  std::string msgtype; //!< Type of Message we subscribe to
870  std::string rettype; //!< Type of Message we return from our callback
871  std::string module; //!< Module which we belong to
872  std::string portname; //!< String name for our Subscription port class
873  std::string description; //!< String human description for our Subscription
874  std::string topicfilt; //!< TopicFilter that we subscribe to
875  int sequence; //!< Subscriber sequence number within module used for ordering of ports in the GUI
876  bool splittable; //!< Can this port be split into sub-ports?
877 
878  mutable boost::shared_mutex mtx; //<! Shared mutex to protect our internals
879 
880  std::shared_ptr<nrt::MessageSubscriberCallbackWrapperBase> callbackWrapperBase;
881 
882  std::shared_ptr<nrt::MessageSubscriberCallbackWrapperBase> callbackAnyWrapperBase;
883 
884  //! Notification called by each sub-subscriber when it receives a split field
885  virtual void notifySplitReceive(size_t const bitnum) = 0;
886 
887  //! Order to abort a split receive, e.g., upon destruction or stop()
888  virtual void abortSplitReceive() = 0;
889  };
890 
891  // ######################################################################
892  template <class MsgPtr, class RetPtr>
893  class MessageSubscriberCallbackWrapper : public nrt::MessageSubscriberCallbackWrapperBase
894  {
895  public:
896  MessageSubscriberCallbackWrapper()
897  {
898  this->callbackForRemote = [&](std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)
899  {
900  std::shared_ptr<typename MsgPtr::element_type> msgcast =
901  std::dynamic_pointer_cast<typename MsgPtr::element_type>(msg);
902 
903  // Run the callback function, which returns a Subscription::RetPtr, and return that:
904  RetPtr ret = this->callback(msgcast);
905 
906  // Serialize the result message to string and return it:
907  std::ostringstream os;
908  nrt::oarchive archive(os);
909  archive( *(ret.get()) );
910 
911  // Move the contents of the archive into our output variable:
912  retstr = os.str();
913  };
914  }
915 
916  virtual ~MessageSubscriberCallbackWrapper();
917 
918  protected:
919  template <class Posting> friend class MessagePosterCore;
920  template <class Subscription> friend class MessageSubscriberCore;
921 
922  //! A hook to our callback, by incoming/outgoing message pointer type rather than by Subscription
923  std::function<RetPtr(MsgPtr)> callback;
924  };
925 
926  // ######################################################################
927  // Specialization for void return, non-void message
928  template <class MsgPtr>
929  class MessageSubscriberCallbackWrapper<MsgPtr, void> : public nrt::MessageSubscriberCallbackWrapperBase
930  {
931  public:
932  MessageSubscriberCallbackWrapper()
933  {
934  this->callbackForRemote = [&](std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)
935  {
936  std::shared_ptr<typename MsgPtr::element_type> msgcast =
937  std::dynamic_pointer_cast<typename MsgPtr::element_type>(msg);
938 
939  // Run the callback function, and that's it since it returns nothing:
940  this->callback(msgcast);
941  };
942  }
943 
944  virtual ~MessageSubscriberCallbackWrapper();
945 
946  protected:
947  template <class Posting> friend class MessagePosterCore;
948  template <class Subscription> friend class MessageSubscriberCore;
949 
950  //! A hook to our callback, by incoming/outgoing message pointer type rather than by Subscription
951  std::function<void(MsgPtr)> callback;
952  };
953 
954  // ######################################################################
955  // Specialization for nrt::AnyMessage message
956  template <class RetPtr>
957  class MessageSubscriberCallbackWrapper<nrt::blackboard::AnyMsgPtr, RetPtr> :
958  public nrt::MessageSubscriberCallbackWrapperBase
959  {
960  public:
961  MessageSubscriberCallbackWrapper()
962  {
963  this->callbackForRemote = [&](std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)
964  {
965  nrt::blackboard::AnyMsgPtr msgcast = std::dynamic_pointer_cast<nrt::AnyMessage const>(msg);
966 
967  // Run the callback function, which returns a Subscription::RetPtr, and return that:
968  RetPtr ret = this->callback(msgcast);
969 
970  // Serialize the result message to string and return it:
971  std::ostringstream os; nrt::oarchive archive(os);
972  archive( *(ret.get()) );
973 
974  // Move the contents of the archive into our output variable:
975  retstr = os.str();
976  };
977  }
978 
979  virtual ~MessageSubscriberCallbackWrapper();
980 
981  protected:
982  template <class Posting> friend class MessagePosterCore;
983  template <class Subscription> friend class MessageSubscriberCore;
984 
985  //! A hook to our callback, by incoming/outgoing message pointer type rather than by Subscription
986  std::function<RetPtr(nrt::blackboard::AnyMsgPtr)> callback;
987  };
988 
989  // ######################################################################
990  // Specialization for void return, nrt::AnyMessage message
991  template <>
992  class MessageSubscriberCallbackWrapper<nrt::blackboard::AnyMsgPtr, void> :
993  public nrt::MessageSubscriberCallbackWrapperBase
994  {
995  public:
996  MessageSubscriberCallbackWrapper()
997  {
998  this->callbackForRemote = [&](std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)
999  {
1000  nrt::blackboard::AnyMsgPtr msgcast = std::dynamic_pointer_cast<nrt::AnyMessage const>(msg);
1001 
1002  // Run the callback function, and that's it since it returns nothing:
1003  this->callback(msgcast);
1004  };
1005  }
1006 
1007  virtual ~MessageSubscriberCallbackWrapper();
1008 
1009  protected:
1010  template <class Posting> friend class MessagePosterCore;
1011  template <class Subscription> friend class MessageSubscriberCore;
1012 
1013  //! A hook to our callback, by incoming/outgoing message pointer type rather than by Subscription
1014  std::function<void(nrt::blackboard::AnyMsgPtr)> callback;
1015  };
1016 
1017  // ######################################################################
1018  // ######################################################################
1019  template <class RetPtr>
1020  class MessageSubscriberCallbackAnyWrapper : public nrt::MessageSubscriberCallbackWrapperBase
1021  {
1022  public:
1023  template <class MsgPtr>
1024  MessageSubscriberCallbackAnyWrapper(MsgPtr & phony) // required to be able to construct...
1025  {
1026  this->callbackForRemote = [&](std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)
1027  {
1028  // Cast from MessageBase to AnyMessage, the callback itself will then convert to native type:
1029  std::shared_ptr<nrt::AnyMessage const> msgany = std::dynamic_pointer_cast<nrt::AnyMessage const>(msg);
1030 
1031  // Run the callback function, which returns a Subscription::RetPtr, and return that:
1032  RetPtr ret = this->callbackany(msgany);
1033 
1034  // Serialize the result message to string and return it:
1035  std::ostringstream os; nrt::oarchive archive(os);
1036  archive( *(ret.get()) );
1037 
1038  // Move the contents of the archive into our output variable:
1039  retstr = os.str();
1040  };
1041  }
1042 
1043  virtual ~MessageSubscriberCallbackAnyWrapper();
1044 
1045  protected:
1046  template <class Posting> friend class MessagePosterCore;
1047  template <class Subscription> friend class MessageSubscriberCore;
1048 
1049  //! A hook to our callback, by incoming/outgoing message pointer type rather than by Subscription
1050  std::function<RetPtr(nrt::blackboard::AnyMsgPtr)> callbackany;
1051  };
1052 
1053  // ######################################################################
1054  // Specialization for void return, non-void message
1055  template <>
1056  class MessageSubscriberCallbackAnyWrapper<void> : public nrt::MessageSubscriberCallbackWrapperBase
1057  {
1058  public:
1059  template <class MsgPtr>
1060  MessageSubscriberCallbackAnyWrapper(MsgPtr & phony) // required to be able to construct...
1061  {
1062  this->callbackForRemote = [&](std::shared_ptr<nrt::MessageBase const> msg, std::string & retstr)
1063  {
1064  // Cast from MessageBase to AnyMessage, the callback itself will then convert to native type:
1065  std::shared_ptr<nrt::AnyMessage const> msgany = std::dynamic_pointer_cast<nrt::AnyMessage const>(msg);
1066 
1067  // Run the callback function, and that's it since it returns nothing:
1068  this->callbackany(msgany);
1069  };
1070  }
1071 
1072  virtual ~MessageSubscriberCallbackAnyWrapper();
1073 
1074  protected:
1075  template <class Posting> friend class MessagePosterCore;
1076  template <class Subscription> friend class MessageSubscriberCore;
1077 
1078  //! A hook to our callback, by incoming/outgoing message pointer type rather than by Subscription
1079  std::function<void(nrt::blackboard::AnyMsgPtr)> callbackany;
1080  };
1081 
1082  // ######################################################################
1083  //! Core class for the variadic MessageSubscriber template
1084  /*! \ingroup module */
1085  template <class Subscription>
1086  class MessageSubscriberCore : public virtual ModuleBase, public MessageSubscriberCoreBase
1087  {
1088  public:
1089  //! Our underlying subscription type (used by Connectors)
1090  typedef Subscription SubscriptionType;
1091 
1092  //! Type used for incoming messages in onMessage
1093  /*! We just use Subscription and allow people to manipulate Subscription objects just as if they were shared_ptr
1094  to messages. This is so that we can define onMessage() with just one argument instead of 2 somewhet redundant
1095  ones. */
1096  typedef Subscription InPtr;
1097 
1098  //! Type that onMessage() returns (typically, void or std::unique_ptr<ReturnMessageType>)
1099  typedef typename Subscription::RetUptr OutPtr;
1100 
1101  //! Constructor, will register this subscriber with the Blackboard
1103 
1104  //! Virtual destructor for safe inheritance, will un-register this subscriber with the Blackboard
1105  virtual ~MessageSubscriberCore();
1106 
1107  //! Create our split sub-ports
1108  virtual void createSubSubscribers();
1109 
1110  //! Delete our split sub-ports
1111  virtual void deleteSubSubscribers();
1112 
1113  //! Return true if we currently have split-sub-subscribers
1114  virtual bool isSplit() const;
1115 
1116  //! Allocate a return message and return a unique_ptr to it, to be used in a callback as return value
1117  /*! All given args are forwarded to the return message constructor. */
1118  template <typename... Args>
1119  OutPtr make_return_message(Args && ... args) const;
1120 
1121  //! Create a connector for this port's Checking type on the master Blackboard
1122  /*! \return The connector uid. This is the only way to "access" the connector, as it lives hidden deep inside the
1123  master Blackboard. The Blackboard class exposes functions that allow one to set connector topics, etc using
1124  the connector uid to address a particular connector.
1125 
1126  Beware of the order of the topic and topicfilter arguments as they differ between the poster checker and
1127  subscriber version of make_connector. Think of the first one as directly connected to the physical port, and
1128  the other one as the other end of the connector. */
1129  std::string make_connector(std::string const & name, std::string const & namespc, nrt::ConnectorType const type,
1130  std::string const & topic, std::string const & topicfilter) const;
1131 
1132  protected:
1133  friend class nrt::Blackboard;
1134  template <class T> friend class nrt::ParameterCore;
1135 
1136  //! Callback called by the Blackboard when a relevant Message is posted.
1137  /*! Derived classes must implement this function. */
1138  virtual OutPtr onMessage(InPtr msg) = 0;
1139 
1140  //! Special constructor used only by dynamic ports (Parameter, split subscribers, etc)
1141  MessageSubscriberCore(std::string const & mod, std::string const & portna, std::string const & descr);
1142 
1143  //! Notification called by each sub-subscriber when it receives a split field
1144  virtual void notifySplitReceive(size_t const bitnum);
1145 
1146  //! Order to abort a split receive, e.g., upon destruction or stop()
1147  virtual void abortSplitReceive();
1148 
1149  private:
1150  // Construct our callback wrapper
1151  void constructCallback();
1152 
1153  //! Holder for our split sub-subscriber ports, if any
1154  std::shared_ptr<typename Subscription::MsgType::SplitSubscribersTuple> splitSubscribers;
1155 
1156  //! Partially received message that gets filled by our split subscribers
1157  std::shared_ptr<typename Subscription::MsgType> splitReceiveMessage;
1158 
1159  //! Number of split message parts
1160  static constexpr size_t numSplitReceiveParts =
1161  std::tuple_size<typename Subscription::MsgType::SplitSubscribersTuple>::value;
1162 
1163  //! Array indicating which split message parts have been received so far
1164  std::shared_ptr<std::array<bool, numSplitReceiveParts> > splitReceiveBits;
1165 
1166  //! Mutex for split receiving, associated with condition variable that blocks partial
1167  std::mutex splitReceiveMutex;
1168 
1169  //! Condition variable to block partial sub-ports until we trigger onMessage() once all parts are received
1170  std::condition_variable splitReceiveCond;
1171 
1172  //! Mutex to protect our partial message and bitfield
1173  std::mutex splitDataMutex;
1174  };
1175 
1176 
1177  // ######################################################################
1178  //! Subscriber helper class for split subscriptions
1179  /*! \ingroup module */
1180  template <typename T>
1181  class SplitMessageSubscriber<T, true> :
1182  public nrt::MessageSubscriberCore<nrt::SimpleMessageSubscription<T, true> >
1183  {
1184  public:
1185  SplitMessageSubscriber(nrt::MessageSubscriberCoreBase * parent, std::shared_ptr<T> & field,
1186  size_t const bitnum, std::string const & mod, std::string const & portna,
1187  std::string const & descr);
1188 
1189  virtual ~SplitMessageSubscriber();
1190 
1191  virtual void onMessage(nrt::SimpleMessageSubscription<T, true> msg);
1192 
1193  private:
1194  nrt::MessageSubscriberCoreBase * itsParent;
1195  std::shared_ptr<T> & itsField;
1196  size_t const itsBitnum;
1197  std::mutex itsOnMessageMtx;
1198  };
1199 
1200  //! Subscriber helper class for split subscriptions
1201  /*! \ingroup module */
1202  template <typename T>
1203  class SplitMessageSubscriber<T, false> :
1204  public nrt::MessageSubscriberCore<nrt::SimpleMessageSubscription<T, false> >
1205  {
1206  public:
1207  SplitMessageSubscriber(nrt::MessageSubscriberCoreBase * parent, std::shared_ptr<T> & field,
1208  size_t const bitnum, std::string const & mod, std::string const & portna,
1209  std::string const & descr);
1210 
1211  virtual ~SplitMessageSubscriber();
1212 
1213  virtual void onMessage(nrt::SimpleMessageSubscription<T, false> msg);
1214 
1215  private:
1216  nrt::MessageSubscriberCoreBase * itsParent;
1217  std::shared_ptr<T> & itsField;
1218  size_t const itsBitnum;
1219  std::mutex itsOnMessageMtx;
1220  };
1221 
1222 
1223 }; // namespace nrt
1224 
1225 #endif // INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MODULEPORTHELPERS_H
1226