souffle  2.0.2-371-g6315b36
ParallelUtil.h
Go to the documentation of this file.
1 /*
2  * Souffle - A Datalog Compiler
3  * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved
4  * Licensed under the Universal Permissive License v 1.0 as shown at:
5  * - https://opensource.org/licenses/UPL
6  * - <souffle root>/licenses/SOUFFLE-UPL.txt
7  */
8 
9 /************************************************************************
10  *
11  * @file ParallelUtil.h
12  *
13  * A set of utilities abstracting from the underlying parallel library.
14  * Currently supported APIs: OpenMP and Cilk
15  *
16  ***********************************************************************/
17 
18 #pragma once
19 
20 #include <atomic>
21 
22 #ifdef _OPENMP
23 
24 /**
25  * Implementation of parallel control flow constructs utilizing OpenMP
26  */
27 
28 #include <omp.h>
29 
30 #ifdef __APPLE__
31 #define pthread_yield pthread_yield_np
32 #endif
33 
34 // support for a parallel region
35 #define PARALLEL_START _Pragma("omp parallel") {
36 #define PARALLEL_END }
37 
38 // support for parallel loops
39 #define pfor _Pragma("omp for schedule(dynamic)") for
40 
41 // spawn and sync are processed sequentially (overhead to expensive)
42 #define task_spawn
43 #define task_sync
44 
45 // section start / end => corresponding OpenMP pragmas
46 // NOTE: disabled since it causes performance losses
47 //#define SECTIONS_START _Pragma("omp parallel sections") {
48 // NOTE: we stick to flat-level parallelism since it is faster due to thread pooling
49 #define SECTIONS_START {
50 #define SECTIONS_END }
51 
52 // the markers for a single section
53 //#define SECTION_START _Pragma("omp section") {
54 #define SECTION_START {
55 #define SECTION_END }
56 
57 // a macro to create an operation context
58 #define CREATE_OP_CONTEXT(NAME, INIT) [[maybe_unused]] auto NAME = INIT;
59 #define READ_OP_CONTEXT(NAME) NAME
60 
61 #else
62 
63 // support for a parallel region => sequential execution
64 #define PARALLEL_START {
65 #define PARALLEL_END }
66 
67 // support for parallel loops => simple sequential loop
68 #define pfor for
69 
70 // spawn and sync not supported
71 #define task_spawn
72 #define task_sync
73 
74 // sections are processed sequentially
75 #define SECTIONS_START {
76 #define SECTIONS_END }
77 
78 // sections are inlined
79 #define SECTION_START {
80 #define SECTION_END }
81 
82 // a macro to create an operation context
83 #define CREATE_OP_CONTEXT(NAME, INIT) [[maybe_unused]] auto NAME = INIT;
84 #define READ_OP_CONTEXT(NAME) NAME
85 
86 // mark es sequential
87 #define IS_SEQUENTIAL
88 
89 #endif
90 
91 #ifndef IS_SEQUENTIAL
92 #define IS_PARALLEL
93 #endif
94 
95 #ifdef IS_PARALLEL
96 #define MAX_THREADS (omp_get_max_threads())
97 #else
98 #define MAX_THREADS (1)
99 #endif
100 
101 #ifdef IS_PARALLEL
102 
103 #include <mutex>
104 
105 namespace souffle {
106 
107 /**
108  * A small utility class for implementing simple locks.
109  */
110 class Lock {
111  // the underlying mutex
112  std::mutex mux;
113 
114 public:
115  struct Lease {
116  Lease(std::mutex& mux) : mux(&mux) {
117  mux.lock();
118  }
119  Lease(Lease&& other) : mux(other.mux) {
120  other.mux = nullptr;
121  }
122  Lease(const Lease& other) = delete;
123  ~Lease() {
124  if (mux != nullptr) {
125  mux->unlock();
126  }
127  }
128 
129  protected:
130  std::mutex* mux;
131  };
132 
133  // acquired the lock for the live-cycle of the returned guard
134  Lease acquire() {
135  return Lease(mux);
136  }
137 
138  void lock() {
139  mux.lock();
140  }
141 
142  bool try_lock() {
143  return mux.try_lock();
144  }
145 
146  void unlock() {
147  mux.unlock();
148  }
149 };
150 
151 // /* valuable source: http://locklessinc.com/articles/locks/ */
152 
153 namespace detail {
154 
155 /* Pause instruction to prevent excess processor bus usage */
156 #ifdef __x86_64__
157 #define cpu_relax() asm volatile("pause\n" : : : "memory")
158 #else
159 #define cpu_relax() asm volatile("" : : : "memory")
160 #endif
161 
162 /**
163  * A utility class managing waiting operations for spin locks.
164  */
165 class Waiter {
166  int i = 0;
167 
168 public:
169  Waiter() = default;
170 
171  /**
172  * Conducts a wait operation.
173  */
174  void operator()() {
175  ++i;
176  if ((i % 1000) == 0) {
177  // there was no progress => let others work
178  pthread_yield();
179  } else {
180  // relax this CPU
181  cpu_relax();
182  }
183  }
184 };
185 } // namespace detail
186 
187 /* compare: http://en.cppreference.com/w/cpp/atomic/atomic_flag */
188 class SpinLock {
189  std::atomic<int> lck{0};
190 
191 public:
192  SpinLock() = default;
193 
194  void lock() {
195  detail::Waiter wait;
196  while (!try_lock()) {
197  wait();
198  }
199  }
200 
201  bool try_lock() {
202  int should = 0;
203  return lck.compare_exchange_weak(should, 1, std::memory_order_acquire);
204  }
205 
206  void unlock() {
207  lck.store(0, std::memory_order_release);
208  }
209 };
210 
211 /**
212  * A read/write lock for increased access performance on a
213  * read-heavy use case.
214  */
215 class ReadWriteLock {
216  /**
217  * Based on paper:
218  * Scalable Reader-Writer Synchronization
219  * for Shared-Memory Multiprocessors
220  *
221  * Layout of the lock:
222  * 31 ... 2 1 0
223  * +-------------------------+--------------------+--------------------+
224  * | interested reader count | waiting writer | active writer flag |
225  * +-------------------------+--------------------+--------------------+
226  */
227 
228  std::atomic<int> lck{0};
229 
230 public:
231  ReadWriteLock() = default;
232 
233  void start_read() {
234  // add reader
235  auto r = lck.fetch_add(4, std::memory_order_acquire);
236 
237  // wait until there is no writer any more
238  detail::Waiter wait;
239  while (r & 0x3) {
240  // release reader
241  end_read();
242 
243  // wait a bit
244  wait();
245 
246  // apply as a reader again
247  r = lck.fetch_add(4, std::memory_order_acquire);
248 
249  } // while there is a writer => spin
250  }
251 
252  void end_read() {
253  lck.fetch_sub(4, std::memory_order_release);
254  }
255 
256  void start_write() {
257  detail::Waiter wait;
258 
259  // set wait-for-write bit
260  auto stat = lck.fetch_or(2, std::memory_order_acquire);
261  while (stat & 0x2) {
262  wait();
263  stat = lck.fetch_or(2, std::memory_order_acquire);
264  }
265 
266  // the caller may starve here ...
267  int should = 2;
268  while (!lck.compare_exchange_strong(
269  should, 1, std::memory_order_acquire, std::memory_order_relaxed)) {
270  wait();
271  should = 2;
272  }
273  }
274 
275  bool try_write() {
276  int should = 0;
277  return lck.compare_exchange_strong(should, 1, std::memory_order_acquire, std::memory_order_relaxed);
278  }
279 
280  void end_write() {
281  lck.fetch_sub(1, std::memory_order_release);
282  }
283 
284  bool try_upgrade_to_write() {
285  int should = 4;
286  return lck.compare_exchange_strong(should, 1, std::memory_order_acquire, std::memory_order_relaxed);
287  }
288 
289  void downgrade_to_read() {
290  // delete write bit + set num readers to 1
291  lck.fetch_add(3, std::memory_order_release);
292  }
293 };
294 
295 /**
296  * An implementation of an optimistic r/w lock.
297  */
298 class OptimisticReadWriteLock {
299  /**
300  * The version number utilized for the synchronization.
301  *
302  * Usage:
303  * - even version numbers are stable versions, not being updated
304  * - odd version numbers are temporary versions, currently being updated
305  */
306  std::atomic<int> version{0};
307 
308 public:
309  /**
310  * The lease utilized to link start and end of read phases.
311  */
312  class Lease {
313  friend class OptimisticReadWriteLock;
314  int version;
315 
316  public:
317  Lease(int version = 0) : version(version) {}
318  Lease(const Lease& lease) = default;
319  Lease& operator=(const Lease& other) = default;
320  Lease& operator=(Lease&& other) = default;
321  };
322 
323  /**
324  * A default constructor initializing the lock.
325  */
326  OptimisticReadWriteLock() = default;
327 
328  /**
329  * Starts a read phase, making sure that there is currently no
330  * active concurrent modification going on. The resulting lease
331  * enables the invoking process to later-on verify that no
332  * concurrent modifications took place.
333  */
334  Lease start_read() {
335  detail::Waiter wait;
336 
337  // get a snapshot of the lease version
338  auto v = version.load(std::memory_order_acquire);
339 
340  // spin while there is a write in progress
341  while ((v & 0x1) == 1) {
342  // wait for a moment
343  wait();
344  // get an updated version
345  v = version.load(std::memory_order_acquire);
346  }
347 
348  // done
349  return Lease(v);
350  }
351 
352  /**
353  * Tests whether there have been concurrent modifications since
354  * the given lease has been issued.
355  *
356  * @return true if no updates have been conducted, false otherwise
357  */
358  bool validate(const Lease& lease) {
359  // check whether version number has changed in the mean-while
360  std::atomic_thread_fence(std::memory_order_acquire);
361  return lease.version == version.load(std::memory_order_relaxed);
362  }
363 
364  /**
365  * Ends a read phase by validating the given lease.
366  *
367  * @return true if no updates have been conducted since the
368  * issuing of the lease, false otherwise
369  */
370  bool end_read(const Lease& lease) {
371  // check lease in the end
372  return validate(lease);
373  }
374 
375  /**
376  * Starts a write phase on this lock be ensuring exclusive access
377  * and invalidating any existing read lease.
378  */
379  void start_write() {
380  detail::Waiter wait;
381 
382  // set last bit => make it odd
383  auto v = version.fetch_or(0x1, std::memory_order_acquire);
384 
385  // check for concurrent writes
386  while ((v & 0x1) == 1) {
387  // wait for a moment
388  wait();
389  // get an updated version
390  v = version.fetch_or(0x1, std::memory_order_acquire);
391  }
392 
393  // done
394  }
395 
396  /**
397  * Tries to start a write phase unless there is a currently ongoing
398  * write operation. In this case no write permission will be obtained.
399  *
400  * @return true if write permission has been granted, false otherwise.
401  */
402  bool try_start_write() {
403  auto v = version.fetch_or(0x1, std::memory_order_acquire);
404  return !(v & 0x1);
405  }
406 
407  /**
408  * Updates a read-lease to a write permission by a) validating that the
409  * given lease is still valid and b) making sure that there is no currently
410  * ongoing write operation.
411  *
412  * @return true if the lease was still valid and write permissions could
413  * be granted, false otherwise.
414  */
415  bool try_upgrade_to_write(const Lease& lease) {
416  auto v = version.fetch_or(0x1, std::memory_order_acquire);
417 
418  // check whether write privileges have been gained
419  if (v & 0x1) return false; // there is another writer already
420 
421  // check whether there was no write since the gain of the read lock
422  if (lease.version == v) return true;
423 
424  // if there was, undo write update
425  abort_write();
426 
427  // operation failed
428  return false;
429  }
430 
431  /**
432  * Aborts a write operation by reverting to the version number before
433  * starting the ongoing write, thereby re-validating existing leases.
434  */
435  void abort_write() {
436  // reset version number
437  version.fetch_sub(1, std::memory_order_release);
438  }
439 
440  /**
441  * Ends a write operation by giving up the associated exclusive access
442  * to the protected data and abandoning the provided write permission.
443  */
444  void end_write() {
445  // update version number another time
446  version.fetch_add(1, std::memory_order_release);
447  }
448 
449  /**
450  * Tests whether currently write permissions have been granted to any
451  * client by this lock.
452  *
453  * @return true if so, false otherwise
454  */
455  bool is_write_locked() const {
456  return version & 0x1;
457  }
458 };
459 
460 #else
461 
462 namespace souffle {
463 
464 /**
465  * A small utility class for implementing simple locks.
466  */
467 struct Lock {
468  class Lease {};
469 
470  // no locking if there is no parallel execution
472  return Lease();
473  }
474 
475  void lock() {}
476 
477  bool try_lock() {
478  return true;
479  }
480 
481  void unlock() {}
482 };
483 
484 /**
485  * A 'sequential' non-locking implementation for a spin lock.
486  */
487 class SpinLock {
488 public:
489  SpinLock() = default;
490 
491  void lock() {}
492 
493  bool try_lock() {
494  return true;
495  }
496 
497  void unlock() {}
498 };
499 
501 public:
502  ReadWriteLock() = default;
503 
504  void start_read() {}
505 
506  void end_read() {}
507 
508  void start_write() {}
509 
510  bool try_write() {
511  return true;
512  }
513 
514  void end_write() {}
515 
517  return true;
518  }
519 
521 };
522 
523 /**
524  * A 'sequential' non-locking implementation for an optimistic r/w lock.
525  */
527 public:
528  class Lease {};
529 
530  OptimisticReadWriteLock() = default;
531 
533  return Lease();
534  }
535 
536  bool validate(const Lease& /*lease*/) {
537  return true;
538  }
539 
540  bool end_read(const Lease& /*lease*/) {
541  return true;
542  }
543 
544  void start_write() {}
545 
547  return true;
548  }
549 
550  bool try_upgrade_to_write(const Lease& /*lease*/) {
551  return true;
552  }
553 
554  void abort_write() {}
555 
556  void end_write() {}
557 
558  bool is_write_locked() const {
559  return true;
560  }
561 };
562 
563 #endif
564 
565 /**
566  * Obtains a reference to the lock synchronizing output operations.
567  */
568 inline Lock& getOutputLock() {
569  static Lock outputLock;
570  return outputLock;
571 }
572 
573 } // end of namespace souffle
souffle::Lock::unlock
void unlock()
Definition: ParallelUtil.h:481
souffle::ReadWriteLock::end_write
void end_write()
Definition: ParallelUtil.h:514
souffle::Lock::lock
void lock()
Definition: ParallelUtil.h:475
souffle::ReadWriteLock::try_upgrade_to_write
bool try_upgrade_to_write()
Definition: ParallelUtil.h:516
souffle::OptimisticReadWriteLock::start_read
Lease start_read()
Definition: ParallelUtil.h:532
souffle::OptimisticReadWriteLock::is_write_locked
bool is_write_locked() const
Definition: ParallelUtil.h:558
souffle::Lock
A small utility class for implementing simple locks.
Definition: ParallelUtil.h:467
souffle::OptimisticReadWriteLock::validate
bool validate(const Lease &)
Definition: ParallelUtil.h:536
souffle::ReadWriteLock::start_write
void start_write()
Definition: ParallelUtil.h:508
souffle::SpinLock::try_lock
bool try_lock()
Definition: ParallelUtil.h:493
souffle::ReadWriteLock::start_read
void start_read()
Definition: ParallelUtil.h:504
souffle::SpinLock::SpinLock
SpinLock()=default
souffle::OptimisticReadWriteLock::abort_write
void abort_write()
Definition: ParallelUtil.h:554
souffle::Lock::acquire
Lease acquire()
Definition: ParallelUtil.h:471
souffle::OptimisticReadWriteLock::try_upgrade_to_write
bool try_upgrade_to_write(const Lease &)
Definition: ParallelUtil.h:550
souffle::ReadWriteLock
Definition: ParallelUtil.h:500
i
size_t i
Definition: json11.h:663
souffle::OptimisticReadWriteLock
A 'sequential' non-locking implementation for an optimistic r/w lock.
Definition: ParallelUtil.h:526
souffle::Lock::Lease
Definition: ParallelUtil.h:468
souffle::ReadWriteLock::downgrade_to_read
void downgrade_to_read()
Definition: ParallelUtil.h:520
souffle::ReadWriteLock::ReadWriteLock
ReadWriteLock()=default
souffle::SpinLock
A 'sequential' non-locking implementation for a spin lock.
Definition: ParallelUtil.h:487
souffle::OptimisticReadWriteLock::start_write
void start_write()
Definition: ParallelUtil.h:544
souffle::OptimisticReadWriteLock::try_start_write
bool try_start_write()
Definition: ParallelUtil.h:546
souffle::OptimisticReadWriteLock::end_write
void end_write()
Definition: ParallelUtil.h:556
souffle::SpinLock::lock
void lock()
Definition: ParallelUtil.h:491
souffle::OptimisticReadWriteLock::Lease
Definition: ParallelUtil.h:528
souffle::OptimisticReadWriteLock::end_read
bool end_read(const Lease &)
Definition: ParallelUtil.h:540
souffle::OptimisticReadWriteLock::OptimisticReadWriteLock
OptimisticReadWriteLock()=default
souffle::getOutputLock
Lock & getOutputLock()
Obtains a reference to the lock synchronizing output operations.
Definition: ParallelUtil.h:568
souffle
Definition: AggregateOp.h:25
souffle::ReadWriteLock::end_read
void end_read()
Definition: ParallelUtil.h:506
souffle::ReadWriteLock::try_write
bool try_write()
Definition: ParallelUtil.h:510
souffle::SpinLock::unlock
void unlock()
Definition: ParallelUtil.h:497
souffle::Lock::try_lock
bool try_lock()
Definition: ParallelUtil.h:477