iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BottomlessThreadPoolImpl.H
Go to the documentation of this file.
1 /*! @file
2  @author Randolph Voorhies
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 #ifndef INCLUDE_NRT_CORE_UTIL_DETAILS_BOTTOMLESSTHREADPOOLIMPL_H
36 #define INCLUDE_NRT_CORE_UTIL_DETAILS_BOTTOMLESSTHREADPOOLIMPL_H
37 
38 // ######################################################################
39 //nrt::BottomlessThreadPool::Worker::Worker(BottomlessThreadPool * sourcePool, std::shared_ptr<Channel> channel, size_t id, std::function<void()> job) :
40 inline nrt::BottomlessThreadPool::Worker::Worker(BottomlessThreadPool * sourcePool, std::shared_ptr<Channel> channel, size_t id, FunctionWrapper && job) :
41  itsChannel(channel),
42  itsId(id),
43  itsSourcePool(sourcePool),
44  itsRunning(true),
45  itsThread(&Worker::workThread, this, std::move(job))
46 { }
47 
48 // ######################################################################
49 inline nrt::BottomlessThreadPool::Worker::~Worker()
50 {
51  if(itsSourcePool)
52  {
53  {
54  std::unique_lock<std::mutex> _(itsSourcePool->itsMtx);
55  itsRunning = false;
56  }
57  itsChannel->condition.notify_one();
58  itsThread.join();
59  }
60 
61 }
62 
63 // ######################################################################
64 inline void nrt::BottomlessThreadPool::Worker::workThread(FunctionWrapper && initialJob)
65 //void nrt::BottomlessThreadPool::Worker::workThread(std::function<void()> initialJob)
66 {
67  initialJob();
68  while(itsRunning)
69  {
70  {
71  std::unique_lock<std::mutex> lock(itsSourcePool->itsMtx);
72  itsSourcePool->itsOpenChannels.push(itsId);
73  itsChannel->condition.wait(lock);
74  }
75 
76  itsChannel->job();
77  itsChannel->job = [](){};
78  }
79 }
80 
81 // ######################################################################
83 {
84  for(size_t i = 0; i < initialThreads; ++i)
85  {
86  std::shared_ptr<Channel> channel(new Channel);
87  itsChannels.push_back(channel);
88  itsWorkers.emplace_back(std::unique_ptr<Worker>(new Worker(this, channel, itsChannels.size()-1, [](){})));
89  }
90 }
91 
92 // ######################################################################
94 {
95 }
96 
97 // ######################################################################
98 template <class Func> inline
99 std::future<typename std::result_of<Func()>::type>
100 nrt::BottomlessThreadPool::pushJob( Func f )
101 {
102  typedef typename std::result_of<Func()>::type result_type;
103 
104  std::packaged_task<result_type()> task( std::move( f ) );
105  std::future<result_type> result( task.get_future() );
106 
107  std::lock_guard<std::mutex> lock(itsMtx);
108  if(itsOpenChannels.size() == 0)
109  {
110  std::shared_ptr<Channel> channel(new Channel);
111  itsChannels.push_back(channel);
112  itsWorkers.emplace_back(std::unique_ptr<Worker>(new Worker(this, channel, itsChannels.size()-1, std::move( task ))));
113  }
114  else
115  {
116  size_t channelIdx = itsOpenChannels.front();
117  itsOpenChannels.pop();
118 
119  std::shared_ptr<Channel> channel = itsChannels[channelIdx];
120 
121  channel->job = std::move( task );
122  channel->condition.notify_one();
123  }
124 
125  return result;
126 }
127 
128 // ######################################################################
129 /*template<class ReturnType>
130 std::future<ReturnType> nrt::BottomlessThreadPool::pushJob(std::function<ReturnType()> job)
131 {
132  auto promise = std::make_shared<std::promise<ReturnType>>();
133  std::future<ReturnType> future = promise->get_future();
134 
135  pushJobWrapper([promise, job](){ promise->set_value(job()); });
136 
137  return future;
138 }*/
139 
140 
141 // ######################################################################
143 {
144  std::unique_lock<std::mutex> lock(itsMtx);
145  return itsChannels.size();
146 }
147 
148 // ######################################################################
149 /*void nrt::BottomlessThreadPool::pushJobWrapper(std::function<void()> jobWrapper)
150 {
151  std::lock_guard<std::mutex> lock(itsMtx);
152  if(itsOpenChannels.size() == 0)
153  {
154  std::shared_ptr<Channel> channel(new Channel);
155  itsChannels.push_back(channel);
156  itsWorkers.emplace_back(std::unique_ptr<Worker>(new Worker(this, channel, itsChannels.size()-1, jobWrapper)));
157  }
158  else
159  {
160  size_t channelIdx = itsOpenChannels.front();
161  itsOpenChannels.pop();
162 
163  std::shared_ptr<Channel> channel = itsChannels[channelIdx];
164 
165  channel->job = jobWrapper;
166  channel->condition.notify_one();
167  }
168 }
169 */
170 
171 // ######################################################################
172 /*template<> inline
173 std::future<void> nrt::BottomlessThreadPool::pushJob(std::function<void()> job)
174 {
175  auto promise = std::make_shared<std::promise<void>>();
176  std::future<void> future = promise->get_future();
177 
178  pushJobWrapper([promise, job]() { job(); promise->set_value(); });
179 
180  return future;
181 }
182 */
183 
184 #endif // INCLUDE_NRT_CORE_UTIL_DETAILS_BOTTOMLESSTHREADPOOLIMPL_H