iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BlackboardImpl.H
Go to the documentation of this file.
1 /*! @file
2  @author Laurent Itti
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 
36 
37 #ifndef INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_BLACKBOARDIMPL_H
38 #define INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_BLACKBOARDIMPL_H
39 
40 #include <nrt/Core/Debugging/Log.H>
41 
42 // Define a bunch of macros
44 
45 // ######################################################################
46 // ######################################################################
47 // ######################################################################
48 
49 template <class Posting> inline
50 void nrt::Blackboard::registerMessagePoster(nrt::MessagePosterCore<Posting> * pos)
51 {
52  typedef typename Posting::MsgType Msg;
53  typedef typename Posting::RetType Ret;
54 
55  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Msg must derive from nrt::MessageBase");
56  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
57  "Ret must be void or derive from nrt::MessageBase");
58 
59  // Register the message type are return type:
60  registerMessageType(pos->msgtype);
61  registerMessageType(pos->rettype);
62 
63  {
64  // Get unique write access and update our tables, also get a unique lock onto our poster so we don't change our
65  // tables while it is active. We lock both at once to avoid deadlocks:
66  boost::unique_lock<boost::shared_mutex> ulck(itsModMtx, boost::defer_lock);
67  boost::unique_lock<boost::shared_mutex> pulck(pos->mtx, boost::defer_lock);
68  boost::lock(ulck, pulck);
69 
70  // Create a remote check() serving function for this message type, since we post that type:
71  itsServeRemoteCheckFuncMap[pos->msgtype] = &nrt::Blackboard::doServeRemoteCheck<Msg>;
72 
73  // Store info about this poster for later use:
74  itsPosters[pos->module][pos->portname] = pos;
75 
76  // Update this poster's sequence number:
77  int seq = -1; for (auto const & p : itsPosters[pos->module]) seq = std::max(seq, p.second->sequence);
78  pos->sequence = seq + 1;
79 
80  // Create a message key for this local poster:
81  pos->messagekey = itsMessageKeys.create(pos->module+'|'+pos->portname);
82  }
83 
84  NRT_BBDEBUG("Registered [" << pos->module << '|' << pos->portname << ']');
85 
86  // Let the blackboard federation know about this change:
87  sendUpdateToBlackboardFederation();
88 }
89 
90 // ######################################################################
91 template <class Posting> inline
92 void nrt::Blackboard::unRegisterMessagePoster(nrt::MessagePosterCore<Posting> * pos)
93 {
94  typedef typename Posting::MsgType Msg;
95  typedef typename Posting::RetType Ret;
96 
97  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Msg must derive from nrt::MessageBase");
98  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
99  "Ret must be void or derive from nrt::MessageBase");
100 
101  // First kill our split posters, if any:
102  pos->deleteSubPosters();
103 
104  {
105  // Get unique write access and update our tables, also get a unique lock onto our poster so we don't change our
106  // tables while it is active. We lock both at once to avoid deadlocks:
107  boost::unique_lock<boost::shared_mutex> ulck(itsModMtx, boost::defer_lock);
108  boost::unique_lock<boost::shared_mutex> pulck(pos->mtx, boost::defer_lock);
109  boost::lock(ulck, pulck);
110 
111  // Cleanup itsPosters as appropriate:
112  auto moditr = itsPosters.find(pos->module);
113  if (moditr == itsPosters.end()) NRT_WARNING("Module not found: " << pos->module);
114  else {
115  auto & pmap = moditr->second;
116  auto pitr = pmap.find(pos->portname);
117  if (pitr == pmap.end()) NRT_WARNING("Poster not found: " << pos->portname); else pmap.erase(pitr);
118  if (pmap.empty()) itsPosters.erase(moditr);
119  }
120 
121  // Cleanup any message from this poster, if any was posted:
122  auto mitr = itsMessages.find(pos->messagekey); if (mitr != itsMessages.end()) itsMessages.erase(mitr);
123 
124  // Forget about this message key:
125  itsMessageKeys.remove(pos->messagekey);
126  }
127 
128  // And finally unregister this message type an dreturn type:
129  unRegisterMessageType(pos->msgtype);
130  unRegisterMessageType(pos->rettype);
131 
132  NRT_BBDEBUG("Un-registered [" << pos->module << '|' << pos->portname << ']');
133 
134  // Let the blackboard federation know about this change:
135  sendUpdateToBlackboardFederation();
136 }
137 
138 // ######################################################################
139 template <class Checking> inline
140 void nrt::Blackboard::registerMessageChecker(nrt::MessageCheckerCore<Checking> * chk)
141 {
142  typedef typename Checking::MsgType Msg;
143  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Msg must derive from nrt::MessageBase");
144 
145  // Register the message type:
146  registerMessageType(chk->msgtype);
147 
148  {
149  // Get unique write access and update our tables, also get a unique lock onto our checker so we don't change our
150  // tables while it is active. We lock both at once to avoid deadlocks:
151  boost::unique_lock<boost::shared_mutex> ulck(itsModMtx, boost::defer_lock);
152  boost::unique_lock<boost::shared_mutex> culck(chk->mtx, boost::defer_lock);
153  boost::lock(ulck, culck);
154 
155  // Encapsulate and store info about this checker:
156  itsCheckers[chk->module][chk->portname] = chk;
157 
158  // Update this checker's sequence number:
159  int seq = -1; for (auto const & c : itsCheckers[chk->module]) seq = std::max(seq, c.second->sequence);
160  chk->sequence = seq + 1;
161  }
162 
163  NRT_BBDEBUG("Registered [" << chk->module << '|' << chk->portname << ']');
164 
165  // Let the blackboard federation know about this change:
166  sendUpdateToBlackboardFederation();
167 }
168 
169 // ######################################################################
170 template <class Checking> inline
171 void nrt::Blackboard::unRegisterMessageChecker(nrt::MessageCheckerCore<Checking> * chk)
172 {
173  typedef typename Checking::MsgType Msg;
174  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Msg must derive from nrt::MessageBase");
175 
176  // If we are running, we have to be very careful here that some posters might have us in their pre-compiled
177  // hitlists. We need to clear those hitlists before we un-register. To avoid having additional locks that all ports
178  // would need to acquire before messages are sent, we will incur the cost here by first clearing out our topic filter
179  // and waiting until that's done. Make sure the code below stays in sync with setTopicFilter() of the port:
180  if (running())
181  {
182  bool needUpdate = false;
183  {
184  // Lock the port up and set the topic filter:
185  boost::unique_lock<boost::shared_mutex> ulck(chk->mtx);
186  NRT_BBDEBUG("Clear topic filter on [" << chk->portname << ']');
187  if (chk->topicfilt.empty() == false) { chk->topicfilt = ""; needUpdate = true; }
188  }
189 
190  // Let the blackboard federation know about this change, wait until it has gone through:
191  if (needUpdate) nrt::Blackboard::instance().sendSynchronousUpdateToBlackboardFederation();
192  }
193 
194  {
195  // Get unique write access and update our tables, also get a unique lock onto our checker so we don't change our
196  // tables while it is active. We lock both at once to avoid deadlocks:
197  boost::unique_lock<boost::shared_mutex> ulck(itsModMtx, boost::defer_lock);
198  boost::unique_lock<boost::shared_mutex> culck(chk->mtx, boost::defer_lock);
199  boost::lock(ulck, culck);
200 
201  // cleanup itsCheckers as appropriate:
202  auto moditr = itsCheckers.find(chk->module);
203  if (moditr == itsCheckers.end()) NRT_WARNING("Module not found: " << chk->module);
204  else {
205  auto & cmap = moditr->second;
206  auto citr = cmap.find(chk->portname);
207  if (citr == cmap.end()) NRT_WARNING("Checker not found: " << chk->portname); else cmap.erase(citr);
208  if (cmap.empty()) itsCheckers.erase(moditr);
209  }
210  }
211 
212  // And finally unregister this message type:
213  unRegisterMessageType(chk->msgtype);
214 
215  NRT_BBDEBUG("Un-registered [" << chk->module << '|' << chk->portname << ']');
216 
217  // Let the blackboard federation know about this change:
218  sendUpdateToBlackboardFederation();
219 }
220 
221 // ######################################################################
222 template <class Subscription> inline
223 void nrt::Blackboard::registerMessageSubscriber(nrt::MessageSubscriberCore<Subscription> * sub)
224 {
225  typedef typename Subscription::MsgType Msg;
226  typedef typename Subscription::RetType Ret;
227 
228  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Msg must derive from nrt::MessageBase");
229  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
230  "Ret must be void or derive from nrt::MessageBase");
231 
232  // Register the message type and return type:
233  registerMessageType(sub->msgtype);
234  registerMessageType(sub->rettype);
235 
236  {
237  // Get unique write access and update our tables, also get a unique lock onto our subscriber so we don't change our
238  // tables while it is active. We lock both at once to avoid deadlocks:
239  boost::unique_lock<boost::shared_mutex> ulck(itsModMtx, boost::defer_lock);
240  boost::unique_lock<boost::shared_mutex> sulck(sub->mtx, boost::defer_lock);
241  boost::lock(ulck, sulck);
242 
243  // Since we are now subscribing to this MsgType, we need to be able to serve remote posts for it:
244  itsServeRemotePostFuncMap[sub->msgtype] = &nrt::Blackboard::doServeRemotePost<Msg>;
245 
246  // Keep a pointer to this subscriber for later use:
247  itsSubscribers[sub->module][sub->portname] = sub;
248 
249  // Update this subscriber's sequence number:
250  int seq = -1; for (auto const & s : itsSubscribers[sub->module]) seq = std::max(seq, s.second->sequence);
251  sub->sequence = seq + 1;
252  }
253 
254  NRT_BBDEBUG("Registered [" << sub->module << '|' << sub->portname << ']');
255 
256  // Let the blackboard federation know about this change:
257  sendUpdateToBlackboardFederation();
258 }
259 
260 // ######################################################################
261 template <class Subscription> inline
262 void nrt::Blackboard::unRegisterMessageSubscriber(nrt::MessageSubscriberCore<Subscription> * sub)
263 {
264  typedef typename Subscription::MsgType Msg;
265  typedef typename Subscription::RetType Ret;
266 
267  static_assert(std::is_base_of<nrt::MessageBase, Msg>::value, "Msg must derive from nrt::MessageBase");
268  static_assert(std::is_same<void, Ret>::value || std::is_base_of<nrt::MessageBase, Ret>::value,
269  "Ret must be void or derive from nrt::MessageBase");
270 
271  // If we are running, we have to be very careful here that some posters might have us in their pre-compiled
272  // hitlists. We need to clear those hitlists before we un-register. To avoid having additional locks that all ports
273  // would need to acquire before messages are sent, we will incur the cost here by first clearing out our topic filter
274  // and waiting until that's done. Make sure the code below stays in sync with setTopicFilter() of the port:
275  if (running())
276  {
277  bool needUpdate = false;
278  {
279  // Lock the port up and set the topic filter:
280  boost::unique_lock<boost::shared_mutex> ulck(sub->mtx);
281  NRT_BBDEBUG("Clear topic filter on [" << sub->portname << ']');
282  if (sub->topicfilt.empty() == false) { sub->topicfilt = ""; needUpdate = true; }
283  }
284 
285  // Let the blackboard federation know about this change, wait until it has gone through:
286  if (needUpdate) nrt::Blackboard::instance().sendSynchronousUpdateToBlackboardFederation();
287  }
288 
289  // First kill our split subscribers, if any. This will also free any onMessage() call that was blocked on some of our
290  // sub-ports while waiting for a complete composite message:
291  sub->deleteSubSubscribers();
292 
293  {
294  // Get unique write access and update our tables, also get a unique lock onto our subscriber so we don't change our
295  // tables while it is active. We lock both at once to avoid deadlocks:
296  boost::unique_lock<boost::shared_mutex> ulck(itsModMtx, boost::defer_lock);
297  boost::unique_lock<boost::shared_mutex> sulck(sub->mtx, boost::defer_lock);
298  boost::lock(ulck, sulck);
299 
300  // cleanup itsSubscribers as appropriate:
301  auto moditr = itsSubscribers.find(sub->module);
302  if (moditr == itsSubscribers.end()) NRT_WARNING("Module not found: " << sub->module);
303  else {
304  auto & smap = moditr->second;
305  auto sitr = smap.find(sub->portname);
306  if (sitr == smap.end()) NRT_WARNING("Subscriber not found: " << sub->portname); else smap.erase(sitr);
307  if (smap.empty()) itsSubscribers.erase(moditr);
308  }
309  }
310 
311  // And finally unregister this message type and return type:
312  unRegisterMessageType(sub->msgtype);
313  unRegisterMessageType(sub->rettype);
314 
315  NRT_BBDEBUG("Un-registered [" << sub->module << '|' << sub->portname << ']');
316 
317  // Let the blackboard federation know about this change:
318  sendUpdateToBlackboardFederation();
319 }
320 
321 // ######################################################################
322 template <class Msg> inline
323 void nrt::Blackboard::storeMessage(std::shared_ptr<Msg const> msg, nrt::blackboard::msgkey mkey)
324 {
325 #define BBWHAT "message [" + nrt::MessageType<Msg>() + "]"
326 
327  // let's store the new posted message on our blackboard, overwriting any previous message that is a full match for the
328  // key, so that we can serve possible future check() requests for that message:
329  boost::unique_lock<boost::shared_mutex> ulck(itsMsgMtx);
330  itsMessages[mkey] = nrt::blackboard::MessageData(msg);
331 
332 #undef BBWHAT
333 }
334 
335 // ######################################################################
336 template <class Msg> inline
337 void nrt::Blackboard::expungeMessage(nrt::blackboard::msgkey mkey)
338 {
339 #define BBWHAT "message [" + nrt::MessageType<Msg>() + "]"
340 
341  // let's store the new posted message on our blackboard, overwriting any previous message that is a full match for the
342  // key, so that we can serve possible future check() requests for that message:
343  boost::unique_lock<boost::shared_mutex> ulck(itsMsgMtx);
344  auto itr = itsMessages.find(mkey);
345  if (itr != itsMessages.end()) itsMessages.erase(itr);
346 
347 #undef BBWHAT
348 }
349 
350 // ######################################################################
351 template <class Msg> inline
352 void nrt::Blackboard::doServeRemotePost(std::string const & bytes, std::string const & rmodulepportname,
353  std::string const & transactionkey, size_t const numcallbacks)
354 {
355 #define BBWHAT "message ["+nrt::MessageType<Msg>()+"] from ["+rmodulepportname+\
356  "] transaction key ["+transactionkey+"]"
357 
358  // First, de-serialize the message (will throw if malformed):
359  std::shared_ptr<Msg const> msg(new Msg);
360  try
361  {
362  std::istringstream is(bytes);
363  nrt::iarchive ar(is);
364  ar( *(const_cast<Msg *>(msg.get())) );
365  }
366  catch (cereal::Exception const & e)
367  {
368  BBTHROWX(MessageSerializationError, e.what());
369  }
370  catch (...)
371  {
372  BBTHROW(MessageSerializationError);
373  }
374 
375  // Store the message and the number of callbacks on our list of remote post transactions that are in progress:
376  {
377  boost::unique_lock<boost::shared_mutex> ulck(itsPrpMtx);
378 
379  // Store pending post message for future use by incoming callback requests. We will garbage-collect this in
380  // processSummaryUpdateFromBlackboardFederation() by detecting dead transactions:
381  itsPendingRemotePostMessages[transactionkey] = nrt::blackboard::RemotePostMessageData(msg, numcallbacks);
382  }
383 
384  // we stop here, individual remote callback requests will then be triggered by the sender
385 
386 #undef BBWHAT
387 }
388 
389 // ######################################################################
390 template <class Msg> inline
391 void nrt::Blackboard::doServeRemoteCheck(std::string const & rmodulecportname, nrt::MessageCheckerPolicy mcp,
392  std::string & bytes)
393 {
394 #define BBWHAT "message ["+nrt::MessageType<Msg>()+"] for remote checker ["+rmodulecportname+"]"
395 
396  NRT_BBDEBUG("Serving remote check()...");
397  std::string const msgtype = nrt::MessageType<Msg>();
398  typedef typename std::shared_ptr<Msg const> ResultData;
399 
400  std::ostringstream os;
401  nrt::oarchive archive(os);
402 
403  std::shared_ptr<std::vector<ResultData> > localmatches(new std::vector<ResultData>());
404  {
405  // Get shared lock onto our tables:
406  boost::shared_lock<boost::shared_mutex> lck(itsModMtx);
407 
408  auto itr = itsRemoteCheckHitList.find(rmodulecportname);
409  if (itr == itsRemoteCheckHitList.end()) BBTHROW(UnknownChecker);
410 
411  for (auto const mkey : itr->second) {
412  // Get the base pointer to the message:
413  std::shared_ptr<nrt::MessageBase const> msgbase = getMessage(mkey, mcp, rmodulecportname);
414  if (msgbase) {
415  // Cast message back to its full derived type:
416  std::shared_ptr<Msg const> msg = std::dynamic_pointer_cast<Msg const>(msgbase);
417  if (!msg) BBTHROW(InternalInconsistency);
418 
419  localmatches->push_back(msg);
420  }
421  }
422  }
423 
424  size_t const n = localmatches->size();
425  try
426  {
427  NRT_BBDEBUG("Serializing " << n << " messages of type [" << msgtype << "]");
428  archive( n );
429  for (ResultData const & d : *localmatches)
430  {
431  Msg *mm = const_cast<Msg*>(d.get());
432  archive( *mm );
433  }
434 
435  // Move the contents of the archive into our output variable:
436  bytes = os.str();
437  }
438  catch (cereal::Exception const & e)
439  {
440  BBTHROWX(MessageSerializationError, e.what());
441  }
442  catch (...)
443  {
444  BBTHROW(MessageSerializationError);
445  }
446 
447  NRT_BBDEBUG("Done with remote check()");
448 
449 #undef BBWHAT
450 }
451 
452 // ######################################################################
453 template <class Mod> inline
454 std::shared_ptr<Mod> nrt::Blackboard::addModule(std::string const & instanceName, std::string const & namespc)
455 {
456 #define BBWHAT "While attempting to add Module of type [" + nrt::demangledName<Mod>() + "], instance [" + \
457  instanceName + "], namespace [" + namespc + ']'
458 
459  // Keep this code in sync with module and component versions
460 
461  // Enforce that Mod derives from Module:
462  static_assert(std::is_base_of<nrt::Module, Mod>::value, "Mod must derive from nrt::Module");
463 
464  // First, create the sub and put it into a shared_ptr:
465  std::shared_ptr<Mod> module(new Mod(instanceName));
466 
467  // Set the namespace:
468  if (namespc.empty() == false) module->setNamespace(namespc);
469 
470  // Then add it as a sub-component to us:
471  {
472  boost::upgrade_lock<boost::shared_mutex> uplck(itsSubMtx);
473 
474  std::string const inst = module->meta().instanceName;
475 
476  for (std::shared_ptr<nrt::Component> const & c : itsSubComponents)
477  if (inst == c->meta().instanceName) BBTHROW(BadInstanceName);
478 
479  NRT_MODDEBUG("Adding Module [" << module->meta().str() << ']');
480 
481  {
482  boost::upgrade_to_unique_lock<boost::shared_mutex> ulck(uplck);
483  itsSubComponents.push_back(module);
484  module->itsParent = this;
485  }
486  }
487 
488  // Register the module with the Blackboard; this will also update the federation:
489  registerModule(module.get());
490 
491  // Finally bring it to our runstate:
492  if (itsInitialized) module->init();
493  if (itsStarted) module->start();
494  if (itsRunning) module->launch();
495 
496  return module;
497 
498 #undef BBWHAT
499 }
500 
501 // ######################################################################
502 template <class Mod> inline
503 std::shared_ptr<Mod> nrt::Blackboard::getModule(std::string const & instanceName) const
504 {
505 #define BBWHAT "While attempting to get Module of type [" + nrt::demangledName<Mod>() + "], instance [" + \
506  instanceName + ']'
507 
508  // Keep this code in sync with module and component versions
509 
510  // Enforce that Mod derives from Module:
511  static_assert(std::is_base_of<nrt::Module, Mod>::value, "Mod must derive from nrt::Module");
512 
513  boost::shared_lock<boost::shared_mutex> lck(itsSubMtx);
514 
515  for (std::shared_ptr<nrt::Component> c : itsSubComponents)
516  if (instanceName == c->meta().instanceName)
517  {
518  std::shared_ptr<Mod> ret = std::dynamic_pointer_cast<Mod>(c);
519  if (ret) return ret; // correct type
520  BBTHROWX(UnknownModule, "Module/Component ["+instanceName+"] is not of type ["+nrt::demangledName<Mod>()+']');
521  }
522 
523  BBTHROW(UnknownModule);
524 
525 #undef BBWHAT
526 }
527 
528 // ######################################################################
529 template <class Mod>
530 void nrt::Blackboard::removeModule(std::shared_ptr<Mod> & module)
531 {
532  // Keep this code in sync with module and component versions
533 
534  // Enforce that Mod derives from Module:
535  static_assert(std::is_base_of<nrt::Module, Mod>::value, "Mod must derive from nrt::Module");
536 
537  boost::upgrade_lock<boost::shared_mutex> uplck(itsSubMtx);
538 
539  for (auto itr = itsSubComponents.begin(); itr != itsSubComponents.end(); ++itr)
540  if (itr->get() == module.get())
541  {
542  NRT_BBDEBUG("Removing Module [" << module->meta().str() << ']');
543 
544  boost::upgrade_to_unique_lock<boost::shared_mutex> ulck(uplck);
545  module->prepareForDeletion();
546  itsSubComponents.erase(itr);
547 
548  // Update the blackboard:
549  unRegisterModule(module.get());
550 
551  if (module.use_count() > 1)
552  NRT_WARNING(module.use_count() - 1 << " additional external shared_ptr references exist to "
553  "Module [" << module->meta().str() << "]. Module was NOT deleted.");
554 
555  module.reset(); // nuke the shared_ptr, this should yield a delete unless use_count was > 1
556 
557  return;
558  }
559 
560  NRT_WARNING("Module [" << module->meta().str() << "] not found. Ignored.");
561 }
562 
563 
564 // Un-define a bunch of macros
566 
567 #endif // INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_BLACKBOARDIMPL_H
568