00001
00002
00003
00004
00005 #ifndef SG_THREADEDWORKER_HPP
00006 #define SG_THREADEDWORKER_HPP
00007
00008 #include "SgDebug.h"
00009 #include <boost/thread.hpp>
00010 #include <boost/thread/barrier.hpp>
00011 #include <boost/thread/mutex.hpp>
00012 #include <boost/shared_ptr.hpp>
00013
00014
00015
00016 template<typename I, typename O, typename W>
00017 class SgThreadedWorker
00018 {
00019 public:
00020
00021 SgThreadedWorker(std::vector<W>& workers);
00022
00023 ~SgThreadedWorker();
00024
00025 void DoWork(const std::vector<I>& work,
00026 std::vector<std::pair<I,O> >& output);
00027
00028 private:
00029
00030 void StartDoingWork();
00031
00032 void WaitForThreadsToFinish();
00033
00034 void TellThreadsToQuit();
00035
00036 friend class Thread;
00037
00038
00039 class Thread
00040 {
00041 public:
00042 Thread(std::size_t threadId, W& worker,
00043 SgThreadedWorker<I,O,W>& threadedWork);
00044
00045 void operator()();
00046
00047 private:
00048
00049 std::size_t m_id;
00050
00051 W& m_worker;
00052
00053 SgThreadedWorker<I,O,W>& m_boss;
00054 };
00055
00056
00057 bool m_quit;
00058
00059
00060 boost::mutex m_workMutex;
00061
00062
00063 boost::mutex m_outputMutex;
00064
00065
00066 boost::barrier m_startWork;
00067
00068
00069 boost::barrier m_workFinished;
00070
00071
00072 std::size_t m_workIndex;
00073
00074
00075 const std::vector<I>* m_workToDo;
00076
00077
00078 std::vector<std::pair<I,O> >* m_output;
00079
00080
00081 std::vector<boost::shared_ptr<boost::thread> > m_threads;
00082 };
00083
00084
00085
00086 template<typename I, typename O, typename W>
00087 SgThreadedWorker<I,O,W>::SgThreadedWorker(std::vector<W>& workers)
00088 : m_quit(false),
00089 m_startWork(static_cast<unsigned int>(workers.size() + 1)),
00090 m_workFinished(static_cast<unsigned int>(workers.size() + 1))
00091 {
00092 for (std::size_t i = 0; i < workers.size(); ++i)
00093 {
00094 Thread runnable((int)i, workers[i], *this);
00095 boost::shared_ptr<boost::thread> thread(new boost::thread(runnable));
00096 m_threads.push_back(thread);
00097 }
00098 }
00099
00100 template<typename I, typename O, typename W>
00101 SgThreadedWorker<I,O,W>::~SgThreadedWorker()
00102 {
00103 TellThreadsToQuit();
00104 for (std::size_t i = 0; i < m_threads.size(); ++i)
00105 {
00106 m_threads[i]->join();
00107 SgDebug() << "SgThreadedWorker: joined " << i << '\n';
00108 }
00109 }
00110
00111 template<typename I, typename O, typename W>
00112 void SgThreadedWorker<I,O,W>::DoWork(const std::vector<I>& work,
00113 std::vector<std::pair<I,O> >& output)
00114 {
00115 m_workToDo = &work;
00116 m_workIndex = 0;
00117 m_output = &output;
00118 SgDebug() << "SgThreadedWorker::DoWork(): Processing "
00119 << work.size() << " jobs." << '\n';
00120 StartDoingWork();
00121 WaitForThreadsToFinish();
00122 }
00123
00124 template<typename I, typename O, typename W>
00125 SgThreadedWorker<I,O,W>::Thread::Thread(std::size_t threadId, W& worker,
00126 SgThreadedWorker<I,O,W>& threadedWorker)
00127 : m_id(threadId),
00128 m_worker(worker),
00129 m_boss(threadedWorker)
00130 {
00131 }
00132
00133 template<typename I, typename O, typename W>
00134 void SgThreadedWorker<I,O,W>::Thread::operator()()
00135 {
00136 while (true)
00137 {
00138 m_boss.m_startWork.wait();
00139 if (m_boss.m_quit)
00140 break;
00141
00142 while (true)
00143 {
00144 bool finished = false;
00145 const I* currentWork = 0;
00146 {
00147 boost::mutex::scoped_lock lock(m_boss.m_workMutex);
00148 if (m_boss.m_workIndex < m_boss.m_workToDo->size())
00149 currentWork = &(*m_boss.m_workToDo)[m_boss.m_workIndex++];
00150 else
00151 finished = true;
00152 }
00153 if (finished)
00154 break;
00155 O answer = m_worker(*currentWork);
00156 {
00157 boost::mutex::scoped_lock lock(m_boss.m_outputMutex);
00158 m_boss.m_output
00159 ->push_back(std::make_pair(*currentWork, answer));
00160 }
00161 }
00162
00163 m_boss.m_workFinished.wait();
00164 }
00165 }
00166
00167 template<typename I, typename O, typename W>
00168 void SgThreadedWorker<I,O,W>::StartDoingWork()
00169 {
00170 m_startWork.wait();
00171 }
00172
00173 template<typename I, typename O, typename W>
00174 void SgThreadedWorker<I,O,W>::WaitForThreadsToFinish()
00175 {
00176 m_workFinished.wait();
00177 }
00178
00179 template<typename I, typename O, typename W>
00180 void SgThreadedWorker<I,O,W>::TellThreadsToQuit()
00181 {
00182 m_quit = true;
00183 m_startWork.wait();
00184 }
00185
00186
00187
00188 #endif // SG_THREADEDWORKER_HPP