Index   Main   Namespaces   Classes   Hierarchy   Annotated   Files   Compound   Global   Pages  

SgThreadedWorker.h

Go to the documentation of this file.
00001 //----------------------------------------------------------------------------
00002 /** @file SgThreadedWorker.h */
00003 //----------------------------------------------------------------------------
00004 
00005 #ifndef SG_THREADEDWORKER_HPP
00006 #define SG_THREADEDWORKER_HPP
00007 
00008 #include "SgDebug.h"
00009 #include <boost/thread.hpp>
00010 #include <boost/thread/barrier.hpp>
00011 #include <boost/thread/mutex.hpp>
00012 #include <boost/shared_ptr.hpp>
00013 
00014 //----------------------------------------------------------------------------
00015 
00016 template<typename I, typename O, typename W>
00017 class SgThreadedWorker
00018 {
00019 public:
00020     
00021     SgThreadedWorker(std::vector<W>& workers);
00022 
00023     ~SgThreadedWorker();
00024 
00025     void DoWork(const std::vector<I>& work, 
00026                 std::vector<std::pair<I,O> >& output);
00027     
00028 private:
00029 
00030     void StartDoingWork();
00031 
00032     void WaitForThreadsToFinish();
00033 
00034     void TellThreadsToQuit();
00035 
00036     friend class Thread;
00037 
00038     /** Copyable object run in a boost::thread. */
00039     class Thread
00040     {
00041     public:
00042         Thread(std::size_t threadId, W& worker, 
00043                SgThreadedWorker<I,O,W>& threadedWork);
00044 
00045         void operator()();
00046 
00047     private:
00048 
00049         std::size_t m_id;
00050 
00051         W& m_worker;
00052             
00053         SgThreadedWorker<I,O,W>& m_boss;
00054     };
00055 
00056     /** Flag telling threads to exit. */
00057     bool m_quit;
00058 
00059     /** Threads must lock this mutex before getting work from list. */
00060     boost::mutex m_workMutex;
00061 
00062     /** Threads must lock this mutex before updating output. */
00063     boost::mutex m_outputMutex;
00064 
00065     /** Threads block on this barrier until told to start. */
00066     boost::barrier m_startWork;
00067 
00068     /** Threads block on this barrier until all are finished. */
00069     boost::barrier m_workFinished;
00070 
00071     /** Index of next problem to solve. */
00072     std::size_t m_workIndex;
00073 
00074     /** Problems to solve. */
00075     const std::vector<I>* m_workToDo;
00076 
00077     /** Solved problems. */
00078     std::vector<std::pair<I,O> >* m_output;
00079 
00080     /** The threads. */
00081     std::vector<boost::shared_ptr<boost::thread> > m_threads;
00082 };
00083 
00084 //----------------------------------------------------------------------------
00085 
00086 template<typename I, typename O, typename W>
00087 SgThreadedWorker<I,O,W>::SgThreadedWorker(std::vector<W>& workers)
00088     : m_quit(false),
00089       m_startWork(static_cast<unsigned int>(workers.size() + 1)),
00090       m_workFinished(static_cast<unsigned int>(workers.size() + 1))
00091 {
00092     for (std::size_t i = 0; i < workers.size(); ++i)
00093     {
00094         Thread runnable((int)i, workers[i], *this);
00095         boost::shared_ptr<boost::thread> thread(new boost::thread(runnable));
00096         m_threads.push_back(thread);
00097     }
00098 }
00099 
00100 template<typename I, typename O, typename W>
00101 SgThreadedWorker<I,O,W>::~SgThreadedWorker()
00102 {
00103     TellThreadsToQuit();
00104     for (std::size_t i = 0; i < m_threads.size(); ++i)
00105     {
00106         m_threads[i]->join();
00107         SgDebug() << "SgThreadedWorker: joined " << i << '\n';
00108     }
00109 }
00110 
00111 template<typename I, typename O, typename W>
00112 void SgThreadedWorker<I,O,W>::DoWork(const std::vector<I>& work,
00113                                    std::vector<std::pair<I,O> >& output)
00114 {
00115     m_workToDo = &work;
00116     m_workIndex = 0;
00117     m_output = &output;
00118     SgDebug() << "SgThreadedWorker::DoWork(): Processing " 
00119               << work.size() << " jobs." << '\n';
00120     StartDoingWork();
00121     WaitForThreadsToFinish();
00122 }
00123 
00124 template<typename I, typename O, typename W>
00125 SgThreadedWorker<I,O,W>::Thread::Thread(std::size_t threadId, W& worker, 
00126                                       SgThreadedWorker<I,O,W>& threadedWorker)
00127     : m_id(threadId),
00128       m_worker(worker),
00129       m_boss(threadedWorker)
00130 {
00131 }
00132 
00133 template<typename I, typename O, typename W>
00134 void SgThreadedWorker<I,O,W>::Thread::operator()()
00135 {
00136     while (true)
00137     {
00138         m_boss.m_startWork.wait();
00139         if (m_boss.m_quit) 
00140             break;
00141         //SgDebug() << "[" << m_id << "]: starting..."  << '\n';
00142         while (true)
00143         {
00144             bool finished = false;
00145             const I* currentWork = 0;
00146             {
00147                 boost::mutex::scoped_lock lock(m_boss.m_workMutex);
00148                 if (m_boss.m_workIndex < m_boss.m_workToDo->size())
00149                     currentWork = &(*m_boss.m_workToDo)[m_boss.m_workIndex++];
00150                 else
00151                     finished = true;
00152             }
00153             if (finished)
00154                 break;
00155             O answer = m_worker(*currentWork);
00156             {
00157                 boost::mutex::scoped_lock lock(m_boss.m_outputMutex);
00158                 m_boss.m_output
00159                     ->push_back(std::make_pair(*currentWork, answer));
00160             }
00161         }
00162         //SgDebug() << "[" << m_id << "]: finished." << '\n';
00163         m_boss.m_workFinished.wait();
00164     }
00165 }
00166 
00167 template<typename I, typename O, typename W>
00168 void SgThreadedWorker<I,O,W>::StartDoingWork()
00169 {
00170     m_startWork.wait();
00171 }
00172 
00173 template<typename I, typename O, typename W>
00174 void SgThreadedWorker<I,O,W>::WaitForThreadsToFinish()
00175 {
00176     m_workFinished.wait();
00177 }
00178 
00179 template<typename I, typename O, typename W>
00180 void SgThreadedWorker<I,O,W>::TellThreadsToQuit()
00181 {
00182     m_quit = true;
00183     m_startWork.wait();
00184 }
00185 
00186 //----------------------------------------------------------------------------
00187 
00188 #endif // SG_THREADEDWORKER_HPP


Sun Mar 13 2011 Doxygen 1.7.1