My Project
parallelcxx11.hh
Go to the documentation of this file.
1 /* -*- mia-c++ -*-
2  *
3  * This file is part of MIA - a toolbox for medical image analysis
4  * Copyright (c) Leipzig, Madrid 1999-2017 Gert Wollny
5  *
6  * MIA is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with MIA; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #ifndef mia_core_parallelcxx11_hh
22 #define mia_core_parallelcxx11_hh
23 
24 #include <mia/core/defines.hh>
25 
26 #include <thread>
27 #include <atomic>
28 #include <mutex>
29 #include <cassert>
30 #include <vector>
31 
33 
34 typedef std::mutex CMutex;
35 typedef std::recursive_mutex CRecursiveMutex;
36 
37 
39 {
40 public:
41  static int get_max_tasks();
42  static void set_max_tasks(int mt);
43 private:
44  static int max_tasks;
45 };
46 
47 #define ATOMIC std::atomic
48 
49 template <typename Mutex>
51 {
52 public:
53  TScopedLock(Mutex& m): m_mutex(m)
54  {
55  m_mutex.lock();
56  own_lock = true;
57  };
58 
59  TScopedLock(const TScopedLock<Mutex>& other) = delete;
60  TScopedLock& operator = (const TScopedLock<Mutex>& other) = delete;
61 
63  {
64  if (own_lock)
65  m_mutex.unlock();
66  };
67 
68  void release()
69  {
70  if (own_lock) {
71  own_lock = false;
72  m_mutex.unlock();
73  }
74  }
75 private:
76  Mutex& m_mutex;
77  bool own_lock;
78 };
79 
82 
84 {
85 public:
86  C1DParallelRange(int begin, int end, int block = 1):
87  m_begin(begin),
88  m_end(end),
89  m_block(block),
90  m_current_wp(0)
91  {
92  assert(begin <= end);
93  }
94 
96  m_begin(orig.m_begin),
97  m_end(orig.m_end),
98  m_block(orig.m_block)
99  {
100  m_current_wp = orig.m_current_wp.load();
101  }
102 
104  {
105  int wp = m_current_wp++;
106  int begin = m_begin + wp * m_block;
107  int end = begin + m_block;
108 
109  if (begin > m_end) {
110  return C1DParallelRange(m_end, m_end, 0);
111  }
112 
113  if (end > m_end) {
114  return C1DParallelRange(begin, m_end, 1);
115  }
116 
117  return C1DParallelRange(begin, end, 1);
118  }
119 
120  bool empty() const
121  {
122  return m_begin >= m_end;
123  }
124 
125  int begin() const
126  {
127  return m_begin;
128  }
129 
130  int end() const
131  {
132  return m_end;
133  }
134 
135 private:
136  int m_begin;
137  int m_end;
138  int m_block;
139  std::atomic<int> m_current_wp;
140 };
141 
142 // The functor f must actually be passed by value because a copy must
143 // be used.
144 //coverity[PASS_BY_VALUE]
145 template <typename Range, typename Func>
146 void pfor_callback(Range& range, Func f)
147 {
148  while (true) {
149  Range wp = range.get_next_workpackage();
150 
151  if (!wp.empty())
152  f(wp);
153  else
154  break;
155  }
156 }
157 
158 template <typename Range, typename Func>
159 void pfor(Range range, const Func& f)
160 {
161  int max_treads = CMaxTasks::get_max_tasks();
162  std::thread::hardware_concurrency();
163  std::vector<std::thread> threads;
164 
165  for (int i = 0; i < max_treads; ++i) {
166  threads.push_back(std::thread(pfor_callback<Range, Func>, std::ref(range), f));
167  }
168 
169  for (int i = 0; i < max_treads; ++i) {
170  threads[i].join();
171  }
172 };
173 
174 template <typename V>
176 {
177 public:
178  typedef V Value;
179  ReduceValue(const Value& i): identity(i), value(i)
180  {
181  }
182 
183  template <typename Reduce>
184  void reduce(const Value& v, Reduce r)
185  {
186  CScopedLock sl(mutex);
187  value = r(v, value);
188  }
189  const Value& get_identity() const
190  {
191  return identity;
192  }
193  const Value& get_reduced() const
194  {
195  return value;
196  }
197 private:
198  mutable CMutex mutex;
199  Value identity;
200  Value value;
201 };
202 
203 // The functor f must actually be passed by value because a copy must
204 // be used.
205 //coverity[PASS_BY_VALUE]
206 template <typename Range, typename Value, typename Func, typename Reduce>
207 void preduce_callback(Range& range, ReduceValue<Value>& v, Func f, Reduce r)
208 {
209  Value value = v.get_identity();
210 
211  while (true) {
212  Range wp = range.get_next_workpackage();
213 
214  if (!wp.empty())
215  value = f(wp, value);
216  else
217  break;
218  }
219 
220  v.reduce(value, r);
221 }
222 
223 template <typename Range, typename Value, typename Func, typename Reduce>
224 Value preduce(Range range, Value identity, const Func& f, Reduce r)
225 {
226  int max_treads = CMaxTasks::get_max_tasks();
227  ReduceValue<Value> value(identity);
228  std::vector<std::thread> threads;
229 
230  for (int i = 0; i < max_treads; ++i) {
231  threads.push_back(std::thread(preduce_callback<Range, Value, Func, Reduce>,
232  std::ref(range), std::ref(value), f, r));
233  }
234 
235  for (int i = 0; i < max_treads; ++i) {
236  threads[i].join();
237  }
238 
239  return value.get_reduced();
240 };
241 
243 
244 #endif
C1DParallelRange get_next_workpackage()
C1DParallelRange(int begin, int end, int block=1)
int begin() const
bool empty() const
C1DParallelRange(const C1DParallelRange &orig)
static int get_max_tasks()
static void set_max_tasks(int mt)
ReduceValue(const Value &i)
const Value & get_reduced() const
const Value & get_identity() const
void reduce(const Value &v, Reduce r)
void release()
TScopedLock & operator=(const TScopedLock< Mutex > &other)=delete
TScopedLock(Mutex &m)
TScopedLock(const TScopedLock< Mutex > &other)=delete
#define NS_MIA_BEGIN
conveniance define to start the mia namespace
Definition: defines.hh:33
#define EXPORT_CORE
Macro to manage Visual C++ style dllimport/dllexport.
Definition: defines.hh:101
#define NS_MIA_END
conveniance define to end the mia namespace
Definition: defines.hh:36
void preduce_callback(Range &range, ReduceValue< Value > &v, Func f, Reduce r)
std::recursive_mutex CRecursiveMutex
void pfor(Range range, const Func &f)
TScopedLock< CRecursiveMutex > CRecursiveScopedLock
std::mutex CMutex
void pfor_callback(Range &range, Func f)
Value preduce(Range range, Value identity, const Func &f, Reduce r)
TScopedLock< CMutex > CScopedLock