iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MessageCheckerResultsImpl.H
Go to the documentation of this file.
1 /*! @file
2  @author Laurent Itti
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 
36 #ifndef INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MESSAGECHECKERRESULTSIMPL_H
37 #define INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MESSAGECHECKERRESULTSIMPL_H
38 
39 // MessageCheckerResults inlined implementation
40 
41 template <class Msg> inline
43  itsBeginUsed(false)
44 { }
45 
46 // ######################################################################
47 template <class Msg> inline
49  itsBeginUsed(other.itsBeginUsed)
50 {
51  std::swap(messages, other.messages);
52  std::swap(readyFutures, other.readyFutures);
53  std::swap(futures, other.futures);
54 }
55 
56 // ######################################################################
57 template <class Msg> inline
60 {
61  if (this != &other) {
62  itsBeginUsed = other.itsBeginUsed;
63  messages.clear(); readyFutures.clear(); futures.clear();
64  std::swap(messages, other.messages);
65  std::swap(readyFutures, other.readyFutures);
66  std::swap(futures, other.futures);
67  }
68  return *this;
69 }
70 
71 // ######################################################################
72 template <class Msg> inline
74 {
75  // Block until all results are available, if any pending result is still present; then run a get on each result so
76  // that we will propagate any exception in the results:
77  waitall();
78 
79  while (! exhausted_core(std::chrono::microseconds(0)) ) get();
80 }
81 
82 // ######################################################################
83 template <class Msg> inline
84 bool nrt::MessageCheckerResults<Msg>::exhausted_core(std::chrono::microseconds const maxwait)
85 {
86  // If we have some unpacked results ready to go, then we are not exhausted:
87  if (messages.empty() == false) return false;
88 
89  // If we have some futures ready to go, then we are not exhausted:
90  if (readyFutures.empty() == false) return false;
91 
92  // Otherwise, if no more futures, we know we have exhausted all data:
93  if (futures.empty()) return true;
94 
95  // Otherwise, let's wait and see whether we can say something about any future that is ready:
96  std::chrono::time_point<std::chrono::system_clock> const stoptime = std::chrono::system_clock::now() + maxwait;
97 
98  auto itr = futures.begin();
99  while (itr != futures.end())
100  if (itr->wait_until(stoptime) == std::future_status::ready) { readyFutures.push_back(std::move(*itr)); itr = futures.erase(itr); }
101  else ++itr;
102 
103  // If we have some futures ready to go, then we are not exhausted:
104  if (readyFutures.empty() == false) return false;
105 
106  // Otherwise, if no more futures, we know we have exhausted all data:
107  if (futures.empty()) return true;
108 
109  // If we have not returned at this point, then we have no ready results, but we still have some futures which are not
110  // ready yet, so the only thing we can say is that we may have more data coming in, and consequently here we return
111  // that we are not exhausted yet:
112  return false;
113 }
114 
115 // ######################################################################
116 template <class Msg> inline
117 bool nrt::MessageCheckerResults<Msg>::exhausted(std::chrono::microseconds const maxwait)
118 {
119  // Anything we can say without waiting?
120  if (exhausted_core(std::chrono::microseconds(0))) return true;
121 
122  // Anything we can say with waiting, if a non-zero maxwait was given? Note that if our previous attempt revealed some
123  // new ready results, this call will return right away without waiting:
124  if (maxwait != std::chrono::microseconds(0) && exhausted_core(maxwait)) return true;
125 
126  // If we have not returned at this point, then either we have found some new ready results or we still have some
127  // futures but they are not ready yet, so we are not exhausted:
128  return false;
129 }
130 
131 // ######################################################################
132 template <class Msg> inline
134 {
135  // Move any now-ready futures onto the ready queue
136  auto itr = futures.begin();
137  while (itr != futures.end())
138  if (itr->is_ready()) { readyFutures.push_back(std::move(*itr)); itr = futures.erase(itr); }
139  else ++itr;
140 
141  return ( (messages.empty() == false) || (readyFutures.empty() == false) );
142 }
143 
144 // ######################################################################
145 template <class Msg> inline
147 { return ( ! exhausted_core(std::chrono::microseconds(0)) ); }
148 
149 // ######################################################################
150 template <class Msg> inline
151 bool nrt::MessageCheckerResults<Msg>::waitall(std::chrono::microseconds const maxwait)
152 {
153  // Wait for a while and transfer any new ready results out of the futures and into the reday futures. Thus we know
154  // that all results are ready when our futures list has become empty:
155  exhausted_core(maxwait);
156 
157  if (futures.empty()) return true;
158 
159  return false;
160 }
161 
162 // ######################################################################
163 template <class Msg> inline
164 typename std::shared_ptr<Msg const> nrt::MessageCheckerResults<Msg>::get()
165 {
166  // Do we have any previously unpacked result ready to go, if so, consume and return the first one:
167  if (messages.empty() == false) {
168  std::shared_ptr<Msg const> ret = messages.front(); messages.pop();
169  return ret;
170  }
171 
172  while(true) {
173  // Do we have any ready futures? If so, grab all their messages, which may throw. Since some of them may be empty,
174  // we here tentatively loop and grab everything we can (without waiting), not just the first message that comes:
175  while (readyFutures.empty() == false) {
176  // Pop the first ready future out:
177  std::future<std::shared_ptr<std::vector<typename std::shared_ptr<Msg const> > > > ret =
178  std::move(readyFutures.front());
179  readyFutures.pop_front();
180 
181  // Now do a get on it (which may throw):
182  std::shared_ptr<std::vector<typename std::shared_ptr<Msg const> > > vec = ret.get();
183 
184  // Is there anything in the results? If so, transfer all messages into our vector of ready results:
185  if (vec) for (auto & ptr : *vec) if (ptr) messages.push(ptr);
186 
187  // Do we now have any new result ready to go, if so, consume and return the first one:
188  if (messages.empty() == false) {
189  std::shared_ptr<Msg const> ret = messages.front(); messages.pop();
190  return ret;
191  }
192  }
193 
194  // Ok, so we don't have any messages unpacked, maybe we are exhausted (note that the call to exhausted will wait if
195  // and as necessary)?
196  if (exhausted_core(std::chrono::hours::max())) return std::shared_ptr<Msg const>();
197 
198  // All right, the call to exhausted() should have unveiled new ready futures, so just try again...
199  }
200 }
201 
202 // ######################################################################
203 template <class Msg>
204 class nrt::MessageCheckerResults<Msg>::ResultFuture
205 {
206  public:
207  operator typename std::shared_ptr<Msg const>() { return itsMCR->get(); }
208  typename std::shared_ptr<Msg const> get()
209  { return itsMCR->get(); }
210 
211  private:
212  friend class nrt::MessageCheckerResults<Msg>;
213  ResultFuture(nrt::MessageCheckerResults<Msg> *mcr) :
214  itsMCR(mcr)
215  { }
216 
218 
219 };
220 
221 // ######################################################################
222 template<class Msg>
224 {
225  if (itsBeginUsed) throw std::range_error("Tried to get begin() from a MessageCheckerResults twice");
226  itsBeginUsed = true;
227  return iterator(this, this->exhausted());
228 }
229 
230 // ######################################################################
231 template<class Msg>
233 {
234  return iterator(this, true);
235 }
236 
237 // ######################################################################
238 template<class Msg>
239 class nrt::MessageCheckerResults<Msg>::iterator
240 {
241  public:
242  iterator(iterator const& other) :
243  itsMCR(other.itsMCR), itsIsEnd(other.itsIsEnd), itsReadyForDeref(other.itsReadyForDeref)
244  { }
245 
246  iterator& operator++()
247  {
248  itsReadyForDeref = true;
249  return *this;
250  }
251 
252  iterator operator++(int)
253  {
254  itsReadyForDeref = true;
255  return *this;
256  }
257 
258  bool operator==(iterator const& other) const
259  {
260  if (other.itsIsEnd && itsMCR->exhausted()) return true;
261  return false;
262  }
263 
264  bool operator!=(iterator const& other) const
265  {
266  return !(*this == other);
267  }
268 
269  ResultFuture operator*()
270  {
271  if(itsIsEnd)
272  throw std::range_error("Tried to dereference a MessageCheckerResults::end() iterator");
273  if(!itsReadyForDeref)
274  throw std::range_error("Tried to dereference a MessageCheckerResults::iterator twice without incrementing");
275 
276  ResultFuture res(itsMCR);
277  itsReadyForDeref = false;
278  return std::move(res);
279  }
280 
281  private:
282  friend class MessageCheckerResults<Msg>;
283 
284  iterator(MessageCheckerResults<Msg> *mcr, bool isEnd) : itsMCR(mcr), itsIsEnd(isEnd), itsReadyForDeref(true)
285  { }
286 
287  MessageCheckerResults<Msg> *itsMCR;
288  bool itsIsEnd;
289  bool itsReadyForDeref;
290 };
291 
292 #endif // INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MESSAGECHECKERRESULTSIMPL_H