Toast++  1.0.2 (r.539)
Forward and inverse modelling in optical tomography
task.h
1 // -*-C++-*-
2 
3 #ifndef __TASK_H
4 #define __TASK_H
5 
6 #ifdef TOAST_THREAD
7 
8 #include <pthread.h>
9 
10 #define NUMTHREAD 2
11 
12 
13 #ifdef TOAST_PARALLEL
14 #define INITMUTEX(m) pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER
15 #define STATICINITMUTEX(m) static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER
16 #define MUTEXLOCK(m) pthread_mutex_lock (m)
17 #define MUTEXUNLOCK(m) pthread_mutex_unlock (m)
18 #else
19 #define INITMUTEX(m)
20 #define STATICINITMUTEX(m)
21 #define MUTEXLOCK(m)
22 #define MUTEXUNLOCK(m)
23 #endif
24 
25 typedef struct {
26  int proc;
27  int np;
28  void *data;
29 } task_data;
30 
31 void Task_Init (int nth);
32 
33 class MATHLIB Task {
34 public:
35  Task() {}
36 
37  static int nProcessor();
38  // number of processors available
39 
40  static void SetThreadCount (int _nthread) { nthread = _nthread; }
41  // set the number of threads to use for multiprocessing tasks.
42  // Default is nProcessor()
43 
44  static int GetThreadCount () { return nthread; }
45  // return current thread count setting
46 
47  static double GetThreadCPUTiming () { return ttime; }
48  // returns user time [sec] spent by threads inside Multiprocess().
49  // Note that the interpretation of this time is platform-dependent:
50  // For Linux, this is the time spent by the master thread. For Sun and
51  // SGI, it is the sum of all threads.
52 
53  static double GetThreadWallTiming () { return wtime; }
54  // Wall clock (real) time spent inside Multiprocess()
55 
56  static void Multiprocess (void (*func)(task_data*), void *data, int np = 0);
57  // run function 'func' in parallel. User data 'data' are passed to
58  // each instance of 'func'. 'np' is the number of threads to create. If
59  // np==0 then nthread is used
60 
61  static bool IsMultiprocessing() { return is_multiprocessing; }
62 
63  inline static void UserMutex_lock() { pthread_mutex_lock (&user_mutex); }
64  inline static void UserMutex_unlock() {pthread_mutex_unlock (&user_mutex);}
65 
66 private:
67  static int nthread;
68  static double ttime; // accumulated cpu time spent multiprocessing
69  static double wtime; // accumulated wall clock time spent multiprocessing
70  static pthread_mutex_t user_mutex;
71  static bool is_multiprocessing; // we are currently in Multiprocess
72 };
73 
74 // ===========================================================================
75 // class ThreadPool
76 
77 typedef struct tpool_work {
78  void (*routine)(void*,int,int); // task
79  void *arg; // task arguments
80  int idx0, idx1; // task index range
81  int id; // task no. in sequence
82  int *counter; // pointer to sequence task counter
83  struct tpool_work *next;
84 } tpool_work_t;
85 
86 typedef struct tpool {
87  int num_threads; // number of threads
88  int queue_size; // current queue size
89  pthread_t *threads; // array of worker threads
90  tpool_work_t *queue_head, *queue_tail;
91 
92  pthread_mutex_t queue_lock;
93  pthread_cond_t queue_not_empty;
94  pthread_cond_t thread_done;
95 } tpool_t;
96 
97 class ThreadPool {
98 public:
99  ThreadPool (int num_threads);
100  // Construct a pool with `num_threads' threads
101 
102  void ProcessSequence (void (*routine)(void*,int,int), void *arg,
103  int from, int to, int grain=1);
104  // Calls `routine(arg,i)' for a sequence from <= i < to of indices
105  // `grain' defines the granularity, i.e. the number of indices assigned
106  // to each task
107  // Function returns when complete sequence is processed
108 
109  inline void LockUserMutex() { pthread_mutex_lock (&user_lock); }
110  inline void UnlockUserMutex() { pthread_mutex_unlock (&user_lock); }
111 
112 private:
113  tpool_t *tpool; // pool properties
114  pthread_mutex_t user_lock; // user-space mutex
115 };
116 
117 
118 // ===========================================================================
119 // class ThreadPool2
120 
121 typedef struct {
122  void(*task)(int,void*);
123  void *context;
124  pthread_mutex_t mutex; // general-purpose mutex to be used by workers
125 } THREAD_GLOBAL;
126 
127 typedef struct {
128  int nth; // thread index
129  pthread_t thread; // thread handle
130  pthread_mutex_t wakeup_mutex; // locked by worker during task processing
131  pthread_mutex_t done_mutex;
132  pthread_cond_t wakeup_cond;
133  pthread_cond_t done_cond;
134  bool wakeup;
135  bool done;
136  THREAD_GLOBAL *tg; // pointer to global pool data
137 } THREAD_DATA;
138 
139 class ThreadPool2 {
140 public:
141  ThreadPool2 (int num_threads);
142  ~ThreadPool2 ();
143  static void Initialise (int num_threads);
144  static ThreadPool2 *Pool() { return g_tpool2; }
145 
146  void MutexLock() { pthread_mutex_lock (&tg.mutex); }
147  void MutexUnlock() { pthread_mutex_unlock (&tg.mutex); }
148  inline int NumThread() const { return nthread; }
149 
150  void Invoke (void(*func)(int,void*), void *context);
151 
152 private:
153  int nthread; // number of threads
154  THREAD_GLOBAL tg; // global pool data
155  THREAD_DATA *td; // array of worker threads
156  static ThreadPool2 *g_tpool2;
157 
158  static void *tpool_thread (void *context);
159 };
160 
161 #ifndef __TASK_CC
162 #ifndef OLD_PARALLEL
163 void TPool_Init (int nt);
164 extern ThreadPool *g_tpool;
165 #endif
166 #endif
167 
168 #endif // TOAST_THREAD
169 #endif // !__TASK_H