source: git/Singular/dyn_modules/systhreads/shared.cc @ 5af0c3

fieker-DuValspielwiese
Last change on this file since 5af0c3 was 5af0c3, checked in by Hans Schoenemann <hannes@…>, 5 years ago
fix: static build for systhreads.so
  • Property mode set to 100644
File size: 82.3 KB
Line 
1#include "threadconf.h"
2#ifdef ENABLE_THREADS
3#include <factory/prelude.h>
4#else
5#define SIMPLE_THREAD_VAR
6#endif
7#include <iostream>
8#include "kernel/mod2.h"
9#include "Singular/ipid.h"
10#include "Singular/ipshell.h"
11#include "Singular/links/silink.h"
12#include "Singular/lists.h"
13#include "Singular/blackbox.h"
14#include "Singular/feOpt.h"
15#include "Singular/libsingular.h"
16#include <cstring>
17#include <string>
18#include <errno.h>
19#include <stdio.h>
20#include <vector>
21#include <map>
22#include <iterator>
23#include <queue>
24#include <assert.h>
25#include "thread.h"
26#include "lintree.h"
27
28#include "singthreads.h"
29
30using namespace std;
31
32#ifdef ENABLE_THREADS
33extern char *global_argv0;
34#endif
35
36namespace LibThread {
37
38#ifdef ENABLE_THREADS
39const int have_threads = 1;
40#else
41const int have_threads = 0;
42#endif
43
44class Command {
45private:
46  const char *name;
47  const char *error;
48  leftv result;
49  leftv *args;
50  int argc;
51public:
52  Command(const char *n, leftv r, leftv a)
53  {
54    name = n;
55    result = r;
56    error = NULL;
57    argc = 0;
58    for (leftv t = a; t != NULL; t = t->next) {
59      argc++;
60    }
61    args = (leftv *) omAlloc0(sizeof(leftv) * argc);
62    int i = 0;
63    for (leftv t = a; t != NULL; t = t->next) {
64      args[i++] = t;
65    }
66    result->rtyp = NONE;
67    result->data = NULL;
68  }
69  ~Command() {
70    omFree(args);
71  }
72  void check_argc(int n) {
73    if (error) return;
74    if (argc != n) error = "wrong number of arguments";
75  }
76  void check_argc(int lo, int hi) {
77    if (error) return;
78    if (argc < lo || argc > hi) error = "wrong number of arguments";
79  }
80  void check_argc_min(int n) {
81    if (error) return;
82    if (argc < n) error = "wrong number of arguments";
83  }
84  void check_arg(int i, int type, const char *err) {
85    if (error) return;
86    if (args[i]->Typ() != type) error = err;
87  }
88  void check_init(int i, const char *err) {
89    if (error) return;
90    leftv arg = args[i];
91    if (arg->Data() == NULL || *(void **)(arg->Data()) == NULL)
92      error = err;
93  }
94  void check_arg(int i, int type, int type2, const char *err) {
95    if (error) return;
96    if (args[i]->Typ() != type && args[i]->Typ() != type2) error = err;
97  }
98  int argtype(int i) {
99    return args[i]->Typ();
100  }
101  int nargs() {
102    return argc;
103  }
104  void *arg(int i) {
105    return args[i]->Data();
106  }
107  template <typename T>
108  T *shared_arg(int i) {
109    return *(T **)(arg(i));
110  }
111  long int_arg(int i) {
112    return (long)(args[i]->Data());
113  }
114  void report(const char *err) {
115    error = err;
116  }
117  // intentionally not bool, so we can also do
118  // q = p + test_arg(p, type);
119  int test_arg(int i, int type) {
120    if (i >= argc) return 0;
121    return args[i]->Typ() == type;
122  }
123  void set_result(long n) {
124    result->rtyp = INT_CMD;
125    result->data = (char *)n;
126  }
127  void set_result(const char *s) {
128    result->rtyp = STRING_CMD;
129    result->data = omStrDup(s);
130  }
131  void set_result(int type, void *p) {
132    result->rtyp = type;
133    result->data = (char *) p;
134  }
135  void set_result(int type, long n) {
136    result->rtyp = type;
137    result->data = (char *) n;
138  }
139  void no_result() {
140    result->rtyp = NONE;
141  }
142  bool ok() {
143    return error == NULL;
144  }
145  BOOLEAN status() {
146    if (error) {
147      Werror("%s: %s", name, error);
148    }
149    return error != NULL;
150  }
151  BOOLEAN abort(const char *err) {
152    report(err);
153    return status();
154  }
155};
156
157class SharedObject {
158private:
159  Lock lock;
160  long refcount;
161  int type;
162  std::string name;
163public:
164  SharedObject(): lock(), refcount(0) { }
165  virtual ~SharedObject() { }
166  void set_type(int type_init) { type = type_init; }
167  int get_type() { return type; }
168  void set_name(std::string &name_init) { name = name_init; }
169  void set_name(const char *s) {
170    name = std::string(s);
171  }
172  std::string &get_name() { return name; }
173  void incref(int by = 1) {
174    lock.lock();
175    refcount += 1;
176    lock.unlock();
177  }
178  long decref() {
179    int result;
180    lock.lock();
181    result = --refcount;
182    lock.unlock();
183    return result;
184  }
185  long getref() {
186    return refcount;
187  }
188  virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2) {
189    return TRUE;
190  }
191  virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
192    return TRUE;
193  }
194};
195
196void acquireShared(SharedObject *obj) {
197  obj->incref();
198}
199
200void releaseShared(SharedObject *obj) {
201  if (obj->decref() == 0) {
202    // delete obj;
203  }
204}
205
206typedef std::map<std::string, SharedObject *> SharedObjectTable;
207
208class Region : public SharedObject {
209private:
210  Lock region_lock;
211public:
212  SharedObjectTable objects;
213  Region() : SharedObject(), region_lock(), objects() { }
214  virtual ~Region() { }
215  Lock *get_lock() { return &region_lock; }
216  void lock() {
217    if (!region_lock.is_locked())
218      region_lock.lock();
219  }
220  void unlock() {
221    if (region_lock.is_locked())
222      region_lock.unlock();
223  }
224  int is_locked() {
225    return region_lock.is_locked();
226  }
227};
228
229Lock global_objects_lock;
230SharedObjectTable global_objects;
231Lock master_lock(true);
232Lock name_lock(true);
233SIMPLE_THREAD_VAR long thread_id;
234long thread_counter;
235
236int type_region;
237int type_regionlock;
238int type_channel;
239int type_syncvar;
240int type_atomic_table;
241int type_shared_table;
242int type_atomic_list;
243int type_shared_list;
244int type_thread;
245int type_threadpool;
246int type_job;
247int type_trigger;
248
249typedef SharedObject *SharedObjectPtr;
250typedef SharedObjectPtr (*SharedConstructor)();
251
252SharedObject *makeSharedObject(SharedObjectTable &table,
253  Lock *lock, int type, string &name, SharedConstructor scons)
254{
255  int was_locked = lock->is_locked();
256  SharedObject *result = NULL;
257  if (!was_locked)
258    lock->lock();
259  if (table.count(name)) {
260    result = table[name];
261    if (result->get_type() != type)
262      result = NULL;
263  } else {
264    result = scons();
265    result->set_type(type);
266    result->set_name(name);
267    table.insert(pair<string,SharedObject *>(name, result));
268  }
269  if (!was_locked)
270    lock->unlock();
271  return result;
272}
273
274SharedObject *findSharedObject(SharedObjectTable &table,
275  Lock *lock, string &name)
276{
277  int was_locked = lock->is_locked();
278  SharedObject *result = NULL;
279  if (!was_locked)
280    lock->lock();
281  if (table.count(name)) {
282    result = table[name];
283  }
284  if (!was_locked)
285    lock->unlock();
286  return result;
287}
288
289class Transactional: public SharedObject {
290private:
291  Region *region;
292  Lock *lock;
293protected:
294  int tx_begin() {
295    if (!region)
296      lock->lock();
297    else {
298      if (!lock->is_locked()) {
299        return 0;
300      }
301    }
302    return 1;
303  }
304  void tx_end() {
305    if (!region)
306      lock->unlock();
307  }
308public:
309  Transactional() :
310      SharedObject(), region(NULL), lock(NULL) {
311  }
312  void set_region(Region *region_init) {
313    region = region_init;
314    if (region_init) {
315      lock = region_init->get_lock();
316    } else {
317      lock = new Lock();
318    }
319  }
320  virtual ~Transactional() { if (!region && lock) delete lock; }
321};
322
323class TxTable: public Transactional {
324private:
325  std::map<string, string> entries;
326public:
327  TxTable() : Transactional(), entries() { }
328  virtual ~TxTable() { }
329  int put(string &key, string &value) {
330    int result = 0;
331    if (!tx_begin()) return -1;
332    if (entries.count(key)) {
333      entries[key] = value;
334    } else {
335      entries.insert(pair<string, string>(key, value));
336      result = 1;
337    }
338    tx_end();
339    return result;
340  }
341  int get(string &key, string &value) {
342    int result = 0;
343    if (!tx_begin()) return -1;
344    if (entries.count(key)) {
345      value = entries[key];
346      result = 1;
347    }
348    tx_end();
349    return result;
350  }
351  int check(string &key) {
352    int result;
353    if (!tx_begin()) return -1;
354    result = entries.count(key);
355    tx_end();
356    return result;
357  }
358};
359
360class TxList: public Transactional {
361private:
362  vector<string> entries;
363public:
364  TxList() : Transactional(), entries() { }
365  virtual ~TxList() { }
366  int put(size_t index, string &value) {
367    int result = -1;
368    if (!tx_begin()) return -1;
369    if (index >= 1 && index <= entries.size()) {
370      entries[index-1] = value;
371      result = 1;
372    } else {
373      entries.resize(index+1);
374      entries[index-1] = value;
375      result = 0;
376    }
377    tx_end();
378    return result;
379  }
380  int get(size_t index, string &value) {
381    int result = 0;
382    if (!tx_begin()) return -1;
383    if (index >= 1 && index <= entries.size()) {
384      result = (entries[index-1].size() != 0);
385      if (result)
386        value = entries[index-1];
387    }
388    tx_end();
389    return result;
390  }
391  long size() {
392    long result;
393    if (!tx_begin()) return -1;
394    result = (long) entries.size();
395    tx_end();
396    return result;
397  }
398};
399
400class SingularChannel : public SharedObject {
401private:
402  queue<string> q;
403  Lock lock;
404  ConditionVariable cond;
405public:
406  SingularChannel(): SharedObject(), lock(), cond(&lock) { }
407  virtual ~SingularChannel() { }
408  void send(string item) {
409    lock.lock();
410    q.push(item);
411    cond.signal();
412    lock.unlock();
413  }
414  string receive() {
415    lock.lock();
416    while (q.empty()) {
417      cond.wait();
418    }
419    string result = q.front();
420    q.pop();
421    if (!q.empty())
422      cond.signal();
423    lock.unlock();
424    return result;
425  }
426  long count() {
427    lock.lock();
428    long result = q.size();
429    lock.unlock();
430    return result;
431  }
432};
433
434class SingularSyncVar : public SharedObject {
435private:
436  string value;
437  int init;
438  Lock lock;
439  ConditionVariable cond;
440public:
441  SingularSyncVar(): SharedObject(), init(0), lock(), cond(&lock) { }
442  virtual ~SingularSyncVar() { }
443  void acquire() {
444    lock.lock();
445  }
446  void release() {
447    lock.unlock();
448  }
449  void wait_init() {
450    while (!init)
451      cond.wait();
452  }
453  leftv get() {
454    if (value.size() == 0) return NULL;
455    return LinTree::from_string(value);
456  }
457  void update(leftv val) {
458    value = LinTree::to_string(val);
459    init = 1;
460    cond.broadcast();
461  }
462  int write(string item) {
463    int result = 0;
464    lock.lock();
465    if (!init) {
466      value = item;
467      init = 1;
468      cond.broadcast();
469      result = 1;
470    }
471    lock.unlock();
472    return result;
473  }
474  string read() {
475    lock.lock();
476    while (!init)
477      cond.wait();
478    string result = value;
479    lock.unlock();
480    return result;
481  }
482  int check() {
483    lock.lock();
484    int result = init;
485    lock.unlock();
486    return result;
487  }
488};
489
490void *shared_init(blackbox *b) {
491  return omAlloc0(sizeof(SharedObject *));
492}
493
494void *new_shared(SharedObject *obj) {
495  acquireShared(obj);
496  void *result = omAlloc0(sizeof(SharedObject *));
497  *(SharedObject **)result = obj;
498  return result;
499}
500
501void shared_destroy(blackbox *b, void *d) {
502  SharedObject *obj = *(SharedObject **)d;
503  if (obj) {
504    releaseShared(*(SharedObject **)d);
505    *(SharedObject **)d = NULL;
506  }
507}
508
509void rlock_destroy(blackbox *b, void *d) {
510  SharedObject *obj = *(SharedObject **)d;
511  ((Region *) obj)->unlock();
512  if (obj) {
513    releaseShared(*(SharedObject **)d);
514    *(SharedObject **)d = NULL;
515  }
516}
517
518void *shared_copy(blackbox *b, void *d) {
519  SharedObject *obj = *(SharedObject **)d;
520  void *result = shared_init(b);
521  *(SharedObject **)result = obj;
522  if (obj)
523    acquireShared(obj);
524  return result;
525}
526
527BOOLEAN shared_assign(leftv l, leftv r) {
528  if (r->Typ() == l->Typ()) {
529    if (l->rtyp == IDHDL) {
530      omFree(IDDATA((idhdl)l->data));
531      IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
532    } else {
533      leftv ll=l->LData();
534      if (ll==NULL)
535      {
536        return TRUE; // out of array bounds or similiar
537      }
538      if (ll->data) {
539        shared_destroy(NULL, ll->data);
540        omFree(ll->data);
541      }
542      ll->data = shared_copy(NULL,r->Data());
543    }
544  } else {
545    Werror("assign %s(%d) = %s(%d)",
546        Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
547    return TRUE;
548  }
549  return FALSE;
550}
551
552BOOLEAN rlock_assign(leftv l, leftv r) {
553  if (r->Typ() == l->Typ()) {
554    if (l->rtyp == IDHDL) {
555      omFree(IDDATA((idhdl)l->data));
556      IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
557    } else {
558      leftv ll=l->LData();
559      if (ll==NULL)
560      {
561        return TRUE; // out of array bounds or similiar
562      }
563      rlock_destroy(NULL, ll->data);
564      omFree(ll->data);
565      ll->data = shared_copy(NULL,r->Data());
566    }
567  } else {
568    Werror("assign %s(%d) = %s(%d)",
569        Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
570    return TRUE;
571  }
572  return FALSE;
573}
574
575
576BOOLEAN shared_check_assign(blackbox *b, leftv l, leftv r) {
577  int lt = l->Typ();
578  int rt = r->Typ();
579  if (lt != DEF_CMD && lt != rt) {
580    const char *rn=Tok2Cmdname(rt);
581    const char *ln=Tok2Cmdname(lt);
582    Werror("cannot assign %s (%d) to %s (%d)\n", rn, rt, ln, lt);
583    return TRUE;
584  }
585  return FALSE;
586}
587
588BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2) {
589  SharedObject *obj = *(SharedObject **)a1->Data();
590  return obj->op2(op, res, a1, a2);
591}
592
593BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
594  SharedObject *obj = *(SharedObject **)a1->Data();
595  return obj->op3(op, res, a1, a2, a3);
596}
597
598char *shared_string(blackbox *b, void *d) {
599  char buf[80];
600  SharedObject *obj = *(SharedObject **)d;
601  if (!obj)
602    return omStrDup("<uninitialized shared object>");
603  int type = obj->get_type();
604  string &name = obj->get_name();
605  const char *type_name = "unknown";
606  if (type == type_channel)
607    type_name = "channel";
608  else if (type == type_atomic_table)
609    type_name = "atomic_table";
610  else if (type == type_shared_table)
611    type_name = "shared_table";
612  else if (type == type_atomic_list)
613    type_name = "atomic_list";
614  else if (type == type_shared_list)
615    type_name = "shared_list";
616  else if (type == type_syncvar)
617    type_name = "syncvar";
618  else if (type == type_region)
619    type_name = "region";
620  else if (type == type_regionlock)
621    type_name = "regionlock";
622  else if (type == type_thread) {
623    sprintf(buf, "<thread #%s>", name.c_str());
624    return omStrDup(buf);
625  }
626  else if (type == type_threadpool) {
627    if (name.size() > 0) {
628      name_lock.lock();
629      sprintf(buf, "<threadpool \"%.40s\"@%p>", name.c_str(), obj);
630      name_lock.unlock();
631    } else
632      sprintf(buf, "<threadpool @%p>", obj);
633    return omStrDup(buf);
634  }
635  else if (type == type_job) {
636    if (name.size() > 0) {
637      name_lock.lock();
638      sprintf(buf, "<job \"%.40s\"@%p>", name.c_str(), obj);
639      name_lock.unlock();
640    } else
641      sprintf(buf, "<job @%p>", obj);
642    return omStrDup(buf);
643  }
644  else if (type == type_trigger) {
645    if (name.size() > 0) {
646      name_lock.lock();
647      sprintf(buf, "<trigger \"%.40s\"@%p>", name.c_str(), obj);
648      name_lock.unlock();
649    } else
650      sprintf(buf, "<trigger @%p>", obj);
651    return omStrDup(buf);
652  } else {
653    sprintf(buf, "<unknown type %d>", type);
654    return omStrDup(buf);
655  }
656  sprintf(buf, "<%s \"%.40s\">", type_name, name.c_str());
657  return omStrDup(buf);
658}
659
660char *rlock_string(blackbox *b, void *d) {
661  char buf[80];
662  SharedObject *obj = *(SharedObject **)d;
663  if (!obj)
664    return omStrDup("<uninitialized region lock>");
665  sprintf(buf, "<region lock \"%.40s\">", obj->get_name().c_str());
666  return omStrDup(buf);
667}
668
669void report(const char *fmt, const char *name) {
670  char buf[80];
671  sprintf(buf, fmt, name);
672  WerrorS(buf);
673}
674
675int wrong_num_args(const char *name, leftv arg, int n) {
676  for (int i=1; i<=n; i++) {
677    if (!arg) {
678      report("%s: too few arguments", name);
679      return TRUE;
680    }
681    arg = arg->next;
682  }
683  if (arg) {
684    report("%s: too many arguments", name);
685    return TRUE;
686  }
687  return FALSE;
688}
689
690int not_a_uri(const char *name, leftv arg) {
691  if (arg->Typ() != STRING_CMD) {
692    report("%s: not a valid URI", name);
693    return TRUE;
694  }
695  return FALSE;
696}
697
698int not_a_region(const char *name, leftv arg) {
699  if (arg->Typ() != type_region || !arg->Data()) {
700    report("%s: not a region", name);
701    return TRUE;
702  }
703  return FALSE;
704}
705
706
707char *str(leftv arg) {
708  return (char *)(arg->Data());
709}
710
711SharedObject *consTable() {
712  return new TxTable();
713}
714
715SharedObject *consList() {
716  return new TxList();
717}
718
719SharedObject *consChannel() {
720  return new SingularChannel();
721}
722
723SharedObject *consSyncVar() {
724  return new SingularSyncVar();
725}
726
727SharedObject *consRegion() {
728  return new Region();
729}
730
731static void appendArg(vector<leftv> &argv, string &s) {
732  if (s.size() == 0) return;
733  leftv val = LinTree::from_string(s);
734  if (val->Typ() == NONE) {
735    omFreeBin(val, sleftv_bin);
736    return;
737  }
738  argv.push_back(val);
739}
740
741static void appendArg(vector<leftv> &argv, leftv arg) {
742  argv.push_back(arg);
743}
744
745static void appendArgCopy(vector<leftv> &argv, leftv arg) {
746  leftv val = (leftv) omAlloc0Bin(sleftv_bin);
747  val->Copy(arg);
748  argv.push_back(val);
749}
750
751
752static BOOLEAN executeProc(sleftv &result,
753  const char *procname, const vector<leftv> &argv)
754{
755  leftv procnode = (leftv) omAlloc0Bin(sleftv_bin);
756  procnode->name = omStrDup(procname);
757  procnode->req_packhdl = basePack;
758  int error = procnode->Eval();
759  if (error) {
760    Werror("procedure \"%s\" not found", procname);
761    omFreeBin(procnode, sleftv_bin);
762    return TRUE;
763  }
764  memset(&result, 0, sizeof(result));
765  leftv *tail = &procnode->next;
766  for (int i = 0; i < argv.size(); i++) {
767    *tail = argv[i];
768    tail = &(*tail)->next;
769  }
770  *tail = NULL;
771  error = iiExprArithM(&result, procnode, '(');
772  procnode->CleanUp();
773  omFreeBin(procnode, sleftv_bin);
774  if (error) {
775    Werror("procedure call of \"%s\" failed", procname);
776    return TRUE;
777  }
778  return FALSE;
779}
780
781BOOLEAN makeAtomicTable(leftv result, leftv arg) {
782  if (wrong_num_args("makeAtomicTable", arg, 1))
783    return TRUE;
784  if (not_a_uri("makeAtomicTable", arg))
785    return TRUE;
786  string uri = str(arg);
787  SharedObject *obj = makeSharedObject(global_objects,
788    &global_objects_lock, type_atomic_table, uri, consTable);
789  ((TxTable *) obj)->set_region(NULL);
790  result->rtyp = type_atomic_table;
791  result->data = new_shared(obj);
792  return FALSE;
793}
794
795BOOLEAN makeAtomicList(leftv result, leftv arg) {
796  if (wrong_num_args("makeAtomicList", arg, 1))
797    return TRUE;
798  if (not_a_uri("makeAtomicList", arg))
799    return TRUE;
800  string uri = str(arg);
801  SharedObject *obj = makeSharedObject(global_objects,
802    &global_objects_lock, type_atomic_list, uri, consList);
803  ((TxList *) obj)->set_region(NULL);
804  result->rtyp = type_atomic_list;
805  result->data = new_shared(obj);
806  return FALSE;
807}
808
809BOOLEAN makeSharedTable(leftv result, leftv arg) {
810  if (wrong_num_args("makeSharedTable", arg, 2))
811    return TRUE;
812  if (not_a_region("makeSharedTable", arg))
813    return TRUE;
814  if (not_a_uri("makeSharedTable", arg->next))
815    return TRUE;
816  Region *region = *(Region **) arg->Data();
817  fflush(stdout);
818  string s = str(arg->next);
819  SharedObject *obj = makeSharedObject(region->objects,
820    region->get_lock(), type_shared_table, s, consTable);
821  ((TxTable *) obj)->set_region(region);
822  result->rtyp = type_shared_table;
823  result->data = new_shared(obj);
824  return FALSE;
825}
826
827BOOLEAN makeSharedList(leftv result, leftv arg) {
828  if (wrong_num_args("makeSharedList", arg, 2))
829    return TRUE;
830  if (not_a_region("makeSharedList", arg))
831    return TRUE;
832  if (not_a_uri("makeSharedList", arg->next))
833    return TRUE;
834  Region *region = *(Region **) arg->Data();
835  string s = str(arg->next);
836  SharedObject *obj = makeSharedObject(region->objects,
837    region->get_lock(), type_shared_list, s, consList);
838  ((TxList *) obj)->set_region(region);
839  result->rtyp = type_shared_list;
840  result->data = new_shared(obj);
841  return FALSE;
842}
843
844BOOLEAN makeChannel(leftv result, leftv arg) {
845  if (wrong_num_args("makeChannel", arg, 1))
846    return TRUE;
847  if (not_a_uri("makeChannel", arg))
848    return TRUE;
849  string uri = str(arg);
850  SharedObject *obj = makeSharedObject(global_objects,
851    &global_objects_lock, type_channel, uri, consChannel);
852  result->rtyp = type_channel;
853  result->data = new_shared(obj);
854  return FALSE;
855}
856
857BOOLEAN makeSyncVar(leftv result, leftv arg) {
858  if (wrong_num_args("makeSyncVar", arg, 1))
859    return TRUE;
860  if (not_a_uri("makeSyncVar", arg))
861    return TRUE;
862  string uri = str(arg);
863  SharedObject *obj = makeSharedObject(global_objects,
864    &global_objects_lock, type_syncvar, uri, consSyncVar);
865  result->rtyp = type_syncvar;
866  result->data = new_shared(obj);
867  return FALSE;
868}
869
870BOOLEAN makeRegion(leftv result, leftv arg) {
871  if (wrong_num_args("makeRegion", arg, 1))
872    return TRUE;
873  if (not_a_uri("makeRegion", arg))
874    return TRUE;
875  string uri = str(arg);
876  SharedObject *obj = makeSharedObject(global_objects,
877    &global_objects_lock, type_region, uri, consRegion);
878  result->rtyp = type_region;
879  result->data = new_shared(obj);
880  return FALSE;
881}
882
883BOOLEAN findSharedObject(leftv result, leftv arg) {
884  if (wrong_num_args("findSharedObject", arg, 1))
885    return TRUE;
886  if (not_a_uri("findSharedObject", arg))
887    return TRUE;
888  string uri = str(arg);
889  SharedObject *obj = findSharedObject(global_objects,
890    &global_objects_lock, uri);
891  result->rtyp = INT_CMD;
892  result->data = (char *)(long)(obj != NULL);
893  return FALSE;
894}
895
896BOOLEAN typeSharedObject(leftv result, leftv arg) {
897  if (wrong_num_args("findSharedObject", arg, 1))
898    return TRUE;
899  if (not_a_uri("findSharedObject", arg))
900    return TRUE;
901  string uri = str(arg);
902  SharedObject *obj = findSharedObject(global_objects,
903    &global_objects_lock, uri);
904  int type = obj ? obj->get_type() : -1;
905  const char *type_name = "undefined";
906  if (type == type_channel)
907    type_name = "channel";
908  else if (type == type_atomic_table)
909    type_name = "atomic_table";
910  else if (type == type_shared_table)
911    type_name = "shared_table";
912  else if (type == type_atomic_list)
913    type_name = "atomic_list";
914  else if (type == type_shared_list)
915    type_name = "shared_list";
916  else if (type == type_syncvar)
917    type_name = "syncvar";
918  else if (type == type_region)
919    type_name = "region";
920  else if (type == type_regionlock)
921    type_name = "regionlock";
922  result->rtyp = STRING_CMD;
923  result->data = (char *)(omStrDup(type_name));
924  return FALSE;
925}
926
927BOOLEAN bindSharedObject(leftv result, leftv arg) {
928  if (wrong_num_args("bindSharedObject", arg, 1))
929    return TRUE;
930  if (not_a_uri("bindSharedObject", arg))
931    return TRUE;
932  string uri = str(arg);
933  SharedObject *obj = findSharedObject(global_objects,
934    &global_objects_lock, uri);
935  if (!obj) {
936    WerrorS("bindSharedObject: cannot find object");
937    return TRUE;
938  }
939  result->rtyp = obj->get_type();
940  result->data = new_shared(obj);
941  return FALSE;
942}
943
944BOOLEAN getTable(leftv result, leftv arg) {
945  if (wrong_num_args("getTable", arg, 2))
946    return TRUE;
947  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
948    WerrorS("getTable: not a valid table");
949    return TRUE;
950  }
951  if (arg->next->Typ() != STRING_CMD) {
952    WerrorS("getTable: not a valid table key");
953    return TRUE;
954  }
955  TxTable *table = *(TxTable **) arg->Data();
956  if (!table) {
957    WerrorS("getTable: table has not been initialized");
958    return TRUE;
959  }
960  string key = (char *)(arg->next->Data());
961  string value;
962  int success = table->get(key, value);
963  if (success < 0) {
964    WerrorS("getTable: region not acquired");
965    return TRUE;
966  }
967  if (success == 0) {
968    WerrorS("getTable: key not found");
969    return TRUE;
970  }
971  leftv tmp = LinTree::from_string(value);
972  result->rtyp = tmp->Typ();
973  result->data = tmp->Data();
974  return FALSE;
975}
976
977BOOLEAN inTable(leftv result, leftv arg) {
978  if (wrong_num_args("inTable", arg, 2))
979    return TRUE;
980  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
981    WerrorS("inTable: not a valid table");
982    return TRUE;
983  }
984  if (arg->next->Typ() != STRING_CMD) {
985    WerrorS("inTable: not a valid table key");
986    return TRUE;
987  }
988  TxTable *table = *(TxTable **) arg->Data();
989  if (!table) {
990    WerrorS("inTable: table has not been initialized");
991    return TRUE;
992  }
993  string key = (char *)(arg->next->Data());
994  int success = table->check(key);
995  if (success < 0) {
996    WerrorS("inTable: region not acquired");
997    return TRUE;
998  }
999  result->rtyp = INT_CMD;
1000  result->data = (char *)(long)(success);
1001  return FALSE;
1002}
1003
1004BOOLEAN putTable(leftv result, leftv arg) {
1005  if (wrong_num_args("putTable", arg, 3))
1006    return TRUE;
1007  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
1008    WerrorS("putTable: not a valid table");
1009    return TRUE;
1010  }
1011  if (arg->next->Typ() != STRING_CMD) {
1012    WerrorS("putTable: not a valid table key");
1013    return TRUE;
1014  }
1015  TxTable *table = *(TxTable **) arg->Data();
1016  if (!table) {
1017    WerrorS("putTable: table has not been initialized");
1018    return TRUE;
1019  }
1020  string key = (char *)(arg->next->Data());
1021  string value = LinTree::to_string(arg->next->next);
1022  int success = table->put(key, value);
1023  if (success < 0) {
1024    WerrorS("putTable: region not acquired");
1025    return TRUE;
1026  }
1027  result->rtyp = NONE;
1028  return FALSE;
1029}
1030
1031BOOLEAN getList(leftv result, leftv arg) {
1032  if (wrong_num_args("getList", arg, 2))
1033    return TRUE;
1034  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1035    WerrorS("getList: not a valid list (atomic or shared)");
1036    return TRUE;
1037  }
1038  if (arg->next->Typ() != INT_CMD) {
1039    WerrorS("getList: index must be an integer");
1040    return TRUE;
1041  }
1042  TxList *list = *(TxList **) arg->Data();
1043  if (!list) {
1044    WerrorS("getList: list has not been initialized");
1045    return TRUE;
1046  }
1047  long index = (long)(arg->next->Data());
1048  string value;
1049  int success = list->get(index, value);
1050  if (success < 0) {
1051    WerrorS("getList: region not acquired");
1052    return TRUE;
1053  }
1054  if (success == 0) {
1055    WerrorS("getList: no value at position");
1056    return TRUE;
1057  }
1058  leftv tmp = LinTree::from_string(value);
1059  result->rtyp = tmp->Typ();
1060  result->data = tmp->Data();
1061  return FALSE;
1062}
1063
1064BOOLEAN putList(leftv result, leftv arg) {
1065  if (wrong_num_args("putList", arg, 3))
1066    return TRUE;
1067  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1068    WerrorS("putList: not a valid list (shared or atomic)");
1069    return TRUE;
1070  }
1071  if (arg->next->Typ() != INT_CMD) {
1072    WerrorS("putList: index must be an integer");
1073    return TRUE;
1074  }
1075  TxList *list = *(TxList **) arg->Data();
1076  if (!list) {
1077    WerrorS("putList: list has not been initialized");
1078    return TRUE;
1079  }
1080  long index = (long)(arg->next->Data());
1081  string value = LinTree::to_string(arg->next->next);
1082  int success = list->put(index, value);
1083  if (success < 0) {
1084    WerrorS("putList: region not acquired");
1085    return TRUE;
1086  }
1087  result->rtyp = NONE;
1088  return FALSE;
1089}
1090
1091BOOLEAN lockRegion(leftv result, leftv arg) {
1092  if (wrong_num_args("lockRegion", arg, 1))
1093    return TRUE;
1094  if (not_a_region("lockRegion", arg))
1095    return TRUE;
1096  Region *region = *(Region **)arg->Data();
1097  if (region->is_locked()) {
1098    WerrorS("lockRegion: region is already locked");
1099    return TRUE;
1100  }
1101  region->lock();
1102  result->rtyp = NONE;
1103  return FALSE;
1104}
1105
1106BOOLEAN regionLock(leftv result, leftv arg) {
1107  if (wrong_num_args("lockRegion", arg, 1))
1108    return TRUE;
1109  if (not_a_region("lockRegion", arg))
1110    return TRUE;
1111  Region *region = *(Region **)arg->Data();
1112  if (region->is_locked()) {
1113    WerrorS("lockRegion: region is already locked");
1114    return TRUE;
1115  }
1116  region->lock();
1117  result->rtyp = type_regionlock;
1118  result->data = new_shared(region);
1119  return FALSE;
1120}
1121
1122
1123BOOLEAN unlockRegion(leftv result, leftv arg) {
1124  if (wrong_num_args("unlockRegion", arg, 1))
1125    return TRUE;
1126  if (not_a_region("unlockRegion", arg))
1127    return TRUE;
1128  Region *region = *(Region **)arg->Data();
1129  if (!region->is_locked()) {
1130    WerrorS("unlockRegion: region is not locked");
1131    return TRUE;
1132  }
1133  region->unlock();
1134  result->rtyp = NONE;
1135  return FALSE;
1136}
1137
1138BOOLEAN sendChannel(leftv result, leftv arg) {
1139  if (wrong_num_args("sendChannel", arg, 2))
1140    return TRUE;
1141  if (arg->Typ() != type_channel) {
1142    WerrorS("sendChannel: argument is not a channel");
1143    return TRUE;
1144  }
1145  SingularChannel *channel = *(SingularChannel **)arg->Data();
1146  if (!channel) {
1147    WerrorS("sendChannel: channel has not been initialized");
1148    return TRUE;
1149  }
1150  channel->send(LinTree::to_string(arg->next));
1151  result->rtyp = NONE;
1152  return FALSE;
1153}
1154
1155BOOLEAN receiveChannel(leftv result, leftv arg) {
1156  if (wrong_num_args("receiveChannel", arg, 1))
1157    return TRUE;
1158  if (arg->Typ() != type_channel) {
1159    WerrorS("receiveChannel: argument is not a channel");
1160    return TRUE;
1161  }
1162  SingularChannel *channel = *(SingularChannel **)arg->Data();
1163  if (!channel) {
1164    WerrorS("receiveChannel: channel has not been initialized");
1165    return TRUE;
1166  }
1167  string item = channel->receive();
1168  leftv val = LinTree::from_string(item);
1169  result->rtyp = val->Typ();
1170  result->data = val->Data();
1171  return FALSE;
1172}
1173
1174BOOLEAN statChannel(leftv result, leftv arg) {
1175  if (wrong_num_args("statChannel", arg, 1))
1176    return TRUE;
1177  if (arg->Typ() != type_channel) {
1178    WerrorS("statChannel: argument is not a channel");
1179    return TRUE;
1180  }
1181  SingularChannel *channel = *(SingularChannel **)arg->Data();
1182  if (!channel) {
1183    WerrorS("receiveChannel: channel has not been initialized");
1184    return TRUE;
1185  }
1186  long n = channel->count();
1187  result->rtyp = INT_CMD;
1188  result->data = (char *)n;
1189  return FALSE;
1190}
1191
1192BOOLEAN writeSyncVar(leftv result, leftv arg) {
1193  if (wrong_num_args("writeSyncVar", arg, 2))
1194    return TRUE;
1195  if (arg->Typ() != type_syncvar) {
1196    WerrorS("writeSyncVar: argument is not a syncvar");
1197    return TRUE;
1198  }
1199  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1200  if (!syncvar) {
1201    WerrorS("writeSyncVar: syncvar has not been initialized");
1202    return TRUE;
1203  }
1204  if (!syncvar->write(LinTree::to_string(arg->next))) {
1205    WerrorS("writeSyncVar: variable already has a value");
1206    return TRUE;
1207  }
1208  result->rtyp = NONE;
1209  return FALSE;
1210}
1211
1212BOOLEAN updateSyncVar(leftv result, leftv arg) {
1213  Command cmd("updateSyncVar", result, arg);
1214  cmd.check_argc_min(2);
1215  cmd.check_arg(0, type_syncvar, "first argument must be a syncvar");
1216  cmd.check_init(0, "syncvar has not been initialized");
1217  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
1218  if (cmd.ok()) {
1219    SingularSyncVar *syncvar = cmd.shared_arg<SingularSyncVar>(0);
1220    char *procname = (char *) cmd.arg(1);
1221    arg = arg->next->next;
1222    syncvar->acquire();
1223    syncvar->wait_init();
1224    vector<leftv> argv;
1225    appendArg(argv, syncvar->get());
1226    while (arg) {
1227      appendArgCopy(argv, arg);
1228      arg = arg->next;
1229    }
1230    int error = executeProc(*result, procname, argv);
1231    if (!error) {
1232      syncvar->update(result);
1233    }
1234    syncvar->release();
1235    return error;
1236  }
1237  return cmd.status();
1238}
1239
1240
1241BOOLEAN readSyncVar(leftv result, leftv arg) {
1242  if (wrong_num_args("readSyncVar", arg, 1))
1243    return TRUE;
1244  if (arg->Typ() != type_syncvar) {
1245    WerrorS("readSyncVar: argument is not a syncvar");
1246    return TRUE;
1247  }
1248  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1249  if (!syncvar) {
1250    WerrorS("readSyncVar: syncvar has not been initialized");
1251    return TRUE;
1252  }
1253  string item = syncvar->read();
1254  leftv val = LinTree::from_string(item);
1255  result->rtyp = val->Typ();
1256  result->data = val->Data();
1257  return FALSE;
1258}
1259
1260BOOLEAN statSyncVar(leftv result, leftv arg) {
1261  if (wrong_num_args("statSyncVar", arg, 1))
1262    return TRUE;
1263  if (arg->Typ() != type_syncvar) {
1264    WerrorS("statSyncVar: argument is not a syncvar");
1265    return TRUE;
1266  }
1267  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1268  if (!syncvar) {
1269    WerrorS("statSyncVar: syncvar has not been initialized");
1270    return TRUE;
1271  }
1272  int init = syncvar->check();
1273  result->rtyp = INT_CMD;
1274  result->data = (char *)(long) init;
1275  return FALSE;
1276}
1277
1278void encode_shared(LinTree::LinTree &lintree, leftv val) {
1279  SharedObject *obj = *(SharedObject **)(val->Data());
1280  acquireShared(obj);
1281  lintree.put(obj);
1282}
1283
1284leftv decode_shared(LinTree::LinTree &lintree) {
1285  int type = lintree.get_prev<int>();
1286  SharedObject *obj = lintree.get<SharedObject *>();
1287  leftv result = (leftv) omAlloc0Bin(sleftv_bin);
1288  result->rtyp = type;
1289  result->data = (void *)new_shared(obj);
1290  return result;
1291}
1292
1293void ref_shared(LinTree::LinTree &lintree, int by) {
1294  SharedObject *obj = lintree.get<SharedObject *>();
1295  while (by > 0) {
1296    obj->incref();
1297    by--;
1298  }
1299  while (by < 0) {
1300    obj->decref();
1301    by++;
1302  }
1303}
1304
1305void installShared(int type) {
1306  LinTree::install(type, encode_shared, decode_shared, ref_shared);
1307}
1308
1309void makeSharedType(int &type, const char *name) {
1310  if (type != 0) return;
1311  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1312  b->blackbox_Init = shared_init;
1313  b->blackbox_destroy = shared_destroy;
1314  b->blackbox_Copy = shared_copy;
1315  b->blackbox_String = shared_string;
1316  b->blackbox_Assign = shared_assign;
1317  b->blackbox_CheckAssign = shared_check_assign;
1318  // b->blackbox_Op2 = shared_op2;
1319  // b->blackbox_Op3 = shared_op3;
1320  type = setBlackboxStuff(b, name);
1321  installShared(type);
1322}
1323
1324void makeRegionlockType(int &type, const char *name) {
1325  if (type != 0) return;
1326  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1327  b->blackbox_Init = shared_init;
1328  b->blackbox_destroy = rlock_destroy;
1329  b->blackbox_Copy = shared_copy;
1330  b->blackbox_String = shared_string;
1331  b->blackbox_Assign = rlock_assign;
1332  b->blackbox_CheckAssign = shared_check_assign;
1333  type = setBlackboxStuff(b, name);
1334  installShared(type);
1335}
1336
1337#define MAX_THREADS 128
1338
1339class ThreadState {
1340public:
1341  bool active;
1342  bool running;
1343  int index;
1344  void *(*thread_func)(ThreadState *, void *);
1345  void *arg, *result;
1346  pthread_t id;
1347  pthread_t parent;
1348  Lock lock;
1349  ConditionVariable to_cond;
1350  ConditionVariable from_cond;
1351  queue<string> to_thread;
1352  queue<string> from_thread;
1353  ThreadState() : lock(), to_cond(&lock), from_cond(&lock),
1354                  to_thread(), from_thread() {
1355    active = false;
1356    running = false;
1357    index = -1;
1358  }
1359  ~ThreadState() {
1360    // We do nothing here. This is to prevent the condition
1361    // variable destructor from firing upon program exit,
1362    // which would invoke undefined behavior if the thread
1363    // is still running.
1364  }
1365};
1366
1367Lock thread_lock;
1368
1369ThreadState *thread_state;
1370
1371void setOption(int ch) {
1372  int index = feGetOptIndex(ch);
1373  feSetOptValue((feOptIndex) index, (int) 1);
1374}
1375
1376void thread_init() {
1377  master_lock.lock();
1378  thread_id = ++thread_counter;
1379  master_lock.unlock();
1380#ifdef ENABLE_THREADS
1381  onThreadInit();
1382  siInit(global_argv0);
1383#endif
1384  setOption('q');
1385  // setOption('b');
1386}
1387
1388void *thread_main(void *arg) {
1389  ThreadState *ts = (ThreadState *)arg;
1390  thread_init();
1391  return ts->thread_func(ts, ts->arg);
1392}
1393
1394void *interpreter_thread(ThreadState *ts, void *arg) {
1395  ts->lock.lock();
1396  for (;;) {
1397    bool eval = false;
1398    while (ts->to_thread.empty())
1399      ts->to_cond.wait();
1400    /* TODO */
1401    string expr = ts->to_thread.front();
1402    switch (expr[0]) {
1403      case '\0': case 'q':
1404        ts->lock.unlock();
1405        return NULL;
1406      case 'x':
1407        eval = false;
1408        break;
1409      case 'e':
1410        eval = true;
1411        break;
1412    }
1413    ts->to_thread.pop();
1414    expr = ts->to_thread.front();
1415    /* this will implicitly eval commands */
1416    leftv val = LinTree::from_string(expr);
1417    expr = LinTree::to_string(val);
1418    ts->to_thread.pop();
1419    if (eval)
1420      ts->from_thread.push(expr);
1421    ts->from_cond.signal();
1422  }
1423  ts->lock.unlock();
1424  return NULL;
1425}
1426
1427class InterpreterThread : public SharedObject {
1428private:
1429  ThreadState *ts;
1430public:
1431  InterpreterThread(ThreadState *ts_init) : SharedObject(), ts(ts_init) { }
1432  virtual ~InterpreterThread() { }
1433  ThreadState *getThreadState() { return ts; }
1434  void clearThreadState() {
1435    ts = NULL;
1436  }
1437};
1438
1439static ThreadState *newThread(void *(*thread_func)(ThreadState *, void *),
1440    void *arg, const char **error) {
1441  ThreadState *ts = NULL;
1442  if (error) *error = NULL;
1443  thread_lock.lock();
1444  for (int i=0; i<MAX_THREADS; i++) {
1445    if (!thread_state[i].active) {
1446      ts = thread_state + i;
1447      ts->index = i;
1448      ts->parent = pthread_self();
1449      ts->active = true;
1450      ts->running = true;
1451      ts->to_thread = queue<string>();
1452      ts->from_thread = queue<string>();
1453      ts->thread_func = thread_func;
1454      ts->arg = arg;
1455      ts->result = NULL;
1456      if (pthread_create(&ts->id, NULL, thread_main, ts)<0) {
1457        if (error)
1458          *error = "createThread: internal error: failed to create thread";
1459        goto fail;
1460      }
1461      goto exit;
1462    }
1463  }
1464  if (error) *error = "createThread: too many threads";
1465  fail:
1466  ts = NULL;
1467  exit:
1468  thread_lock.unlock();
1469  return ts;
1470}
1471
1472ThreadState *createThread(void *(*thread_func)(ThreadState *, void *),
1473    void *arg) {
1474  return newThread(thread_func, arg, NULL);
1475}
1476
1477void *joinThread(ThreadState *ts) {
1478  void *result;
1479  pthread_join(ts->id, NULL);
1480  result = ts->result;
1481  thread_lock.lock();
1482  ts->running = false;
1483  ts->active = false;
1484  thread_lock.unlock();
1485}
1486
1487static InterpreterThread *createInterpreterThread(const char **error) {
1488  ThreadState *ts = newThread(interpreter_thread, NULL, error);
1489  if (*error) return NULL;
1490  InterpreterThread *thread = new InterpreterThread(ts);
1491  char buf[10];
1492  sprintf(buf, "%d", ts->index);
1493  string name(buf);
1494  thread->set_name(name);
1495  thread->set_type(type_thread);
1496  return thread;
1497}
1498
1499static BOOLEAN createThread(leftv result, leftv arg) {
1500  Command cmd("createThread", result, arg);
1501  cmd.check_argc(0);
1502  const char *error;
1503  if (!have_threads)
1504    cmd.report("thread support not available");
1505  if (!cmd.ok()) return cmd.status();
1506  InterpreterThread *thread = createInterpreterThread(&error);
1507  if (error) {
1508    return cmd.abort(error);
1509  }
1510  cmd.set_result(type_thread, new_shared(thread));
1511  return cmd.status();
1512}
1513
1514static bool joinInterpreterThread(InterpreterThread *thread) {
1515  ThreadState *ts = thread->getThreadState();
1516  if (ts && ts->parent != pthread_self()) {
1517    return false;
1518  }
1519  ts->lock.lock();
1520  string quit("q");
1521  ts->to_thread.push(quit);
1522  ts->to_cond.signal();
1523  ts->lock.unlock();
1524  pthread_join(ts->id, NULL);
1525  thread_lock.lock();
1526  ts->running = false;
1527  ts->active = false;
1528  thread->clearThreadState();
1529  thread_lock.unlock();
1530  return true;
1531}
1532
1533static BOOLEAN joinThread(leftv result, leftv arg) {
1534  if (wrong_num_args("joinThread", arg, 1))
1535    return TRUE;
1536  if (arg->Typ() != type_thread) {
1537    WerrorS("joinThread: argument is not a thread");
1538    return TRUE;
1539  }
1540  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
1541  if (!joinInterpreterThread(thread)) {
1542    WerrorS("joinThread: can only be called from parent thread");
1543    return TRUE;
1544  }
1545  return FALSE;
1546}
1547
1548class ThreadPool;
1549class Trigger;
1550
1551class Job : public SharedObject {
1552public:
1553  ThreadPool *pool;
1554  long prio;
1555  size_t id;
1556  long pending_index;
1557  vector<Job *> deps;
1558  vector<Job *> notify;
1559  vector<Trigger *> triggers;
1560  vector<string> args;
1561  string result; // lintree-encoded
1562  void *data;
1563  bool fast;
1564  bool done;
1565  bool queued;
1566  bool running;
1567  bool cancelled;
1568  Job() : SharedObject(), pool(NULL), deps(), pending_index(-1), fast(false),
1569    done(false), running(false), queued(false), cancelled(false), data(NULL),
1570    result(), args(), notify(), triggers(), prio(0)
1571  { set_type(type_job); }
1572  ~Job();
1573  void addDep(Job *job) {
1574    deps.push_back(job);
1575  }
1576  void addDep(vector<Job *> &jobs);
1577  void addDep(long ndeps, Job **jobs);
1578  void addNotify(vector<Job *> &jobs);
1579  void addNotify(Job *job);
1580  virtual bool ready();
1581  virtual void execute() = 0;
1582  void run();
1583};
1584
1585struct JobCompare {
1586  bool operator()(const Job* lhs, const Job* rhs) {
1587    if (lhs->fast < rhs->fast) {
1588      return true;
1589    }
1590    if (lhs->prio < rhs->prio) {
1591      return true;
1592    }
1593    if (lhs->prio == rhs->prio) {
1594      return lhs->id > rhs->id;
1595    }
1596    return false;
1597  }
1598};
1599
1600class Trigger : public Job {
1601public:
1602  virtual bool accept(leftv arg) = 0;
1603  virtual void activate(leftv arg) = 0;
1604  Trigger() : Job() { set_type(type_trigger); fast = true; }
1605};
1606
1607bool Job::ready() {
1608  vector<Job *>::iterator it;
1609  for (it = deps.begin(); it != deps.end(); it++) {
1610    if (!(*it)->done) return false;
1611  }
1612  return true;
1613}
1614
1615Job::~Job() {
1616  vector<Job *>::iterator it;
1617  for (it = deps.begin(); it != deps.end(); it++) {
1618    releaseShared(*it);
1619  }
1620}
1621
1622typedef queue<Job *> JobQueue;
1623
1624class Scheduler;
1625
1626struct SchedInfo {
1627  Scheduler *scheduler;
1628  Job *job;
1629  int num;
1630};
1631
1632static SIMPLE_THREAD_VAR ThreadPool *currentThreadPoolRef;
1633static SIMPLE_THREAD_VAR Job *currentJobRef;
1634
1635class ThreadPool : public SharedObject {
1636public:
1637  Scheduler *scheduler;
1638  int nthreads;
1639  ThreadPool(Scheduler *sched, int n);
1640  ThreadPool(int n);
1641  ~ThreadPool();
1642  ThreadState *getThread(int i);
1643  void shutdown(bool wait);
1644  void addThread(ThreadState *thread);
1645  void attachJob(Job *job);
1646  void detachJob(Job *job);
1647  void queueJob(Job *job);
1648  void broadcastJob(Job *job);
1649  void cancelDeps(Job * job);
1650  void cancelJob(Job *job);
1651  void waitJob(Job *job);
1652  void clearThreadState();
1653};
1654
1655
1656class Scheduler : public SharedObject {
1657private:
1658  bool single_threaded;
1659  size_t jobid;
1660  int nthreads;
1661  int maxconcurrency;
1662  int running;
1663  bool shutting_down;
1664  int shutdown_counter;
1665  vector<ThreadState *> threads;
1666  vector<ThreadPool *> thread_owners;
1667  priority_queue<Job *, vector<Job *>, JobCompare> global_queue;
1668  vector<JobQueue *> thread_queues;
1669  vector<Job *> pending;
1670  ConditionVariable cond;
1671  ConditionVariable response;
1672  friend class Job;
1673public:
1674  Lock lock;
1675  Scheduler(int n) :
1676    SharedObject(), threads(), thread_owners(), global_queue(), thread_queues(),
1677    single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1678    lock(true), cond(&lock), response(&lock),
1679    shutting_down(false), shutdown_counter(0), jobid(0),
1680    maxconcurrency(n), running(0)
1681  {
1682    thread_queues.push_back(new JobQueue());
1683  }
1684  void set_maxconcurrency(int n) {
1685    maxconcurrency = n;
1686  }
1687  int get_maxconcurrency() {
1688    return maxconcurrency;
1689  }
1690  int threadpool_size(ThreadPool *pool) {
1691    int n;
1692    for (int i = 0; i <thread_owners.size(); i++) {
1693      if (thread_owners[i] == pool)
1694        n++;
1695    }
1696    return n;
1697  }
1698  virtual ~Scheduler() {
1699    for (int i = 0; i < thread_queues.size(); i++) {
1700      JobQueue *q = thread_queues[i];
1701      while (!q->empty()) {
1702        Job *job = q->front();
1703        q->pop();
1704        releaseShared(job);
1705      }
1706    }
1707    thread_queues.clear();
1708    threads.clear();
1709  }
1710  ThreadState *getThread(int i) { return threads[i]; }
1711  void shutdown(bool wait) {
1712    if (single_threaded) {
1713      SchedInfo *info = new SchedInfo();
1714      info->num = 0;
1715      info->scheduler = this;
1716      acquireShared(this);
1717      info->job = NULL;
1718      Scheduler::main(NULL, info);
1719      return;
1720    }
1721    lock.lock();
1722    if (wait) {
1723      while (!global_queue.empty()) {
1724        response.wait();
1725      }
1726    }
1727    shutting_down = true;
1728    while (shutdown_counter < nthreads) {
1729      cond.broadcast();
1730      response.wait();
1731    }
1732    lock.unlock();
1733    for (int i = 0; i <threads.size(); i++) {
1734      joinThread(threads[i]);
1735    }
1736  }
1737  void addThread(ThreadPool *owner, ThreadState *thread) {
1738    lock.lock();
1739    thread_owners.push_back(owner);
1740    threads.push_back(thread);
1741    thread_queues.push_back(new JobQueue());
1742    lock.unlock();
1743  }
1744  void attachJob(ThreadPool *pool, Job *job) {
1745    lock.lock();
1746    job->pool = pool;
1747    job->id = jobid++;
1748    acquireShared(job);
1749    if (job->ready()) {
1750      global_queue.push(job);
1751      cond.signal();
1752    }
1753    else if (job->pending_index < 0) {
1754      job->pool = pool;
1755      job->pending_index = pending.size();
1756      pending.push_back(job);
1757    }
1758    lock.unlock();
1759  }
1760  void detachJob(Job *job) {
1761    lock.lock();
1762    long i = job->pending_index;
1763    job->pending_index = -1;
1764    if (i >= 0) {
1765      job = pending.back();
1766      pending.resize(pending.size()-1);
1767      pending[i] = job;
1768      job->pending_index = i;
1769    }
1770    lock.unlock();
1771  }
1772  void queueJob(Job *job) {
1773    lock.lock();
1774    global_queue.push(job);
1775    cond.signal();
1776    lock.unlock();
1777  }
1778  void broadcastJob(ThreadPool *pool, Job *job) {
1779    lock.lock();
1780    for (int i = 0; i <thread_queues.size(); i++) {
1781      if (thread_owners[i] == pool) {
1782        acquireShared(job);
1783        thread_queues[i]->push(job);
1784      }
1785    }
1786    lock.unlock();
1787  }
1788  void cancelDeps(Job * job) {
1789    vector<Job *> &notify = job->notify;
1790    for (int i = 0; i <notify.size(); i++) {
1791      Job *next = notify[i];
1792      if (!next->cancelled) {
1793        cancelJob(next);
1794      }
1795    }
1796  }
1797  void cancelJob(Job *job) {
1798    lock.lock();
1799    if (!job->cancelled) {
1800      job->cancelled = true;
1801      if (!job->running && !job->done) {
1802        job->done = true;
1803        cancelDeps(job);
1804      }
1805    }
1806    lock.unlock();
1807  }
1808  void waitJob(Job *job) {
1809    if (single_threaded) {
1810      SchedInfo *info = new SchedInfo();
1811      info->num = 0;
1812      info->scheduler = this;
1813      acquireShared(this);
1814      info->job = job;
1815      Scheduler::main(NULL, info);
1816    } else {
1817      lock.lock();
1818      for (;;) {
1819        if (job->done || job->cancelled) {
1820          break;
1821        }
1822        response.wait();
1823      }
1824      response.signal(); // forward signal
1825      lock.unlock();
1826    }
1827  }
1828  void clearThreadState() {
1829    threads.clear();
1830  }
1831  static void notifyDeps(Scheduler *scheduler, Job *job) {
1832    vector<Job *> &notify = job->notify;
1833    job->incref(notify.size());
1834    for (int i = 0; i <notify.size(); i++) {
1835      Job *next = notify[i];
1836      if (!next->queued && next->ready() && !next->cancelled) {
1837        next->queued = true;
1838        scheduler->queueJob(next);
1839      }
1840    }
1841    vector<Trigger *> &triggers = job->triggers;
1842    leftv arg = NULL;
1843    if (triggers.size() > 0 && job->result.size() > 0)
1844      arg = LinTree::from_string(job->result);
1845    for (int i = 0; i < triggers.size(); i++) {
1846      Trigger *trigger = triggers[i];
1847      if (trigger->accept(arg)) {
1848        trigger->activate(arg);
1849        if (trigger->ready())
1850           scheduler->queueJob(trigger);
1851      }
1852    }
1853    if (arg) {
1854      arg->CleanUp();
1855      omFreeBin(arg, sleftv_bin);
1856    }
1857  }
1858  static void *main(ThreadState *ts, void *arg) {
1859    SchedInfo *info = (SchedInfo *) arg;
1860    Scheduler *scheduler = info->scheduler;
1861    ThreadPool *oldThreadPool = currentThreadPoolRef;
1862    // TODO: set current thread pool
1863    // currentThreadPoolRef = pool;
1864    Lock &lock = scheduler->lock;
1865    ConditionVariable &cond = scheduler->cond;
1866    ConditionVariable &response = scheduler->response;
1867    JobQueue *my_queue = scheduler->thread_queues[info->num];
1868    if (!scheduler->single_threaded)
1869      thread_init();
1870    lock.lock();
1871    for (;;) {
1872      if (info->job && info->job->done)
1873        break;
1874      if (scheduler->shutting_down) {
1875        scheduler->shutdown_counter++;
1876        scheduler->response.signal();
1877        break;
1878      }
1879      if (!my_queue->empty()) {
1880       Job *job = my_queue->front();
1881       my_queue->pop();
1882       if (!scheduler->global_queue.empty())
1883         cond.signal();
1884       currentJobRef = job;
1885       job->run();
1886       currentJobRef = NULL;
1887       notifyDeps(scheduler, job);
1888       releaseShared(job);
1889       scheduler->response.signal();
1890       continue;
1891      } else if (!scheduler->global_queue.empty()) {
1892       Job *job = scheduler->global_queue.top();
1893       scheduler->global_queue.pop();
1894       if (!scheduler->global_queue.empty())
1895         cond.signal();
1896       currentJobRef = job;
1897       job->run();
1898       currentJobRef = NULL;
1899       notifyDeps(scheduler, job);
1900       releaseShared(job);
1901       scheduler->response.signal();
1902       continue;
1903      } else {
1904        if (scheduler->single_threaded) {
1905          break;
1906        }
1907        cond.wait();
1908      }
1909    }
1910    // TODO: correct current thread pool
1911    // releaseShared(currentThreadPoolRef);
1912    currentThreadPoolRef = oldThreadPool;
1913    scheduler->lock.unlock();
1914    delete info;
1915    return NULL;
1916  }
1917};
1918
1919ThreadPool::ThreadPool(int n) : SharedObject(), nthreads(n) {
1920  scheduler = new Scheduler(n);
1921  acquireShared(scheduler);
1922}
1923ThreadPool::ThreadPool(Scheduler *sched, int n) : SharedObject(), nthreads(n) {
1924  scheduler = sched;
1925  acquireShared(sched);
1926}
1927ThreadPool::~ThreadPool() {
1928  releaseShared(scheduler);
1929}
1930ThreadState *ThreadPool::getThread(int i) { return scheduler->getThread(i); }
1931void ThreadPool::shutdown(bool wait) { scheduler->shutdown(wait); }
1932void ThreadPool::addThread(ThreadState *thread) {
1933  scheduler->addThread(this, thread);
1934}
1935void ThreadPool::attachJob(Job *job) {
1936  scheduler->attachJob(this, job);
1937}
1938void ThreadPool::detachJob(Job *job) {
1939  scheduler->detachJob(job);
1940}
1941void ThreadPool::queueJob(Job *job) {
1942  scheduler->queueJob(job);
1943}
1944void ThreadPool::broadcastJob(Job *job) {
1945  scheduler->broadcastJob(this, job);
1946}
1947void ThreadPool::cancelDeps(Job * job) {
1948  scheduler->cancelDeps(job);
1949}
1950void ThreadPool::cancelJob(Job *job) {
1951  scheduler->cancelJob(job);
1952}
1953void ThreadPool::waitJob(Job *job) {
1954  scheduler->waitJob(job);
1955}
1956void ThreadPool::clearThreadState() {
1957  scheduler->clearThreadState();
1958}
1959
1960void Job::addDep(vector<Job *> &jobs) {
1961  deps.insert(deps.end(), jobs.begin(), jobs.end());
1962}
1963
1964void Job::addDep(long ndeps, Job **jobs) {
1965  for (long i = 0; i < ndeps; i++) {
1966    deps.push_back(jobs[i]);
1967  }
1968}
1969
1970void Job::addNotify(vector<Job *> &jobs) {
1971  notify.insert(notify.end(), jobs.begin(), jobs.end());
1972  if (done) {
1973    Scheduler::notifyDeps(pool->scheduler, this);
1974  }
1975}
1976
1977void Job::addNotify(Job *job) {
1978  notify.push_back(job);
1979  if (done) {
1980    Scheduler::notifyDeps(pool->scheduler, this);
1981  }
1982}
1983
1984void Job::run() {
1985  if (!cancelled) {
1986    running = true;
1987    pool->scheduler->lock.unlock();
1988    pool->scheduler->running++;
1989    execute();
1990    pool->scheduler->running--;
1991    pool->scheduler->lock.lock();
1992    running = false;
1993  }
1994  done = true;
1995}
1996
1997class AccTrigger : public Trigger {
1998private:
1999  long count;
2000public:
2001  AccTrigger(long count_init): Trigger(), count(count_init) {
2002  }
2003  virtual bool ready() {
2004    if (!Trigger::ready()) return false;
2005    return args.size() >= count;
2006  }
2007  virtual bool accept(leftv arg) {
2008    return true;
2009  }
2010  virtual void activate(leftv arg) {
2011    while (arg != NULL && !ready()) {
2012      args.push_back(LinTree::to_string(arg));
2013      if (ready()) {
2014        return;
2015      }
2016      arg = arg->next;
2017    }
2018  }
2019  virtual void execute() {
2020    lists l = (lists) omAlloc0Bin(slists_bin);
2021    l->Init(args.size());
2022    for (int i = 0; i < args.size(); i++) {
2023      leftv val = LinTree::from_string(args[i]);
2024      memcpy(&l->m[i], val, sizeof(*val));
2025      omFreeBin(val, sleftv_bin);
2026    }
2027    sleftv val;
2028    memset(&val, 0, sizeof(val));
2029    val.rtyp = LIST_CMD;
2030    val.data = l;
2031    result = LinTree::to_string(&val);
2032    // val.CleanUp();
2033  }
2034};
2035
2036class CountTrigger : public Trigger {
2037private:
2038  long count;
2039public:
2040  CountTrigger(long count_init): Trigger(), count(count_init) {
2041  }
2042  virtual bool ready() {
2043    if (!Trigger::ready()) return false;
2044    return count <= 0;
2045  }
2046  virtual bool accept(leftv arg) {
2047    return arg == NULL;
2048  }
2049  virtual void activate(leftv arg) {
2050    if (!ready()) {
2051      count--;
2052    }
2053  }
2054  virtual void execute() {
2055    // do nothing
2056  }
2057};
2058
2059class SetTrigger : public Trigger {
2060private:
2061  vector<bool> set;
2062  long count;
2063public:
2064  SetTrigger(long count_init) : Trigger(), count(0),
2065    set(count_init) {
2066  }
2067  virtual bool ready() {
2068    if (!Trigger::ready()) return false;
2069    return count == set.size();
2070  }
2071  virtual bool accept(leftv arg) {
2072    return arg->Typ() == INT_CMD;
2073  }
2074  virtual void activate(leftv arg) {
2075    if (!ready()) {
2076      long value = (long) arg->Data();
2077      if (value < 0 || value >= count) return;
2078      if (set[value]) return;
2079      set[value] = true;
2080      count++;
2081    }
2082  }
2083  virtual void execute() {
2084    // do nothing
2085  }
2086};
2087
2088
2089class ProcTrigger : public Trigger {
2090private:
2091  string procname;
2092  bool success;
2093public:
2094  ProcTrigger(const char *p) : Trigger(), procname(p), success(false) {
2095  }
2096  virtual bool ready() {
2097    if (!Trigger::ready()) return false;
2098    return success;
2099  }
2100  virtual bool accept(leftv arg) {
2101    return TRUE;
2102  }
2103  virtual void activate(leftv arg) {
2104    if (!ready()) {
2105      pool->scheduler->lock.unlock();
2106      vector<leftv> argv;
2107      for (int i = 0; i < args.size(); i++) {
2108        appendArg(argv, args[i]);
2109      }
2110      int error = false;
2111      while (arg) {
2112        appendArgCopy(argv, arg);
2113        arg = arg->next;
2114      }
2115      sleftv val;
2116      if (!error)
2117        error = executeProc(val, procname.c_str(), argv);
2118      if (!error) {
2119        if (val.Typ() == NONE || (val.Typ() == INT_CMD &&
2120                                  (long) val.Data()))
2121        {
2122          success = true;
2123        }
2124        val.CleanUp();
2125      }
2126      pool->scheduler->lock.lock();
2127    }
2128  }
2129  virtual void execute() {
2130    // do nothing
2131  }
2132};
2133
2134static BOOLEAN createThreadPool(leftv result, leftv arg) {
2135  long n;
2136  Command cmd("createThreadPool", result, arg);
2137  cmd.check_argc(1, 2);
2138  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2139  if (cmd.ok()) {
2140    n = (long) cmd.arg(0);
2141    if (n < 0) cmd.report("number of threads must be non-negative");
2142    else if (n >= 256) cmd.report("number of threads too large");
2143    if (!have_threads && n != 0)
2144      cmd.report("in single-threaded mode, number of threads must be zero");
2145  }
2146  if (cmd.ok()) {
2147    ThreadPool *pool = new ThreadPool((int) n);
2148    pool->set_type(type_threadpool);
2149    for (int i = 0; i <n; i++) {
2150      const char *error;
2151      SchedInfo *info = new SchedInfo();
2152      info->scheduler = pool->scheduler;
2153      acquireShared(pool->scheduler);
2154      info->job = NULL;
2155      info->num = i;
2156      ThreadState *thread = newThread(Scheduler::main, info, &error);
2157      if (!thread) {
2158        // TODO: clean up bad pool
2159        return cmd.abort(error);
2160      }
2161      pool->addThread(thread);
2162    }
2163    cmd.set_result(type_threadpool, new_shared(pool));
2164  }
2165  return cmd.status();
2166}
2167
2168static BOOLEAN createThreadPoolSet(leftv result, leftv arg) {
2169  Command cmd("createThreadPoolSet", result, arg);
2170  cmd.check_argc(2);
2171  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2172  cmd.check_arg(1, LIST_CMD, "second argument must be a list of integers");
2173  lists l;
2174  int n;
2175  if (cmd.ok()) {
2176    l = (lists) (cmd.arg(1));
2177    n = lSize(l)+1;
2178    if (n == 0)
2179      return cmd.abort("second argument must not be empty");
2180    for (int i = 0; i < n; i++) {
2181      if (l->m[i].Typ() != INT_CMD)
2182        return cmd.abort("second argument must be a list of integers");
2183    }
2184  }
2185  lists pools = (lists) omAlloc0Bin(slists_bin);
2186  pools->Init(n);
2187  if (cmd.ok()) {
2188    long s = 0;
2189    for (int i = 0; i < n; i++) {
2190      s += (long) (l->m[i].Data());
2191    }
2192    Scheduler *sched = new Scheduler((int)s);
2193    sched->set_maxconcurrency(cmd.int_arg(0));
2194    for (int i = 0; i < n; i++) {
2195      long m = (long) (l->m[i].Data());
2196      ThreadPool *pool = new ThreadPool(sched, (int) m);
2197      pool->set_type(type_threadpool);
2198      for (int j = 0; j < m; j++) {
2199        const char *error;
2200        SchedInfo *info = new SchedInfo();
2201        info->scheduler = pool->scheduler;
2202        acquireShared(pool->scheduler);
2203        info->job = NULL;
2204        info->num = i;
2205        ThreadState *thread = newThread(Scheduler::main, info, &error);
2206        if (!thread) {
2207          // TODO: clean up bad pool
2208          return cmd.abort(error);
2209        }
2210        pool->addThread(thread);
2211      }
2212      pools->m[i].rtyp = type_threadpool;
2213      pools->m[i].data = new_shared(pool);
2214    }
2215    cmd.set_result(LIST_CMD, pools);
2216  }
2217  return cmd.status();
2218}
2219
2220ThreadPool *createThreadPool(int nthreads, int prioThreads = 0) {
2221  ThreadPool *pool = new ThreadPool((int) nthreads);
2222  pool->set_type(type_threadpool);
2223  for (int i = 0; i <nthreads; i++) {
2224    const char *error;
2225    SchedInfo *info = new SchedInfo();
2226    info->scheduler = pool->scheduler;
2227    acquireShared(pool);
2228    info->job = NULL;
2229    info->num = i;
2230    ThreadState *thread = newThread(Scheduler::main, info, &error);
2231    if (!thread) {
2232      return NULL;
2233    }
2234    pool->addThread(thread);
2235  }
2236  return pool;
2237}
2238
2239void release(ThreadPool *pool) {
2240  releaseShared(pool);
2241}
2242
2243void retain(ThreadPool *pool) {
2244  acquireShared(pool);
2245}
2246
2247ThreadPool *getCurrentThreadPool() {
2248  return currentThreadPoolRef;
2249}
2250
2251static BOOLEAN getThreadPoolWorkers(leftv result, leftv arg) {
2252  Command cmd("getThreadPoolWorkers", result, arg);
2253  cmd.check_argc(1);
2254  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2255  cmd.check_init(0, "threadpool not initialized");
2256  int r = 0;
2257  if (cmd.ok()) {
2258    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2259    Scheduler *sched = pool->scheduler;
2260    sched->lock.lock();
2261    r = sched->threadpool_size(pool);
2262    sched->lock.unlock();
2263    cmd.set_result(INT_CMD, r);
2264  }
2265  return cmd.status();
2266}
2267
2268static BOOLEAN setThreadPoolWorkers(leftv result, leftv arg) {
2269  Command cmd("setThreadPoolWorkers", result, arg);
2270  cmd.check_argc(2);
2271  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2272  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2273  cmd.check_init(0, "threadpool not initialized");
2274  if (cmd.ok()) {
2275    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2276    Scheduler *sched = pool->scheduler;
2277    // TODO: count/add threads
2278    cmd.no_result();
2279  }
2280  return cmd.status();
2281}
2282
2283static BOOLEAN getThreadPoolConcurrency(leftv result, leftv arg) {
2284  Command cmd("getThreadPoolConcurrency", result, arg);
2285  cmd.check_argc(1);
2286  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2287  cmd.check_init(0, "threadpool not initialized");
2288  if (cmd.ok()) {
2289    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2290    Scheduler *sched = pool->scheduler;
2291    sched->lock.lock();
2292    cmd.set_result(INT_CMD, sched->get_maxconcurrency());
2293    sched->lock.unlock();
2294  }
2295  return cmd.status();
2296}
2297
2298static BOOLEAN setThreadPoolConcurrency(leftv result, leftv arg) {
2299  Command cmd("setThreadPoolWorkers", result, arg);
2300  cmd.check_argc(2);
2301  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2302  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2303  cmd.check_init(0, "threadpool not initialized");
2304  if (cmd.ok()) {
2305    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2306    Scheduler *sched = pool->scheduler;
2307    sched->lock.lock();
2308    sched->set_maxconcurrency(cmd.int_arg(1));
2309    sched->lock.unlock();
2310    cmd.no_result();
2311  }
2312  return cmd.status();
2313}
2314
2315static BOOLEAN closeThreadPool(leftv result, leftv arg) {
2316  Command cmd("closeThreadPool", result, arg);
2317  cmd.check_argc(1, 2);
2318  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2319  cmd.check_init(0, "threadpool not initialized");
2320  if (cmd.nargs() > 1)
2321    cmd.check_arg(1, INT_CMD, "optional argument must be an integer");
2322  if (cmd.ok()) {
2323    ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2324    bool wait = cmd.nargs() == 2 ? (cmd.int_arg(1) != 0) : 1;
2325    pool->shutdown(wait);
2326    cmd.no_result();
2327  }
2328  return cmd.status();
2329}
2330
2331void closeThreadPool(ThreadPool *pool, bool wait) {
2332  pool->shutdown(wait);
2333}
2334
2335
2336BOOLEAN currentThreadPool(leftv result, leftv arg) {
2337  Command cmd("currentThreadPool", result, arg);
2338  cmd.check_argc(0);
2339  ThreadPool *pool = currentThreadPoolRef;
2340  if (pool) {
2341    cmd.set_result(type_threadpool, new_shared(pool));
2342  } else {
2343    cmd.report("no current threadpool");
2344  }
2345  return cmd.status();
2346}
2347
2348BOOLEAN setCurrentThreadPool(leftv result, leftv arg) {
2349  Command cmd("setCurrentThreadPool", result, arg);
2350  cmd.check_argc(1);
2351  cmd.check_init(0, "threadpool not initialized");
2352  if (cmd.ok()) {
2353    ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2354    acquireShared(pool);
2355    if (currentThreadPoolRef)
2356      releaseShared(currentThreadPoolRef);
2357    currentThreadPoolRef = pool;
2358  }
2359  return cmd.status();
2360}
2361
2362class EvalJob : public Job {
2363public:
2364  EvalJob() : Job() { }
2365  virtual void execute() {
2366    leftv val = LinTree::from_string(args[0]);
2367    result = (LinTree::to_string(val));
2368    val->CleanUp();
2369    omFreeBin(val, sleftv_bin);
2370  }
2371};
2372
2373class ExecJob : public Job {
2374public:
2375  ExecJob() : Job() { }
2376  virtual void execute() {
2377    leftv val = LinTree::from_string(args[0]);
2378    val->CleanUp();
2379    omFreeBin(val, sleftv_bin);
2380  }
2381};
2382
2383class ProcJob : public Job {
2384  string procname;
2385public:
2386  ProcJob(const char *procname_init) : Job(),
2387    procname(procname_init) {
2388    set_name(procname_init);
2389  }
2390  virtual void execute() {
2391    vector<leftv> argv;
2392    for (int i = 0; i <args.size(); i++) {
2393      appendArg(argv, args[i]);
2394    }
2395    for (int i = 0; i < deps.size(); i++) {
2396      appendArg(argv, deps[i]->result);
2397    }
2398    sleftv val;
2399    int error = executeProc(val, procname.c_str(), argv);
2400    if (!error) {
2401      result = (LinTree::to_string(&val));
2402      val.CleanUp();
2403    }
2404  }
2405};
2406
2407class KernelJob : public Job {
2408private:
2409  void (*cfunc)(leftv result, leftv arg);
2410public:
2411  KernelJob(void (*func)(leftv result, leftv arg)) : cfunc(func) { }
2412  virtual void execute() {
2413    vector<leftv> argv;
2414    for (int i = 0; i <args.size(); i++) {
2415      appendArg(argv, args[i]);
2416    }
2417    for (int i = 0; i < deps.size(); i++) {
2418      appendArg(argv, deps[i]->result);
2419    }
2420    sleftv val;
2421    memset(&val, 0, sizeof(val));
2422    if (argv.size() > 0) {
2423      leftv *tail = &argv[0]->next;
2424      for (int i = 1; i < argv.size(); i++) {
2425        *tail = argv[i];
2426        tail = &(*tail)->next;
2427      }
2428      *tail = NULL;
2429    }
2430    cfunc(&val, argv[0]);
2431    result = (LinTree::to_string(&val));
2432    val.CleanUp();
2433  }
2434};
2435
2436class RawKernelJob : public Job {
2437private:
2438  void (*cfunc)(long ndeps, Job **deps);
2439public:
2440  RawKernelJob(void (*func)(long ndeps, Job **deps)) : cfunc(func) { }
2441  virtual void execute() {
2442    long ndeps = deps.size();
2443    Job **jobs = (Job **) omAlloc0(sizeof(Job *) * ndeps);
2444    for (long i = 0; i < ndeps; i++)
2445      jobs[i] = deps[i];
2446    cfunc(ndeps, jobs);
2447    omFree(jobs);
2448  }
2449};
2450
2451static BOOLEAN createJob(leftv result, leftv arg) {
2452  Command cmd("createJob", result, arg);
2453  cmd.check_argc_min(1);
2454  cmd.check_arg(0, STRING_CMD, COMMAND,
2455    "job name must be a string or quote expression");
2456  if (cmd.ok()) {
2457    if (cmd.test_arg(0, STRING_CMD)) {
2458      ProcJob *job = new ProcJob((char *)(cmd.arg(0)));
2459      for (leftv a = arg->next; a != NULL; a = a->next) {
2460        job->args.push_back(LinTree::to_string(a));
2461      }
2462      cmd.set_result(type_job, new_shared(job));
2463    } else {
2464      cmd.check_argc(1);
2465      Job *job = new EvalJob();
2466      job->args.push_back(LinTree::to_string(arg));
2467      cmd.set_result(type_job, new_shared(job));
2468    }
2469  }
2470  return cmd.status();
2471}
2472
2473Job *createJob(void (*func)(leftv result, leftv arg)) {
2474  KernelJob *job = new KernelJob(func);
2475  return job;
2476}
2477
2478Job *createJob(void (*func)(long ndeps, Job **deps)) {
2479  RawKernelJob *job = new RawKernelJob(func);
2480  return job;
2481}
2482
2483Job *startJob(ThreadPool *pool, Job *job, leftv arg) {
2484  if (job->pool) return NULL;
2485  while (arg) {
2486    job->args.push_back(LinTree::to_string(arg));
2487    arg = arg->next;
2488  }
2489  pool->attachJob(job);
2490  return job;
2491}
2492
2493Job *startJob(ThreadPool *pool, Job *job) {
2494  return startJob(pool, job, NULL);
2495}
2496
2497Job *scheduleJob(ThreadPool *pool, Job *job, long ndeps, Job **deps) {
2498  if (job->pool) return NULL;
2499  pool->scheduler->lock.lock();
2500  bool cancelled = false;
2501  job->addDep(ndeps, deps);
2502  for (long i = 0; i < ndeps; i++) {
2503    deps[i]->addNotify(job);
2504    cancelled |= deps[i]->cancelled;
2505  }
2506  if (cancelled) {
2507    job->pool = pool;
2508    pool->cancelJob(job);
2509  }
2510  else
2511    pool->attachJob(job);
2512  pool->scheduler->lock.unlock();
2513}
2514
2515void cancelJob(Job *job) {
2516  ThreadPool *pool = job->pool;
2517  if (pool) pool->cancelJob(job);
2518}
2519
2520Job *getCurrentJob() {
2521  return currentJobRef;
2522}
2523
2524static BOOLEAN startJob(leftv result, leftv arg) {
2525  Command cmd("startJob", result, arg);
2526  cmd.check_argc_min(1);
2527  int has_pool = cmd.test_arg(0, type_threadpool);
2528  cmd.check_argc_min(1+has_pool);
2529  if (has_pool)
2530    cmd.check_init(0, "threadpool not initialized");
2531  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2532  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2533  int first_arg = has_pool + has_prio;
2534  cmd.check_arg(first_arg, type_job, STRING_CMD,
2535    "job argument must be a job or string");
2536  if (cmd.ok() && cmd.argtype(first_arg) == type_job)
2537    cmd.check_init(first_arg, "job not initialized");
2538  if (!cmd.ok()) return cmd.status();
2539  ThreadPool *pool;
2540  if (has_pool)
2541    pool = cmd.shared_arg<ThreadPool>(0);
2542  else {
2543    if (!currentThreadPoolRef)
2544      return cmd.abort("no current threadpool defined");
2545    pool = currentThreadPoolRef;
2546  }
2547  Job *job;
2548  if (cmd.argtype(first_arg) == type_job) 
2549    job = *(Job **)(cmd.arg(first_arg));
2550  else
2551    job = new ProcJob((char *)(cmd.arg(first_arg)));
2552  leftv a = arg->next;
2553  if (has_pool) a = a->next;
2554  if (has_prio) a = a->next;
2555  for (; a != NULL; a = a->next) {
2556    job->args.push_back(LinTree::to_string(a));
2557  }
2558  if (job->pool)
2559    return cmd.abort("job has already been scheduled");
2560  job->prio = prio;
2561  pool->attachJob(job);
2562  cmd.set_result(type_job, new_shared(job));
2563  return cmd.status();
2564}
2565
2566static BOOLEAN waitJob(leftv result, leftv arg) {
2567  Command cmd("waitJob", result, arg);
2568  cmd.check_argc(1);
2569  cmd.check_arg(0, type_job, "argument must be a job");
2570  cmd.check_init(0, "job not initialized");
2571  if (cmd.ok()) {
2572    Job *job = *(Job **)(cmd.arg(0));
2573    ThreadPool *pool = job->pool;
2574    if (!pool) {
2575      return cmd.abort("job has not yet been started or scheduled");
2576    }
2577    pool->waitJob(job);
2578    if (job->cancelled) {
2579      return cmd.abort("job has been cancelled");
2580    }
2581    if (job->result.size() == 0)
2582      cmd.no_result();
2583    else {
2584      leftv res = LinTree::from_string(job->result);
2585      cmd.set_result(res->Typ(), res->Data());
2586    }
2587  }
2588  return cmd.status();
2589}
2590
2591void waitJob(Job *job) {
2592  assert(job->pool != NULL);
2593  job->pool->waitJob(job);
2594}
2595
2596static BOOLEAN cancelJob(leftv result, leftv arg) {
2597  Command cmd("cancelJob", result, arg);
2598  cmd.check_argc(1);
2599  cmd.check_arg(0, type_job, "argument must be a job");
2600  cmd.check_init(0, "job not initialized");
2601  if (cmd.ok()) {
2602    Job *job = cmd.shared_arg<Job>(0);
2603    ThreadPool *pool = job->pool;
2604    if (!pool) {
2605      return cmd.abort("job has not yet been started or scheduled");
2606    }
2607    pool->cancelJob(job);
2608    cmd.no_result();
2609  }
2610  return cmd.status();
2611}
2612
2613static BOOLEAN jobCancelled(leftv result, leftv arg) {
2614  Job *job;
2615  Command cmd("jobCancelled", result, arg);
2616  cmd.check_argc(0, 1);
2617  if (cmd.nargs() == 1) {
2618    cmd.check_arg(0, type_job, "argument must be a job");
2619    cmd.check_init(0, "job not initialized");
2620    job = cmd.shared_arg<Job>(0);
2621  } else {
2622    job = currentJobRef;
2623    if (!job)
2624      cmd.report("no current job");
2625  }
2626  if (cmd.ok()) {
2627    ThreadPool *pool = job->pool;
2628    if (!pool) {
2629      return cmd.abort("job has not yet been started or scheduled");
2630    }
2631    pool->scheduler->lock.lock();
2632    cmd.set_result((long) job->cancelled);
2633    pool->scheduler->lock.unlock();
2634  }
2635  return cmd.status();
2636}
2637
2638bool getJobCancelled(Job *job) {
2639  ThreadPool *pool = job->pool;
2640  if (pool) pool->scheduler->lock.lock();
2641  bool result = job->cancelled;
2642  if (pool) pool->scheduler->lock.unlock();
2643  return result;
2644}
2645
2646bool getJobCancelled() {
2647  return getJobCancelled(currentJobRef);
2648}
2649
2650void setJobData(Job *job, void *data) {
2651  ThreadPool *pool = job->pool;
2652  if (pool) pool->scheduler->lock.lock();
2653  job->data = data;
2654  if (pool) pool->scheduler->lock.unlock();
2655}
2656
2657
2658void *getJobData(Job *job) {
2659  ThreadPool *pool = job->pool;
2660  if (pool) pool->scheduler->lock.lock();
2661  void *result = job->data;
2662  if (pool) pool->scheduler->lock.unlock();
2663  return result;
2664}
2665
2666void addJobArgs(Job *job, leftv arg) {
2667  ThreadPool *pool = job->pool;
2668  if (pool) pool->scheduler->lock.lock();
2669  while (arg) {
2670    job->args.push_back(LinTree::to_string(arg));
2671    arg = arg->next;
2672  }
2673  if (pool) pool->scheduler->lock.unlock();
2674}
2675
2676leftv getJobResult(Job *job) {
2677  ThreadPool *pool = job->pool;
2678  if (pool) pool->scheduler->lock.lock();
2679  leftv result = LinTree::from_string(job->result);
2680  if (pool) pool->scheduler->lock.unlock();
2681  return result;
2682}
2683
2684const char *getJobName(Job *job) {
2685  // TODO
2686  return "";
2687}
2688
2689void setJobName(Job *job, const char *name) {
2690  // TODO
2691}
2692
2693static BOOLEAN createTrigger(leftv result, leftv arg) {
2694  Command cmd("createTrigger", result, arg);
2695  cmd.check_argc_min(1);
2696  int has_pool = cmd.test_arg(0, type_threadpool);
2697  ThreadPool *pool;
2698  if (has_pool) {
2699    cmd.check_init(0, "threadpool not initialized");
2700    pool = cmd.shared_arg<ThreadPool>(0);
2701  } else {
2702    pool = currentThreadPoolRef;
2703    if (!pool)
2704      return cmd.abort("no default threadpool");
2705  }
2706  cmd.check_argc(has_pool + 2);
2707  cmd.check_arg(has_pool + 0, STRING_CMD, "trigger subtype must be a string");
2708  const char *kind = (const char *)(cmd.arg(has_pool + 0));
2709  if (0 == strcmp(kind, "proc")) {
2710    cmd.check_arg(has_pool + 1, STRING_CMD, "proc trigger argument must be a string");
2711  } else {
2712    cmd.check_arg(has_pool + 1, INT_CMD, "trigger argument must be an integer");
2713  }
2714  if (cmd.ok()) {
2715    Trigger *trigger;
2716    long n = (long) (cmd.arg(has_pool + 1));
2717    if (n < 0)
2718      return cmd.abort("trigger argument must be a non-negative integer");
2719    if (0 == strcmp(kind, "acc")) {
2720      trigger = new AccTrigger(n);
2721    } else if (0 == strcmp(kind, "count")) {
2722      trigger = new CountTrigger(n);
2723    } else if (0 == strcmp(kind, "set")) {
2724      trigger = new SetTrigger(n);
2725    } else if (0 == strcmp(kind, "proc")) {
2726      trigger = new ProcTrigger((const char *) cmd.arg(has_pool + 1));
2727    } else {
2728      return cmd.abort("unknown trigger subtype");
2729    }
2730    pool->attachJob(trigger);
2731    cmd.set_result(type_trigger, new_shared(trigger));
2732  }
2733  return cmd.status();
2734}
2735
2736static BOOLEAN updateTrigger(leftv result, leftv arg) {
2737  Command cmd("updateTrigger", result, arg);
2738  cmd.check_argc_min(1);
2739  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2740  cmd.check_init(0, "trigger not initialized");
2741  if (cmd.ok()) {
2742    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2743    trigger->pool->scheduler->lock.lock();
2744    if (!trigger->accept(arg->next))
2745      cmd.report("incompatible argument type(s) for this trigger");
2746    else {
2747      trigger->activate(arg->next);
2748      if (trigger->ready()) {
2749        trigger->run();
2750        Scheduler::notifyDeps(trigger->pool->scheduler, trigger);
2751      }
2752    }
2753    trigger->pool->scheduler->lock.unlock();
2754  }
2755  return cmd.status();
2756}
2757
2758static BOOLEAN chainTrigger(leftv result, leftv arg) {
2759  Command cmd("chainTrigger", result, arg);
2760  cmd.check_argc(2);
2761  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2762  cmd.check_arg(1, type_trigger, type_job,
2763    "second argument must be a trigger or job");
2764  cmd.check_init(0, "trigger not initialized");
2765  cmd.check_init(1, "trigger/job not initialized");
2766  if (cmd.ok()) {
2767    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2768    Job *job = cmd.shared_arg<Job>(1);
2769    if (trigger->pool != job->pool)
2770      return cmd.abort("arguments use different threadpools");
2771    ThreadPool *pool = trigger->pool;
2772    pool->scheduler->lock.lock();
2773    job->triggers.push_back(trigger);
2774    pool->scheduler->lock.unlock();
2775  }
2776  return cmd.status();
2777}
2778
2779static BOOLEAN testTrigger(leftv result, leftv arg) {
2780  Command cmd("testTrigger", result, arg);
2781  cmd.check_argc(1);
2782  cmd.check_arg(0, type_trigger, "argument must be a trigger");
2783  cmd.check_init(0, "trigger not initialized");
2784  if (cmd.ok()) {
2785    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2786    ThreadPool *pool = trigger->pool;
2787    pool->scheduler->lock.lock();
2788    cmd.set_result((long)trigger->ready());
2789    pool->scheduler->lock.unlock();
2790  }
2791  return cmd.status();
2792}
2793
2794
2795static BOOLEAN scheduleJob(leftv result, leftv arg) {
2796  vector<Job *> jobs;
2797  vector<Job *> deps;
2798  Command cmd("scheduleJob", result, arg);
2799  cmd.check_argc_min(1);
2800  int has_pool = cmd.test_arg(0, type_threadpool);
2801  if (has_pool) {
2802    cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2803    cmd.check_init(0, "threadpool not initialized");
2804  }
2805  cmd.check_argc_min(has_pool+1);
2806  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2807  ThreadPool *pool;
2808  if (has_pool)
2809    pool = cmd.shared_arg<ThreadPool>(0);
2810  else {
2811    if (!currentThreadPoolRef)
2812      return cmd.abort("no current threadpool defined");
2813    pool = currentThreadPoolRef;
2814  }
2815  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2816  int first_arg = has_pool + has_prio;
2817  if (cmd.test_arg(first_arg, type_job)) {
2818    jobs.push_back(*(Job **)(cmd.arg(first_arg)));
2819  } else if (cmd.test_arg(first_arg, STRING_CMD)) {
2820    jobs.push_back(new ProcJob((char *)(cmd.arg(first_arg))));
2821  } else if (cmd.test_arg(first_arg, LIST_CMD)) {
2822    lists l = (lists) (cmd.arg(first_arg));
2823    int n = lSize(l);
2824    for (int i = 0; i < n; i++) {
2825      if (l->m[i].Typ() != type_job)
2826        return cmd.abort("job argument must be a job, string, or list of jobs");
2827    }
2828    for (int i = 0; i < n; i++) {
2829      Job *job = *(Job **) (l->m[i].Data());
2830      if (!job)
2831        return cmd.abort("job not initialized");
2832      jobs.push_back(job);
2833    }
2834  } else {
2835    return cmd.abort("job argument must be a job, string, or list of jobs");
2836  }
2837  bool error = false;
2838  leftv a = arg->next;
2839  if (has_pool) a = a->next;
2840  if (has_prio) a = a->next;
2841  for (; !error && a; a = a->next) {
2842    if (a->Typ() == type_job || a->Typ() == type_trigger) {
2843      deps.push_back(*(Job **)(a->Data()));
2844    } else if (a->Typ() == LIST_CMD) {
2845      lists l = (lists) a->Data();
2846      int n = lSize(l);
2847      for (int i = 0; i < n; i++) {
2848        if (l->m[i].Typ() == type_job || l->m[i].Typ() == type_trigger) {
2849          deps.push_back(*(Job **)(l->m[i].Data()));
2850        } else {
2851          error = true;
2852          break;
2853        }
2854      }
2855    }
2856  }
2857  if (error) {
2858    return cmd.abort("illegal dependency");
2859  }
2860  for (int i = 0; i < jobs.size(); i++) {
2861    Job *job = jobs[i];
2862    if (job->pool) {
2863      return cmd.abort("job has already been scheduled");
2864    }
2865    job->prio = prio;
2866  }
2867  for (int i = 0; i < deps.size(); i++) {
2868    Job *job = deps[i];
2869    if (!job->pool) {
2870      return cmd.abort("dependency has not yet been scheduled");
2871    }
2872    if (job->pool != pool) {
2873      return cmd.abort("dependency has been scheduled on a different threadpool");
2874    }
2875  }
2876  pool->scheduler->lock.lock();
2877  bool cancelled = false;
2878  for (int i = 0; i < jobs.size(); i++) {
2879    jobs[i]->addDep(deps);
2880  }
2881  for (int i = 0; i < deps.size(); i++) {
2882    deps[i]->addNotify(jobs);
2883    cancelled |= deps[i]->cancelled;
2884  }
2885  for (int i = 0; i < jobs.size(); i++) {
2886    if (cancelled) {
2887      jobs[i]->pool = pool;
2888      pool->cancelJob(jobs[i]);
2889    }
2890    else
2891      pool->attachJob(jobs[i]);
2892  }
2893  pool->scheduler->lock.unlock();
2894  if (jobs.size() > 0)
2895    cmd.set_result(type_job, new_shared(jobs[0]));
2896  return cmd.status();
2897}
2898
2899BOOLEAN currentJob(leftv result, leftv arg) {
2900  Command cmd("currentJob", result, arg);
2901  cmd.check_argc(0);
2902  Job *job = currentJobRef;
2903  if (job) {
2904    cmd.set_result(type_job, new_shared(job));
2905  } else {
2906    cmd.report("no current job");
2907  }
2908  return cmd.status();
2909}
2910
2911
2912BOOLEAN threadID(leftv result, leftv arg) {
2913  int i;
2914  if (wrong_num_args("threadID", arg, 0))
2915    return TRUE;
2916  result->rtyp = INT_CMD;
2917  result->data = (char *)thread_id;
2918  return FALSE;
2919}
2920
2921BOOLEAN mainThread(leftv result, leftv arg) {
2922  int i;
2923  if (wrong_num_args("mainThread", arg, 0))
2924    return TRUE;
2925  result->rtyp = INT_CMD;
2926  result->data = (char *)(long)(thread_id == 0L);
2927  return FALSE;
2928}
2929
2930BOOLEAN threadEval(leftv result, leftv arg) {
2931  if (wrong_num_args("threadEval", arg, 2))
2932    return TRUE;
2933  if (arg->Typ() != type_thread) {
2934    WerrorS("threadEval: argument is not a thread");
2935    return TRUE;
2936  }
2937  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2938  string expr = LinTree::to_string(arg->next);
2939  ThreadState *ts = thread->getThreadState();
2940  if (ts && ts->parent != pthread_self()) {
2941    WerrorS("threadEval: can only be called from parent thread");
2942    return TRUE;
2943  }
2944  if (ts) ts->lock.lock();
2945  if (!ts || !ts->running || !ts->active) {
2946    WerrorS("threadEval: thread is no longer running");
2947    if (ts) ts->lock.unlock();
2948    return TRUE;
2949  }
2950  ts->to_thread.push("e");
2951  ts->to_thread.push(expr);
2952  ts->to_cond.signal();
2953  ts->lock.unlock();
2954  result->rtyp = NONE;
2955  return FALSE;
2956}
2957
2958BOOLEAN threadExec(leftv result, leftv arg) {
2959  if (wrong_num_args("threadExec", arg, 2))
2960    return TRUE;
2961  if (arg->Typ() != type_thread) {
2962    WerrorS("threadExec: argument is not a thread");
2963    return TRUE;
2964  }
2965  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2966  string expr = LinTree::to_string(arg->next);
2967  ThreadState *ts = thread->getThreadState();
2968  if (ts && ts->parent != pthread_self()) {
2969    WerrorS("threadExec: can only be called from parent thread");
2970    return TRUE;
2971  }
2972  if (ts) ts->lock.lock();
2973  if (!ts || !ts->running || !ts->active) {
2974    WerrorS("threadExec: thread is no longer running");
2975    if (ts) ts->lock.unlock();
2976    return TRUE;
2977  }
2978  ts->to_thread.push("x");
2979  ts->to_thread.push(expr);
2980  ts->to_cond.signal();
2981  ts->lock.unlock();
2982  result->rtyp = NONE;
2983  return FALSE;
2984}
2985
2986BOOLEAN threadPoolExec(leftv result, leftv arg) {
2987  Command cmd("threadPoolExec", result, arg);
2988  ThreadPool *pool;
2989  cmd.check_argc(1, 2);
2990  int has_pool = cmd.nargs() == 2;
2991  if (has_pool) {
2992    cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2993    cmd.check_init(0, "threadpool not initialized");
2994    pool = cmd.shared_arg<ThreadPool>(0);
2995  } else {
2996    pool = currentThreadPoolRef;
2997    if (!pool)
2998      return cmd.abort("no current threadpool");
2999  }
3000  if (cmd.ok()) {
3001    string expr = LinTree::to_string(has_pool ? arg->next : arg);
3002    Job* job = new ExecJob();
3003    job->args.push_back(expr);
3004    job->pool = pool;
3005    pool->broadcastJob(job);
3006  }
3007  return cmd.status();
3008}
3009
3010BOOLEAN threadResult(leftv result, leftv arg) {
3011  if (wrong_num_args("threadResult", arg, 1))
3012    return TRUE;
3013  if (arg->Typ() != type_thread) {
3014    WerrorS("threadResult: argument is not a thread");
3015    return TRUE;
3016  }
3017  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
3018  ThreadState *ts = thread->getThreadState();
3019  if (ts && ts->parent != pthread_self()) {
3020    WerrorS("threadResult: can only be called from parent thread");
3021    return TRUE;
3022  }
3023  if (ts) ts->lock.lock();
3024  if (!ts || !ts->running || !ts->active) {
3025    WerrorS("threadResult: thread is no longer running");
3026    if (ts) ts->lock.unlock();
3027    return TRUE;
3028  }
3029  while (ts->from_thread.empty()) {
3030    ts->from_cond.wait();
3031  }
3032  string expr = ts->from_thread.front();
3033  ts->from_thread.pop();
3034  ts->lock.unlock();
3035  leftv val = LinTree::from_string(expr);
3036  result->rtyp = val->Typ();
3037  result->data = val->Data();
3038  return FALSE;
3039}
3040
3041BOOLEAN setSharedName(leftv result, leftv arg) {
3042  Command cmd("setSharedName", result, arg);
3043  cmd.check_argc(2);
3044  int type = cmd.argtype(0);
3045  cmd.check_init(0, "first argument is not initialized");
3046  if (type != type_job && type != type_trigger && type != type_threadpool) {
3047    cmd.report("first argument must be a job, trigger, or threadpool");
3048  }
3049  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
3050  if (cmd.ok()) {
3051    SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3052    name_lock.lock();
3053    obj->set_name((char *) cmd.arg(1));
3054    name_lock.unlock();
3055  }
3056  return cmd.status();
3057}
3058
3059BOOLEAN getSharedName(leftv result, leftv arg) {
3060  Command cmd("getSharedName", result, arg);
3061  cmd.check_argc(1);
3062  int type = cmd.argtype(0);
3063  cmd.check_init(0, "first argument is not initialized");
3064  if (type != type_job && type != type_trigger && type != type_threadpool) {
3065    cmd.report("first argument must be a job, trigger, or threadpool");
3066  }
3067  if (cmd.ok()) {
3068    SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3069    name_lock.lock();
3070    cmd.set_result(obj->get_name().c_str());
3071    name_lock.unlock();
3072  }
3073  return cmd.status();
3074}
3075
3076}
3077
3078using namespace LibThread;
3079
3080
3081extern "C" int SI_MOD_INIT(systhreads)(SModulFunctions *fn)
3082{
3083  const char *libname = currPack->libname;
3084  if (!libname) libname = "";
3085  master_lock.lock();
3086  if (!thread_state)
3087    thread_state = new ThreadState[MAX_THREADS];
3088  makeSharedType(type_atomic_table, "atomic_table");
3089  makeSharedType(type_atomic_list, "atomic_list");
3090  makeSharedType(type_shared_table, "shared_table");
3091  makeSharedType(type_shared_list, "shared_list");
3092  makeSharedType(type_channel, "channel");
3093  makeSharedType(type_syncvar, "syncvar");
3094  makeSharedType(type_region, "region");
3095  makeSharedType(type_thread, "thread");
3096  makeSharedType(type_threadpool, "threadpool");
3097  makeSharedType(type_job, "job");
3098  makeSharedType(type_trigger, "trigger");
3099  makeRegionlockType(type_regionlock, "regionlock");
3100
3101  fn->iiAddCproc(libname, "putTable", FALSE, putTable);
3102  fn->iiAddCproc(libname, "getTable", FALSE, getTable);
3103  fn->iiAddCproc(libname, "inTable", FALSE, inTable);
3104  fn->iiAddCproc(libname, "putList", FALSE, putList);
3105  fn->iiAddCproc(libname, "getList", FALSE, getList);
3106  fn->iiAddCproc(libname, "lockRegion", FALSE, lockRegion);
3107  fn->iiAddCproc(libname, "regionLock", FALSE, regionLock);
3108  fn->iiAddCproc(libname, "unlockRegion", FALSE, unlockRegion);
3109  fn->iiAddCproc(libname, "sendChannel", FALSE, sendChannel);
3110  fn->iiAddCproc(libname, "receiveChannel", FALSE, receiveChannel);
3111  fn->iiAddCproc(libname, "statChannel", FALSE, statChannel);
3112  fn->iiAddCproc(libname, "writeSyncVar", FALSE, writeSyncVar);
3113  fn->iiAddCproc(libname, "updateSyncVar", FALSE, updateSyncVar);
3114  fn->iiAddCproc(libname, "readSyncVar", FALSE, readSyncVar);
3115  fn->iiAddCproc(libname, "statSyncVar", FALSE, statSyncVar);
3116
3117  fn->iiAddCproc(libname, "makeAtomicTable", FALSE, makeAtomicTable);
3118  fn->iiAddCproc(libname, "makeAtomicList", FALSE, makeAtomicList);
3119  fn->iiAddCproc(libname, "makeSharedTable", FALSE, makeSharedTable);
3120  fn->iiAddCproc(libname, "makeSharedList", FALSE, makeSharedList);
3121  fn->iiAddCproc(libname, "makeChannel", FALSE, makeChannel);
3122  fn->iiAddCproc(libname, "makeSyncVar", FALSE, makeSyncVar);
3123  fn->iiAddCproc(libname, "makeRegion", FALSE, makeRegion);
3124  fn->iiAddCproc(libname, "findSharedObject", FALSE, findSharedObject);
3125  fn->iiAddCproc(libname, "bindSharedObject", FALSE, bindSharedObject);
3126  fn->iiAddCproc(libname, "typeSharedObject", FALSE, typeSharedObject);
3127
3128  fn->iiAddCproc(libname, "createThread", FALSE, createThread);
3129  fn->iiAddCproc(libname, "joinThread", FALSE, joinThread);
3130  fn->iiAddCproc(libname, "createThreadPool", FALSE, createThreadPool);
3131  fn->iiAddCproc(libname, "createThreadPoolSet", FALSE, createThreadPoolSet);
3132#if 0
3133  fn->iiAddCproc(libname, "adjoinThreadPool", FALSE, adjoinThreadPool);
3134  fn->iiAddCproc(libname, "getAdjoinedThreadPools", FALSE, getAdjoinedThreadPools);
3135#endif
3136  fn->iiAddCproc(libname, "closeThreadPool", FALSE, closeThreadPool);
3137  fn->iiAddCproc(libname, "getThreadPoolWorkers", FALSE, getThreadPoolWorkers);
3138  fn->iiAddCproc(libname, "setThreadPoolWorkers", FALSE, setThreadPoolWorkers);
3139  fn->iiAddCproc(libname, "getThreadPoolConcurrency", FALSE, getThreadPoolConcurrency);
3140  fn->iiAddCproc(libname, "setThreadPoolConcurrency", FALSE, setThreadPoolConcurrency);
3141  fn->iiAddCproc(libname, "currentThreadPool", FALSE, currentThreadPool);
3142  fn->iiAddCproc(libname, "setCurrentThreadPool", FALSE, setCurrentThreadPool);
3143  fn->iiAddCproc(libname, "threadPoolExec", FALSE, threadPoolExec);
3144  fn->iiAddCproc(libname, "threadID", FALSE, threadID);
3145  fn->iiAddCproc(libname, "mainThread", FALSE, mainThread);
3146  fn->iiAddCproc(libname, "threadEval", FALSE, threadEval);
3147  fn->iiAddCproc(libname, "threadExec", FALSE, threadExec);
3148  fn->iiAddCproc(libname, "threadResult", FALSE, threadResult);
3149  fn->iiAddCproc(libname, "createJob", FALSE, createJob);
3150  fn->iiAddCproc(libname, "currentJob", FALSE, currentJob);
3151  fn->iiAddCproc(libname, "setSharedName", FALSE, setSharedName);
3152  fn->iiAddCproc(libname, "getSharedName", FALSE, getSharedName);
3153  fn->iiAddCproc(libname, "startJob", FALSE, startJob);
3154  fn->iiAddCproc(libname, "waitJob", FALSE, waitJob);
3155  fn->iiAddCproc(libname, "cancelJob", FALSE, cancelJob);
3156  fn->iiAddCproc(libname, "jobCancelled", FALSE, jobCancelled);
3157  fn->iiAddCproc(libname, "scheduleJob", FALSE, scheduleJob);
3158  fn->iiAddCproc(libname, "scheduleJobs", FALSE, scheduleJob);
3159  fn->iiAddCproc(libname, "createTrigger", FALSE, createTrigger);
3160  fn->iiAddCproc(libname, "updateTrigger", FALSE, updateTrigger);
3161  fn->iiAddCproc(libname, "testTrigger", FALSE, testTrigger);
3162  fn->iiAddCproc(libname, "chainTrigger", FALSE, chainTrigger);
3163
3164  LinTree::init();
3165  master_lock.unlock();
3166
3167  return MAX_TOK;
3168}
Note: See TracBrowser for help on using the repository browser.