source: git/gfanlib/gfanlib_paralleltraverser.cpp @ c987db

spielwiese
Last change on this file since c987db was 15813d, checked in by Hans Schoenemann <hannes@…>, 8 years ago
format
  • Property mode set to 100644
File size: 14.3 KB
Line 
1// Parallel abstract graph traverser. Contributed by Bjarne Knudsen.
2
3#include <iostream>
4#include <vector>
5#include <deque>
6#include <thread>
7#include <condition_variable>
8//#include <chrono>
9#include <iomanip>
10
11#include "gfanlib_paralleltraverser.h"
12
13/*
14  TODO:
15
16  - Consider giving JobTransfers an affinity to a specific thread.
17
18  - Consider if the amount of entries to be copied in the stack for
19    deep searches can be reduced.
20
21  - Consider making a more compact stack for limited in and out edge
22    counts.
23 */
24
25using namespace std;
26
27namespace gfan{
28
29// This struct holds information about one step of a traversal.
30struct TraverseState {
31  // The number of next edges
32  int  next_count;
33
34  // The index of the next edge that was followed
35  int  next_index;
36
37  // The index of the previous edge to get back
38  int  prev_index;
39
40  TraverseState( int  next_count,
41                 int  next_index,
42                 int  prev_index )
43  {
44    this->next_count  =  next_count;
45    this->next_index  =  next_index;
46    this->prev_index  =  prev_index;
47  }
48};
49
50
51void  traverse_simple_recursive( Traverser*  traverser )
52{
53  int  count  =  traverser->getEdgeCountNext();
54
55  traverser->collectInfo();
56
57  for (int i = 0; i < count; i++) {
58    int  prev_index  =  traverser->moveToNext(i, true);
59
60    if (prev_index == 0) {
61      // Only traverse each state once, so do it for the zero'th
62      // parent.
63      traverse_simple_recursive(traverser);
64    }
65
66    traverser->moveToPrev(prev_index);
67  }
68}
69
70
71// This function creates a stack that represents the full job to be
72// done. Info is also collected for the start state.
73vector<TraverseState>*  create_first_job_stack( Traverser*  traverser )
74{
75  vector<TraverseState>*  stack  =  new vector<TraverseState>();
76
77  stack->push_back(TraverseState(traverser->getEdgeCountNext(), -1, -1));
78
79  traverser->collectInfo();
80
81  return stack;
82}
83
84
85void  traverse_simple_stack( Traverser*  traverser )
86{
87  // The stack holds information about what we have done so far
88  vector<TraverseState>*  stack  =  create_first_job_stack(traverser);
89
90  while (!stack->empty()) {
91    stack->back().next_index++;
92
93    TraverseState  state  =  stack->back();
94
95    if (state.next_index == state.next_count || traverser->aborting /* Added by Anders */) {
96      if (state.prev_index != -1) {
97        traverser->moveToPrev(state.prev_index);
98      }
99      stack->pop_back();
100    }
101    else {
102      int  prev_index  =  traverser->moveToNext(state.next_index, true);
103
104      if (prev_index == 0) {
105        traverser->collectInfo();
106        stack->push_back(TraverseState(traverser->getEdgeCountNext(), -1, prev_index));
107      }
108      else {
109        traverser->moveToPrev(prev_index);
110      }
111    }
112  }
113
114  delete stack;
115}
116
117
118void  traverse_simple( Traverser*  traverser )
119{
120  traverse_simple_stack(traverser);
121}
122
123
124// A Job holds a traverser and a stack representing the job. The stack
125// is changed along with the traverser, so it represents the state of
126// the traverser.
127class Job {
128  Traverser*  traverser;
129
130  vector<TraverseState>*  stack;
131
132  // The lowest index in the stack where there is some unfinished
133  // work. -1 if there is no unfinished work.
134  int  first_split;
135
136  // For a given starting point, find the first index of the stack
137  // with unfinished work. -1 if there is no unfinished work.
138  static int  find_first_split( vector<TraverseState>*  stack,
139                                int                     start )
140  {
141    auto  it = stack->begin();
142
143    it += start;
144    while (it != stack->end() && it->next_index == it->next_count - 1) {
145      start++;
146      it++;
147    }
148
149    if (it == stack->end()) {
150      return -1;
151    }
152    else if (it->next_index == -1 && it->next_count == 1) {
153      return -1;
154    }
155
156    return  start;
157  }
158
159
160public:
161        bool aborting;                                                                                                                // Added by Anders
162  // Create a new Job. if first_split is not set (or it is -2), the
163  // first split will be found.
164  Job( vector<TraverseState>*  stack = new vector<TraverseState>(),
165       int                     first_split = -2 )
166  :aborting(false)                                                                                                                // Added by Anders
167  {
168    if (first_split == -2) {
169      first_split = find_first_split(stack, 0);
170    }
171
172    this->stack        =  stack;
173    this->first_split  =  first_split;
174  }
175
176
177  ~Job( void )
178  {
179    delete stack;
180  }
181
182
183  // Set the traverser and move it to the start of this job based on
184  // its last job.
185  void  setTraverser( Traverser*  traverser,
186                      Job*        last_job )
187  {
188    vector<TraverseState>*  last_stack  =  last_job->stack;
189    unsigned int            i;
190
191    this->traverser = traverser;
192
193    // Find the first state where the stacks differ:
194    for (i = 0; i < last_stack->size() && i < stack->size(); i++) {
195      if ((*stack)[i].next_index != (*last_stack)[i].next_index) {
196        break;
197      }
198    }
199
200    if (i > 0) {
201      // TODO: why is this needed?
202      i--;
203    }
204
205    // roll back to the division point
206    while (last_stack->size() > i + 1) {
207      traverser->moveToPrev(last_stack->back().prev_index);
208      last_stack->pop_back();
209    }
210    if (!last_stack->empty()) {
211      last_stack->pop_back();
212    }
213
214    // go forward so the traverser represents the new job
215    for (; i < stack->size() - 1; i++) {
216      traverser->moveToNext((*stack)[i].next_index, false);
217    }
218  }
219
220
221  // This function does some work and returns false when there is no
222  // more work. Otherwise does at least step_count steps and returns
223  // true when there is a subjob available.
224  bool  step( int  step_count )
225  {
226    int  steps  =  0;
227
228    // keep going if there is no subjob available
229    while (steps < step_count || first_split == -1) {
230      if (stack->empty()) {
231        return false;
232      }
233
234      stack->back().next_index++;
235
236      TraverseState  state  =  stack->back();
237
238      if (state.next_index == state.next_count || aborting /* Added by Anders */) {
239        if (state.prev_index != -1) {
240          traverser->moveToPrev(state.prev_index);
241        }
242        stack->pop_back();
243
244        if (first_split == -1) {
245          // There is no more work to do in the previous states
246          return false;
247        }
248      }
249      else {
250        if ((int) stack->size() - 1 == first_split) {
251          if (state.next_index == state.next_count - 1) {
252            first_split = -1;
253          }
254        }
255
256        int  prev_index  =  traverser->moveToNext(state.next_index, true);
257        aborting=traverser->aborting;                                                /* Added by Anders */
258        steps++;
259
260        if (prev_index == 0) {
261          int  count  =  traverser->getEdgeCountNext();
262
263          traverser->collectInfo();
264
265          if (first_split == -1 && count > 1) {
266            first_split = stack->size();
267          }
268
269          stack->push_back(TraverseState(count, -1, prev_index));
270        }
271        else {
272          traverser->moveToPrev(prev_index);
273        }
274      }
275    }
276
277    return  true;
278  }
279
280
281  // Get a new subjob of the current job and adjust the current job so
282  // it does not overlap with the new subjob. first_split may not be
283  // -1 when calling this function.
284  Job*  getSubjob( void )
285  {
286    // We can assume that first_split >= 0.
287    auto  it = stack->begin();
288
289    it += first_split + 1;
290    // this new job will do the rest
291    vector<TraverseState>*  new_stack  =  new vector<TraverseState>(stack->begin(), it);
292    it--;
293
294    // limit the existing job
295    it->next_count = it->next_index + 1;
296
297    Job* new_job =  new Job(new_stack, find_first_split(new_stack, first_split));
298
299    first_split = find_first_split(stack, first_split);
300
301    return new_job;
302  }
303
304
305  void  print( void )
306  {
307    cout << "--cc-nn-pp----------" << endl;
308    for(vector<TraverseState>::const_iterator state=stack->begin();state!=stack->end();state++){
309      //    for (TraverseState state : *stack) {
310      cout << "  " << setw(2) << state->next_count << " " << setw(2) << state->next_index
311           << " " << setw(2) << state->prev_index << endl;
312    }
313    cout << "--------------------" << endl;
314  }
315};
316
317
318// This class is used to safely transfer a job from one thread to another
319class JobTransfer {
320  bool  is_set;
321
322  Job*  job;
323
324  mutex  mtx;
325
326  condition_variable  cond;
327
328public:
329  JobTransfer( void )
330  {
331    is_set  =  false;
332  }
333
334
335  void  setJob( Job*  job )
336  {
337    mtx.lock();
338
339    this->job  =  job;
340    is_set     =  true;
341
342    // notify should be done after unlock according to
343    // http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one
344    // but doing so seems to cause a deadlock with 8 threads using
345    // SubsetTraversers of size 16 on my laptop
346    cond.notify_one();
347    mtx.unlock();
348  }
349
350
351  Job*  getJob( void )
352  {
353    unique_lock<mutex>  lock(mtx);
354
355    while (!is_set) {
356      cond.wait(lock);
357    }
358
359    lock.unlock();
360
361    return job;
362  }
363};
364
365
366class JobCentral;
367
368// A struct for holding all the information needed by a thread.
369struct ThreadContext {
370  JobCentral*  central;
371
372  Traverser*  traverser;
373
374  int  step_count;
375
376  ThreadContext( JobCentral*  central,
377                 Traverser*   traverser,
378                 int          step_count )
379  {
380    this->central     =  central;
381    this->traverser   =  traverser;
382    this->step_count  =  step_count;
383  }
384};
385
386
387void  work( ThreadContext*  context );
388
389
390// This class is used running the whole threaded traversal and for
391// exchanging jobs between threads
392class JobCentral {
393  ThreadContext**  contexts;
394
395  int  context_count;
396
397  int  step_count;
398
399  Job*  first_job;
400
401  mutex  mtx;
402
403  // This queue is used for transferring jobs between threads. When a
404  // thread requests a job, an empty transfer is put in this queue. It
405  // will then be picked up by a thread that has a subjob
406  // available. The subjob is given to the JobTransfer and is then
407  // received by the thread needing it.
408  deque<JobTransfer*>*  transfers;
409
410public:
411        bool aborting;                                                                        // Added by Anders
412  // step_count is the number of algorithm steps taken between
413  // possible job transfers. This value should be high (e.g. 100) if
414  // the traverser is very fast. If the traverser is slow, step_count
415  // should be one.
416  JobCentral( Traverser**  traversers,
417              int          count,
418              int          step_count )
419  :aborting(false)                                                                        // Added by Anders
420  {
421    context_count  =  count;
422    contexts       =  new ThreadContext*[count];
423
424    for (int i = 0; i < count; i++) {
425      contexts[i] = new ThreadContext(this, traversers[i], step_count);
426    }
427
428    transfers = new deque<JobTransfer*>();
429  }
430
431
432  ~JobCentral( void )
433  {
434    delete transfers;
435    for (int i = 0; i < context_count; i++) {
436      delete contexts[i];
437    }
438    delete[] contexts;
439  }
440
441
442  // Run a job in multiple threads. This function is usually just
443  // called once with a job representing everything to be done.
444  void  runJob( Job*  job )
445  {
446    thread**  thr  = new thread*[context_count];
447
448    this->first_job  =  job;
449
450    for (int i = 0; i < context_count; i++) {
451      thr[i] = new thread(work, contexts[i]);
452    }
453
454    for (int i = 0; i < context_count; i++) {
455      thr[i]->join();
456      delete thr[i];
457    }
458
459    delete[] thr;
460  }
461
462
463  bool  hasTransfer( void )
464  {
465        return !transfers->empty();
466  }
467
468
469  // If no threads are requesting jobs, the return value will be
470  // NULL. Otherwise a JobTransfer will be returned ready for
471  // receiving a new job.
472  JobTransfer*  getTransfer( void )
473  {
474    JobTransfer*  transfer  =  NULL;
475
476    mtx.lock();
477
478    if (!transfers->empty()) {
479      transfer = transfers->back();
480      transfers->pop_back();
481    }
482
483    mtx.unlock();
484
485    return  transfer;
486  }
487
488
489  // Requst a job from another thread. The return value is NULL if all
490  // jobs are done.
491  Job*  getJob( void )
492  {
493    Job*  job  =  NULL;
494
495    mtx.lock();
496
497    if (first_job != NULL) {
498      job        =  first_job;
499      first_job  =  NULL;
500    }
501    else if ((int) transfers->size() < context_count - 1) {
502      JobTransfer*  transfer =  new JobTransfer();
503
504      transfers->push_front(transfer);
505
506      mtx.unlock();
507
508      job =  transfer->getJob();
509
510      delete transfer;
511
512      // return now because the mutex is already unlocked
513      return job;
514    }
515    else {
516      // We are fully done
517
518      for(deque<JobTransfer*>::const_iterator tr=transfers->begin();tr!=transfers->end();tr++){
519        //      for (JobTransfer*  tr : *transfers) {
520        (*tr)->setJob(NULL);
521      }
522    }
523
524    mtx.unlock();
525
526    return job;
527  }
528};
529
530
531// Do the actual work
532void  work( ThreadContext*  context )
533{
534  Traverser*   traverser   =  context->traverser;
535  JobCentral*  central     =  context->central;
536  Job*         job;
537  Job*         last_job   =  new Job();
538
539  while ((job = central->getJob()) != NULL) {
540    job->setTraverser(traverser, last_job);
541
542    int  step_count  =  central->hasTransfer() ? 1 : context->step_count;
543
544    if(central->aborting)job->aborting=true;                        // Added by Anders
545
546    while (job->step(step_count)) {
547      if(job->aborting)central->aborting=true;                        // Added by Anders
548      JobTransfer*  transfer  =  central->getTransfer();
549
550      if (transfer != NULL) {
551        transfer->setJob(job->getSubjob());
552      }
553
554      step_count  =  central->hasTransfer() ? 1 : context->step_count;
555    }
556
557    delete last_job;
558    last_job = job;
559  }
560
561  delete last_job;
562}
563
564
565// Do the actual work
566void  work2( ThreadContext*  context )
567{
568  Traverser*   traverser   =  context->traverser;
569  JobCentral*  central     =  context->central;
570  int          step_count  =  context->step_count;
571  Job*         job;
572  Job*         last_job   =  new Job();
573
574  while ((job = central->getJob()) != NULL) {
575    job->setTraverser(traverser, last_job);
576
577    while (job->step(step_count)) {
578      JobTransfer*  transfer  =  central->getTransfer();
579
580      if (transfer != NULL) {
581        transfer->setJob(job->getSubjob());
582      }
583    }
584
585    delete last_job;
586    last_job = job;
587  }
588
589  delete last_job;
590}
591
592
593void  traverse_threaded( Traverser**  traversers,
594                         int          count,
595                         int          step_count )
596{
597  JobCentral*  central  =  new JobCentral(traversers, count, step_count);
598
599  central->runJob(new Job(create_first_job_stack(traversers[0])));
600
601  delete central;
602}
603}
Note: See TracBrowser for help on using the repository browser.