iLab Neuromorphic Robotics Toolkit  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ThreadPoolImpl.H
Go to the documentation of this file.
1 /*! @file
2  @author Randolph Voorhies
3  @copyright GNU Public License (GPL v3)
4  @section License
5  @verbatim
6  // ////////////////////////////////////////////////////////////////////////
7  // The iLab Neuromorphic Robotics Toolkit (NRT) //
8  // Copyright 2010-2012 by the University of Southern California (USC) //
9  // and the iLab at USC. //
10  // //
11  // iLab - University of Southern California //
12  // Hedco Neurociences Building, Room HNB-10 //
13  // Los Angeles, Ca 90089-2520 - USA //
14  // //
15  // See http://ilab.usc.edu for information about this project. //
16  // ////////////////////////////////////////////////////////////////////////
17  // This file is part of The iLab Neuromorphic Robotics Toolkit. //
18  // //
19  // The iLab Neuromorphic Robotics Toolkit is free software: you can //
20  // redistribute it and/or modify it under the terms of the GNU General //
21  // Public License as published by the Free Software Foundation, either //
22  // version 3 of the License, or (at your option) any later version. //
23  // //
24  // The iLab Neuromorphic Robotics Toolkit is distributed in the hope //
25  // that it will be useful, but WITHOUT ANY WARRANTY; without even the //
26  // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
27  // PURPOSE. See the GNU General Public License for more details. //
28  // //
29  // You should have received a copy of the GNU General Public License //
30  // along with The iLab Neuromorphic Robotics Toolkit. If not, see //
31  // <http://www.gnu.org/licenses/>. //
32  // ////////////////////////////////////////////////////////////////////////
33  @endverbatim */
34 
35 
36 
37 #ifndef INCLUDE_NRT_CORE_DESIGN_DETAILS_THREADPOOLIMPL_H
38 #define INCLUDE_NRT_CORE_DESIGN_DETAILS_THREADPOOLIMPL_H
39 
40 // ######################################################################
41 struct nrt::ThreadPool::JobTicket
42 {
43  inline JobTicket(size_t const num) : itsNumJobs(num), itsNumCompleted(0) { }
44 
45  std::promise<void> itsPromise;
46  size_t itsNumJobs;
47  size_t itsNumCompleted;
48  std::mutex itsMtx;
49 };
50 
51 // ######################################################################
52 inline nrt::ThreadPool::ThreadPool(size_t const numThreads)
53 {
54  this->resize(numThreads);
55 }
56 
57 // ######################################################################
58 inline void nrt::ThreadPool::resize(size_t const numThreads)
59 {
60  boost::unique_lock<boost::shared_mutex> ulck(itsMtx);
61 
62  while (itsThreadPool.size() < numThreads)
63  {
64  itsRunningMap[itsThreadPool.size()].reset(new bool(true));
65  itsThreadPool.
66  push_back(std::thread(std::bind(&nrt::ThreadPool::workerThread, this, itsRunningMap[itsThreadPool.size()])));
67  }
68  while (itsThreadPool.size() > numThreads)
69  {
70  *itsRunningMap[itsThreadPool.size() -1] = false;
71  (itsThreadPool.end()-1)->detach();
72  itsThreadPool.pop_back();
73  }
74 }
75 
76 // ######################################################################
78 {
79  boost::unique_lock<boost::shared_mutex> ulck(itsMtx);
80 
81  for (size_t i = 0; i < itsThreadPool.size(); ++i)
82  if (itsThreadPool[i].joinable()) itsThreadPool[i].detach();
83 }
84 
85 // ######################################################################
86 template <> inline
87 std::future<void> nrt::ThreadPool::pushJob(std::function<void()> job)
88 {
89  std::shared_ptr<nrt::ThreadPool::JobTicket> ticket(new JobTicket(1));
90 
91  std::future<void> f = ticket->itsPromise.get_future();
92 
93  itsWorkQueue.push(std::make_pair(job, ticket));
94 
95  return f;
96 }
97 
98 // ######################################################################
99 template <class RETURN_TYPE> inline
100 std::future<RETURN_TYPE> nrt::ThreadPool::pushJob(std::function<RETURN_TYPE()> job)
101 {
102  if(itsThreadPool.empty())
103  throw nrt::exception::SerializableException("Cannot push job onto thread pool of zero size");
104 
105  // Make a bogus ticket for the work queue.
106  std::shared_ptr<nrt::ThreadPool::JobTicket> ticket(new JobTicket(1));
107 
108  std::shared_ptr<std::promise<RETURN_TYPE>> p(new std::promise<RETURN_TYPE>);
109  std::future<RETURN_TYPE> f = p->get_future();
110 
111  std::function<void()> jobWrapper = [p,job](){p->set_value(job());};
112 
113  itsWorkQueue.push(std::make_pair(jobWrapper, ticket));
114 
115  return f;
116 }
117 
118 // ######################################################################
119 inline std::future<void> nrt::ThreadPool::pushJobs(std::vector<std::function<void()>> jobs)
120 {
121  if(itsThreadPool.empty())
122  throw nrt::exception::SerializableException("Cannot push jobs onto thread pool of zero size");
123 
124  std::shared_ptr<nrt::ThreadPool::JobTicket> ticket(new JobTicket(jobs.size()));
125  std::future<void> fut = ticket->itsPromise.get_future();
126 
127  for (auto & j : jobs) itsWorkQueue.push(std::make_pair(j, ticket));
128 
129  return fut;
130 }
131 
132 // ######################################################################
134 {
135  while (itsWorkQueue.size()) usleep(10000);
136 
137  // Push empty jobs to wake up sleeping threads
138  for (size_t i = 0; i < itsThreadPool.size(); ++i)
139  {
140  *(itsRunningMap[i]) = false;
141  pushJob<void>([](){});
142  }
143 
144  for (size_t i = 0; i < itsThreadPool.size(); ++i) itsThreadPool[i].join();
145  itsThreadPool.resize(0);
146 }
147 
148 // ######################################################################
149 inline void nrt::ThreadPool::workerThread(std::shared_ptr<bool> running)
150 {
151  while (*running)
152  {
153  auto job = itsWorkQueue.pop();
154  auto fun = job.first;
155  auto ticket = job.second;
156 
157  fun();
158 
159  std::lock_guard<std::mutex> lck(ticket->itsMtx);
160  ticket->itsNumCompleted++;
161  if (ticket->itsNumCompleted == ticket->itsNumJobs) ticket->itsPromise.set_value();
162  }
163 }
164 
165 // ######################################################################
167 {
168  boost::shared_lock<boost::shared_mutex> lck(itsMtx);
169  return itsThreadPool.size();
170 }
171 
172 // ######################################################################
174 {
175  return itsWorkQueue.size();
176 }
177 
178 // ######################################################################
180 {
181  boost::unique_lock<boost::shared_mutex> ulck(itsMtx);
182 
183  for( auto worker : itsRunningMap )
184  if( !worker.second )
185  return false;
186 
187  return true;
188 }
189 
190 #endif // INCLUDE_NRT_CORE_DESIGN_DETAILS_THREADPOOLIMPL_H