00001 //---------------------------------------------------------------------------- 00002 /** @file SgThreadedWorker.h */ 00003 //---------------------------------------------------------------------------- 00004 00005 #ifndef SG_THREADEDWORKER_HPP 00006 #define SG_THREADEDWORKER_HPP 00007 00008 #include "SgDebug.h" 00009 #include <boost/thread.hpp> 00010 #include <boost/thread/barrier.hpp> 00011 #include <boost/thread/mutex.hpp> 00012 #include <boost/shared_ptr.hpp> 00013 00014 //---------------------------------------------------------------------------- 00015 00016 template<typename I, typename O, typename W> 00017 class SgThreadedWorker 00018 { 00019 public: 00020 00021 SgThreadedWorker(std::vector<W>& workers); 00022 00023 ~SgThreadedWorker(); 00024 00025 void DoWork(const std::vector<I>& work, 00026 std::vector<std::pair<I,O> >& output); 00027 00028 private: 00029 00030 void StartDoingWork(); 00031 00032 void WaitForThreadsToFinish(); 00033 00034 void TellThreadsToQuit(); 00035 00036 friend class Thread; 00037 00038 /** Copyable object run in a boost::thread. */ 00039 class Thread 00040 { 00041 public: 00042 Thread(std::size_t threadId, W& worker, 00043 SgThreadedWorker<I,O,W>& threadedWork); 00044 00045 void operator()(); 00046 00047 private: 00048 00049 std::size_t m_id; 00050 00051 W& m_worker; 00052 00053 SgThreadedWorker<I,O,W>& m_boss; 00054 }; 00055 00056 /** Flag telling threads to exit. */ 00057 bool m_quit; 00058 00059 /** Threads must lock this mutex before getting work from list. */ 00060 boost::mutex m_workMutex; 00061 00062 /** Threads must lock this mutex before updating output. */ 00063 boost::mutex m_outputMutex; 00064 00065 /** Threads block on this barrier until told to start. */ 00066 boost::barrier m_startWork; 00067 00068 /** Threads block on this barrier until all are finished. */ 00069 boost::barrier m_workFinished; 00070 00071 /** Index of next problem to solve. */ 00072 std::size_t m_workIndex; 00073 00074 /** Problems to solve. */ 00075 const std::vector<I>* m_workToDo; 00076 00077 /** Solved problems. */ 00078 std::vector<std::pair<I,O> >* m_output; 00079 00080 /** The threads. */ 00081 std::vector<boost::shared_ptr<boost::thread> > m_threads; 00082 }; 00083 00084 //---------------------------------------------------------------------------- 00085 00086 template<typename I, typename O, typename W> 00087 SgThreadedWorker<I,O,W>::SgThreadedWorker(std::vector<W>& workers) 00088 : m_quit(false), 00089 m_startWork(static_cast<unsigned int>(workers.size() + 1)), 00090 m_workFinished(static_cast<unsigned int>(workers.size() + 1)) 00091 { 00092 for (std::size_t i = 0; i < workers.size(); ++i) 00093 { 00094 Thread runnable((int)i, workers[i], *this); 00095 boost::shared_ptr<boost::thread> thread(new boost::thread(runnable)); 00096 m_threads.push_back(thread); 00097 } 00098 } 00099 00100 template<typename I, typename O, typename W> 00101 SgThreadedWorker<I,O,W>::~SgThreadedWorker() 00102 { 00103 TellThreadsToQuit(); 00104 for (std::size_t i = 0; i < m_threads.size(); ++i) 00105 { 00106 m_threads[i]->join(); 00107 SgDebug() << "SgThreadedWorker: joined " << i << '\n'; 00108 } 00109 } 00110 00111 template<typename I, typename O, typename W> 00112 void SgThreadedWorker<I,O,W>::DoWork(const std::vector<I>& work, 00113 std::vector<std::pair<I,O> >& output) 00114 { 00115 m_workToDo = &work; 00116 m_workIndex = 0; 00117 m_output = &output; 00118 SgDebug() << "SgThreadedWorker::DoWork(): Processing " 00119 << work.size() << " jobs." << '\n'; 00120 StartDoingWork(); 00121 WaitForThreadsToFinish(); 00122 } 00123 00124 template<typename I, typename O, typename W> 00125 SgThreadedWorker<I,O,W>::Thread::Thread(std::size_t threadId, W& worker, 00126 SgThreadedWorker<I,O,W>& threadedWorker) 00127 : m_id(threadId), 00128 m_worker(worker), 00129 m_boss(threadedWorker) 00130 { 00131 } 00132 00133 template<typename I, typename O, typename W> 00134 void SgThreadedWorker<I,O,W>::Thread::operator()() 00135 { 00136 while (true) 00137 { 00138 m_boss.m_startWork.wait(); 00139 if (m_boss.m_quit) 00140 break; 00141 //SgDebug() << "[" << m_id << "]: starting..." << '\n'; 00142 while (true) 00143 { 00144 bool finished = false; 00145 const I* currentWork = 0; 00146 { 00147 boost::mutex::scoped_lock lock(m_boss.m_workMutex); 00148 if (m_boss.m_workIndex < m_boss.m_workToDo->size()) 00149 currentWork = &(*m_boss.m_workToDo)[m_boss.m_workIndex++]; 00150 else 00151 finished = true; 00152 } 00153 if (finished) 00154 break; 00155 O answer = m_worker(*currentWork); 00156 { 00157 boost::mutex::scoped_lock lock(m_boss.m_outputMutex); 00158 m_boss.m_output 00159 ->push_back(std::make_pair(*currentWork, answer)); 00160 } 00161 } 00162 //SgDebug() << "[" << m_id << "]: finished." << '\n'; 00163 m_boss.m_workFinished.wait(); 00164 } 00165 } 00166 00167 template<typename I, typename O, typename W> 00168 void SgThreadedWorker<I,O,W>::StartDoingWork() 00169 { 00170 m_startWork.wait(); 00171 } 00172 00173 template<typename I, typename O, typename W> 00174 void SgThreadedWorker<I,O,W>::WaitForThreadsToFinish() 00175 { 00176 m_workFinished.wait(); 00177 } 00178 00179 template<typename I, typename O, typename W> 00180 void SgThreadedWorker<I,O,W>::TellThreadsToQuit() 00181 { 00182 m_quit = true; 00183 m_startWork.wait(); 00184 } 00185 00186 //---------------------------------------------------------------------------- 00187 00188 #endif // SG_THREADEDWORKER_HPP