iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MessagePosterResultsImpl.H
Go to the documentation of this file.
1 /*! @file
2  @author Laurent Itti
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 
36 #ifndef INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MESSAGEPOSTERRESULTSIMPL_H
37 #define INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MESSAGEPOSTERRESULTSIMPL_H
38 
39 // MessagePosterResults inlined implementation
40 
41 template <class Posting> inline
43  itsBeginUsed(false)
44 { }
45 
46 // ######################################################################
47 template <class Posting> inline
49  itsBeginUsed(other.itsBeginUsed)
50 {
51  std::swap(readyFutures, other.readyFutures);
52  std::swap(futures, other.futures);
53 }
54 
55 // ######################################################################
56 template <class Posting> inline
59 {
60  if (this != &other)
61  {
62  itsBeginUsed = other.itsBeginUsed;
63  readyFutures.clear(); futures.clear();
64  std::swap(readyFutures, other.readyFutures);
65  std::swap(futures, other.futures);
66  }
67  return *this;
68 }
69 
70 // ######################################################################
71 template <class Posting> inline
73 {
74  // Block until all results are available, if any pending result is still present; then run a get on each result so
75  // that we will propagate any exception in the results:
76  waitgetall();
77 }
78 
79 // ######################################################################
80 template <class Posting> inline
81 bool nrt::MessagePosterResults<Posting>::waitall(std::chrono::microseconds const maxwait)
82 {
83  if (futures.empty()) return true; // nothing to wait for
84 
85  std::chrono::time_point<std::chrono::system_clock> const stoptime = std::chrono::system_clock::now() + maxwait;
86 
87  // wait a bit on each future (if not ready yet) and check the resulting status:
88  auto itr = futures.begin();
89  while (itr != futures.end())
90  if (itr->wait_until(stoptime) == std::future_status::ready)
91  { readyFutures.push_back(std::move(*itr)); itr = futures.erase(itr); }
92  else ++itr;
93 
94  // If we have no more pending futures, then we successfully waited on all of them:
95  return futures.empty();
96 }
97 
98 // ######################################################################
99 template <class Posting> inline
101 {
102  // Move any now-ready futures onto the ready queue
103  auto itr = futures.begin();
104  while (itr != futures.end())
105  if (itr->wait_for(std::chrono::seconds(0))) { readyFutures.push_back(std::move(*itr)); itr = futures.erase(itr); }
106  else ++itr;
107 
108  return (! readyFutures.empty());
109 
110 }
111 
112 // ######################################################################
113 template <class Posting> inline
115 { return readyFutures.size() + futures.size(); }
116 
117 // ######################################################################
118 template <class Posting> inline
120 { return (readyFutures.empty() && futures.empty()); }
121 
122 // ######################################################################
123 template <class Posting> inline
125 { return ( ! (readyFutures.empty() && futures.empty()) ); }
126 
127 // ######################################################################
128 template <class Posting> inline
129 typename Posting::RetPtr nrt::MessagePosterResults<Posting>::get()
130 {
131  while(true) {
132  // Any future ready to go?
133  if (! readyFutures.empty()) {
134  // Pop the future out:
135  std::future<typename Posting::RetPtr> ret = std::move(readyFutures.front());
136  readyFutures.pop_front();
137 
138  // Now do a get on it (which may throw):
139  return ret.get();
140  }
141 
142  // Ok, no futures ready, any futures left at all?
143  if (futures.empty()) throw std::range_error("Tried to get() on empty MessagePosterResults");
144 
145  // All right, some futures left, wait on them for a bit and try again:
146  waitall(std::chrono::microseconds(1000));
147  }
148 }
149 
150 // ######################################################################
151 template <class Posting> inline
153 {
154  // first wait until all done:
155  waitall();
156 
157  // Now run get() on each result, which may throw:
158  while (! readyFutures.empty()) {
159  // Pop the future out:
160  std::future<typename Posting::RetPtr> ret = std::move(readyFutures.front());
161  readyFutures.pop_front();
162 
163  // Now do a get on it (which may throw):
164  ret.get();
165  }
166 }
167 
168 // ######################################################################
169 template <class Posting>
170 class nrt::MessagePosterResults<Posting>::ResultFuture :
171  public std::future<typename Posting::RetPtr>
172 {
173  public:
174  operator typename Posting::RetPtr() { return this->get(); }
175  private:
176  friend class MessagePosterResults<Posting>;
177  ResultFuture(std::future<typename Posting::RetPtr>&& fut) :
178  std::future<typename Posting::RetPtr>(std::move(fut)) { }
179 };
180 
181 // ######################################################################
182 template <class Posting>
183 std::future<typename Posting::RetPtr> nrt::MessagePosterResults<Posting>::takeNextFuture()
184 {
185  while(true) {
186  // Any future ready to go?
187  if (! readyFutures.empty()) {
188  // Pop the future out:
189 
190  std::future<typename Posting::RetPtr> ret = std::move(readyFutures.front());
191 
192  readyFutures.pop_front();
193 
194  // Now, just return it without get()ing
195  return ret;
196  }
197 
198  // Ok, no futures ready, any futures left at all?
199  if (futures.empty()) throw std::range_error("Tried to get() on empty MessagePosterResults");
200 
201  // All right, some futures left, wait on them for a bit and try again:
202  waitall(std::chrono::microseconds(1000));
203  }
204 }
205 
206 // ######################################################################
207 template <class Posting>
209 {
210  if (itsBeginUsed) throw std::range_error("Tried to get begin() from a MessagePosterResults twice");
211  itsBeginUsed = true;
212  return iterator(this, this->empty());
213 }
214 
215 // ######################################################################
216 template<class Posting>
218 {
219  return iterator(this, true);
220 }
221 
222 // ######################################################################
223 template<class Posting>
224 class nrt::MessagePosterResults<Posting>::iterator
225 {
226  public:
227  iterator(iterator const & other) :
228  itsMPR(other.itsMPR), itsIsEnd(other.itsIsEnd), itsReadyForDeref(other.itsReadyForDeref)
229  { }
230 
231  iterator & operator++()
232  {
233  itsReadyForDeref = true;
234  return *this;
235  }
236 
237  iterator operator++(int)
238  {
239  itsReadyForDeref = true;
240  return *this;
241  }
242 
243  bool operator==(iterator const & other) const
244  {
245  if (other.itsIsEnd && itsMPR->empty()) return true;
246  return false;
247  }
248 
249  bool operator!=(iterator const & other) const
250  {
251  return !(*this == other);
252  }
253 
254  ResultFuture operator*()
255  {
256  if (itsIsEnd)
257  throw std::range_error("Tried to dereference a MessagePosterResults::end() iterator");
258  if (!itsReadyForDeref)
259  throw std::range_error("Tried to dereference a MessagePosterResults::iterator twice without incrementing");
260 
261  ResultFuture res(itsMPR->takeNextFuture());
262  itsReadyForDeref = false;
263  return std::move(res);
264  }
265 
266  private:
267  friend class MessagePosterResults<Posting>;
268 
269  iterator(MessagePosterResults<Posting> *mpr, bool isEnd) : itsMPR(mpr), itsIsEnd(isEnd), itsReadyForDeref(true)
270  { }
271 
272  MessagePosterResults<Posting> *itsMPR;
273  bool itsIsEnd;
274  bool itsReadyForDeref;
275 };
276 
277 #endif // INCLUDE_NRT_CORE_BLACKBOARD_DETAILS_MESSAGEPOSTERRESULTSIMPL_H