source: git/Singular/dyn_modules/systhreads/shared.cc @ b2e2b0

spielwiese
Last change on this file since b2e2b0 was b2e2b0, checked in by Max Horn <max@…>, 4 years ago
Fix a bunch of warnings
  • Property mode set to 100644
File size: 82.3 KB
Line 
1#include "threadconf.h"
2#include <iostream>
3#include "kernel/mod2.h"
4#include "Singular/ipid.h"
5#include "Singular/ipshell.h"
6#include "Singular/links/silink.h"
7#include "Singular/lists.h"
8#include "Singular/blackbox.h"
9#include "Singular/feOpt.h"
10#include "Singular/libsingular.h"
11#include <cstring>
12#include <string>
13#include <errno.h>
14#include <stdio.h>
15#include <vector>
16#include <map>
17#include <iterator>
18#include <queue>
19#include <assert.h>
20#include "thread.h"
21#include "lintree.h"
22
23#include "singthreads.h"
24
25using namespace std;
26
27#ifdef ENABLE_THREADS
28extern char *global_argv0;
29#endif
30
31extern "C" void pSingular_initialize_thread();
32
33namespace LibThread {
34
35#ifdef ENABLE_THREADS
36const int have_threads = 1;
37#else
38const int have_threads = 0;
39#endif
40
41class Command {
42private:
43  const char *name;
44  const char *error;
45  leftv result;
46  leftv *args;
47  int argc;
48public:
49  Command(const char *n, leftv r, leftv a)
50  {
51    name = n;
52    result = r;
53    error = NULL;
54    argc = 0;
55    for (leftv t = a; t != NULL; t = t->next) {
56      argc++;
57    }
58    args = (leftv *) omAlloc0(sizeof(leftv) * argc);
59    int i = 0;
60    for (leftv t = a; t != NULL; t = t->next) {
61      args[i++] = t;
62    }
63    result->rtyp = NONE;
64    result->data = NULL;
65  }
66  ~Command() {
67    omFree(args);
68  }
69  void check_argc(int n) {
70    if (error) return;
71    if (argc != n) error = "wrong number of arguments";
72  }
73  void check_argc(int lo, int hi) {
74    if (error) return;
75    if (argc < lo || argc > hi) error = "wrong number of arguments";
76  }
77  void check_argc_min(int n) {
78    if (error) return;
79    if (argc < n) error = "wrong number of arguments";
80  }
81  void check_arg(int i, int type, const char *err) {
82    if (error) return;
83    if (args[i]->Typ() != type) error = err;
84  }
85  void check_init(int i, const char *err) {
86    if (error) return;
87    leftv arg = args[i];
88    if (arg->Data() == NULL || *(void **)(arg->Data()) == NULL)
89      error = err;
90  }
91  void check_arg(int i, int type, int type2, const char *err) {
92    if (error) return;
93    if (args[i]->Typ() != type && args[i]->Typ() != type2) error = err;
94  }
95  int argtype(int i) {
96    return args[i]->Typ();
97  }
98  int nargs() {
99    return argc;
100  }
101  void *arg(int i) {
102    return args[i]->Data();
103  }
104  template <typename T>
105  T *shared_arg(int i) {
106    return *(T **)(arg(i));
107  }
108  long int_arg(int i) {
109    return (long)(args[i]->Data());
110  }
111  void report(const char *err) {
112    error = err;
113  }
114  // intentionally not bool, so we can also do
115  // q = p + test_arg(p, type);
116  int test_arg(int i, int type) {
117    if (i >= argc) return 0;
118    return args[i]->Typ() == type;
119  }
120  void set_result(long n) {
121    result->rtyp = INT_CMD;
122    result->data = (char *)n;
123  }
124  void set_result(const char *s) {
125    result->rtyp = STRING_CMD;
126    result->data = omStrDup(s);
127  }
128  void set_result(int type, void *p) {
129    result->rtyp = type;
130    result->data = (char *) p;
131  }
132  void set_result(int type, long n) {
133    result->rtyp = type;
134    result->data = (char *) n;
135  }
136  void no_result() {
137    result->rtyp = NONE;
138  }
139  bool ok() {
140    return error == NULL;
141  }
142  BOOLEAN status() {
143    if (error) {
144      Werror("%s: %s", name, error);
145    }
146    return error != NULL;
147  }
148  BOOLEAN abort(const char *err) {
149    report(err);
150    return status();
151  }
152};
153
154class SharedObject {
155private:
156  Lock lock;
157  long refcount;
158  int type;
159  std::string name;
160public:
161  SharedObject(): lock(), refcount(0) { }
162  virtual ~SharedObject() { }
163  void set_type(int type_init) { type = type_init; }
164  int get_type() { return type; }
165  void set_name(std::string &name_init) { name = name_init; }
166  void set_name(const char *s) {
167    name = std::string(s);
168  }
169  std::string &get_name() { return name; }
170  void incref(int by = 1) {
171    lock.lock();
172    refcount += 1;
173    lock.unlock();
174  }
175  long decref() {
176    int result;
177    lock.lock();
178    result = --refcount;
179    lock.unlock();
180    return result;
181  }
182  long getref() {
183    return refcount;
184  }
185  virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2) {
186    return TRUE;
187  }
188  virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
189    return TRUE;
190  }
191};
192
193void acquireShared(SharedObject *obj) {
194  obj->incref();
195}
196
197void releaseShared(SharedObject *obj) {
198  if (obj->decref() == 0) {
199    // delete obj;
200  }
201}
202
203typedef std::map<std::string, SharedObject *> SharedObjectTable;
204
205class Region : public SharedObject {
206private:
207  Lock region_lock;
208public:
209  SharedObjectTable objects;
210  Region() : SharedObject(), region_lock(), objects() { }
211  virtual ~Region() { }
212  Lock *get_lock() { return &region_lock; }
213  void lock() {
214    if (!region_lock.is_locked())
215      region_lock.lock();
216  }
217  void unlock() {
218    if (region_lock.is_locked())
219      region_lock.unlock();
220  }
221  int is_locked() {
222    return region_lock.is_locked();
223  }
224};
225
226Lock global_objects_lock;
227SharedObjectTable global_objects;
228Lock master_lock(true);
229Lock name_lock(true);
230VAR long thread_id;
231long thread_counter;
232
233int type_region;
234int type_regionlock;
235int type_channel;
236int type_syncvar;
237int type_atomic_table;
238int type_shared_table;
239int type_atomic_list;
240int type_shared_list;
241int type_thread;
242int type_threadpool;
243int type_job;
244int type_trigger;
245
246typedef SharedObject *SharedObjectPtr;
247typedef SharedObjectPtr (*SharedConstructor)();
248
249SharedObject *makeSharedObject(SharedObjectTable &table,
250  Lock *lock, int type, string &name, SharedConstructor scons)
251{
252  int was_locked = lock->is_locked();
253  SharedObject *result = NULL;
254  if (!was_locked)
255    lock->lock();
256  if (table.count(name)) {
257    result = table[name];
258    if (result->get_type() != type)
259      result = NULL;
260  } else {
261    result = scons();
262    result->set_type(type);
263    result->set_name(name);
264    table.insert(pair<string,SharedObject *>(name, result));
265  }
266  if (!was_locked)
267    lock->unlock();
268  return result;
269}
270
271SharedObject *findSharedObject(SharedObjectTable &table,
272  Lock *lock, string &name)
273{
274  int was_locked = lock->is_locked();
275  SharedObject *result = NULL;
276  if (!was_locked)
277    lock->lock();
278  if (table.count(name)) {
279    result = table[name];
280  }
281  if (!was_locked)
282    lock->unlock();
283  return result;
284}
285
286class Transactional: public SharedObject {
287private:
288  Region *region;
289  Lock *lock;
290protected:
291  int tx_begin() {
292    if (!region)
293      lock->lock();
294    else {
295      if (!lock->is_locked()) {
296        return 0;
297      }
298    }
299    return 1;
300  }
301  void tx_end() {
302    if (!region)
303      lock->unlock();
304  }
305public:
306  Transactional() :
307      SharedObject(), region(NULL), lock(NULL) {
308  }
309  void set_region(Region *region_init) {
310    region = region_init;
311    if (region_init) {
312      lock = region_init->get_lock();
313    } else {
314      lock = new Lock();
315    }
316  }
317  virtual ~Transactional() { if (!region && lock) delete lock; }
318};
319
320class TxTable: public Transactional {
321private:
322  std::map<string, string> entries;
323public:
324  TxTable() : Transactional(), entries() { }
325  virtual ~TxTable() { }
326  int put(string &key, string &value) {
327    int result = 0;
328    if (!tx_begin()) return -1;
329    if (entries.count(key)) {
330      entries[key] = value;
331    } else {
332      entries.insert(pair<string, string>(key, value));
333      result = 1;
334    }
335    tx_end();
336    return result;
337  }
338  int get(string &key, string &value) {
339    int result = 0;
340    if (!tx_begin()) return -1;
341    if (entries.count(key)) {
342      value = entries[key];
343      result = 1;
344    }
345    tx_end();
346    return result;
347  }
348  int check(string &key) {
349    int result;
350    if (!tx_begin()) return -1;
351    result = entries.count(key);
352    tx_end();
353    return result;
354  }
355};
356
357class TxList: public Transactional {
358private:
359  vector<string> entries;
360public:
361  TxList() : Transactional(), entries() { }
362  virtual ~TxList() { }
363  int put(size_t index, string &value) {
364    int result = -1;
365    if (!tx_begin()) return -1;
366    if (index >= 1 && index <= entries.size()) {
367      entries[index-1] = value;
368      result = 1;
369    } else {
370      entries.resize(index+1);
371      entries[index-1] = value;
372      result = 0;
373    }
374    tx_end();
375    return result;
376  }
377  int get(size_t index, string &value) {
378    int result = 0;
379    if (!tx_begin()) return -1;
380    if (index >= 1 && index <= entries.size()) {
381      result = (entries[index-1].size() != 0);
382      if (result)
383        value = entries[index-1];
384    }
385    tx_end();
386    return result;
387  }
388  long size() {
389    long result;
390    if (!tx_begin()) return -1;
391    result = (long) entries.size();
392    tx_end();
393    return result;
394  }
395};
396
397class SingularChannel : public SharedObject {
398private:
399  queue<string> q;
400  Lock lock;
401  ConditionVariable cond;
402public:
403  SingularChannel(): SharedObject(), lock(), cond(&lock) { }
404  virtual ~SingularChannel() { }
405  void send(string item) {
406    lock.lock();
407    q.push(item);
408    cond.signal();
409    lock.unlock();
410  }
411  string receive() {
412    lock.lock();
413    while (q.empty()) {
414      cond.wait();
415    }
416    string result = q.front();
417    q.pop();
418    if (!q.empty())
419      cond.signal();
420    lock.unlock();
421    return result;
422  }
423  long count() {
424    lock.lock();
425    long result = q.size();
426    lock.unlock();
427    return result;
428  }
429};
430
431class SingularSyncVar : public SharedObject {
432private:
433  string value;
434  int init;
435  Lock lock;
436  ConditionVariable cond;
437public:
438  SingularSyncVar(): SharedObject(), init(0), lock(), cond(&lock) { }
439  virtual ~SingularSyncVar() { }
440  void acquire() {
441    lock.lock();
442  }
443  void release() {
444    lock.unlock();
445  }
446  void wait_init() {
447    while (!init)
448      cond.wait();
449  }
450  leftv get() {
451    if (value.size() == 0) return NULL;
452    return LinTree::from_string(value);
453  }
454  void update(leftv val) {
455    value = LinTree::to_string(val);
456    init = 1;
457    cond.broadcast();
458  }
459  int write(string item) {
460    int result = 0;
461    lock.lock();
462    if (!init) {
463      value = item;
464      init = 1;
465      cond.broadcast();
466      result = 1;
467    }
468    lock.unlock();
469    return result;
470  }
471  string read() {
472    lock.lock();
473    while (!init)
474      cond.wait();
475    string result = value;
476    lock.unlock();
477    return result;
478  }
479  int check() {
480    lock.lock();
481    int result = init;
482    lock.unlock();
483    return result;
484  }
485};
486
487void *shared_init(blackbox *b) {
488  return omAlloc0(sizeof(SharedObject *));
489}
490
491void *new_shared(SharedObject *obj) {
492  acquireShared(obj);
493  void *result = omAlloc0(sizeof(SharedObject *));
494  *(SharedObject **)result = obj;
495  return result;
496}
497
498void shared_destroy(blackbox *b, void *d) {
499  SharedObject *obj = *(SharedObject **)d;
500  if (obj) {
501    releaseShared(*(SharedObject **)d);
502    *(SharedObject **)d = NULL;
503  }
504}
505
506void rlock_destroy(blackbox *b, void *d) {
507  SharedObject *obj = *(SharedObject **)d;
508  ((Region *) obj)->unlock();
509  if (obj) {
510    releaseShared(*(SharedObject **)d);
511    *(SharedObject **)d = NULL;
512  }
513}
514
515void *shared_copy(blackbox *b, void *d) {
516  SharedObject *obj = *(SharedObject **)d;
517  void *result = shared_init(b);
518  *(SharedObject **)result = obj;
519  if (obj)
520    acquireShared(obj);
521  return result;
522}
523
524BOOLEAN shared_assign(leftv l, leftv r) {
525  if (r->Typ() == l->Typ()) {
526    if (l->rtyp == IDHDL) {
527      omFree(IDDATA((idhdl)l->data));
528      IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
529    } else {
530      leftv ll=l->LData();
531      if (ll==NULL)
532      {
533        return TRUE; // out of array bounds or similiar
534      }
535      if (ll->data) {
536        shared_destroy(NULL, ll->data);
537        omFree(ll->data);
538      }
539      ll->data = shared_copy(NULL,r->Data());
540    }
541  } else {
542    Werror("assign %s(%d) = %s(%d)",
543        Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
544    return TRUE;
545  }
546  return FALSE;
547}
548
549BOOLEAN rlock_assign(leftv l, leftv r) {
550  if (r->Typ() == l->Typ()) {
551    if (l->rtyp == IDHDL) {
552      omFree(IDDATA((idhdl)l->data));
553      IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
554    } else {
555      leftv ll=l->LData();
556      if (ll==NULL)
557      {
558        return TRUE; // out of array bounds or similiar
559      }
560      rlock_destroy(NULL, ll->data);
561      omFree(ll->data);
562      ll->data = shared_copy(NULL,r->Data());
563    }
564  } else {
565    Werror("assign %s(%d) = %s(%d)",
566        Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
567    return TRUE;
568  }
569  return FALSE;
570}
571
572
573BOOLEAN shared_check_assign(blackbox *b, leftv l, leftv r) {
574  int lt = l->Typ();
575  int rt = r->Typ();
576  if (lt != DEF_CMD && lt != rt) {
577    const char *rn=Tok2Cmdname(rt);
578    const char *ln=Tok2Cmdname(lt);
579    Werror("cannot assign %s (%d) to %s (%d)\n", rn, rt, ln, lt);
580    return TRUE;
581  }
582  return FALSE;
583}
584
585BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2) {
586  SharedObject *obj = *(SharedObject **)a1->Data();
587  return obj->op2(op, res, a1, a2);
588}
589
590BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
591  SharedObject *obj = *(SharedObject **)a1->Data();
592  return obj->op3(op, res, a1, a2, a3);
593}
594
595char *shared_string(blackbox *b, void *d) {
596  char buf[80];
597  SharedObject *obj = *(SharedObject **)d;
598  if (!obj)
599    return omStrDup("<uninitialized shared object>");
600  int type = obj->get_type();
601  string &name = obj->get_name();
602  const char *type_name = "unknown";
603  if (type == type_channel)
604    type_name = "channel";
605  else if (type == type_atomic_table)
606    type_name = "atomic_table";
607  else if (type == type_shared_table)
608    type_name = "shared_table";
609  else if (type == type_atomic_list)
610    type_name = "atomic_list";
611  else if (type == type_shared_list)
612    type_name = "shared_list";
613  else if (type == type_syncvar)
614    type_name = "syncvar";
615  else if (type == type_region)
616    type_name = "region";
617  else if (type == type_regionlock)
618    type_name = "regionlock";
619  else if (type == type_thread) {
620    sprintf(buf, "<thread #%s>", name.c_str());
621    return omStrDup(buf);
622  }
623  else if (type == type_threadpool) {
624    if (name.size() > 0) {
625      name_lock.lock();
626      sprintf(buf, "<threadpool \"%.40s\"@%p>", name.c_str(), obj);
627      name_lock.unlock();
628    } else
629      sprintf(buf, "<threadpool @%p>", obj);
630    return omStrDup(buf);
631  }
632  else if (type == type_job) {
633    if (name.size() > 0) {
634      name_lock.lock();
635      sprintf(buf, "<job \"%.40s\"@%p>", name.c_str(), obj);
636      name_lock.unlock();
637    } else
638      sprintf(buf, "<job @%p>", obj);
639    return omStrDup(buf);
640  }
641  else if (type == type_trigger) {
642    if (name.size() > 0) {
643      name_lock.lock();
644      sprintf(buf, "<trigger \"%.40s\"@%p>", name.c_str(), obj);
645      name_lock.unlock();
646    } else
647      sprintf(buf, "<trigger @%p>", obj);
648    return omStrDup(buf);
649  } else {
650    sprintf(buf, "<unknown type %d>", type);
651    return omStrDup(buf);
652  }
653  sprintf(buf, "<%s \"%.40s\">", type_name, name.c_str());
654  return omStrDup(buf);
655}
656
657char *rlock_string(blackbox *b, void *d) {
658  char buf[80];
659  SharedObject *obj = *(SharedObject **)d;
660  if (!obj)
661    return omStrDup("<uninitialized region lock>");
662  sprintf(buf, "<region lock \"%.40s\">", obj->get_name().c_str());
663  return omStrDup(buf);
664}
665
666void report(const char *fmt, const char *name) {
667  char buf[80];
668  sprintf(buf, fmt, name);
669  WerrorS(buf);
670}
671
672int wrong_num_args(const char *name, leftv arg, int n) {
673  for (int i=1; i<=n; i++) {
674    if (!arg) {
675      report("%s: too few arguments", name);
676      return TRUE;
677    }
678    arg = arg->next;
679  }
680  if (arg) {
681    report("%s: too many arguments", name);
682    return TRUE;
683  }
684  return FALSE;
685}
686
687int not_a_uri(const char *name, leftv arg) {
688  if (arg->Typ() != STRING_CMD) {
689    report("%s: not a valid URI", name);
690    return TRUE;
691  }
692  return FALSE;
693}
694
695int not_a_region(const char *name, leftv arg) {
696  if (arg->Typ() != type_region || !arg->Data()) {
697    report("%s: not a region", name);
698    return TRUE;
699  }
700  return FALSE;
701}
702
703
704char *str(leftv arg) {
705  return (char *)(arg->Data());
706}
707
708SharedObject *consTable() {
709  return new TxTable();
710}
711
712SharedObject *consList() {
713  return new TxList();
714}
715
716SharedObject *consChannel() {
717  return new SingularChannel();
718}
719
720SharedObject *consSyncVar() {
721  return new SingularSyncVar();
722}
723
724SharedObject *consRegion() {
725  return new Region();
726}
727
728static void appendArg(vector<leftv> &argv, string &s) {
729  if (s.size() == 0) return;
730  leftv val = LinTree::from_string(s);
731  if (val->Typ() == NONE) {
732    omFreeBin(val, sleftv_bin);
733    return;
734  }
735  argv.push_back(val);
736}
737
738static void appendArg(vector<leftv> &argv, leftv arg) {
739  argv.push_back(arg);
740}
741
742static void appendArgCopy(vector<leftv> &argv, leftv arg) {
743  leftv val = (leftv) omAlloc0Bin(sleftv_bin);
744  val->Copy(arg);
745  argv.push_back(val);
746}
747
748
749static BOOLEAN executeProc(sleftv &result,
750  const char *procname, const vector<leftv> &argv)
751{
752  leftv procnode = (leftv) omAlloc0Bin(sleftv_bin);
753  procnode->name = omStrDup(procname);
754  procnode->req_packhdl = basePack;
755  int error = procnode->Eval();
756  if (error) {
757    Werror("procedure \"%s\" not found", procname);
758    omFreeBin(procnode, sleftv_bin);
759    return TRUE;
760  }
761  memset(&result, 0, sizeof(result));
762  leftv *tail = &procnode->next;
763  for (int i = 0; i < argv.size(); i++) {
764    *tail = argv[i];
765    tail = &(*tail)->next;
766  }
767  *tail = NULL;
768  error = iiExprArithM(&result, procnode, '(');
769  procnode->CleanUp();
770  omFreeBin(procnode, sleftv_bin);
771  if (error) {
772    Werror("procedure call of \"%s\" failed", procname);
773    return TRUE;
774  }
775  return FALSE;
776}
777
778BOOLEAN makeAtomicTable(leftv result, leftv arg) {
779  if (wrong_num_args("makeAtomicTable", arg, 1))
780    return TRUE;
781  if (not_a_uri("makeAtomicTable", arg))
782    return TRUE;
783  string uri = str(arg);
784  SharedObject *obj = makeSharedObject(global_objects,
785    &global_objects_lock, type_atomic_table, uri, consTable);
786  ((TxTable *) obj)->set_region(NULL);
787  result->rtyp = type_atomic_table;
788  result->data = new_shared(obj);
789  return FALSE;
790}
791
792BOOLEAN makeAtomicList(leftv result, leftv arg) {
793  if (wrong_num_args("makeAtomicList", arg, 1))
794    return TRUE;
795  if (not_a_uri("makeAtomicList", arg))
796    return TRUE;
797  string uri = str(arg);
798  SharedObject *obj = makeSharedObject(global_objects,
799    &global_objects_lock, type_atomic_list, uri, consList);
800  ((TxList *) obj)->set_region(NULL);
801  result->rtyp = type_atomic_list;
802  result->data = new_shared(obj);
803  return FALSE;
804}
805
806BOOLEAN makeSharedTable(leftv result, leftv arg) {
807  if (wrong_num_args("makeSharedTable", arg, 2))
808    return TRUE;
809  if (not_a_region("makeSharedTable", arg))
810    return TRUE;
811  if (not_a_uri("makeSharedTable", arg->next))
812    return TRUE;
813  Region *region = *(Region **) arg->Data();
814  fflush(stdout);
815  string s = str(arg->next);
816  SharedObject *obj = makeSharedObject(region->objects,
817    region->get_lock(), type_shared_table, s, consTable);
818  ((TxTable *) obj)->set_region(region);
819  result->rtyp = type_shared_table;
820  result->data = new_shared(obj);
821  return FALSE;
822}
823
824BOOLEAN makeSharedList(leftv result, leftv arg) {
825  if (wrong_num_args("makeSharedList", arg, 2))
826    return TRUE;
827  if (not_a_region("makeSharedList", arg))
828    return TRUE;
829  if (not_a_uri("makeSharedList", arg->next))
830    return TRUE;
831  Region *region = *(Region **) arg->Data();
832  string s = str(arg->next);
833  SharedObject *obj = makeSharedObject(region->objects,
834    region->get_lock(), type_shared_list, s, consList);
835  ((TxList *) obj)->set_region(region);
836  result->rtyp = type_shared_list;
837  result->data = new_shared(obj);
838  return FALSE;
839}
840
841BOOLEAN makeChannel(leftv result, leftv arg) {
842  if (wrong_num_args("makeChannel", arg, 1))
843    return TRUE;
844  if (not_a_uri("makeChannel", arg))
845    return TRUE;
846  string uri = str(arg);
847  SharedObject *obj = makeSharedObject(global_objects,
848    &global_objects_lock, type_channel, uri, consChannel);
849  result->rtyp = type_channel;
850  result->data = new_shared(obj);
851  return FALSE;
852}
853
854BOOLEAN makeSyncVar(leftv result, leftv arg) {
855  if (wrong_num_args("makeSyncVar", arg, 1))
856    return TRUE;
857  if (not_a_uri("makeSyncVar", arg))
858    return TRUE;
859  string uri = str(arg);
860  SharedObject *obj = makeSharedObject(global_objects,
861    &global_objects_lock, type_syncvar, uri, consSyncVar);
862  result->rtyp = type_syncvar;
863  result->data = new_shared(obj);
864  return FALSE;
865}
866
867BOOLEAN makeRegion(leftv result, leftv arg) {
868  if (wrong_num_args("makeRegion", arg, 1))
869    return TRUE;
870  if (not_a_uri("makeRegion", arg))
871    return TRUE;
872  string uri = str(arg);
873  SharedObject *obj = makeSharedObject(global_objects,
874    &global_objects_lock, type_region, uri, consRegion);
875  result->rtyp = type_region;
876  result->data = new_shared(obj);
877  return FALSE;
878}
879
880BOOLEAN findSharedObject(leftv result, leftv arg) {
881  if (wrong_num_args("findSharedObject", arg, 1))
882    return TRUE;
883  if (not_a_uri("findSharedObject", arg))
884    return TRUE;
885  string uri = str(arg);
886  SharedObject *obj = findSharedObject(global_objects,
887    &global_objects_lock, uri);
888  result->rtyp = INT_CMD;
889  result->data = (char *)(long)(obj != NULL);
890  return FALSE;
891}
892
893BOOLEAN typeSharedObject(leftv result, leftv arg) {
894  if (wrong_num_args("findSharedObject", arg, 1))
895    return TRUE;
896  if (not_a_uri("findSharedObject", arg))
897    return TRUE;
898  string uri = str(arg);
899  SharedObject *obj = findSharedObject(global_objects,
900    &global_objects_lock, uri);
901  int type = obj ? obj->get_type() : -1;
902  const char *type_name = "undefined";
903  if (type == type_channel)
904    type_name = "channel";
905  else if (type == type_atomic_table)
906    type_name = "atomic_table";
907  else if (type == type_shared_table)
908    type_name = "shared_table";
909  else if (type == type_atomic_list)
910    type_name = "atomic_list";
911  else if (type == type_shared_list)
912    type_name = "shared_list";
913  else if (type == type_syncvar)
914    type_name = "syncvar";
915  else if (type == type_region)
916    type_name = "region";
917  else if (type == type_regionlock)
918    type_name = "regionlock";
919  result->rtyp = STRING_CMD;
920  result->data = (char *)(omStrDup(type_name));
921  return FALSE;
922}
923
924BOOLEAN bindSharedObject(leftv result, leftv arg) {
925  if (wrong_num_args("bindSharedObject", arg, 1))
926    return TRUE;
927  if (not_a_uri("bindSharedObject", arg))
928    return TRUE;
929  string uri = str(arg);
930  SharedObject *obj = findSharedObject(global_objects,
931    &global_objects_lock, uri);
932  if (!obj) {
933    WerrorS("bindSharedObject: cannot find object");
934    return TRUE;
935  }
936  result->rtyp = obj->get_type();
937  result->data = new_shared(obj);
938  return FALSE;
939}
940
941BOOLEAN getTable(leftv result, leftv arg) {
942  if (wrong_num_args("getTable", arg, 2))
943    return TRUE;
944  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
945    WerrorS("getTable: not a valid table");
946    return TRUE;
947  }
948  if (arg->next->Typ() != STRING_CMD) {
949    WerrorS("getTable: not a valid table key");
950    return TRUE;
951  }
952  TxTable *table = *(TxTable **) arg->Data();
953  if (!table) {
954    WerrorS("getTable: table has not been initialized");
955    return TRUE;
956  }
957  string key = (char *)(arg->next->Data());
958  string value;
959  int success = table->get(key, value);
960  if (success < 0) {
961    WerrorS("getTable: region not acquired");
962    return TRUE;
963  }
964  if (success == 0) {
965    WerrorS("getTable: key not found");
966    return TRUE;
967  }
968  leftv tmp = LinTree::from_string(value);
969  result->rtyp = tmp->Typ();
970  result->data = tmp->Data();
971  return FALSE;
972}
973
974BOOLEAN inTable(leftv result, leftv arg) {
975  if (wrong_num_args("inTable", arg, 2))
976    return TRUE;
977  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
978    WerrorS("inTable: not a valid table");
979    return TRUE;
980  }
981  if (arg->next->Typ() != STRING_CMD) {
982    WerrorS("inTable: not a valid table key");
983    return TRUE;
984  }
985  TxTable *table = *(TxTable **) arg->Data();
986  if (!table) {
987    WerrorS("inTable: table has not been initialized");
988    return TRUE;
989  }
990  string key = (char *)(arg->next->Data());
991  int success = table->check(key);
992  if (success < 0) {
993    WerrorS("inTable: region not acquired");
994    return TRUE;
995  }
996  result->rtyp = INT_CMD;
997  result->data = (char *)(long)(success);
998  return FALSE;
999}
1000
1001BOOLEAN putTable(leftv result, leftv arg) {
1002  if (wrong_num_args("putTable", arg, 3))
1003    return TRUE;
1004  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
1005    WerrorS("putTable: not a valid table");
1006    return TRUE;
1007  }
1008  if (arg->next->Typ() != STRING_CMD) {
1009    WerrorS("putTable: not a valid table key");
1010    return TRUE;
1011  }
1012  TxTable *table = *(TxTable **) arg->Data();
1013  if (!table) {
1014    WerrorS("putTable: table has not been initialized");
1015    return TRUE;
1016  }
1017  string key = (char *)(arg->next->Data());
1018  string value = LinTree::to_string(arg->next->next);
1019  int success = table->put(key, value);
1020  if (success < 0) {
1021    WerrorS("putTable: region not acquired");
1022    return TRUE;
1023  }
1024  result->rtyp = NONE;
1025  return FALSE;
1026}
1027
1028BOOLEAN getList(leftv result, leftv arg) {
1029  if (wrong_num_args("getList", arg, 2))
1030    return TRUE;
1031  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1032    WerrorS("getList: not a valid list (atomic or shared)");
1033    return TRUE;
1034  }
1035  if (arg->next->Typ() != INT_CMD) {
1036    WerrorS("getList: index must be an integer");
1037    return TRUE;
1038  }
1039  TxList *list = *(TxList **) arg->Data();
1040  if (!list) {
1041    WerrorS("getList: list has not been initialized");
1042    return TRUE;
1043  }
1044  long index = (long)(arg->next->Data());
1045  string value;
1046  int success = list->get(index, value);
1047  if (success < 0) {
1048    WerrorS("getList: region not acquired");
1049    return TRUE;
1050  }
1051  if (success == 0) {
1052    WerrorS("getList: no value at position");
1053    return TRUE;
1054  }
1055  leftv tmp = LinTree::from_string(value);
1056  result->rtyp = tmp->Typ();
1057  result->data = tmp->Data();
1058  return FALSE;
1059}
1060
1061BOOLEAN putList(leftv result, leftv arg) {
1062  if (wrong_num_args("putList", arg, 3))
1063    return TRUE;
1064  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1065    WerrorS("putList: not a valid list (shared or atomic)");
1066    return TRUE;
1067  }
1068  if (arg->next->Typ() != INT_CMD) {
1069    WerrorS("putList: index must be an integer");
1070    return TRUE;
1071  }
1072  TxList *list = *(TxList **) arg->Data();
1073  if (!list) {
1074    WerrorS("putList: list has not been initialized");
1075    return TRUE;
1076  }
1077  long index = (long)(arg->next->Data());
1078  string value = LinTree::to_string(arg->next->next);
1079  int success = list->put(index, value);
1080  if (success < 0) {
1081    WerrorS("putList: region not acquired");
1082    return TRUE;
1083  }
1084  result->rtyp = NONE;
1085  return FALSE;
1086}
1087
1088BOOLEAN lockRegion(leftv result, leftv arg) {
1089  if (wrong_num_args("lockRegion", arg, 1))
1090    return TRUE;
1091  if (not_a_region("lockRegion", arg))
1092    return TRUE;
1093  Region *region = *(Region **)arg->Data();
1094  if (region->is_locked()) {
1095    WerrorS("lockRegion: region is already locked");
1096    return TRUE;
1097  }
1098  region->lock();
1099  result->rtyp = NONE;
1100  return FALSE;
1101}
1102
1103BOOLEAN regionLock(leftv result, leftv arg) {
1104  if (wrong_num_args("lockRegion", arg, 1))
1105    return TRUE;
1106  if (not_a_region("lockRegion", arg))
1107    return TRUE;
1108  Region *region = *(Region **)arg->Data();
1109  if (region->is_locked()) {
1110    WerrorS("lockRegion: region is already locked");
1111    return TRUE;
1112  }
1113  region->lock();
1114  result->rtyp = type_regionlock;
1115  result->data = new_shared(region);
1116  return FALSE;
1117}
1118
1119
1120BOOLEAN unlockRegion(leftv result, leftv arg) {
1121  if (wrong_num_args("unlockRegion", arg, 1))
1122    return TRUE;
1123  if (not_a_region("unlockRegion", arg))
1124    return TRUE;
1125  Region *region = *(Region **)arg->Data();
1126  if (!region->is_locked()) {
1127    WerrorS("unlockRegion: region is not locked");
1128    return TRUE;
1129  }
1130  region->unlock();
1131  result->rtyp = NONE;
1132  return FALSE;
1133}
1134
1135BOOLEAN sendChannel(leftv result, leftv arg) {
1136  if (wrong_num_args("sendChannel", arg, 2))
1137    return TRUE;
1138  if (arg->Typ() != type_channel) {
1139    WerrorS("sendChannel: argument is not a channel");
1140    return TRUE;
1141  }
1142  SingularChannel *channel = *(SingularChannel **)arg->Data();
1143  if (!channel) {
1144    WerrorS("sendChannel: channel has not been initialized");
1145    return TRUE;
1146  }
1147  channel->send(LinTree::to_string(arg->next));
1148  result->rtyp = NONE;
1149  return FALSE;
1150}
1151
1152BOOLEAN receiveChannel(leftv result, leftv arg) {
1153  if (wrong_num_args("receiveChannel", arg, 1))
1154    return TRUE;
1155  if (arg->Typ() != type_channel) {
1156    WerrorS("receiveChannel: argument is not a channel");
1157    return TRUE;
1158  }
1159  SingularChannel *channel = *(SingularChannel **)arg->Data();
1160  if (!channel) {
1161    WerrorS("receiveChannel: channel has not been initialized");
1162    return TRUE;
1163  }
1164  string item = channel->receive();
1165  leftv val = LinTree::from_string(item);
1166  result->rtyp = val->Typ();
1167  result->data = val->Data();
1168  return FALSE;
1169}
1170
1171BOOLEAN statChannel(leftv result, leftv arg) {
1172  if (wrong_num_args("statChannel", arg, 1))
1173    return TRUE;
1174  if (arg->Typ() != type_channel) {
1175    WerrorS("statChannel: argument is not a channel");
1176    return TRUE;
1177  }
1178  SingularChannel *channel = *(SingularChannel **)arg->Data();
1179  if (!channel) {
1180    WerrorS("receiveChannel: channel has not been initialized");
1181    return TRUE;
1182  }
1183  long n = channel->count();
1184  result->rtyp = INT_CMD;
1185  result->data = (char *)n;
1186  return FALSE;
1187}
1188
1189BOOLEAN writeSyncVar(leftv result, leftv arg) {
1190  if (wrong_num_args("writeSyncVar", arg, 2))
1191    return TRUE;
1192  if (arg->Typ() != type_syncvar) {
1193    WerrorS("writeSyncVar: argument is not a syncvar");
1194    return TRUE;
1195  }
1196  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1197  if (!syncvar) {
1198    WerrorS("writeSyncVar: syncvar has not been initialized");
1199    return TRUE;
1200  }
1201  if (!syncvar->write(LinTree::to_string(arg->next))) {
1202    WerrorS("writeSyncVar: variable already has a value");
1203    return TRUE;
1204  }
1205  result->rtyp = NONE;
1206  return FALSE;
1207}
1208
1209BOOLEAN updateSyncVar(leftv result, leftv arg) {
1210  Command cmd("updateSyncVar", result, arg);
1211  cmd.check_argc_min(2);
1212  cmd.check_arg(0, type_syncvar, "first argument must be a syncvar");
1213  cmd.check_init(0, "syncvar has not been initialized");
1214  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
1215  if (cmd.ok()) {
1216    SingularSyncVar *syncvar = cmd.shared_arg<SingularSyncVar>(0);
1217    char *procname = (char *) cmd.arg(1);
1218    arg = arg->next->next;
1219    syncvar->acquire();
1220    syncvar->wait_init();
1221    vector<leftv> argv;
1222    appendArg(argv, syncvar->get());
1223    while (arg) {
1224      appendArgCopy(argv, arg);
1225      arg = arg->next;
1226    }
1227    int error = executeProc(*result, procname, argv);
1228    if (!error) {
1229      syncvar->update(result);
1230    }
1231    syncvar->release();
1232    return error;
1233  }
1234  return cmd.status();
1235}
1236
1237
1238BOOLEAN readSyncVar(leftv result, leftv arg) {
1239  if (wrong_num_args("readSyncVar", arg, 1))
1240    return TRUE;
1241  if (arg->Typ() != type_syncvar) {
1242    WerrorS("readSyncVar: argument is not a syncvar");
1243    return TRUE;
1244  }
1245  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1246  if (!syncvar) {
1247    WerrorS("readSyncVar: syncvar has not been initialized");
1248    return TRUE;
1249  }
1250  string item = syncvar->read();
1251  leftv val = LinTree::from_string(item);
1252  result->rtyp = val->Typ();
1253  result->data = val->Data();
1254  return FALSE;
1255}
1256
1257BOOLEAN statSyncVar(leftv result, leftv arg) {
1258  if (wrong_num_args("statSyncVar", arg, 1))
1259    return TRUE;
1260  if (arg->Typ() != type_syncvar) {
1261    WerrorS("statSyncVar: argument is not a syncvar");
1262    return TRUE;
1263  }
1264  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1265  if (!syncvar) {
1266    WerrorS("statSyncVar: syncvar has not been initialized");
1267    return TRUE;
1268  }
1269  int init = syncvar->check();
1270  result->rtyp = INT_CMD;
1271  result->data = (char *)(long) init;
1272  return FALSE;
1273}
1274
1275void encode_shared(LinTree::LinTree &lintree, leftv val) {
1276  SharedObject *obj = *(SharedObject **)(val->Data());
1277  acquireShared(obj);
1278  lintree.put(obj);
1279}
1280
1281leftv decode_shared(LinTree::LinTree &lintree) {
1282  int type = lintree.get_prev<int>();
1283  SharedObject *obj = lintree.get<SharedObject *>();
1284  leftv result = (leftv) omAlloc0Bin(sleftv_bin);
1285  result->rtyp = type;
1286  result->data = (void *)new_shared(obj);
1287  return result;
1288}
1289
1290void ref_shared(LinTree::LinTree &lintree, int by) {
1291  SharedObject *obj = lintree.get<SharedObject *>();
1292  while (by > 0) {
1293    obj->incref();
1294    by--;
1295  }
1296  while (by < 0) {
1297    obj->decref();
1298    by++;
1299  }
1300}
1301
1302void installShared(int type) {
1303  LinTree::install(type, encode_shared, decode_shared, ref_shared);
1304}
1305
1306void makeSharedType(int &type, const char *name) {
1307  if (type != 0) return;
1308  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1309  b->blackbox_Init = shared_init;
1310  b->blackbox_destroy = shared_destroy;
1311  b->blackbox_Copy = shared_copy;
1312  b->blackbox_String = shared_string;
1313  b->blackbox_Assign = shared_assign;
1314  b->blackbox_CheckAssign = shared_check_assign;
1315  // b->blackbox_Op2 = shared_op2;
1316  // b->blackbox_Op3 = shared_op3;
1317  type = setBlackboxStuff(b, name);
1318  installShared(type);
1319}
1320
1321void makeRegionlockType(int &type, const char *name) {
1322  if (type != 0) return;
1323  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1324  b->blackbox_Init = shared_init;
1325  b->blackbox_destroy = rlock_destroy;
1326  b->blackbox_Copy = shared_copy;
1327  b->blackbox_String = shared_string;
1328  b->blackbox_Assign = rlock_assign;
1329  b->blackbox_CheckAssign = shared_check_assign;
1330  type = setBlackboxStuff(b, name);
1331  installShared(type);
1332}
1333
1334#define MAX_THREADS 128
1335
1336class ThreadState {
1337public:
1338  bool active;
1339  bool running;
1340  int index;
1341  void *(*thread_func)(ThreadState *, void *);
1342  void *arg, *result;
1343  pthread_t id;
1344  pthread_t parent;
1345  Lock lock;
1346  ConditionVariable to_cond;
1347  ConditionVariable from_cond;
1348  queue<string> to_thread;
1349  queue<string> from_thread;
1350  ThreadState() : lock(), to_cond(&lock), from_cond(&lock),
1351                  to_thread(), from_thread() {
1352    active = false;
1353    running = false;
1354    index = -1;
1355  }
1356  ~ThreadState() {
1357    // We do nothing here. This is to prevent the condition
1358    // variable destructor from firing upon program exit,
1359    // which would invoke undefined behavior if the thread
1360    // is still running.
1361  }
1362};
1363
1364Lock thread_lock;
1365
1366ThreadState *thread_state;
1367
1368void setOption(int ch) {
1369  int index = feGetOptIndex(ch);
1370  feSetOptValue((feOptIndex) index, (int) 1);
1371}
1372
1373void thread_init() {
1374  master_lock.lock();
1375  thread_id = ++thread_counter;
1376  master_lock.unlock();
1377#ifdef ENABLE_THREADS
1378  pSingular_initialize_thread();
1379  siInit(global_argv0);
1380#endif
1381  setOption('q');
1382  // setOption('b');
1383}
1384
1385void *thread_main(void *arg) {
1386  ThreadState *ts = (ThreadState *)arg;
1387  thread_init();
1388  return ts->thread_func(ts, ts->arg);
1389}
1390
1391void *interpreter_thread(ThreadState *ts, void *arg) {
1392  ts->lock.lock();
1393  for (;;) {
1394    bool eval = false;
1395    while (ts->to_thread.empty())
1396      ts->to_cond.wait();
1397    /* TODO */
1398    string expr = ts->to_thread.front();
1399    switch (expr[0]) {
1400      case '\0': case 'q':
1401        ts->lock.unlock();
1402        return NULL;
1403      case 'x':
1404        eval = false;
1405        break;
1406      case 'e':
1407        eval = true;
1408        break;
1409    }
1410    ts->to_thread.pop();
1411    expr = ts->to_thread.front();
1412    /* this will implicitly eval commands */
1413    leftv val = LinTree::from_string(expr);
1414    expr = LinTree::to_string(val);
1415    ts->to_thread.pop();
1416    if (eval)
1417      ts->from_thread.push(expr);
1418    ts->from_cond.signal();
1419  }
1420  ts->lock.unlock();
1421  return NULL;
1422}
1423
1424class InterpreterThread : public SharedObject {
1425private:
1426  ThreadState *ts;
1427public:
1428  InterpreterThread(ThreadState *ts_init) : SharedObject(), ts(ts_init) { }
1429  virtual ~InterpreterThread() { }
1430  ThreadState *getThreadState() { return ts; }
1431  void clearThreadState() {
1432    ts = NULL;
1433  }
1434};
1435
1436static ThreadState *newThread(void *(*thread_func)(ThreadState *, void *),
1437    void *arg, const char **error) {
1438  ThreadState *ts = NULL;
1439  if (error) *error = NULL;
1440  thread_lock.lock();
1441  for (int i=0; i<MAX_THREADS; i++) {
1442    if (!thread_state[i].active) {
1443      ts = thread_state + i;
1444      ts->index = i;
1445      ts->parent = pthread_self();
1446      ts->active = true;
1447      ts->running = true;
1448      ts->to_thread = queue<string>();
1449      ts->from_thread = queue<string>();
1450      ts->thread_func = thread_func;
1451      ts->arg = arg;
1452      ts->result = NULL;
1453      if (pthread_create(&ts->id, NULL, thread_main, ts)<0) {
1454        if (error)
1455          *error = "createThread: internal error: failed to create thread";
1456        goto fail;
1457      }
1458      goto exit;
1459    }
1460  }
1461  if (error) *error = "createThread: too many threads";
1462  fail:
1463  ts = NULL;
1464  exit:
1465  thread_lock.unlock();
1466  return ts;
1467}
1468
1469ThreadState *createThread(void *(*thread_func)(ThreadState *, void *),
1470    void *arg) {
1471  return newThread(thread_func, arg, NULL);
1472}
1473
1474void *joinThread(ThreadState *ts) {
1475  void *result;
1476  pthread_join(ts->id, NULL);
1477  result = ts->result;
1478  thread_lock.lock();
1479  ts->running = false;
1480  ts->active = false;
1481  thread_lock.unlock();
1482  return result;
1483}
1484
1485static InterpreterThread *createInterpreterThread(const char **error) {
1486  ThreadState *ts = newThread(interpreter_thread, NULL, error);
1487  if (*error) return NULL;
1488  InterpreterThread *thread = new InterpreterThread(ts);
1489  char buf[10];
1490  sprintf(buf, "%d", ts->index);
1491  string name(buf);
1492  thread->set_name(name);
1493  thread->set_type(type_thread);
1494  return thread;
1495}
1496
1497static BOOLEAN createThread(leftv result, leftv arg) {
1498  Command cmd("createThread", result, arg);
1499  cmd.check_argc(0);
1500  const char *error;
1501  if (!have_threads)
1502    cmd.report("thread support not available");
1503  if (!cmd.ok()) return cmd.status();
1504  InterpreterThread *thread = createInterpreterThread(&error);
1505  if (error) {
1506    return cmd.abort(error);
1507  }
1508  cmd.set_result(type_thread, new_shared(thread));
1509  return cmd.status();
1510}
1511
1512static bool joinInterpreterThread(InterpreterThread *thread) {
1513  ThreadState *ts = thread->getThreadState();
1514  if (ts && ts->parent != pthread_self()) {
1515    return false;
1516  }
1517  ts->lock.lock();
1518  string quit("q");
1519  ts->to_thread.push(quit);
1520  ts->to_cond.signal();
1521  ts->lock.unlock();
1522  pthread_join(ts->id, NULL);
1523  thread_lock.lock();
1524  ts->running = false;
1525  ts->active = false;
1526  thread->clearThreadState();
1527  thread_lock.unlock();
1528  return true;
1529}
1530
1531static BOOLEAN joinThread(leftv result, leftv arg) {
1532  if (wrong_num_args("joinThread", arg, 1))
1533    return TRUE;
1534  if (arg->Typ() != type_thread) {
1535    WerrorS("joinThread: argument is not a thread");
1536    return TRUE;
1537  }
1538  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
1539  if (!joinInterpreterThread(thread)) {
1540    WerrorS("joinThread: can only be called from parent thread");
1541    return TRUE;
1542  }
1543  return FALSE;
1544}
1545
1546class ThreadPool;
1547class Trigger;
1548
1549class Job : public SharedObject {
1550public:
1551  ThreadPool *pool;
1552  long prio;
1553  size_t id;
1554  long pending_index;
1555  vector<Job *> deps;
1556  vector<Job *> notify;
1557  vector<Trigger *> triggers;
1558  vector<string> args;
1559  string result; // lintree-encoded
1560  void *data;
1561  bool fast;
1562  bool done;
1563  bool queued;
1564  bool running;
1565  bool cancelled;
1566  Job() : SharedObject(), pool(NULL), deps(), pending_index(-1), fast(false),
1567    done(false), running(false), queued(false), cancelled(false), data(NULL),
1568    result(), args(), notify(), triggers(), prio(0)
1569  { set_type(type_job); }
1570  ~Job();
1571  void addDep(Job *job) {
1572    deps.push_back(job);
1573  }
1574  void addDep(vector<Job *> &jobs);
1575  void addDep(long ndeps, Job **jobs);
1576  void addNotify(vector<Job *> &jobs);
1577  void addNotify(Job *job);
1578  virtual bool ready();
1579  virtual void execute() = 0;
1580  void run();
1581};
1582
1583struct JobCompare {
1584  bool operator()(const Job* lhs, const Job* rhs) {
1585    if (lhs->fast < rhs->fast) {
1586      return true;
1587    }
1588    if (lhs->prio < rhs->prio) {
1589      return true;
1590    }
1591    if (lhs->prio == rhs->prio) {
1592      return lhs->id > rhs->id;
1593    }
1594    return false;
1595  }
1596};
1597
1598class Trigger : public Job {
1599public:
1600  virtual bool accept(leftv arg) = 0;
1601  virtual void activate(leftv arg) = 0;
1602  Trigger() : Job() { set_type(type_trigger); fast = true; }
1603};
1604
1605bool Job::ready() {
1606  vector<Job *>::iterator it;
1607  for (it = deps.begin(); it != deps.end(); it++) {
1608    if (!(*it)->done) return false;
1609  }
1610  return true;
1611}
1612
1613Job::~Job() {
1614  vector<Job *>::iterator it;
1615  for (it = deps.begin(); it != deps.end(); it++) {
1616    releaseShared(*it);
1617  }
1618}
1619
1620typedef queue<Job *> JobQueue;
1621
1622class Scheduler;
1623
1624struct SchedInfo {
1625  Scheduler *scheduler;
1626  Job *job;
1627  int num;
1628};
1629
1630STATIC_VAR ThreadPool *currentThreadPoolRef;
1631STATIC_VAR Job *currentJobRef;
1632
1633class ThreadPool : public SharedObject {
1634public:
1635  Scheduler *scheduler;
1636  int nthreads;
1637  ThreadPool(Scheduler *sched, int n);
1638  ThreadPool(int n);
1639  ~ThreadPool();
1640  ThreadState *getThread(int i);
1641  void shutdown(bool wait);
1642  void addThread(ThreadState *thread);
1643  void attachJob(Job *job);
1644  void detachJob(Job *job);
1645  void queueJob(Job *job);
1646  void broadcastJob(Job *job);
1647  void cancelDeps(Job * job);
1648  void cancelJob(Job *job);
1649  void waitJob(Job *job);
1650  void clearThreadState();
1651};
1652
1653
1654class Scheduler : public SharedObject {
1655private:
1656  bool single_threaded;
1657  size_t jobid;
1658  int nthreads;
1659  int maxconcurrency;
1660  int running;
1661  bool shutting_down;
1662  int shutdown_counter;
1663  vector<ThreadState *> threads;
1664  vector<ThreadPool *> thread_owners;
1665  priority_queue<Job *, vector<Job *>, JobCompare> global_queue;
1666  vector<JobQueue *> thread_queues;
1667  vector<Job *> pending;
1668  ConditionVariable cond;
1669  ConditionVariable response;
1670  friend class Job;
1671public:
1672  Lock lock;
1673  Scheduler(int n) :
1674    SharedObject(), threads(), thread_owners(), global_queue(), thread_queues(),
1675    single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1676    lock(true), cond(&lock), response(&lock),
1677    shutting_down(false), shutdown_counter(0), jobid(0),
1678    maxconcurrency(n), running(0)
1679  {
1680    thread_queues.push_back(new JobQueue());
1681  }
1682  void set_maxconcurrency(int n) {
1683    maxconcurrency = n;
1684  }
1685  int get_maxconcurrency() {
1686    return maxconcurrency;
1687  }
1688  int threadpool_size(ThreadPool *pool) {
1689    int n;
1690    for (int i = 0; i <thread_owners.size(); i++) {
1691      if (thread_owners[i] == pool)
1692        n++;
1693    }
1694    return n;
1695  }
1696  virtual ~Scheduler() {
1697    for (int i = 0; i < thread_queues.size(); i++) {
1698      JobQueue *q = thread_queues[i];
1699      while (!q->empty()) {
1700        Job *job = q->front();
1701        q->pop();
1702        releaseShared(job);
1703      }
1704    }
1705    thread_queues.clear();
1706    threads.clear();
1707  }
1708  ThreadState *getThread(int i) { return threads[i]; }
1709  void shutdown(bool wait) {
1710    if (single_threaded) {
1711      SchedInfo *info = new SchedInfo();
1712      info->num = 0;
1713      info->scheduler = this;
1714      acquireShared(this);
1715      info->job = NULL;
1716      Scheduler::main(NULL, info);
1717      return;
1718    }
1719    lock.lock();
1720    if (wait) {
1721      while (!global_queue.empty()) {
1722        response.wait();
1723      }
1724    }
1725    shutting_down = true;
1726    while (shutdown_counter < nthreads) {
1727      cond.broadcast();
1728      response.wait();
1729    }
1730    lock.unlock();
1731    for (int i = 0; i <threads.size(); i++) {
1732      joinThread(threads[i]);
1733    }
1734  }
1735  void addThread(ThreadPool *owner, ThreadState *thread) {
1736    lock.lock();
1737    thread_owners.push_back(owner);
1738    threads.push_back(thread);
1739    thread_queues.push_back(new JobQueue());
1740    lock.unlock();
1741  }
1742  void attachJob(ThreadPool *pool, Job *job) {
1743    lock.lock();
1744    job->pool = pool;
1745    job->id = jobid++;
1746    acquireShared(job);
1747    if (job->ready()) {
1748      global_queue.push(job);
1749      cond.signal();
1750    }
1751    else if (job->pending_index < 0) {
1752      job->pool = pool;
1753      job->pending_index = pending.size();
1754      pending.push_back(job);
1755    }
1756    lock.unlock();
1757  }
1758  void detachJob(Job *job) {
1759    lock.lock();
1760    long i = job->pending_index;
1761    job->pending_index = -1;
1762    if (i >= 0) {
1763      job = pending.back();
1764      pending.resize(pending.size()-1);
1765      pending[i] = job;
1766      job->pending_index = i;
1767    }
1768    lock.unlock();
1769  }
1770  void queueJob(Job *job) {
1771    lock.lock();
1772    global_queue.push(job);
1773    cond.signal();
1774    lock.unlock();
1775  }
1776  void broadcastJob(ThreadPool *pool, Job *job) {
1777    lock.lock();
1778    for (int i = 0; i <thread_queues.size(); i++) {
1779      if (thread_owners[i] == pool) {
1780        acquireShared(job);
1781        thread_queues[i]->push(job);
1782      }
1783    }
1784    lock.unlock();
1785  }
1786  void cancelDeps(Job * job) {
1787    vector<Job *> &notify = job->notify;
1788    for (int i = 0; i <notify.size(); i++) {
1789      Job *next = notify[i];
1790      if (!next->cancelled) {
1791        cancelJob(next);
1792      }
1793    }
1794  }
1795  void cancelJob(Job *job) {
1796    lock.lock();
1797    if (!job->cancelled) {
1798      job->cancelled = true;
1799      if (!job->running && !job->done) {
1800        job->done = true;
1801        cancelDeps(job);
1802      }
1803    }
1804    lock.unlock();
1805  }
1806  void waitJob(Job *job) {
1807    if (single_threaded) {
1808      SchedInfo *info = new SchedInfo();
1809      info->num = 0;
1810      info->scheduler = this;
1811      acquireShared(this);
1812      info->job = job;
1813      Scheduler::main(NULL, info);
1814    } else {
1815      lock.lock();
1816      for (;;) {
1817        if (job->done || job->cancelled) {
1818          break;
1819        }
1820        response.wait();
1821      }
1822      response.signal(); // forward signal
1823      lock.unlock();
1824    }
1825  }
1826  void clearThreadState() {
1827    threads.clear();
1828  }
1829  static void notifyDeps(Scheduler *scheduler, Job *job) {
1830    vector<Job *> &notify = job->notify;
1831    job->incref(notify.size());
1832    for (int i = 0; i <notify.size(); i++) {
1833      Job *next = notify[i];
1834      if (!next->queued && next->ready() && !next->cancelled) {
1835        next->queued = true;
1836        scheduler->queueJob(next);
1837      }
1838    }
1839    vector<Trigger *> &triggers = job->triggers;
1840    leftv arg = NULL;
1841    if (triggers.size() > 0 && job->result.size() > 0)
1842      arg = LinTree::from_string(job->result);
1843    for (int i = 0; i < triggers.size(); i++) {
1844      Trigger *trigger = triggers[i];
1845      if (trigger->accept(arg)) {
1846        trigger->activate(arg);
1847        if (trigger->ready())
1848           scheduler->queueJob(trigger);
1849      }
1850    }
1851    if (arg) {
1852      arg->CleanUp();
1853      omFreeBin(arg, sleftv_bin);
1854    }
1855  }
1856  static void *main(ThreadState *ts, void *arg) {
1857    SchedInfo *info = (SchedInfo *) arg;
1858    Scheduler *scheduler = info->scheduler;
1859    ThreadPool *oldThreadPool = currentThreadPoolRef;
1860    // TODO: set current thread pool
1861    // currentThreadPoolRef = pool;
1862    Lock &lock = scheduler->lock;
1863    ConditionVariable &cond = scheduler->cond;
1864    ConditionVariable &response = scheduler->response;
1865    JobQueue *my_queue = scheduler->thread_queues[info->num];
1866    if (!scheduler->single_threaded)
1867      thread_init();
1868    lock.lock();
1869    for (;;) {
1870      if (info->job && info->job->done)
1871        break;
1872      if (scheduler->shutting_down) {
1873        scheduler->shutdown_counter++;
1874        scheduler->response.signal();
1875        break;
1876      }
1877      if (!my_queue->empty()) {
1878       Job *job = my_queue->front();
1879       my_queue->pop();
1880       if (!scheduler->global_queue.empty())
1881         cond.signal();
1882       currentJobRef = job;
1883       job->run();
1884       currentJobRef = NULL;
1885       notifyDeps(scheduler, job);
1886       releaseShared(job);
1887       scheduler->response.signal();
1888       continue;
1889      } else if (!scheduler->global_queue.empty()) {
1890       Job *job = scheduler->global_queue.top();
1891       scheduler->global_queue.pop();
1892       if (!scheduler->global_queue.empty())
1893         cond.signal();
1894       currentJobRef = job;
1895       job->run();
1896       currentJobRef = NULL;
1897       notifyDeps(scheduler, job);
1898       releaseShared(job);
1899       scheduler->response.signal();
1900       continue;
1901      } else {
1902        if (scheduler->single_threaded) {
1903          break;
1904        }
1905        cond.wait();
1906      }
1907    }
1908    // TODO: correct current thread pool
1909    // releaseShared(currentThreadPoolRef);
1910    currentThreadPoolRef = oldThreadPool;
1911    scheduler->lock.unlock();
1912    delete info;
1913    return NULL;
1914  }
1915};
1916
1917ThreadPool::ThreadPool(int n) : SharedObject(), nthreads(n) {
1918  scheduler = new Scheduler(n);
1919  acquireShared(scheduler);
1920}
1921ThreadPool::ThreadPool(Scheduler *sched, int n) : SharedObject(), nthreads(n) {
1922  scheduler = sched;
1923  acquireShared(sched);
1924}
1925ThreadPool::~ThreadPool() {
1926  releaseShared(scheduler);
1927}
1928ThreadState *ThreadPool::getThread(int i) { return scheduler->getThread(i); }
1929void ThreadPool::shutdown(bool wait) { scheduler->shutdown(wait); }
1930void ThreadPool::addThread(ThreadState *thread) {
1931  scheduler->addThread(this, thread);
1932}
1933void ThreadPool::attachJob(Job *job) {
1934  scheduler->attachJob(this, job);
1935}
1936void ThreadPool::detachJob(Job *job) {
1937  scheduler->detachJob(job);
1938}
1939void ThreadPool::queueJob(Job *job) {
1940  scheduler->queueJob(job);
1941}
1942void ThreadPool::broadcastJob(Job *job) {
1943  scheduler->broadcastJob(this, job);
1944}
1945void ThreadPool::cancelDeps(Job * job) {
1946  scheduler->cancelDeps(job);
1947}
1948void ThreadPool::cancelJob(Job *job) {
1949  scheduler->cancelJob(job);
1950}
1951void ThreadPool::waitJob(Job *job) {
1952  scheduler->waitJob(job);
1953}
1954void ThreadPool::clearThreadState() {
1955  scheduler->clearThreadState();
1956}
1957
1958void Job::addDep(vector<Job *> &jobs) {
1959  deps.insert(deps.end(), jobs.begin(), jobs.end());
1960}
1961
1962void Job::addDep(long ndeps, Job **jobs) {
1963  for (long i = 0; i < ndeps; i++) {
1964    deps.push_back(jobs[i]);
1965  }
1966}
1967
1968void Job::addNotify(vector<Job *> &jobs) {
1969  notify.insert(notify.end(), jobs.begin(), jobs.end());
1970  if (done) {
1971    Scheduler::notifyDeps(pool->scheduler, this);
1972  }
1973}
1974
1975void Job::addNotify(Job *job) {
1976  notify.push_back(job);
1977  if (done) {
1978    Scheduler::notifyDeps(pool->scheduler, this);
1979  }
1980}
1981
1982void Job::run() {
1983  if (!cancelled) {
1984    running = true;
1985    pool->scheduler->lock.unlock();
1986    pool->scheduler->running++;
1987    execute();
1988    pool->scheduler->running--;
1989    pool->scheduler->lock.lock();
1990    running = false;
1991  }
1992  done = true;
1993}
1994
1995class AccTrigger : public Trigger {
1996private:
1997  long count;
1998public:
1999  AccTrigger(long count_init): Trigger(), count(count_init) {
2000  }
2001  virtual bool ready() {
2002    if (!Trigger::ready()) return false;
2003    return args.size() >= count;
2004  }
2005  virtual bool accept(leftv arg) {
2006    return true;
2007  }
2008  virtual void activate(leftv arg) {
2009    while (arg != NULL && !ready()) {
2010      args.push_back(LinTree::to_string(arg));
2011      if (ready()) {
2012        return;
2013      }
2014      arg = arg->next;
2015    }
2016  }
2017  virtual void execute() {
2018    lists l = (lists) omAlloc0Bin(slists_bin);
2019    l->Init(args.size());
2020    for (int i = 0; i < args.size(); i++) {
2021      leftv val = LinTree::from_string(args[i]);
2022      memcpy(&l->m[i], val, sizeof(*val));
2023      omFreeBin(val, sleftv_bin);
2024    }
2025    sleftv val;
2026    memset(&val, 0, sizeof(val));
2027    val.rtyp = LIST_CMD;
2028    val.data = l;
2029    result = LinTree::to_string(&val);
2030    // val.CleanUp();
2031  }
2032};
2033
2034class CountTrigger : public Trigger {
2035private:
2036  long count;
2037public:
2038  CountTrigger(long count_init): Trigger(), count(count_init) {
2039  }
2040  virtual bool ready() {
2041    if (!Trigger::ready()) return false;
2042    return count <= 0;
2043  }
2044  virtual bool accept(leftv arg) {
2045    return arg == NULL;
2046  }
2047  virtual void activate(leftv arg) {
2048    if (!ready()) {
2049      count--;
2050    }
2051  }
2052  virtual void execute() {
2053    // do nothing
2054  }
2055};
2056
2057class SetTrigger : public Trigger {
2058private:
2059  vector<bool> set;
2060  long count;
2061public:
2062  SetTrigger(long count_init) : Trigger(), count(0),
2063    set(count_init) {
2064  }
2065  virtual bool ready() {
2066    if (!Trigger::ready()) return false;
2067    return count == set.size();
2068  }
2069  virtual bool accept(leftv arg) {
2070    return arg->Typ() == INT_CMD;
2071  }
2072  virtual void activate(leftv arg) {
2073    if (!ready()) {
2074      long value = (long) arg->Data();
2075      if (value < 0 || value >= count) return;
2076      if (set[value]) return;
2077      set[value] = true;
2078      count++;
2079    }
2080  }
2081  virtual void execute() {
2082    // do nothing
2083  }
2084};
2085
2086
2087class ProcTrigger : public Trigger {
2088private:
2089  string procname;
2090  bool success;
2091public:
2092  ProcTrigger(const char *p) : Trigger(), procname(p), success(false) {
2093  }
2094  virtual bool ready() {
2095    if (!Trigger::ready()) return false;
2096    return success;
2097  }
2098  virtual bool accept(leftv arg) {
2099    return TRUE;
2100  }
2101  virtual void activate(leftv arg) {
2102    if (!ready()) {
2103      pool->scheduler->lock.unlock();
2104      vector<leftv> argv;
2105      for (int i = 0; i < args.size(); i++) {
2106        appendArg(argv, args[i]);
2107      }
2108      int error = false;
2109      while (arg) {
2110        appendArgCopy(argv, arg);
2111        arg = arg->next;
2112      }
2113      sleftv val;
2114      if (!error)
2115        error = executeProc(val, procname.c_str(), argv);
2116      if (!error) {
2117        if (val.Typ() == NONE || (val.Typ() == INT_CMD &&
2118                                  (long) val.Data()))
2119        {
2120          success = true;
2121        }
2122        val.CleanUp();
2123      }
2124      pool->scheduler->lock.lock();
2125    }
2126  }
2127  virtual void execute() {
2128    // do nothing
2129  }
2130};
2131
2132static BOOLEAN createThreadPool(leftv result, leftv arg) {
2133  long n;
2134  Command cmd("createThreadPool", result, arg);
2135  cmd.check_argc(1, 2);
2136  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2137  if (cmd.ok()) {
2138    n = (long) cmd.arg(0);
2139    if (n < 0) cmd.report("number of threads must be non-negative");
2140    else if (n >= 256) cmd.report("number of threads too large");
2141    if (!have_threads && n != 0)
2142      cmd.report("in single-threaded mode, number of threads must be zero");
2143  }
2144  if (cmd.ok()) {
2145    ThreadPool *pool = new ThreadPool((int) n);
2146    pool->set_type(type_threadpool);
2147    for (int i = 0; i <n; i++) {
2148      const char *error;
2149      SchedInfo *info = new SchedInfo();
2150      info->scheduler = pool->scheduler;
2151      acquireShared(pool->scheduler);
2152      info->job = NULL;
2153      info->num = i;
2154      ThreadState *thread = newThread(Scheduler::main, info, &error);
2155      if (!thread) {
2156        // TODO: clean up bad pool
2157        return cmd.abort(error);
2158      }
2159      pool->addThread(thread);
2160    }
2161    cmd.set_result(type_threadpool, new_shared(pool));
2162  }
2163  return cmd.status();
2164}
2165
2166static BOOLEAN createThreadPoolSet(leftv result, leftv arg) {
2167  Command cmd("createThreadPoolSet", result, arg);
2168  cmd.check_argc(2);
2169  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2170  cmd.check_arg(1, LIST_CMD, "second argument must be a list of integers");
2171  lists l;
2172  int n;
2173  if (cmd.ok()) {
2174    l = (lists) (cmd.arg(1));
2175    n = lSize(l)+1;
2176    if (n == 0)
2177      return cmd.abort("second argument must not be empty");
2178    for (int i = 0; i < n; i++) {
2179      if (l->m[i].Typ() != INT_CMD)
2180        return cmd.abort("second argument must be a list of integers");
2181    }
2182  }
2183  lists pools = (lists) omAlloc0Bin(slists_bin);
2184  pools->Init(n);
2185  if (cmd.ok()) {
2186    long s = 0;
2187    for (int i = 0; i < n; i++) {
2188      s += (long) (l->m[i].Data());
2189    }
2190    Scheduler *sched = new Scheduler((int)s);
2191    sched->set_maxconcurrency(cmd.int_arg(0));
2192    for (int i = 0; i < n; i++) {
2193      long m = (long) (l->m[i].Data());
2194      ThreadPool *pool = new ThreadPool(sched, (int) m);
2195      pool->set_type(type_threadpool);
2196      for (int j = 0; j < m; j++) {
2197        const char *error;
2198        SchedInfo *info = new SchedInfo();
2199        info->scheduler = pool->scheduler;
2200        acquireShared(pool->scheduler);
2201        info->job = NULL;
2202        info->num = i;
2203        ThreadState *thread = newThread(Scheduler::main, info, &error);
2204        if (!thread) {
2205          // TODO: clean up bad pool
2206          return cmd.abort(error);
2207        }
2208        pool->addThread(thread);
2209      }
2210      pools->m[i].rtyp = type_threadpool;
2211      pools->m[i].data = new_shared(pool);
2212    }
2213    cmd.set_result(LIST_CMD, pools);
2214  }
2215  return cmd.status();
2216}
2217
2218ThreadPool *createThreadPool(int nthreads, int prioThreads = 0) {
2219  ThreadPool *pool = new ThreadPool((int) nthreads);
2220  pool->set_type(type_threadpool);
2221  for (int i = 0; i <nthreads; i++) {
2222    const char *error;
2223    SchedInfo *info = new SchedInfo();
2224    info->scheduler = pool->scheduler;
2225    acquireShared(pool);
2226    info->job = NULL;
2227    info->num = i;
2228    ThreadState *thread = newThread(Scheduler::main, info, &error);
2229    if (!thread) {
2230      return NULL;
2231    }
2232    pool->addThread(thread);
2233  }
2234  return pool;
2235}
2236
2237void release(ThreadPool *pool) {
2238  releaseShared(pool);
2239}
2240
2241void retain(ThreadPool *pool) {
2242  acquireShared(pool);
2243}
2244
2245ThreadPool *getCurrentThreadPool() {
2246  return currentThreadPoolRef;
2247}
2248
2249static BOOLEAN getThreadPoolWorkers(leftv result, leftv arg) {
2250  Command cmd("getThreadPoolWorkers", result, arg);
2251  cmd.check_argc(1);
2252  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2253  cmd.check_init(0, "threadpool not initialized");
2254  int r = 0;
2255  if (cmd.ok()) {
2256    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2257    Scheduler *sched = pool->scheduler;
2258    sched->lock.lock();
2259    r = sched->threadpool_size(pool);
2260    sched->lock.unlock();
2261    cmd.set_result(INT_CMD, r);
2262  }
2263  return cmd.status();
2264}
2265
2266static BOOLEAN setThreadPoolWorkers(leftv result, leftv arg) {
2267  Command cmd("setThreadPoolWorkers", result, arg);
2268  cmd.check_argc(2);
2269  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2270  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2271  cmd.check_init(0, "threadpool not initialized");
2272  if (cmd.ok()) {
2273    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2274    Scheduler *sched = pool->scheduler;
2275    // TODO: count/add threads
2276    cmd.no_result();
2277  }
2278  return cmd.status();
2279}
2280
2281static BOOLEAN getThreadPoolConcurrency(leftv result, leftv arg) {
2282  Command cmd("getThreadPoolConcurrency", result, arg);
2283  cmd.check_argc(1);
2284  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2285  cmd.check_init(0, "threadpool not initialized");
2286  if (cmd.ok()) {
2287    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2288    Scheduler *sched = pool->scheduler;
2289    sched->lock.lock();
2290    cmd.set_result(INT_CMD, sched->get_maxconcurrency());
2291    sched->lock.unlock();
2292  }
2293  return cmd.status();
2294}
2295
2296static BOOLEAN setThreadPoolConcurrency(leftv result, leftv arg) {
2297  Command cmd("setThreadPoolWorkers", result, arg);
2298  cmd.check_argc(2);
2299  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2300  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2301  cmd.check_init(0, "threadpool not initialized");
2302  if (cmd.ok()) {
2303    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2304    Scheduler *sched = pool->scheduler;
2305    sched->lock.lock();
2306    sched->set_maxconcurrency(cmd.int_arg(1));
2307    sched->lock.unlock();
2308    cmd.no_result();
2309  }
2310  return cmd.status();
2311}
2312
2313static BOOLEAN closeThreadPool(leftv result, leftv arg) {
2314  Command cmd("closeThreadPool", result, arg);
2315  cmd.check_argc(1, 2);
2316  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2317  cmd.check_init(0, "threadpool not initialized");
2318  if (cmd.nargs() > 1)
2319    cmd.check_arg(1, INT_CMD, "optional argument must be an integer");
2320  if (cmd.ok()) {
2321    ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2322    bool wait = cmd.nargs() == 2 ? (cmd.int_arg(1) != 0) : 1;
2323    pool->shutdown(wait);
2324    cmd.no_result();
2325  }
2326  return cmd.status();
2327}
2328
2329void closeThreadPool(ThreadPool *pool, bool wait) {
2330  pool->shutdown(wait);
2331}
2332
2333
2334BOOLEAN currentThreadPool(leftv result, leftv arg) {
2335  Command cmd("currentThreadPool", result, arg);
2336  cmd.check_argc(0);
2337  ThreadPool *pool = currentThreadPoolRef;
2338  if (pool) {
2339    cmd.set_result(type_threadpool, new_shared(pool));
2340  } else {
2341    cmd.report("no current threadpool");
2342  }
2343  return cmd.status();
2344}
2345
2346BOOLEAN setCurrentThreadPool(leftv result, leftv arg) {
2347  Command cmd("setCurrentThreadPool", result, arg);
2348  cmd.check_argc(1);
2349  cmd.check_init(0, "threadpool not initialized");
2350  if (cmd.ok()) {
2351    ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2352    acquireShared(pool);
2353    if (currentThreadPoolRef)
2354      releaseShared(currentThreadPoolRef);
2355    currentThreadPoolRef = pool;
2356  }
2357  return cmd.status();
2358}
2359
2360class EvalJob : public Job {
2361public:
2362  EvalJob() : Job() { }
2363  virtual void execute() {
2364    leftv val = LinTree::from_string(args[0]);
2365    result = (LinTree::to_string(val));
2366    val->CleanUp();
2367    omFreeBin(val, sleftv_bin);
2368  }
2369};
2370
2371class ExecJob : public Job {
2372public:
2373  ExecJob() : Job() { }
2374  virtual void execute() {
2375    leftv val = LinTree::from_string(args[0]);
2376    val->CleanUp();
2377    omFreeBin(val, sleftv_bin);
2378  }
2379};
2380
2381class ProcJob : public Job {
2382  string procname;
2383public:
2384  ProcJob(const char *procname_init) : Job(),
2385    procname(procname_init) {
2386    set_name(procname_init);
2387  }
2388  virtual void execute() {
2389    vector<leftv> argv;
2390    for (int i = 0; i <args.size(); i++) {
2391      appendArg(argv, args[i]);
2392    }
2393    for (int i = 0; i < deps.size(); i++) {
2394      appendArg(argv, deps[i]->result);
2395    }
2396    sleftv val;
2397    int error = executeProc(val, procname.c_str(), argv);
2398    if (!error) {
2399      result = (LinTree::to_string(&val));
2400      val.CleanUp();
2401    }
2402  }
2403};
2404
2405class KernelJob : public Job {
2406private:
2407  void (*cfunc)(leftv result, leftv arg);
2408public:
2409  KernelJob(void (*func)(leftv result, leftv arg)) : cfunc(func) { }
2410  virtual void execute() {
2411    vector<leftv> argv;
2412    for (int i = 0; i <args.size(); i++) {
2413      appendArg(argv, args[i]);
2414    }
2415    for (int i = 0; i < deps.size(); i++) {
2416      appendArg(argv, deps[i]->result);
2417    }
2418    sleftv val;
2419    memset(&val, 0, sizeof(val));
2420    if (argv.size() > 0) {
2421      leftv *tail = &argv[0]->next;
2422      for (int i = 1; i < argv.size(); i++) {
2423        *tail = argv[i];
2424        tail = &(*tail)->next;
2425      }
2426      *tail = NULL;
2427    }
2428    cfunc(&val, argv[0]);
2429    result = (LinTree::to_string(&val));
2430    val.CleanUp();
2431  }
2432};
2433
2434class RawKernelJob : public Job {
2435private:
2436  void (*cfunc)(long ndeps, Job **deps);
2437public:
2438  RawKernelJob(void (*func)(long ndeps, Job **deps)) : cfunc(func) { }
2439  virtual void execute() {
2440    long ndeps = deps.size();
2441    Job **jobs = (Job **) omAlloc0(sizeof(Job *) * ndeps);
2442    for (long i = 0; i < ndeps; i++)
2443      jobs[i] = deps[i];
2444    cfunc(ndeps, jobs);
2445    omFree(jobs);
2446  }
2447};
2448
2449static BOOLEAN createJob(leftv result, leftv arg) {
2450  Command cmd("createJob", result, arg);
2451  cmd.check_argc_min(1);
2452  cmd.check_arg(0, STRING_CMD, COMMAND,
2453    "job name must be a string or quote expression");
2454  if (cmd.ok()) {
2455    if (cmd.test_arg(0, STRING_CMD)) {
2456      ProcJob *job = new ProcJob((char *)(cmd.arg(0)));
2457      for (leftv a = arg->next; a != NULL; a = a->next) {
2458        job->args.push_back(LinTree::to_string(a));
2459      }
2460      cmd.set_result(type_job, new_shared(job));
2461    } else {
2462      cmd.check_argc(1);
2463      Job *job = new EvalJob();
2464      job->args.push_back(LinTree::to_string(arg));
2465      cmd.set_result(type_job, new_shared(job));
2466    }
2467  }
2468  return cmd.status();
2469}
2470
2471Job *createJob(void (*func)(leftv result, leftv arg)) {
2472  KernelJob *job = new KernelJob(func);
2473  return job;
2474}
2475
2476Job *createJob(void (*func)(long ndeps, Job **deps)) {
2477  RawKernelJob *job = new RawKernelJob(func);
2478  return job;
2479}
2480
2481Job *startJob(ThreadPool *pool, Job *job, leftv arg) {
2482  if (job->pool) return NULL;
2483  while (arg) {
2484    job->args.push_back(LinTree::to_string(arg));
2485    arg = arg->next;
2486  }
2487  pool->attachJob(job);
2488  return job;
2489}
2490
2491Job *startJob(ThreadPool *pool, Job *job) {
2492  return startJob(pool, job, NULL);
2493}
2494
2495// Job *scheduleJob(ThreadPool *pool, Job *job, long ndeps, Job **deps) {
2496//   if (job->pool) return NULL;
2497//   pool->scheduler->lock.lock();
2498//   bool cancelled = false;
2499//   job->addDep(ndeps, deps);
2500//   for (long i = 0; i < ndeps; i++) {
2501//     deps[i]->addNotify(job);
2502//     cancelled |= deps[i]->cancelled;
2503//   }
2504//   if (cancelled) {
2505//     job->pool = pool;
2506//     pool->cancelJob(job);
2507//   }
2508//   else
2509//     pool->attachJob(job);
2510//   pool->scheduler->lock.unlock();
2511//   return FIXME: missing/unclear what this is supposed to be
2512// }
2513
2514void cancelJob(Job *job) {
2515  ThreadPool *pool = job->pool;
2516  if (pool) pool->cancelJob(job);
2517}
2518
2519Job *getCurrentJob() {
2520  return currentJobRef;
2521}
2522
2523static BOOLEAN startJob(leftv result, leftv arg) {
2524  Command cmd("startJob", result, arg);
2525  cmd.check_argc_min(1);
2526  int has_pool = cmd.test_arg(0, type_threadpool);
2527  cmd.check_argc_min(1+has_pool);
2528  if (has_pool)
2529    cmd.check_init(0, "threadpool not initialized");
2530  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2531  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2532  int first_arg = has_pool + has_prio;
2533  cmd.check_arg(first_arg, type_job, STRING_CMD,
2534    "job argument must be a job or string");
2535  if (cmd.ok() && cmd.argtype(first_arg) == type_job)
2536    cmd.check_init(first_arg, "job not initialized");
2537  if (!cmd.ok()) return cmd.status();
2538  ThreadPool *pool;
2539  if (has_pool)
2540    pool = cmd.shared_arg<ThreadPool>(0);
2541  else {
2542    if (!currentThreadPoolRef)
2543      return cmd.abort("no current threadpool defined");
2544    pool = currentThreadPoolRef;
2545  }
2546  Job *job;
2547  if (cmd.argtype(first_arg) == type_job) 
2548    job = *(Job **)(cmd.arg(first_arg));
2549  else
2550    job = new ProcJob((char *)(cmd.arg(first_arg)));
2551  leftv a = arg->next;
2552  if (has_pool) a = a->next;
2553  if (has_prio) a = a->next;
2554  for (; a != NULL; a = a->next) {
2555    job->args.push_back(LinTree::to_string(a));
2556  }
2557  if (job->pool)
2558    return cmd.abort("job has already been scheduled");
2559  job->prio = prio;
2560  pool->attachJob(job);
2561  cmd.set_result(type_job, new_shared(job));
2562  return cmd.status();
2563}
2564
2565static BOOLEAN waitJob(leftv result, leftv arg) {
2566  Command cmd("waitJob", result, arg);
2567  cmd.check_argc(1);
2568  cmd.check_arg(0, type_job, "argument must be a job");
2569  cmd.check_init(0, "job not initialized");
2570  if (cmd.ok()) {
2571    Job *job = *(Job **)(cmd.arg(0));
2572    ThreadPool *pool = job->pool;
2573    if (!pool) {
2574      return cmd.abort("job has not yet been started or scheduled");
2575    }
2576    pool->waitJob(job);
2577    if (job->cancelled) {
2578      return cmd.abort("job has been cancelled");
2579    }
2580    if (job->result.size() == 0)
2581      cmd.no_result();
2582    else {
2583      leftv res = LinTree::from_string(job->result);
2584      cmd.set_result(res->Typ(), res->Data());
2585    }
2586  }
2587  return cmd.status();
2588}
2589
2590void waitJob(Job *job) {
2591  assert(job->pool != NULL);
2592  job->pool->waitJob(job);
2593}
2594
2595static BOOLEAN cancelJob(leftv result, leftv arg) {
2596  Command cmd("cancelJob", result, arg);
2597  cmd.check_argc(1);
2598  cmd.check_arg(0, type_job, "argument must be a job");
2599  cmd.check_init(0, "job not initialized");
2600  if (cmd.ok()) {
2601    Job *job = cmd.shared_arg<Job>(0);
2602    ThreadPool *pool = job->pool;
2603    if (!pool) {
2604      return cmd.abort("job has not yet been started or scheduled");
2605    }
2606    pool->cancelJob(job);
2607    cmd.no_result();
2608  }
2609  return cmd.status();
2610}
2611
2612static BOOLEAN jobCancelled(leftv result, leftv arg) {
2613  Job *job;
2614  Command cmd("jobCancelled", result, arg);
2615  cmd.check_argc(0, 1);
2616  if (cmd.nargs() == 1) {
2617    cmd.check_arg(0, type_job, "argument must be a job");
2618    cmd.check_init(0, "job not initialized");
2619    job = cmd.shared_arg<Job>(0);
2620  } else {
2621    job = currentJobRef;
2622    if (!job)
2623      cmd.report("no current job");
2624  }
2625  if (cmd.ok()) {
2626    ThreadPool *pool = job->pool;
2627    if (!pool) {
2628      return cmd.abort("job has not yet been started or scheduled");
2629    }
2630    pool->scheduler->lock.lock();
2631    cmd.set_result((long) job->cancelled);
2632    pool->scheduler->lock.unlock();
2633  }
2634  return cmd.status();
2635}
2636
2637bool getJobCancelled(Job *job) {
2638  ThreadPool *pool = job->pool;
2639  if (pool) pool->scheduler->lock.lock();
2640  bool result = job->cancelled;
2641  if (pool) pool->scheduler->lock.unlock();
2642  return result;
2643}
2644
2645bool getJobCancelled() {
2646  return getJobCancelled(currentJobRef);
2647}
2648
2649void setJobData(Job *job, void *data) {
2650  ThreadPool *pool = job->pool;
2651  if (pool) pool->scheduler->lock.lock();
2652  job->data = data;
2653  if (pool) pool->scheduler->lock.unlock();
2654}
2655
2656
2657void *getJobData(Job *job) {
2658  ThreadPool *pool = job->pool;
2659  if (pool) pool->scheduler->lock.lock();
2660  void *result = job->data;
2661  if (pool) pool->scheduler->lock.unlock();
2662  return result;
2663}
2664
2665void addJobArgs(Job *job, leftv arg) {
2666  ThreadPool *pool = job->pool;
2667  if (pool) pool->scheduler->lock.lock();
2668  while (arg) {
2669    job->args.push_back(LinTree::to_string(arg));
2670    arg = arg->next;
2671  }
2672  if (pool) pool->scheduler->lock.unlock();
2673}
2674
2675leftv getJobResult(Job *job) {
2676  ThreadPool *pool = job->pool;
2677  if (pool) pool->scheduler->lock.lock();
2678  leftv result = LinTree::from_string(job->result);
2679  if (pool) pool->scheduler->lock.unlock();
2680  return result;
2681}
2682
2683const char *getJobName(Job *job) {
2684  // TODO
2685  return "";
2686}
2687
2688void setJobName(Job *job, const char *name) {
2689  // TODO
2690}
2691
2692static BOOLEAN createTrigger(leftv result, leftv arg) {
2693  Command cmd("createTrigger", result, arg);
2694  cmd.check_argc_min(1);
2695  int has_pool = cmd.test_arg(0, type_threadpool);
2696  ThreadPool *pool;
2697  if (has_pool) {
2698    cmd.check_init(0, "threadpool not initialized");
2699    pool = cmd.shared_arg<ThreadPool>(0);
2700  } else {
2701    pool = currentThreadPoolRef;
2702    if (!pool)
2703      return cmd.abort("no default threadpool");
2704  }
2705  cmd.check_argc(has_pool + 2);
2706  cmd.check_arg(has_pool + 0, STRING_CMD, "trigger subtype must be a string");
2707  const char *kind = (const char *)(cmd.arg(has_pool + 0));
2708  if (0 == strcmp(kind, "proc")) {
2709    cmd.check_arg(has_pool + 1, STRING_CMD, "proc trigger argument must be a string");
2710  } else {
2711    cmd.check_arg(has_pool + 1, INT_CMD, "trigger argument must be an integer");
2712  }
2713  if (cmd.ok()) {
2714    Trigger *trigger;
2715    long n = (long) (cmd.arg(has_pool + 1));
2716    if (n < 0)
2717      return cmd.abort("trigger argument must be a non-negative integer");
2718    if (0 == strcmp(kind, "acc")) {
2719      trigger = new AccTrigger(n);
2720    } else if (0 == strcmp(kind, "count")) {
2721      trigger = new CountTrigger(n);
2722    } else if (0 == strcmp(kind, "set")) {
2723      trigger = new SetTrigger(n);
2724    } else if (0 == strcmp(kind, "proc")) {
2725      trigger = new ProcTrigger((const char *) cmd.arg(has_pool + 1));
2726    } else {
2727      return cmd.abort("unknown trigger subtype");
2728    }
2729    pool->attachJob(trigger);
2730    cmd.set_result(type_trigger, new_shared(trigger));
2731  }
2732  return cmd.status();
2733}
2734
2735static BOOLEAN updateTrigger(leftv result, leftv arg) {
2736  Command cmd("updateTrigger", result, arg);
2737  cmd.check_argc_min(1);
2738  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2739  cmd.check_init(0, "trigger not initialized");
2740  if (cmd.ok()) {
2741    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2742    trigger->pool->scheduler->lock.lock();
2743    if (!trigger->accept(arg->next))
2744      cmd.report("incompatible argument type(s) for this trigger");
2745    else {
2746      trigger->activate(arg->next);
2747      if (trigger->ready()) {
2748        trigger->run();
2749        Scheduler::notifyDeps(trigger->pool->scheduler, trigger);
2750      }
2751    }
2752    trigger->pool->scheduler->lock.unlock();
2753  }
2754  return cmd.status();
2755}
2756
2757static BOOLEAN chainTrigger(leftv result, leftv arg) {
2758  Command cmd("chainTrigger", result, arg);
2759  cmd.check_argc(2);
2760  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2761  cmd.check_arg(1, type_trigger, type_job,
2762    "second argument must be a trigger or job");
2763  cmd.check_init(0, "trigger not initialized");
2764  cmd.check_init(1, "trigger/job not initialized");
2765  if (cmd.ok()) {
2766    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2767    Job *job = cmd.shared_arg<Job>(1);
2768    if (trigger->pool != job->pool)
2769      return cmd.abort("arguments use different threadpools");
2770    ThreadPool *pool = trigger->pool;
2771    pool->scheduler->lock.lock();
2772    job->triggers.push_back(trigger);
2773    pool->scheduler->lock.unlock();
2774  }
2775  return cmd.status();
2776}
2777
2778static BOOLEAN testTrigger(leftv result, leftv arg) {
2779  Command cmd("testTrigger", result, arg);
2780  cmd.check_argc(1);
2781  cmd.check_arg(0, type_trigger, "argument must be a trigger");
2782  cmd.check_init(0, "trigger not initialized");
2783  if (cmd.ok()) {
2784    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2785    ThreadPool *pool = trigger->pool;
2786    pool->scheduler->lock.lock();
2787    cmd.set_result((long)trigger->ready());
2788    pool->scheduler->lock.unlock();
2789  }
2790  return cmd.status();
2791}
2792
2793
2794static BOOLEAN scheduleJob(leftv result, leftv arg) {
2795  vector<Job *> jobs;
2796  vector<Job *> deps;
2797  Command cmd("scheduleJob", result, arg);
2798  cmd.check_argc_min(1);
2799  int has_pool = cmd.test_arg(0, type_threadpool);
2800  if (has_pool) {
2801    cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2802    cmd.check_init(0, "threadpool not initialized");
2803  }
2804  cmd.check_argc_min(has_pool+1);
2805  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2806  ThreadPool *pool;
2807  if (has_pool)
2808    pool = cmd.shared_arg<ThreadPool>(0);
2809  else {
2810    if (!currentThreadPoolRef)
2811      return cmd.abort("no current threadpool defined");
2812    pool = currentThreadPoolRef;
2813  }
2814  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2815  int first_arg = has_pool + has_prio;
2816  if (cmd.test_arg(first_arg, type_job)) {
2817    jobs.push_back(*(Job **)(cmd.arg(first_arg)));
2818  } else if (cmd.test_arg(first_arg, STRING_CMD)) {
2819    jobs.push_back(new ProcJob((char *)(cmd.arg(first_arg))));
2820  } else if (cmd.test_arg(first_arg, LIST_CMD)) {
2821    lists l = (lists) (cmd.arg(first_arg));
2822    int n = lSize(l);
2823    for (int i = 0; i < n; i++) {
2824      if (l->m[i].Typ() != type_job)
2825        return cmd.abort("job argument must be a job, string, or list of jobs");
2826    }
2827    for (int i = 0; i < n; i++) {
2828      Job *job = *(Job **) (l->m[i].Data());
2829      if (!job)
2830        return cmd.abort("job not initialized");
2831      jobs.push_back(job);
2832    }
2833  } else {
2834    return cmd.abort("job argument must be a job, string, or list of jobs");
2835  }
2836  bool error = false;
2837  leftv a = arg->next;
2838  if (has_pool) a = a->next;
2839  if (has_prio) a = a->next;
2840  for (; !error && a; a = a->next) {
2841    if (a->Typ() == type_job || a->Typ() == type_trigger) {
2842      deps.push_back(*(Job **)(a->Data()));
2843    } else if (a->Typ() == LIST_CMD) {
2844      lists l = (lists) a->Data();
2845      int n = lSize(l);
2846      for (int i = 0; i < n; i++) {
2847        if (l->m[i].Typ() == type_job || l->m[i].Typ() == type_trigger) {
2848          deps.push_back(*(Job **)(l->m[i].Data()));
2849        } else {
2850          error = true;
2851          break;
2852        }
2853      }
2854    }
2855  }
2856  if (error) {
2857    return cmd.abort("illegal dependency");
2858  }
2859  for (int i = 0; i < jobs.size(); i++) {
2860    Job *job = jobs[i];
2861    if (job->pool) {
2862      return cmd.abort("job has already been scheduled");
2863    }
2864    job->prio = prio;
2865  }
2866  for (int i = 0; i < deps.size(); i++) {
2867    Job *job = deps[i];
2868    if (!job->pool) {
2869      return cmd.abort("dependency has not yet been scheduled");
2870    }
2871    if (job->pool != pool) {
2872      return cmd.abort("dependency has been scheduled on a different threadpool");
2873    }
2874  }
2875  pool->scheduler->lock.lock();
2876  bool cancelled = false;
2877  for (int i = 0; i < jobs.size(); i++) {
2878    jobs[i]->addDep(deps);
2879  }
2880  for (int i = 0; i < deps.size(); i++) {
2881    deps[i]->addNotify(jobs);
2882    cancelled |= deps[i]->cancelled;
2883  }
2884  for (int i = 0; i < jobs.size(); i++) {
2885    if (cancelled) {
2886      jobs[i]->pool = pool;
2887      pool->cancelJob(jobs[i]);
2888    }
2889    else
2890      pool->attachJob(jobs[i]);
2891  }
2892  pool->scheduler->lock.unlock();
2893  if (jobs.size() > 0)
2894    cmd.set_result(type_job, new_shared(jobs[0]));
2895  return cmd.status();
2896}
2897
2898BOOLEAN currentJob(leftv result, leftv arg) {
2899  Command cmd("currentJob", result, arg);
2900  cmd.check_argc(0);
2901  Job *job = currentJobRef;
2902  if (job) {
2903    cmd.set_result(type_job, new_shared(job));
2904  } else {
2905    cmd.report("no current job");
2906  }
2907  return cmd.status();
2908}
2909
2910
2911BOOLEAN threadID(leftv result, leftv arg) {
2912  int i;
2913  if (wrong_num_args("threadID", arg, 0))
2914    return TRUE;
2915  result->rtyp = INT_CMD;
2916  result->data = (char *)thread_id;
2917  return FALSE;
2918}
2919
2920BOOLEAN mainThread(leftv result, leftv arg) {
2921  int i;
2922  if (wrong_num_args("mainThread", arg, 0))
2923    return TRUE;
2924  result->rtyp = INT_CMD;
2925  result->data = (char *)(long)(thread_id == 0L);
2926  return FALSE;
2927}
2928
2929BOOLEAN threadEval(leftv result, leftv arg) {
2930  if (wrong_num_args("threadEval", arg, 2))
2931    return TRUE;
2932  if (arg->Typ() != type_thread) {
2933    WerrorS("threadEval: argument is not a thread");
2934    return TRUE;
2935  }
2936  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2937  string expr = LinTree::to_string(arg->next);
2938  ThreadState *ts = thread->getThreadState();
2939  if (ts && ts->parent != pthread_self()) {
2940    WerrorS("threadEval: can only be called from parent thread");
2941    return TRUE;
2942  }
2943  if (ts) ts->lock.lock();
2944  if (!ts || !ts->running || !ts->active) {
2945    WerrorS("threadEval: thread is no longer running");
2946    if (ts) ts->lock.unlock();
2947    return TRUE;
2948  }
2949  ts->to_thread.push("e");
2950  ts->to_thread.push(expr);
2951  ts->to_cond.signal();
2952  ts->lock.unlock();
2953  result->rtyp = NONE;
2954  return FALSE;
2955}
2956
2957BOOLEAN threadExec(leftv result, leftv arg) {
2958  if (wrong_num_args("threadExec", arg, 2))
2959    return TRUE;
2960  if (arg->Typ() != type_thread) {
2961    WerrorS("threadExec: argument is not a thread");
2962    return TRUE;
2963  }
2964  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2965  string expr = LinTree::to_string(arg->next);
2966  ThreadState *ts = thread->getThreadState();
2967  if (ts && ts->parent != pthread_self()) {
2968    WerrorS("threadExec: can only be called from parent thread");
2969    return TRUE;
2970  }
2971  if (ts) ts->lock.lock();
2972  if (!ts || !ts->running || !ts->active) {
2973    WerrorS("threadExec: thread is no longer running");
2974    if (ts) ts->lock.unlock();
2975    return TRUE;
2976  }
2977  ts->to_thread.push("x");
2978  ts->to_thread.push(expr);
2979  ts->to_cond.signal();
2980  ts->lock.unlock();
2981  result->rtyp = NONE;
2982  return FALSE;
2983}
2984
2985BOOLEAN threadPoolExec(leftv result, leftv arg) {
2986  Command cmd("threadPoolExec", result, arg);
2987  ThreadPool *pool;
2988  cmd.check_argc(1, 2);
2989  int has_pool = cmd.nargs() == 2;
2990  if (has_pool) {
2991    cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2992    cmd.check_init(0, "threadpool not initialized");
2993    pool = cmd.shared_arg<ThreadPool>(0);
2994  } else {
2995    pool = currentThreadPoolRef;
2996    if (!pool)
2997      return cmd.abort("no current threadpool");
2998  }
2999  if (cmd.ok()) {
3000    string expr = LinTree::to_string(has_pool ? arg->next : arg);
3001    Job* job = new ExecJob();
3002    job->args.push_back(expr);
3003    job->pool = pool;
3004    pool->broadcastJob(job);
3005  }
3006  return cmd.status();
3007}
3008
3009BOOLEAN threadResult(leftv result, leftv arg) {
3010  if (wrong_num_args("threadResult", arg, 1))
3011    return TRUE;
3012  if (arg->Typ() != type_thread) {
3013    WerrorS("threadResult: argument is not a thread");
3014    return TRUE;
3015  }
3016  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
3017  ThreadState *ts = thread->getThreadState();
3018  if (ts && ts->parent != pthread_self()) {
3019    WerrorS("threadResult: can only be called from parent thread");
3020    return TRUE;
3021  }
3022  if (ts) ts->lock.lock();
3023  if (!ts || !ts->running || !ts->active) {
3024    WerrorS("threadResult: thread is no longer running");
3025    if (ts) ts->lock.unlock();
3026    return TRUE;
3027  }
3028  while (ts->from_thread.empty()) {
3029    ts->from_cond.wait();
3030  }
3031  string expr = ts->from_thread.front();
3032  ts->from_thread.pop();
3033  ts->lock.unlock();
3034  leftv val = LinTree::from_string(expr);
3035  result->rtyp = val->Typ();
3036  result->data = val->Data();
3037  return FALSE;
3038}
3039
3040BOOLEAN setSharedName(leftv result, leftv arg) {
3041  Command cmd("setSharedName", result, arg);
3042  cmd.check_argc(2);
3043  int type = cmd.argtype(0);
3044  cmd.check_init(0, "first argument is not initialized");
3045  if (type != type_job && type != type_trigger && type != type_threadpool) {
3046    cmd.report("first argument must be a job, trigger, or threadpool");
3047  }
3048  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
3049  if (cmd.ok()) {
3050    SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3051    name_lock.lock();
3052    obj->set_name((char *) cmd.arg(1));
3053    name_lock.unlock();
3054  }
3055  return cmd.status();
3056}
3057
3058BOOLEAN getSharedName(leftv result, leftv arg) {
3059  Command cmd("getSharedName", result, arg);
3060  cmd.check_argc(1);
3061  int type = cmd.argtype(0);
3062  cmd.check_init(0, "first argument is not initialized");
3063  if (type != type_job && type != type_trigger && type != type_threadpool) {
3064    cmd.report("first argument must be a job, trigger, or threadpool");
3065  }
3066  if (cmd.ok()) {
3067    SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3068    name_lock.lock();
3069    cmd.set_result(obj->get_name().c_str());
3070    name_lock.unlock();
3071  }
3072  return cmd.status();
3073}
3074
3075}
3076
3077using namespace LibThread;
3078
3079
3080extern "C" int SI_MOD_INIT(systhreads)(SModulFunctions *fn)
3081{
3082  const char *libname = currPack->libname;
3083  if (!libname) libname = "";
3084  master_lock.lock();
3085  if (!thread_state)
3086    thread_state = new ThreadState[MAX_THREADS];
3087  makeSharedType(type_atomic_table, "atomic_table");
3088  makeSharedType(type_atomic_list, "atomic_list");
3089  makeSharedType(type_shared_table, "shared_table");
3090  makeSharedType(type_shared_list, "shared_list");
3091  makeSharedType(type_channel, "channel");
3092  makeSharedType(type_syncvar, "syncvar");
3093  makeSharedType(type_region, "region");
3094  makeSharedType(type_thread, "thread");
3095  makeSharedType(type_threadpool, "threadpool");
3096  makeSharedType(type_job, "job");
3097  makeSharedType(type_trigger, "trigger");
3098  makeRegionlockType(type_regionlock, "regionlock");
3099
3100  fn->iiAddCproc(libname, "putTable", FALSE, putTable);
3101  fn->iiAddCproc(libname, "getTable", FALSE, getTable);
3102  fn->iiAddCproc(libname, "inTable", FALSE, inTable);
3103  fn->iiAddCproc(libname, "putList", FALSE, putList);
3104  fn->iiAddCproc(libname, "getList", FALSE, getList);
3105  fn->iiAddCproc(libname, "lockRegion", FALSE, lockRegion);
3106  fn->iiAddCproc(libname, "regionLock", FALSE, regionLock);
3107  fn->iiAddCproc(libname, "unlockRegion", FALSE, unlockRegion);
3108  fn->iiAddCproc(libname, "sendChannel", FALSE, sendChannel);
3109  fn->iiAddCproc(libname, "receiveChannel", FALSE, receiveChannel);
3110  fn->iiAddCproc(libname, "statChannel", FALSE, statChannel);
3111  fn->iiAddCproc(libname, "writeSyncVar", FALSE, writeSyncVar);
3112  fn->iiAddCproc(libname, "updateSyncVar", FALSE, updateSyncVar);
3113  fn->iiAddCproc(libname, "readSyncVar", FALSE, readSyncVar);
3114  fn->iiAddCproc(libname, "statSyncVar", FALSE, statSyncVar);
3115
3116  fn->iiAddCproc(libname, "makeAtomicTable", FALSE, makeAtomicTable);
3117  fn->iiAddCproc(libname, "makeAtomicList", FALSE, makeAtomicList);
3118  fn->iiAddCproc(libname, "makeSharedTable", FALSE, makeSharedTable);
3119  fn->iiAddCproc(libname, "makeSharedList", FALSE, makeSharedList);
3120  fn->iiAddCproc(libname, "makeChannel", FALSE, makeChannel);
3121  fn->iiAddCproc(libname, "makeSyncVar", FALSE, makeSyncVar);
3122  fn->iiAddCproc(libname, "makeRegion", FALSE, makeRegion);
3123  fn->iiAddCproc(libname, "findSharedObject", FALSE, findSharedObject);
3124  fn->iiAddCproc(libname, "bindSharedObject", FALSE, bindSharedObject);
3125  fn->iiAddCproc(libname, "typeSharedObject", FALSE, typeSharedObject);
3126
3127  fn->iiAddCproc(libname, "createThread", FALSE, createThread);
3128  fn->iiAddCproc(libname, "joinThread", FALSE, joinThread);
3129  fn->iiAddCproc(libname, "createThreadPool", FALSE, createThreadPool);
3130  fn->iiAddCproc(libname, "createThreadPoolSet", FALSE, createThreadPoolSet);
3131#if 0
3132  fn->iiAddCproc(libname, "adjoinThreadPool", FALSE, adjoinThreadPool);
3133  fn->iiAddCproc(libname, "getAdjoinedThreadPools", FALSE, getAdjoinedThreadPools);
3134#endif
3135  fn->iiAddCproc(libname, "closeThreadPool", FALSE, closeThreadPool);
3136  fn->iiAddCproc(libname, "getThreadPoolWorkers", FALSE, getThreadPoolWorkers);
3137  fn->iiAddCproc(libname, "setThreadPoolWorkers", FALSE, setThreadPoolWorkers);
3138  fn->iiAddCproc(libname, "getThreadPoolConcurrency", FALSE, getThreadPoolConcurrency);
3139  fn->iiAddCproc(libname, "setThreadPoolConcurrency", FALSE, setThreadPoolConcurrency);
3140  fn->iiAddCproc(libname, "currentThreadPool", FALSE, currentThreadPool);
3141  fn->iiAddCproc(libname, "setCurrentThreadPool", FALSE, setCurrentThreadPool);
3142  fn->iiAddCproc(libname, "threadPoolExec", FALSE, threadPoolExec);
3143  fn->iiAddCproc(libname, "threadID", FALSE, threadID);
3144  fn->iiAddCproc(libname, "mainThread", FALSE, mainThread);
3145  fn->iiAddCproc(libname, "threadEval", FALSE, threadEval);
3146  fn->iiAddCproc(libname, "threadExec", FALSE, threadExec);
3147  fn->iiAddCproc(libname, "threadResult", FALSE, threadResult);
3148  fn->iiAddCproc(libname, "createJob", FALSE, createJob);
3149  fn->iiAddCproc(libname, "currentJob", FALSE, currentJob);
3150  fn->iiAddCproc(libname, "setSharedName", FALSE, setSharedName);
3151  fn->iiAddCproc(libname, "getSharedName", FALSE, getSharedName);
3152  fn->iiAddCproc(libname, "startJob", FALSE, startJob);
3153  fn->iiAddCproc(libname, "waitJob", FALSE, waitJob);
3154  fn->iiAddCproc(libname, "cancelJob", FALSE, cancelJob);
3155  fn->iiAddCproc(libname, "jobCancelled", FALSE, jobCancelled);
3156  fn->iiAddCproc(libname, "scheduleJob", FALSE, scheduleJob);
3157  fn->iiAddCproc(libname, "scheduleJobs", FALSE, scheduleJob);
3158  fn->iiAddCproc(libname, "createTrigger", FALSE, createTrigger);
3159  fn->iiAddCproc(libname, "updateTrigger", FALSE, updateTrigger);
3160  fn->iiAddCproc(libname, "testTrigger", FALSE, testTrigger);
3161  fn->iiAddCproc(libname, "chainTrigger", FALSE, chainTrigger);
3162
3163  LinTree::init();
3164  master_lock.unlock();
3165
3166  return MAX_TOK;
3167}
Note: See TracBrowser for help on using the repository browser.