source: git/Singular/dyn_modules/systhreads/shared.cc @ a6895f

fieker-DuValspielwiese
Last change on this file since a6895f was a6895f, checked in by Hans Schoenemann <hannes@…>, 5 years ago
Merge pull request #919 from rbehrends/threadlib Fixed systhreads build with ppcc.
  • Property mode set to 100644
File size: 82.2 KB
Line 
1#include "threadconf.h"
2#include <iostream>
3#include "kernel/mod2.h"
4#include "Singular/ipid.h"
5#include "Singular/ipshell.h"
6#include "Singular/links/silink.h"
7#include "Singular/lists.h"
8#include "Singular/blackbox.h"
9#include "Singular/feOpt.h"
10#include "Singular/libsingular.h"
11#include <cstring>
12#include <string>
13#include <errno.h>
14#include <stdio.h>
15#include <vector>
16#include <map>
17#include <iterator>
18#include <queue>
19#include <assert.h>
20#include "thread.h"
21#include "lintree.h"
22
23#include "singthreads.h"
24
25using namespace std;
26
27#ifdef ENABLE_THREADS
28extern char *global_argv0;
29#endif
30
31namespace LibThread {
32
33#ifdef ENABLE_THREADS
34const int have_threads = 1;
35#else
36const int have_threads = 0;
37#endif
38
39class Command {
40private:
41  const char *name;
42  const char *error;
43  leftv result;
44  leftv *args;
45  int argc;
46public:
47  Command(const char *n, leftv r, leftv a)
48  {
49    name = n;
50    result = r;
51    error = NULL;
52    argc = 0;
53    for (leftv t = a; t != NULL; t = t->next) {
54      argc++;
55    }
56    args = (leftv *) omAlloc0(sizeof(leftv) * argc);
57    int i = 0;
58    for (leftv t = a; t != NULL; t = t->next) {
59      args[i++] = t;
60    }
61    result->rtyp = NONE;
62    result->data = NULL;
63  }
64  ~Command() {
65    omFree(args);
66  }
67  void check_argc(int n) {
68    if (error) return;
69    if (argc != n) error = "wrong number of arguments";
70  }
71  void check_argc(int lo, int hi) {
72    if (error) return;
73    if (argc < lo || argc > hi) error = "wrong number of arguments";
74  }
75  void check_argc_min(int n) {
76    if (error) return;
77    if (argc < n) error = "wrong number of arguments";
78  }
79  void check_arg(int i, int type, const char *err) {
80    if (error) return;
81    if (args[i]->Typ() != type) error = err;
82  }
83  void check_init(int i, const char *err) {
84    if (error) return;
85    leftv arg = args[i];
86    if (arg->Data() == NULL || *(void **)(arg->Data()) == NULL)
87      error = err;
88  }
89  void check_arg(int i, int type, int type2, const char *err) {
90    if (error) return;
91    if (args[i]->Typ() != type && args[i]->Typ() != type2) error = err;
92  }
93  int argtype(int i) {
94    return args[i]->Typ();
95  }
96  int nargs() {
97    return argc;
98  }
99  void *arg(int i) {
100    return args[i]->Data();
101  }
102  template <typename T>
103  T *shared_arg(int i) {
104    return *(T **)(arg(i));
105  }
106  long int_arg(int i) {
107    return (long)(args[i]->Data());
108  }
109  void report(const char *err) {
110    error = err;
111  }
112  // intentionally not bool, so we can also do
113  // q = p + test_arg(p, type);
114  int test_arg(int i, int type) {
115    if (i >= argc) return 0;
116    return args[i]->Typ() == type;
117  }
118  void set_result(long n) {
119    result->rtyp = INT_CMD;
120    result->data = (char *)n;
121  }
122  void set_result(const char *s) {
123    result->rtyp = STRING_CMD;
124    result->data = omStrDup(s);
125  }
126  void set_result(int type, void *p) {
127    result->rtyp = type;
128    result->data = (char *) p;
129  }
130  void set_result(int type, long n) {
131    result->rtyp = type;
132    result->data = (char *) n;
133  }
134  void no_result() {
135    result->rtyp = NONE;
136  }
137  bool ok() {
138    return error == NULL;
139  }
140  BOOLEAN status() {
141    if (error) {
142      Werror("%s: %s", name, error);
143    }
144    return error != NULL;
145  }
146  BOOLEAN abort(const char *err) {
147    report(err);
148    return status();
149  }
150};
151
152class SharedObject {
153private:
154  Lock lock;
155  long refcount;
156  int type;
157  std::string name;
158public:
159  SharedObject(): lock(), refcount(0) { }
160  virtual ~SharedObject() { }
161  void set_type(int type_init) { type = type_init; }
162  int get_type() { return type; }
163  void set_name(std::string &name_init) { name = name_init; }
164  void set_name(const char *s) {
165    name = std::string(s);
166  }
167  std::string &get_name() { return name; }
168  void incref(int by = 1) {
169    lock.lock();
170    refcount += 1;
171    lock.unlock();
172  }
173  long decref() {
174    int result;
175    lock.lock();
176    result = --refcount;
177    lock.unlock();
178    return result;
179  }
180  long getref() {
181    return refcount;
182  }
183  virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2) {
184    return TRUE;
185  }
186  virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
187    return TRUE;
188  }
189};
190
191void acquireShared(SharedObject *obj) {
192  obj->incref();
193}
194
195void releaseShared(SharedObject *obj) {
196  if (obj->decref() == 0) {
197    // delete obj;
198  }
199}
200
201typedef std::map<std::string, SharedObject *> SharedObjectTable;
202
203class Region : public SharedObject {
204private:
205  Lock region_lock;
206public:
207  SharedObjectTable objects;
208  Region() : SharedObject(), region_lock(), objects() { }
209  virtual ~Region() { }
210  Lock *get_lock() { return &region_lock; }
211  void lock() {
212    if (!region_lock.is_locked())
213      region_lock.lock();
214  }
215  void unlock() {
216    if (region_lock.is_locked())
217      region_lock.unlock();
218  }
219  int is_locked() {
220    return region_lock.is_locked();
221  }
222};
223
224Lock global_objects_lock;
225SharedObjectTable global_objects;
226Lock master_lock(true);
227Lock name_lock(true);
228VAR long thread_id;
229long thread_counter;
230
231int type_region;
232int type_regionlock;
233int type_channel;
234int type_syncvar;
235int type_atomic_table;
236int type_shared_table;
237int type_atomic_list;
238int type_shared_list;
239int type_thread;
240int type_threadpool;
241int type_job;
242int type_trigger;
243
244typedef SharedObject *SharedObjectPtr;
245typedef SharedObjectPtr (*SharedConstructor)();
246
247SharedObject *makeSharedObject(SharedObjectTable &table,
248  Lock *lock, int type, string &name, SharedConstructor scons)
249{
250  int was_locked = lock->is_locked();
251  SharedObject *result = NULL;
252  if (!was_locked)
253    lock->lock();
254  if (table.count(name)) {
255    result = table[name];
256    if (result->get_type() != type)
257      result = NULL;
258  } else {
259    result = scons();
260    result->set_type(type);
261    result->set_name(name);
262    table.insert(pair<string,SharedObject *>(name, result));
263  }
264  if (!was_locked)
265    lock->unlock();
266  return result;
267}
268
269SharedObject *findSharedObject(SharedObjectTable &table,
270  Lock *lock, string &name)
271{
272  int was_locked = lock->is_locked();
273  SharedObject *result = NULL;
274  if (!was_locked)
275    lock->lock();
276  if (table.count(name)) {
277    result = table[name];
278  }
279  if (!was_locked)
280    lock->unlock();
281  return result;
282}
283
284class Transactional: public SharedObject {
285private:
286  Region *region;
287  Lock *lock;
288protected:
289  int tx_begin() {
290    if (!region)
291      lock->lock();
292    else {
293      if (!lock->is_locked()) {
294        return 0;
295      }
296    }
297    return 1;
298  }
299  void tx_end() {
300    if (!region)
301      lock->unlock();
302  }
303public:
304  Transactional() :
305      SharedObject(), region(NULL), lock(NULL) {
306  }
307  void set_region(Region *region_init) {
308    region = region_init;
309    if (region_init) {
310      lock = region_init->get_lock();
311    } else {
312      lock = new Lock();
313    }
314  }
315  virtual ~Transactional() { if (!region && lock) delete lock; }
316};
317
318class TxTable: public Transactional {
319private:
320  std::map<string, string> entries;
321public:
322  TxTable() : Transactional(), entries() { }
323  virtual ~TxTable() { }
324  int put(string &key, string &value) {
325    int result = 0;
326    if (!tx_begin()) return -1;
327    if (entries.count(key)) {
328      entries[key] = value;
329    } else {
330      entries.insert(pair<string, string>(key, value));
331      result = 1;
332    }
333    tx_end();
334    return result;
335  }
336  int get(string &key, string &value) {
337    int result = 0;
338    if (!tx_begin()) return -1;
339    if (entries.count(key)) {
340      value = entries[key];
341      result = 1;
342    }
343    tx_end();
344    return result;
345  }
346  int check(string &key) {
347    int result;
348    if (!tx_begin()) return -1;
349    result = entries.count(key);
350    tx_end();
351    return result;
352  }
353};
354
355class TxList: public Transactional {
356private:
357  vector<string> entries;
358public:
359  TxList() : Transactional(), entries() { }
360  virtual ~TxList() { }
361  int put(size_t index, string &value) {
362    int result = -1;
363    if (!tx_begin()) return -1;
364    if (index >= 1 && index <= entries.size()) {
365      entries[index-1] = value;
366      result = 1;
367    } else {
368      entries.resize(index+1);
369      entries[index-1] = value;
370      result = 0;
371    }
372    tx_end();
373    return result;
374  }
375  int get(size_t index, string &value) {
376    int result = 0;
377    if (!tx_begin()) return -1;
378    if (index >= 1 && index <= entries.size()) {
379      result = (entries[index-1].size() != 0);
380      if (result)
381        value = entries[index-1];
382    }
383    tx_end();
384    return result;
385  }
386  long size() {
387    long result;
388    if (!tx_begin()) return -1;
389    result = (long) entries.size();
390    tx_end();
391    return result;
392  }
393};
394
395class SingularChannel : public SharedObject {
396private:
397  queue<string> q;
398  Lock lock;
399  ConditionVariable cond;
400public:
401  SingularChannel(): SharedObject(), lock(), cond(&lock) { }
402  virtual ~SingularChannel() { }
403  void send(string item) {
404    lock.lock();
405    q.push(item);
406    cond.signal();
407    lock.unlock();
408  }
409  string receive() {
410    lock.lock();
411    while (q.empty()) {
412      cond.wait();
413    }
414    string result = q.front();
415    q.pop();
416    if (!q.empty())
417      cond.signal();
418    lock.unlock();
419    return result;
420  }
421  long count() {
422    lock.lock();
423    long result = q.size();
424    lock.unlock();
425    return result;
426  }
427};
428
429class SingularSyncVar : public SharedObject {
430private:
431  string value;
432  int init;
433  Lock lock;
434  ConditionVariable cond;
435public:
436  SingularSyncVar(): SharedObject(), init(0), lock(), cond(&lock) { }
437  virtual ~SingularSyncVar() { }
438  void acquire() {
439    lock.lock();
440  }
441  void release() {
442    lock.unlock();
443  }
444  void wait_init() {
445    while (!init)
446      cond.wait();
447  }
448  leftv get() {
449    if (value.size() == 0) return NULL;
450    return LinTree::from_string(value);
451  }
452  void update(leftv val) {
453    value = LinTree::to_string(val);
454    init = 1;
455    cond.broadcast();
456  }
457  int write(string item) {
458    int result = 0;
459    lock.lock();
460    if (!init) {
461      value = item;
462      init = 1;
463      cond.broadcast();
464      result = 1;
465    }
466    lock.unlock();
467    return result;
468  }
469  string read() {
470    lock.lock();
471    while (!init)
472      cond.wait();
473    string result = value;
474    lock.unlock();
475    return result;
476  }
477  int check() {
478    lock.lock();
479    int result = init;
480    lock.unlock();
481    return result;
482  }
483};
484
485void *shared_init(blackbox *b) {
486  return omAlloc0(sizeof(SharedObject *));
487}
488
489void *new_shared(SharedObject *obj) {
490  acquireShared(obj);
491  void *result = omAlloc0(sizeof(SharedObject *));
492  *(SharedObject **)result = obj;
493  return result;
494}
495
496void shared_destroy(blackbox *b, void *d) {
497  SharedObject *obj = *(SharedObject **)d;
498  if (obj) {
499    releaseShared(*(SharedObject **)d);
500    *(SharedObject **)d = NULL;
501  }
502}
503
504void rlock_destroy(blackbox *b, void *d) {
505  SharedObject *obj = *(SharedObject **)d;
506  ((Region *) obj)->unlock();
507  if (obj) {
508    releaseShared(*(SharedObject **)d);
509    *(SharedObject **)d = NULL;
510  }
511}
512
513void *shared_copy(blackbox *b, void *d) {
514  SharedObject *obj = *(SharedObject **)d;
515  void *result = shared_init(b);
516  *(SharedObject **)result = obj;
517  if (obj)
518    acquireShared(obj);
519  return result;
520}
521
522BOOLEAN shared_assign(leftv l, leftv r) {
523  if (r->Typ() == l->Typ()) {
524    if (l->rtyp == IDHDL) {
525      omFree(IDDATA((idhdl)l->data));
526      IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
527    } else {
528      leftv ll=l->LData();
529      if (ll==NULL)
530      {
531        return TRUE; // out of array bounds or similiar
532      }
533      if (ll->data) {
534        shared_destroy(NULL, ll->data);
535        omFree(ll->data);
536      }
537      ll->data = shared_copy(NULL,r->Data());
538    }
539  } else {
540    Werror("assign %s(%d) = %s(%d)",
541        Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
542    return TRUE;
543  }
544  return FALSE;
545}
546
547BOOLEAN rlock_assign(leftv l, leftv r) {
548  if (r->Typ() == l->Typ()) {
549    if (l->rtyp == IDHDL) {
550      omFree(IDDATA((idhdl)l->data));
551      IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
552    } else {
553      leftv ll=l->LData();
554      if (ll==NULL)
555      {
556        return TRUE; // out of array bounds or similiar
557      }
558      rlock_destroy(NULL, ll->data);
559      omFree(ll->data);
560      ll->data = shared_copy(NULL,r->Data());
561    }
562  } else {
563    Werror("assign %s(%d) = %s(%d)",
564        Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
565    return TRUE;
566  }
567  return FALSE;
568}
569
570
571BOOLEAN shared_check_assign(blackbox *b, leftv l, leftv r) {
572  int lt = l->Typ();
573  int rt = r->Typ();
574  if (lt != DEF_CMD && lt != rt) {
575    const char *rn=Tok2Cmdname(rt);
576    const char *ln=Tok2Cmdname(lt);
577    Werror("cannot assign %s (%d) to %s (%d)\n", rn, rt, ln, lt);
578    return TRUE;
579  }
580  return FALSE;
581}
582
583BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2) {
584  SharedObject *obj = *(SharedObject **)a1->Data();
585  return obj->op2(op, res, a1, a2);
586}
587
588BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
589  SharedObject *obj = *(SharedObject **)a1->Data();
590  return obj->op3(op, res, a1, a2, a3);
591}
592
593char *shared_string(blackbox *b, void *d) {
594  char buf[80];
595  SharedObject *obj = *(SharedObject **)d;
596  if (!obj)
597    return omStrDup("<uninitialized shared object>");
598  int type = obj->get_type();
599  string &name = obj->get_name();
600  const char *type_name = "unknown";
601  if (type == type_channel)
602    type_name = "channel";
603  else if (type == type_atomic_table)
604    type_name = "atomic_table";
605  else if (type == type_shared_table)
606    type_name = "shared_table";
607  else if (type == type_atomic_list)
608    type_name = "atomic_list";
609  else if (type == type_shared_list)
610    type_name = "shared_list";
611  else if (type == type_syncvar)
612    type_name = "syncvar";
613  else if (type == type_region)
614    type_name = "region";
615  else if (type == type_regionlock)
616    type_name = "regionlock";
617  else if (type == type_thread) {
618    sprintf(buf, "<thread #%s>", name.c_str());
619    return omStrDup(buf);
620  }
621  else if (type == type_threadpool) {
622    if (name.size() > 0) {
623      name_lock.lock();
624      sprintf(buf, "<threadpool \"%.40s\"@%p>", name.c_str(), obj);
625      name_lock.unlock();
626    } else
627      sprintf(buf, "<threadpool @%p>", obj);
628    return omStrDup(buf);
629  }
630  else if (type == type_job) {
631    if (name.size() > 0) {
632      name_lock.lock();
633      sprintf(buf, "<job \"%.40s\"@%p>", name.c_str(), obj);
634      name_lock.unlock();
635    } else
636      sprintf(buf, "<job @%p>", obj);
637    return omStrDup(buf);
638  }
639  else if (type == type_trigger) {
640    if (name.size() > 0) {
641      name_lock.lock();
642      sprintf(buf, "<trigger \"%.40s\"@%p>", name.c_str(), obj);
643      name_lock.unlock();
644    } else
645      sprintf(buf, "<trigger @%p>", obj);
646    return omStrDup(buf);
647  } else {
648    sprintf(buf, "<unknown type %d>", type);
649    return omStrDup(buf);
650  }
651  sprintf(buf, "<%s \"%.40s\">", type_name, name.c_str());
652  return omStrDup(buf);
653}
654
655char *rlock_string(blackbox *b, void *d) {
656  char buf[80];
657  SharedObject *obj = *(SharedObject **)d;
658  if (!obj)
659    return omStrDup("<uninitialized region lock>");
660  sprintf(buf, "<region lock \"%.40s\">", obj->get_name().c_str());
661  return omStrDup(buf);
662}
663
664void report(const char *fmt, const char *name) {
665  char buf[80];
666  sprintf(buf, fmt, name);
667  WerrorS(buf);
668}
669
670int wrong_num_args(const char *name, leftv arg, int n) {
671  for (int i=1; i<=n; i++) {
672    if (!arg) {
673      report("%s: too few arguments", name);
674      return TRUE;
675    }
676    arg = arg->next;
677  }
678  if (arg) {
679    report("%s: too many arguments", name);
680    return TRUE;
681  }
682  return FALSE;
683}
684
685int not_a_uri(const char *name, leftv arg) {
686  if (arg->Typ() != STRING_CMD) {
687    report("%s: not a valid URI", name);
688    return TRUE;
689  }
690  return FALSE;
691}
692
693int not_a_region(const char *name, leftv arg) {
694  if (arg->Typ() != type_region || !arg->Data()) {
695    report("%s: not a region", name);
696    return TRUE;
697  }
698  return FALSE;
699}
700
701
702char *str(leftv arg) {
703  return (char *)(arg->Data());
704}
705
706SharedObject *consTable() {
707  return new TxTable();
708}
709
710SharedObject *consList() {
711  return new TxList();
712}
713
714SharedObject *consChannel() {
715  return new SingularChannel();
716}
717
718SharedObject *consSyncVar() {
719  return new SingularSyncVar();
720}
721
722SharedObject *consRegion() {
723  return new Region();
724}
725
726static void appendArg(vector<leftv> &argv, string &s) {
727  if (s.size() == 0) return;
728  leftv val = LinTree::from_string(s);
729  if (val->Typ() == NONE) {
730    omFreeBin(val, sleftv_bin);
731    return;
732  }
733  argv.push_back(val);
734}
735
736static void appendArg(vector<leftv> &argv, leftv arg) {
737  argv.push_back(arg);
738}
739
740static void appendArgCopy(vector<leftv> &argv, leftv arg) {
741  leftv val = (leftv) omAlloc0Bin(sleftv_bin);
742  val->Copy(arg);
743  argv.push_back(val);
744}
745
746
747static BOOLEAN executeProc(sleftv &result,
748  const char *procname, const vector<leftv> &argv)
749{
750  leftv procnode = (leftv) omAlloc0Bin(sleftv_bin);
751  procnode->name = omStrDup(procname);
752  procnode->req_packhdl = basePack;
753  int error = procnode->Eval();
754  if (error) {
755    Werror("procedure \"%s\" not found", procname);
756    omFreeBin(procnode, sleftv_bin);
757    return TRUE;
758  }
759  memset(&result, 0, sizeof(result));
760  leftv *tail = &procnode->next;
761  for (int i = 0; i < argv.size(); i++) {
762    *tail = argv[i];
763    tail = &(*tail)->next;
764  }
765  *tail = NULL;
766  error = iiExprArithM(&result, procnode, '(');
767  procnode->CleanUp();
768  omFreeBin(procnode, sleftv_bin);
769  if (error) {
770    Werror("procedure call of \"%s\" failed", procname);
771    return TRUE;
772  }
773  return FALSE;
774}
775
776BOOLEAN makeAtomicTable(leftv result, leftv arg) {
777  if (wrong_num_args("makeAtomicTable", arg, 1))
778    return TRUE;
779  if (not_a_uri("makeAtomicTable", arg))
780    return TRUE;
781  string uri = str(arg);
782  SharedObject *obj = makeSharedObject(global_objects,
783    &global_objects_lock, type_atomic_table, uri, consTable);
784  ((TxTable *) obj)->set_region(NULL);
785  result->rtyp = type_atomic_table;
786  result->data = new_shared(obj);
787  return FALSE;
788}
789
790BOOLEAN makeAtomicList(leftv result, leftv arg) {
791  if (wrong_num_args("makeAtomicList", arg, 1))
792    return TRUE;
793  if (not_a_uri("makeAtomicList", arg))
794    return TRUE;
795  string uri = str(arg);
796  SharedObject *obj = makeSharedObject(global_objects,
797    &global_objects_lock, type_atomic_list, uri, consList);
798  ((TxList *) obj)->set_region(NULL);
799  result->rtyp = type_atomic_list;
800  result->data = new_shared(obj);
801  return FALSE;
802}
803
804BOOLEAN makeSharedTable(leftv result, leftv arg) {
805  if (wrong_num_args("makeSharedTable", arg, 2))
806    return TRUE;
807  if (not_a_region("makeSharedTable", arg))
808    return TRUE;
809  if (not_a_uri("makeSharedTable", arg->next))
810    return TRUE;
811  Region *region = *(Region **) arg->Data();
812  fflush(stdout);
813  string s = str(arg->next);
814  SharedObject *obj = makeSharedObject(region->objects,
815    region->get_lock(), type_shared_table, s, consTable);
816  ((TxTable *) obj)->set_region(region);
817  result->rtyp = type_shared_table;
818  result->data = new_shared(obj);
819  return FALSE;
820}
821
822BOOLEAN makeSharedList(leftv result, leftv arg) {
823  if (wrong_num_args("makeSharedList", arg, 2))
824    return TRUE;
825  if (not_a_region("makeSharedList", arg))
826    return TRUE;
827  if (not_a_uri("makeSharedList", arg->next))
828    return TRUE;
829  Region *region = *(Region **) arg->Data();
830  string s = str(arg->next);
831  SharedObject *obj = makeSharedObject(region->objects,
832    region->get_lock(), type_shared_list, s, consList);
833  ((TxList *) obj)->set_region(region);
834  result->rtyp = type_shared_list;
835  result->data = new_shared(obj);
836  return FALSE;
837}
838
839BOOLEAN makeChannel(leftv result, leftv arg) {
840  if (wrong_num_args("makeChannel", arg, 1))
841    return TRUE;
842  if (not_a_uri("makeChannel", arg))
843    return TRUE;
844  string uri = str(arg);
845  SharedObject *obj = makeSharedObject(global_objects,
846    &global_objects_lock, type_channel, uri, consChannel);
847  result->rtyp = type_channel;
848  result->data = new_shared(obj);
849  return FALSE;
850}
851
852BOOLEAN makeSyncVar(leftv result, leftv arg) {
853  if (wrong_num_args("makeSyncVar", arg, 1))
854    return TRUE;
855  if (not_a_uri("makeSyncVar", arg))
856    return TRUE;
857  string uri = str(arg);
858  SharedObject *obj = makeSharedObject(global_objects,
859    &global_objects_lock, type_syncvar, uri, consSyncVar);
860  result->rtyp = type_syncvar;
861  result->data = new_shared(obj);
862  return FALSE;
863}
864
865BOOLEAN makeRegion(leftv result, leftv arg) {
866  if (wrong_num_args("makeRegion", arg, 1))
867    return TRUE;
868  if (not_a_uri("makeRegion", arg))
869    return TRUE;
870  string uri = str(arg);
871  SharedObject *obj = makeSharedObject(global_objects,
872    &global_objects_lock, type_region, uri, consRegion);
873  result->rtyp = type_region;
874  result->data = new_shared(obj);
875  return FALSE;
876}
877
878BOOLEAN findSharedObject(leftv result, leftv arg) {
879  if (wrong_num_args("findSharedObject", arg, 1))
880    return TRUE;
881  if (not_a_uri("findSharedObject", arg))
882    return TRUE;
883  string uri = str(arg);
884  SharedObject *obj = findSharedObject(global_objects,
885    &global_objects_lock, uri);
886  result->rtyp = INT_CMD;
887  result->data = (char *)(long)(obj != NULL);
888  return FALSE;
889}
890
891BOOLEAN typeSharedObject(leftv result, leftv arg) {
892  if (wrong_num_args("findSharedObject", arg, 1))
893    return TRUE;
894  if (not_a_uri("findSharedObject", arg))
895    return TRUE;
896  string uri = str(arg);
897  SharedObject *obj = findSharedObject(global_objects,
898    &global_objects_lock, uri);
899  int type = obj ? obj->get_type() : -1;
900  const char *type_name = "undefined";
901  if (type == type_channel)
902    type_name = "channel";
903  else if (type == type_atomic_table)
904    type_name = "atomic_table";
905  else if (type == type_shared_table)
906    type_name = "shared_table";
907  else if (type == type_atomic_list)
908    type_name = "atomic_list";
909  else if (type == type_shared_list)
910    type_name = "shared_list";
911  else if (type == type_syncvar)
912    type_name = "syncvar";
913  else if (type == type_region)
914    type_name = "region";
915  else if (type == type_regionlock)
916    type_name = "regionlock";
917  result->rtyp = STRING_CMD;
918  result->data = (char *)(omStrDup(type_name));
919  return FALSE;
920}
921
922BOOLEAN bindSharedObject(leftv result, leftv arg) {
923  if (wrong_num_args("bindSharedObject", arg, 1))
924    return TRUE;
925  if (not_a_uri("bindSharedObject", arg))
926    return TRUE;
927  string uri = str(arg);
928  SharedObject *obj = findSharedObject(global_objects,
929    &global_objects_lock, uri);
930  if (!obj) {
931    WerrorS("bindSharedObject: cannot find object");
932    return TRUE;
933  }
934  result->rtyp = obj->get_type();
935  result->data = new_shared(obj);
936  return FALSE;
937}
938
939BOOLEAN getTable(leftv result, leftv arg) {
940  if (wrong_num_args("getTable", arg, 2))
941    return TRUE;
942  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
943    WerrorS("getTable: not a valid table");
944    return TRUE;
945  }
946  if (arg->next->Typ() != STRING_CMD) {
947    WerrorS("getTable: not a valid table key");
948    return TRUE;
949  }
950  TxTable *table = *(TxTable **) arg->Data();
951  if (!table) {
952    WerrorS("getTable: table has not been initialized");
953    return TRUE;
954  }
955  string key = (char *)(arg->next->Data());
956  string value;
957  int success = table->get(key, value);
958  if (success < 0) {
959    WerrorS("getTable: region not acquired");
960    return TRUE;
961  }
962  if (success == 0) {
963    WerrorS("getTable: key not found");
964    return TRUE;
965  }
966  leftv tmp = LinTree::from_string(value);
967  result->rtyp = tmp->Typ();
968  result->data = tmp->Data();
969  return FALSE;
970}
971
972BOOLEAN inTable(leftv result, leftv arg) {
973  if (wrong_num_args("inTable", arg, 2))
974    return TRUE;
975  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
976    WerrorS("inTable: not a valid table");
977    return TRUE;
978  }
979  if (arg->next->Typ() != STRING_CMD) {
980    WerrorS("inTable: not a valid table key");
981    return TRUE;
982  }
983  TxTable *table = *(TxTable **) arg->Data();
984  if (!table) {
985    WerrorS("inTable: table has not been initialized");
986    return TRUE;
987  }
988  string key = (char *)(arg->next->Data());
989  int success = table->check(key);
990  if (success < 0) {
991    WerrorS("inTable: region not acquired");
992    return TRUE;
993  }
994  result->rtyp = INT_CMD;
995  result->data = (char *)(long)(success);
996  return FALSE;
997}
998
999BOOLEAN putTable(leftv result, leftv arg) {
1000  if (wrong_num_args("putTable", arg, 3))
1001    return TRUE;
1002  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
1003    WerrorS("putTable: not a valid table");
1004    return TRUE;
1005  }
1006  if (arg->next->Typ() != STRING_CMD) {
1007    WerrorS("putTable: not a valid table key");
1008    return TRUE;
1009  }
1010  TxTable *table = *(TxTable **) arg->Data();
1011  if (!table) {
1012    WerrorS("putTable: table has not been initialized");
1013    return TRUE;
1014  }
1015  string key = (char *)(arg->next->Data());
1016  string value = LinTree::to_string(arg->next->next);
1017  int success = table->put(key, value);
1018  if (success < 0) {
1019    WerrorS("putTable: region not acquired");
1020    return TRUE;
1021  }
1022  result->rtyp = NONE;
1023  return FALSE;
1024}
1025
1026BOOLEAN getList(leftv result, leftv arg) {
1027  if (wrong_num_args("getList", arg, 2))
1028    return TRUE;
1029  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1030    WerrorS("getList: not a valid list (atomic or shared)");
1031    return TRUE;
1032  }
1033  if (arg->next->Typ() != INT_CMD) {
1034    WerrorS("getList: index must be an integer");
1035    return TRUE;
1036  }
1037  TxList *list = *(TxList **) arg->Data();
1038  if (!list) {
1039    WerrorS("getList: list has not been initialized");
1040    return TRUE;
1041  }
1042  long index = (long)(arg->next->Data());
1043  string value;
1044  int success = list->get(index, value);
1045  if (success < 0) {
1046    WerrorS("getList: region not acquired");
1047    return TRUE;
1048  }
1049  if (success == 0) {
1050    WerrorS("getList: no value at position");
1051    return TRUE;
1052  }
1053  leftv tmp = LinTree::from_string(value);
1054  result->rtyp = tmp->Typ();
1055  result->data = tmp->Data();
1056  return FALSE;
1057}
1058
1059BOOLEAN putList(leftv result, leftv arg) {
1060  if (wrong_num_args("putList", arg, 3))
1061    return TRUE;
1062  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1063    WerrorS("putList: not a valid list (shared or atomic)");
1064    return TRUE;
1065  }
1066  if (arg->next->Typ() != INT_CMD) {
1067    WerrorS("putList: index must be an integer");
1068    return TRUE;
1069  }
1070  TxList *list = *(TxList **) arg->Data();
1071  if (!list) {
1072    WerrorS("putList: list has not been initialized");
1073    return TRUE;
1074  }
1075  long index = (long)(arg->next->Data());
1076  string value = LinTree::to_string(arg->next->next);
1077  int success = list->put(index, value);
1078  if (success < 0) {
1079    WerrorS("putList: region not acquired");
1080    return TRUE;
1081  }
1082  result->rtyp = NONE;
1083  return FALSE;
1084}
1085
1086BOOLEAN lockRegion(leftv result, leftv arg) {
1087  if (wrong_num_args("lockRegion", arg, 1))
1088    return TRUE;
1089  if (not_a_region("lockRegion", arg))
1090    return TRUE;
1091  Region *region = *(Region **)arg->Data();
1092  if (region->is_locked()) {
1093    WerrorS("lockRegion: region is already locked");
1094    return TRUE;
1095  }
1096  region->lock();
1097  result->rtyp = NONE;
1098  return FALSE;
1099}
1100
1101BOOLEAN regionLock(leftv result, leftv arg) {
1102  if (wrong_num_args("lockRegion", arg, 1))
1103    return TRUE;
1104  if (not_a_region("lockRegion", arg))
1105    return TRUE;
1106  Region *region = *(Region **)arg->Data();
1107  if (region->is_locked()) {
1108    WerrorS("lockRegion: region is already locked");
1109    return TRUE;
1110  }
1111  region->lock();
1112  result->rtyp = type_regionlock;
1113  result->data = new_shared(region);
1114  return FALSE;
1115}
1116
1117
1118BOOLEAN unlockRegion(leftv result, leftv arg) {
1119  if (wrong_num_args("unlockRegion", arg, 1))
1120    return TRUE;
1121  if (not_a_region("unlockRegion", arg))
1122    return TRUE;
1123  Region *region = *(Region **)arg->Data();
1124  if (!region->is_locked()) {
1125    WerrorS("unlockRegion: region is not locked");
1126    return TRUE;
1127  }
1128  region->unlock();
1129  result->rtyp = NONE;
1130  return FALSE;
1131}
1132
1133BOOLEAN sendChannel(leftv result, leftv arg) {
1134  if (wrong_num_args("sendChannel", arg, 2))
1135    return TRUE;
1136  if (arg->Typ() != type_channel) {
1137    WerrorS("sendChannel: argument is not a channel");
1138    return TRUE;
1139  }
1140  SingularChannel *channel = *(SingularChannel **)arg->Data();
1141  if (!channel) {
1142    WerrorS("sendChannel: channel has not been initialized");
1143    return TRUE;
1144  }
1145  channel->send(LinTree::to_string(arg->next));
1146  result->rtyp = NONE;
1147  return FALSE;
1148}
1149
1150BOOLEAN receiveChannel(leftv result, leftv arg) {
1151  if (wrong_num_args("receiveChannel", arg, 1))
1152    return TRUE;
1153  if (arg->Typ() != type_channel) {
1154    WerrorS("receiveChannel: argument is not a channel");
1155    return TRUE;
1156  }
1157  SingularChannel *channel = *(SingularChannel **)arg->Data();
1158  if (!channel) {
1159    WerrorS("receiveChannel: channel has not been initialized");
1160    return TRUE;
1161  }
1162  string item = channel->receive();
1163  leftv val = LinTree::from_string(item);
1164  result->rtyp = val->Typ();
1165  result->data = val->Data();
1166  return FALSE;
1167}
1168
1169BOOLEAN statChannel(leftv result, leftv arg) {
1170  if (wrong_num_args("statChannel", arg, 1))
1171    return TRUE;
1172  if (arg->Typ() != type_channel) {
1173    WerrorS("statChannel: argument is not a channel");
1174    return TRUE;
1175  }
1176  SingularChannel *channel = *(SingularChannel **)arg->Data();
1177  if (!channel) {
1178    WerrorS("receiveChannel: channel has not been initialized");
1179    return TRUE;
1180  }
1181  long n = channel->count();
1182  result->rtyp = INT_CMD;
1183  result->data = (char *)n;
1184  return FALSE;
1185}
1186
1187BOOLEAN writeSyncVar(leftv result, leftv arg) {
1188  if (wrong_num_args("writeSyncVar", arg, 2))
1189    return TRUE;
1190  if (arg->Typ() != type_syncvar) {
1191    WerrorS("writeSyncVar: argument is not a syncvar");
1192    return TRUE;
1193  }
1194  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1195  if (!syncvar) {
1196    WerrorS("writeSyncVar: syncvar has not been initialized");
1197    return TRUE;
1198  }
1199  if (!syncvar->write(LinTree::to_string(arg->next))) {
1200    WerrorS("writeSyncVar: variable already has a value");
1201    return TRUE;
1202  }
1203  result->rtyp = NONE;
1204  return FALSE;
1205}
1206
1207BOOLEAN updateSyncVar(leftv result, leftv arg) {
1208  Command cmd("updateSyncVar", result, arg);
1209  cmd.check_argc_min(2);
1210  cmd.check_arg(0, type_syncvar, "first argument must be a syncvar");
1211  cmd.check_init(0, "syncvar has not been initialized");
1212  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
1213  if (cmd.ok()) {
1214    SingularSyncVar *syncvar = cmd.shared_arg<SingularSyncVar>(0);
1215    char *procname = (char *) cmd.arg(1);
1216    arg = arg->next->next;
1217    syncvar->acquire();
1218    syncvar->wait_init();
1219    vector<leftv> argv;
1220    appendArg(argv, syncvar->get());
1221    while (arg) {
1222      appendArgCopy(argv, arg);
1223      arg = arg->next;
1224    }
1225    int error = executeProc(*result, procname, argv);
1226    if (!error) {
1227      syncvar->update(result);
1228    }
1229    syncvar->release();
1230    return error;
1231  }
1232  return cmd.status();
1233}
1234
1235
1236BOOLEAN readSyncVar(leftv result, leftv arg) {
1237  if (wrong_num_args("readSyncVar", arg, 1))
1238    return TRUE;
1239  if (arg->Typ() != type_syncvar) {
1240    WerrorS("readSyncVar: argument is not a syncvar");
1241    return TRUE;
1242  }
1243  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1244  if (!syncvar) {
1245    WerrorS("readSyncVar: syncvar has not been initialized");
1246    return TRUE;
1247  }
1248  string item = syncvar->read();
1249  leftv val = LinTree::from_string(item);
1250  result->rtyp = val->Typ();
1251  result->data = val->Data();
1252  return FALSE;
1253}
1254
1255BOOLEAN statSyncVar(leftv result, leftv arg) {
1256  if (wrong_num_args("statSyncVar", arg, 1))
1257    return TRUE;
1258  if (arg->Typ() != type_syncvar) {
1259    WerrorS("statSyncVar: argument is not a syncvar");
1260    return TRUE;
1261  }
1262  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1263  if (!syncvar) {
1264    WerrorS("statSyncVar: syncvar has not been initialized");
1265    return TRUE;
1266  }
1267  int init = syncvar->check();
1268  result->rtyp = INT_CMD;
1269  result->data = (char *)(long) init;
1270  return FALSE;
1271}
1272
1273void encode_shared(LinTree::LinTree &lintree, leftv val) {
1274  SharedObject *obj = *(SharedObject **)(val->Data());
1275  acquireShared(obj);
1276  lintree.put(obj);
1277}
1278
1279leftv decode_shared(LinTree::LinTree &lintree) {
1280  int type = lintree.get_prev<int>();
1281  SharedObject *obj = lintree.get<SharedObject *>();
1282  leftv result = (leftv) omAlloc0Bin(sleftv_bin);
1283  result->rtyp = type;
1284  result->data = (void *)new_shared(obj);
1285  return result;
1286}
1287
1288void ref_shared(LinTree::LinTree &lintree, int by) {
1289  SharedObject *obj = lintree.get<SharedObject *>();
1290  while (by > 0) {
1291    obj->incref();
1292    by--;
1293  }
1294  while (by < 0) {
1295    obj->decref();
1296    by++;
1297  }
1298}
1299
1300void installShared(int type) {
1301  LinTree::install(type, encode_shared, decode_shared, ref_shared);
1302}
1303
1304void makeSharedType(int &type, const char *name) {
1305  if (type != 0) return;
1306  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1307  b->blackbox_Init = shared_init;
1308  b->blackbox_destroy = shared_destroy;
1309  b->blackbox_Copy = shared_copy;
1310  b->blackbox_String = shared_string;
1311  b->blackbox_Assign = shared_assign;
1312  b->blackbox_CheckAssign = shared_check_assign;
1313  // b->blackbox_Op2 = shared_op2;
1314  // b->blackbox_Op3 = shared_op3;
1315  type = setBlackboxStuff(b, name);
1316  installShared(type);
1317}
1318
1319void makeRegionlockType(int &type, const char *name) {
1320  if (type != 0) return;
1321  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1322  b->blackbox_Init = shared_init;
1323  b->blackbox_destroy = rlock_destroy;
1324  b->blackbox_Copy = shared_copy;
1325  b->blackbox_String = shared_string;
1326  b->blackbox_Assign = rlock_assign;
1327  b->blackbox_CheckAssign = shared_check_assign;
1328  type = setBlackboxStuff(b, name);
1329  installShared(type);
1330}
1331
1332#define MAX_THREADS 128
1333
1334class ThreadState {
1335public:
1336  bool active;
1337  bool running;
1338  int index;
1339  void *(*thread_func)(ThreadState *, void *);
1340  void *arg, *result;
1341  pthread_t id;
1342  pthread_t parent;
1343  Lock lock;
1344  ConditionVariable to_cond;
1345  ConditionVariable from_cond;
1346  queue<string> to_thread;
1347  queue<string> from_thread;
1348  ThreadState() : lock(), to_cond(&lock), from_cond(&lock),
1349                  to_thread(), from_thread() {
1350    active = false;
1351    running = false;
1352    index = -1;
1353  }
1354  ~ThreadState() {
1355    // We do nothing here. This is to prevent the condition
1356    // variable destructor from firing upon program exit,
1357    // which would invoke undefined behavior if the thread
1358    // is still running.
1359  }
1360};
1361
1362Lock thread_lock;
1363
1364ThreadState *thread_state;
1365
1366void setOption(int ch) {
1367  int index = feGetOptIndex(ch);
1368  feSetOptValue((feOptIndex) index, (int) 1);
1369}
1370
1371void thread_init() {
1372  master_lock.lock();
1373  thread_id = ++thread_counter;
1374  master_lock.unlock();
1375#ifdef ENABLE_THREADS
1376  extern void pSingular_initialize_thread();
1377  pSingular_initialize_thread();
1378  siInit(global_argv0);
1379#endif
1380  setOption('q');
1381  // setOption('b');
1382}
1383
1384void *thread_main(void *arg) {
1385  ThreadState *ts = (ThreadState *)arg;
1386  thread_init();
1387  return ts->thread_func(ts, ts->arg);
1388}
1389
1390void *interpreter_thread(ThreadState *ts, void *arg) {
1391  ts->lock.lock();
1392  for (;;) {
1393    bool eval = false;
1394    while (ts->to_thread.empty())
1395      ts->to_cond.wait();
1396    /* TODO */
1397    string expr = ts->to_thread.front();
1398    switch (expr[0]) {
1399      case '\0': case 'q':
1400        ts->lock.unlock();
1401        return NULL;
1402      case 'x':
1403        eval = false;
1404        break;
1405      case 'e':
1406        eval = true;
1407        break;
1408    }
1409    ts->to_thread.pop();
1410    expr = ts->to_thread.front();
1411    /* this will implicitly eval commands */
1412    leftv val = LinTree::from_string(expr);
1413    expr = LinTree::to_string(val);
1414    ts->to_thread.pop();
1415    if (eval)
1416      ts->from_thread.push(expr);
1417    ts->from_cond.signal();
1418  }
1419  ts->lock.unlock();
1420  return NULL;
1421}
1422
1423class InterpreterThread : public SharedObject {
1424private:
1425  ThreadState *ts;
1426public:
1427  InterpreterThread(ThreadState *ts_init) : SharedObject(), ts(ts_init) { }
1428  virtual ~InterpreterThread() { }
1429  ThreadState *getThreadState() { return ts; }
1430  void clearThreadState() {
1431    ts = NULL;
1432  }
1433};
1434
1435static ThreadState *newThread(void *(*thread_func)(ThreadState *, void *),
1436    void *arg, const char **error) {
1437  ThreadState *ts = NULL;
1438  if (error) *error = NULL;
1439  thread_lock.lock();
1440  for (int i=0; i<MAX_THREADS; i++) {
1441    if (!thread_state[i].active) {
1442      ts = thread_state + i;
1443      ts->index = i;
1444      ts->parent = pthread_self();
1445      ts->active = true;
1446      ts->running = true;
1447      ts->to_thread = queue<string>();
1448      ts->from_thread = queue<string>();
1449      ts->thread_func = thread_func;
1450      ts->arg = arg;
1451      ts->result = NULL;
1452      if (pthread_create(&ts->id, NULL, thread_main, ts)<0) {
1453        if (error)
1454          *error = "createThread: internal error: failed to create thread";
1455        goto fail;
1456      }
1457      goto exit;
1458    }
1459  }
1460  if (error) *error = "createThread: too many threads";
1461  fail:
1462  ts = NULL;
1463  exit:
1464  thread_lock.unlock();
1465  return ts;
1466}
1467
1468ThreadState *createThread(void *(*thread_func)(ThreadState *, void *),
1469    void *arg) {
1470  return newThread(thread_func, arg, NULL);
1471}
1472
1473void *joinThread(ThreadState *ts) {
1474  void *result;
1475  pthread_join(ts->id, NULL);
1476  result = ts->result;
1477  thread_lock.lock();
1478  ts->running = false;
1479  ts->active = false;
1480  thread_lock.unlock();
1481}
1482
1483static InterpreterThread *createInterpreterThread(const char **error) {
1484  ThreadState *ts = newThread(interpreter_thread, NULL, error);
1485  if (*error) return NULL;
1486  InterpreterThread *thread = new InterpreterThread(ts);
1487  char buf[10];
1488  sprintf(buf, "%d", ts->index);
1489  string name(buf);
1490  thread->set_name(name);
1491  thread->set_type(type_thread);
1492  return thread;
1493}
1494
1495static BOOLEAN createThread(leftv result, leftv arg) {
1496  Command cmd("createThread", result, arg);
1497  cmd.check_argc(0);
1498  const char *error;
1499  if (!have_threads)
1500    cmd.report("thread support not available");
1501  if (!cmd.ok()) return cmd.status();
1502  InterpreterThread *thread = createInterpreterThread(&error);
1503  if (error) {
1504    return cmd.abort(error);
1505  }
1506  cmd.set_result(type_thread, new_shared(thread));
1507  return cmd.status();
1508}
1509
1510static bool joinInterpreterThread(InterpreterThread *thread) {
1511  ThreadState *ts = thread->getThreadState();
1512  if (ts && ts->parent != pthread_self()) {
1513    return false;
1514  }
1515  ts->lock.lock();
1516  string quit("q");
1517  ts->to_thread.push(quit);
1518  ts->to_cond.signal();
1519  ts->lock.unlock();
1520  pthread_join(ts->id, NULL);
1521  thread_lock.lock();
1522  ts->running = false;
1523  ts->active = false;
1524  thread->clearThreadState();
1525  thread_lock.unlock();
1526  return true;
1527}
1528
1529static BOOLEAN joinThread(leftv result, leftv arg) {
1530  if (wrong_num_args("joinThread", arg, 1))
1531    return TRUE;
1532  if (arg->Typ() != type_thread) {
1533    WerrorS("joinThread: argument is not a thread");
1534    return TRUE;
1535  }
1536  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
1537  if (!joinInterpreterThread(thread)) {
1538    WerrorS("joinThread: can only be called from parent thread");
1539    return TRUE;
1540  }
1541  return FALSE;
1542}
1543
1544class ThreadPool;
1545class Trigger;
1546
1547class Job : public SharedObject {
1548public:
1549  ThreadPool *pool;
1550  long prio;
1551  size_t id;
1552  long pending_index;
1553  vector<Job *> deps;
1554  vector<Job *> notify;
1555  vector<Trigger *> triggers;
1556  vector<string> args;
1557  string result; // lintree-encoded
1558  void *data;
1559  bool fast;
1560  bool done;
1561  bool queued;
1562  bool running;
1563  bool cancelled;
1564  Job() : SharedObject(), pool(NULL), deps(), pending_index(-1), fast(false),
1565    done(false), running(false), queued(false), cancelled(false), data(NULL),
1566    result(), args(), notify(), triggers(), prio(0)
1567  { set_type(type_job); }
1568  ~Job();
1569  void addDep(Job *job) {
1570    deps.push_back(job);
1571  }
1572  void addDep(vector<Job *> &jobs);
1573  void addDep(long ndeps, Job **jobs);
1574  void addNotify(vector<Job *> &jobs);
1575  void addNotify(Job *job);
1576  virtual bool ready();
1577  virtual void execute() = 0;
1578  void run();
1579};
1580
1581struct JobCompare {
1582  bool operator()(const Job* lhs, const Job* rhs) {
1583    if (lhs->fast < rhs->fast) {
1584      return true;
1585    }
1586    if (lhs->prio < rhs->prio) {
1587      return true;
1588    }
1589    if (lhs->prio == rhs->prio) {
1590      return lhs->id > rhs->id;
1591    }
1592    return false;
1593  }
1594};
1595
1596class Trigger : public Job {
1597public:
1598  virtual bool accept(leftv arg) = 0;
1599  virtual void activate(leftv arg) = 0;
1600  Trigger() : Job() { set_type(type_trigger); fast = true; }
1601};
1602
1603bool Job::ready() {
1604  vector<Job *>::iterator it;
1605  for (it = deps.begin(); it != deps.end(); it++) {
1606    if (!(*it)->done) return false;
1607  }
1608  return true;
1609}
1610
1611Job::~Job() {
1612  vector<Job *>::iterator it;
1613  for (it = deps.begin(); it != deps.end(); it++) {
1614    releaseShared(*it);
1615  }
1616}
1617
1618typedef queue<Job *> JobQueue;
1619
1620class Scheduler;
1621
1622struct SchedInfo {
1623  Scheduler *scheduler;
1624  Job *job;
1625  int num;
1626};
1627
1628STATIC_VAR ThreadPool *currentThreadPoolRef;
1629STATIC_VAR Job *currentJobRef;
1630
1631class ThreadPool : public SharedObject {
1632public:
1633  Scheduler *scheduler;
1634  int nthreads;
1635  ThreadPool(Scheduler *sched, int n);
1636  ThreadPool(int n);
1637  ~ThreadPool();
1638  ThreadState *getThread(int i);
1639  void shutdown(bool wait);
1640  void addThread(ThreadState *thread);
1641  void attachJob(Job *job);
1642  void detachJob(Job *job);
1643  void queueJob(Job *job);
1644  void broadcastJob(Job *job);
1645  void cancelDeps(Job * job);
1646  void cancelJob(Job *job);
1647  void waitJob(Job *job);
1648  void clearThreadState();
1649};
1650
1651
1652class Scheduler : public SharedObject {
1653private:
1654  bool single_threaded;
1655  size_t jobid;
1656  int nthreads;
1657  int maxconcurrency;
1658  int running;
1659  bool shutting_down;
1660  int shutdown_counter;
1661  vector<ThreadState *> threads;
1662  vector<ThreadPool *> thread_owners;
1663  priority_queue<Job *, vector<Job *>, JobCompare> global_queue;
1664  vector<JobQueue *> thread_queues;
1665  vector<Job *> pending;
1666  ConditionVariable cond;
1667  ConditionVariable response;
1668  friend class Job;
1669public:
1670  Lock lock;
1671  Scheduler(int n) :
1672    SharedObject(), threads(), thread_owners(), global_queue(), thread_queues(),
1673    single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1674    lock(true), cond(&lock), response(&lock),
1675    shutting_down(false), shutdown_counter(0), jobid(0),
1676    maxconcurrency(n), running(0)
1677  {
1678    thread_queues.push_back(new JobQueue());
1679  }
1680  void set_maxconcurrency(int n) {
1681    maxconcurrency = n;
1682  }
1683  int get_maxconcurrency() {
1684    return maxconcurrency;
1685  }
1686  int threadpool_size(ThreadPool *pool) {
1687    int n;
1688    for (int i = 0; i <thread_owners.size(); i++) {
1689      if (thread_owners[i] == pool)
1690        n++;
1691    }
1692    return n;
1693  }
1694  virtual ~Scheduler() {
1695    for (int i = 0; i < thread_queues.size(); i++) {
1696      JobQueue *q = thread_queues[i];
1697      while (!q->empty()) {
1698        Job *job = q->front();
1699        q->pop();
1700        releaseShared(job);
1701      }
1702    }
1703    thread_queues.clear();
1704    threads.clear();
1705  }
1706  ThreadState *getThread(int i) { return threads[i]; }
1707  void shutdown(bool wait) {
1708    if (single_threaded) {
1709      SchedInfo *info = new SchedInfo();
1710      info->num = 0;
1711      info->scheduler = this;
1712      acquireShared(this);
1713      info->job = NULL;
1714      Scheduler::main(NULL, info);
1715      return;
1716    }
1717    lock.lock();
1718    if (wait) {
1719      while (!global_queue.empty()) {
1720        response.wait();
1721      }
1722    }
1723    shutting_down = true;
1724    while (shutdown_counter < nthreads) {
1725      cond.broadcast();
1726      response.wait();
1727    }
1728    lock.unlock();
1729    for (int i = 0; i <threads.size(); i++) {
1730      joinThread(threads[i]);
1731    }
1732  }
1733  void addThread(ThreadPool *owner, ThreadState *thread) {
1734    lock.lock();
1735    thread_owners.push_back(owner);
1736    threads.push_back(thread);
1737    thread_queues.push_back(new JobQueue());
1738    lock.unlock();
1739  }
1740  void attachJob(ThreadPool *pool, Job *job) {
1741    lock.lock();
1742    job->pool = pool;
1743    job->id = jobid++;
1744    acquireShared(job);
1745    if (job->ready()) {
1746      global_queue.push(job);
1747      cond.signal();
1748    }
1749    else if (job->pending_index < 0) {
1750      job->pool = pool;
1751      job->pending_index = pending.size();
1752      pending.push_back(job);
1753    }
1754    lock.unlock();
1755  }
1756  void detachJob(Job *job) {
1757    lock.lock();
1758    long i = job->pending_index;
1759    job->pending_index = -1;
1760    if (i >= 0) {
1761      job = pending.back();
1762      pending.resize(pending.size()-1);
1763      pending[i] = job;
1764      job->pending_index = i;
1765    }
1766    lock.unlock();
1767  }
1768  void queueJob(Job *job) {
1769    lock.lock();
1770    global_queue.push(job);
1771    cond.signal();
1772    lock.unlock();
1773  }
1774  void broadcastJob(ThreadPool *pool, Job *job) {
1775    lock.lock();
1776    for (int i = 0; i <thread_queues.size(); i++) {
1777      if (thread_owners[i] == pool) {
1778        acquireShared(job);
1779        thread_queues[i]->push(job);
1780      }
1781    }
1782    lock.unlock();
1783  }
1784  void cancelDeps(Job * job) {
1785    vector<Job *> &notify = job->notify;
1786    for (int i = 0; i <notify.size(); i++) {
1787      Job *next = notify[i];
1788      if (!next->cancelled) {
1789        cancelJob(next);
1790      }
1791    }
1792  }
1793  void cancelJob(Job *job) {
1794    lock.lock();
1795    if (!job->cancelled) {
1796      job->cancelled = true;
1797      if (!job->running && !job->done) {
1798        job->done = true;
1799        cancelDeps(job);
1800      }
1801    }
1802    lock.unlock();
1803  }
1804  void waitJob(Job *job) {
1805    if (single_threaded) {
1806      SchedInfo *info = new SchedInfo();
1807      info->num = 0;
1808      info->scheduler = this;
1809      acquireShared(this);
1810      info->job = job;
1811      Scheduler::main(NULL, info);
1812    } else {
1813      lock.lock();
1814      for (;;) {
1815        if (job->done || job->cancelled) {
1816          break;
1817        }
1818        response.wait();
1819      }
1820      response.signal(); // forward signal
1821      lock.unlock();
1822    }
1823  }
1824  void clearThreadState() {
1825    threads.clear();
1826  }
1827  static void notifyDeps(Scheduler *scheduler, Job *job) {
1828    vector<Job *> &notify = job->notify;
1829    job->incref(notify.size());
1830    for (int i = 0; i <notify.size(); i++) {
1831      Job *next = notify[i];
1832      if (!next->queued && next->ready() && !next->cancelled) {
1833        next->queued = true;
1834        scheduler->queueJob(next);
1835      }
1836    }
1837    vector<Trigger *> &triggers = job->triggers;
1838    leftv arg = NULL;
1839    if (triggers.size() > 0 && job->result.size() > 0)
1840      arg = LinTree::from_string(job->result);
1841    for (int i = 0; i < triggers.size(); i++) {
1842      Trigger *trigger = triggers[i];
1843      if (trigger->accept(arg)) {
1844        trigger->activate(arg);
1845        if (trigger->ready())
1846           scheduler->queueJob(trigger);
1847      }
1848    }
1849    if (arg) {
1850      arg->CleanUp();
1851      omFreeBin(arg, sleftv_bin);
1852    }
1853  }
1854  static void *main(ThreadState *ts, void *arg) {
1855    SchedInfo *info = (SchedInfo *) arg;
1856    Scheduler *scheduler = info->scheduler;
1857    ThreadPool *oldThreadPool = currentThreadPoolRef;
1858    // TODO: set current thread pool
1859    // currentThreadPoolRef = pool;
1860    Lock &lock = scheduler->lock;
1861    ConditionVariable &cond = scheduler->cond;
1862    ConditionVariable &response = scheduler->response;
1863    JobQueue *my_queue = scheduler->thread_queues[info->num];
1864    if (!scheduler->single_threaded)
1865      thread_init();
1866    lock.lock();
1867    for (;;) {
1868      if (info->job && info->job->done)
1869        break;
1870      if (scheduler->shutting_down) {
1871        scheduler->shutdown_counter++;
1872        scheduler->response.signal();
1873        break;
1874      }
1875      if (!my_queue->empty()) {
1876       Job *job = my_queue->front();
1877       my_queue->pop();
1878       if (!scheduler->global_queue.empty())
1879         cond.signal();
1880       currentJobRef = job;
1881       job->run();
1882       currentJobRef = NULL;
1883       notifyDeps(scheduler, job);
1884       releaseShared(job);
1885       scheduler->response.signal();
1886       continue;
1887      } else if (!scheduler->global_queue.empty()) {
1888       Job *job = scheduler->global_queue.top();
1889       scheduler->global_queue.pop();
1890       if (!scheduler->global_queue.empty())
1891         cond.signal();
1892       currentJobRef = job;
1893       job->run();
1894       currentJobRef = NULL;
1895       notifyDeps(scheduler, job);
1896       releaseShared(job);
1897       scheduler->response.signal();
1898       continue;
1899      } else {
1900        if (scheduler->single_threaded) {
1901          break;
1902        }
1903        cond.wait();
1904      }
1905    }
1906    // TODO: correct current thread pool
1907    // releaseShared(currentThreadPoolRef);
1908    currentThreadPoolRef = oldThreadPool;
1909    scheduler->lock.unlock();
1910    delete info;
1911    return NULL;
1912  }
1913};
1914
1915ThreadPool::ThreadPool(int n) : SharedObject(), nthreads(n) {
1916  scheduler = new Scheduler(n);
1917  acquireShared(scheduler);
1918}
1919ThreadPool::ThreadPool(Scheduler *sched, int n) : SharedObject(), nthreads(n) {
1920  scheduler = sched;
1921  acquireShared(sched);
1922}
1923ThreadPool::~ThreadPool() {
1924  releaseShared(scheduler);
1925}
1926ThreadState *ThreadPool::getThread(int i) { return scheduler->getThread(i); }
1927void ThreadPool::shutdown(bool wait) { scheduler->shutdown(wait); }
1928void ThreadPool::addThread(ThreadState *thread) {
1929  scheduler->addThread(this, thread);
1930}
1931void ThreadPool::attachJob(Job *job) {
1932  scheduler->attachJob(this, job);
1933}
1934void ThreadPool::detachJob(Job *job) {
1935  scheduler->detachJob(job);
1936}
1937void ThreadPool::queueJob(Job *job) {
1938  scheduler->queueJob(job);
1939}
1940void ThreadPool::broadcastJob(Job *job) {
1941  scheduler->broadcastJob(this, job);
1942}
1943void ThreadPool::cancelDeps(Job * job) {
1944  scheduler->cancelDeps(job);
1945}
1946void ThreadPool::cancelJob(Job *job) {
1947  scheduler->cancelJob(job);
1948}
1949void ThreadPool::waitJob(Job *job) {
1950  scheduler->waitJob(job);
1951}
1952void ThreadPool::clearThreadState() {
1953  scheduler->clearThreadState();
1954}
1955
1956void Job::addDep(vector<Job *> &jobs) {
1957  deps.insert(deps.end(), jobs.begin(), jobs.end());
1958}
1959
1960void Job::addDep(long ndeps, Job **jobs) {
1961  for (long i = 0; i < ndeps; i++) {
1962    deps.push_back(jobs[i]);
1963  }
1964}
1965
1966void Job::addNotify(vector<Job *> &jobs) {
1967  notify.insert(notify.end(), jobs.begin(), jobs.end());
1968  if (done) {
1969    Scheduler::notifyDeps(pool->scheduler, this);
1970  }
1971}
1972
1973void Job::addNotify(Job *job) {
1974  notify.push_back(job);
1975  if (done) {
1976    Scheduler::notifyDeps(pool->scheduler, this);
1977  }
1978}
1979
1980void Job::run() {
1981  if (!cancelled) {
1982    running = true;
1983    pool->scheduler->lock.unlock();
1984    pool->scheduler->running++;
1985    execute();
1986    pool->scheduler->running--;
1987    pool->scheduler->lock.lock();
1988    running = false;
1989  }
1990  done = true;
1991}
1992
1993class AccTrigger : public Trigger {
1994private:
1995  long count;
1996public:
1997  AccTrigger(long count_init): Trigger(), count(count_init) {
1998  }
1999  virtual bool ready() {
2000    if (!Trigger::ready()) return false;
2001    return args.size() >= count;
2002  }
2003  virtual bool accept(leftv arg) {
2004    return true;
2005  }
2006  virtual void activate(leftv arg) {
2007    while (arg != NULL && !ready()) {
2008      args.push_back(LinTree::to_string(arg));
2009      if (ready()) {
2010        return;
2011      }
2012      arg = arg->next;
2013    }
2014  }
2015  virtual void execute() {
2016    lists l = (lists) omAlloc0Bin(slists_bin);
2017    l->Init(args.size());
2018    for (int i = 0; i < args.size(); i++) {
2019      leftv val = LinTree::from_string(args[i]);
2020      memcpy(&l->m[i], val, sizeof(*val));
2021      omFreeBin(val, sleftv_bin);
2022    }
2023    sleftv val;
2024    memset(&val, 0, sizeof(val));
2025    val.rtyp = LIST_CMD;
2026    val.data = l;
2027    result = LinTree::to_string(&val);
2028    // val.CleanUp();
2029  }
2030};
2031
2032class CountTrigger : public Trigger {
2033private:
2034  long count;
2035public:
2036  CountTrigger(long count_init): Trigger(), count(count_init) {
2037  }
2038  virtual bool ready() {
2039    if (!Trigger::ready()) return false;
2040    return count <= 0;
2041  }
2042  virtual bool accept(leftv arg) {
2043    return arg == NULL;
2044  }
2045  virtual void activate(leftv arg) {
2046    if (!ready()) {
2047      count--;
2048    }
2049  }
2050  virtual void execute() {
2051    // do nothing
2052  }
2053};
2054
2055class SetTrigger : public Trigger {
2056private:
2057  vector<bool> set;
2058  long count;
2059public:
2060  SetTrigger(long count_init) : Trigger(), count(0),
2061    set(count_init) {
2062  }
2063  virtual bool ready() {
2064    if (!Trigger::ready()) return false;
2065    return count == set.size();
2066  }
2067  virtual bool accept(leftv arg) {
2068    return arg->Typ() == INT_CMD;
2069  }
2070  virtual void activate(leftv arg) {
2071    if (!ready()) {
2072      long value = (long) arg->Data();
2073      if (value < 0 || value >= count) return;
2074      if (set[value]) return;
2075      set[value] = true;
2076      count++;
2077    }
2078  }
2079  virtual void execute() {
2080    // do nothing
2081  }
2082};
2083
2084
2085class ProcTrigger : public Trigger {
2086private:
2087  string procname;
2088  bool success;
2089public:
2090  ProcTrigger(const char *p) : Trigger(), procname(p), success(false) {
2091  }
2092  virtual bool ready() {
2093    if (!Trigger::ready()) return false;
2094    return success;
2095  }
2096  virtual bool accept(leftv arg) {
2097    return TRUE;
2098  }
2099  virtual void activate(leftv arg) {
2100    if (!ready()) {
2101      pool->scheduler->lock.unlock();
2102      vector<leftv> argv;
2103      for (int i = 0; i < args.size(); i++) {
2104        appendArg(argv, args[i]);
2105      }
2106      int error = false;
2107      while (arg) {
2108        appendArgCopy(argv, arg);
2109        arg = arg->next;
2110      }
2111      sleftv val;
2112      if (!error)
2113        error = executeProc(val, procname.c_str(), argv);
2114      if (!error) {
2115        if (val.Typ() == NONE || (val.Typ() == INT_CMD &&
2116                                  (long) val.Data()))
2117        {
2118          success = true;
2119        }
2120        val.CleanUp();
2121      }
2122      pool->scheduler->lock.lock();
2123    }
2124  }
2125  virtual void execute() {
2126    // do nothing
2127  }
2128};
2129
2130static BOOLEAN createThreadPool(leftv result, leftv arg) {
2131  long n;
2132  Command cmd("createThreadPool", result, arg);
2133  cmd.check_argc(1, 2);
2134  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2135  if (cmd.ok()) {
2136    n = (long) cmd.arg(0);
2137    if (n < 0) cmd.report("number of threads must be non-negative");
2138    else if (n >= 256) cmd.report("number of threads too large");
2139    if (!have_threads && n != 0)
2140      cmd.report("in single-threaded mode, number of threads must be zero");
2141  }
2142  if (cmd.ok()) {
2143    ThreadPool *pool = new ThreadPool((int) n);
2144    pool->set_type(type_threadpool);
2145    for (int i = 0; i <n; i++) {
2146      const char *error;
2147      SchedInfo *info = new SchedInfo();
2148      info->scheduler = pool->scheduler;
2149      acquireShared(pool->scheduler);
2150      info->job = NULL;
2151      info->num = i;
2152      ThreadState *thread = newThread(Scheduler::main, info, &error);
2153      if (!thread) {
2154        // TODO: clean up bad pool
2155        return cmd.abort(error);
2156      }
2157      pool->addThread(thread);
2158    }
2159    cmd.set_result(type_threadpool, new_shared(pool));
2160  }
2161  return cmd.status();
2162}
2163
2164static BOOLEAN createThreadPoolSet(leftv result, leftv arg) {
2165  Command cmd("createThreadPoolSet", result, arg);
2166  cmd.check_argc(2);
2167  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2168  cmd.check_arg(1, LIST_CMD, "second argument must be a list of integers");
2169  lists l;
2170  int n;
2171  if (cmd.ok()) {
2172    l = (lists) (cmd.arg(1));
2173    n = lSize(l)+1;
2174    if (n == 0)
2175      return cmd.abort("second argument must not be empty");
2176    for (int i = 0; i < n; i++) {
2177      if (l->m[i].Typ() != INT_CMD)
2178        return cmd.abort("second argument must be a list of integers");
2179    }
2180  }
2181  lists pools = (lists) omAlloc0Bin(slists_bin);
2182  pools->Init(n);
2183  if (cmd.ok()) {
2184    long s = 0;
2185    for (int i = 0; i < n; i++) {
2186      s += (long) (l->m[i].Data());
2187    }
2188    Scheduler *sched = new Scheduler((int)s);
2189    sched->set_maxconcurrency(cmd.int_arg(0));
2190    for (int i = 0; i < n; i++) {
2191      long m = (long) (l->m[i].Data());
2192      ThreadPool *pool = new ThreadPool(sched, (int) m);
2193      pool->set_type(type_threadpool);
2194      for (int j = 0; j < m; j++) {
2195        const char *error;
2196        SchedInfo *info = new SchedInfo();
2197        info->scheduler = pool->scheduler;
2198        acquireShared(pool->scheduler);
2199        info->job = NULL;
2200        info->num = i;
2201        ThreadState *thread = newThread(Scheduler::main, info, &error);
2202        if (!thread) {
2203          // TODO: clean up bad pool
2204          return cmd.abort(error);
2205        }
2206        pool->addThread(thread);
2207      }
2208      pools->m[i].rtyp = type_threadpool;
2209      pools->m[i].data = new_shared(pool);
2210    }
2211    cmd.set_result(LIST_CMD, pools);
2212  }
2213  return cmd.status();
2214}
2215
2216ThreadPool *createThreadPool(int nthreads, int prioThreads = 0) {
2217  ThreadPool *pool = new ThreadPool((int) nthreads);
2218  pool->set_type(type_threadpool);
2219  for (int i = 0; i <nthreads; i++) {
2220    const char *error;
2221    SchedInfo *info = new SchedInfo();
2222    info->scheduler = pool->scheduler;
2223    acquireShared(pool);
2224    info->job = NULL;
2225    info->num = i;
2226    ThreadState *thread = newThread(Scheduler::main, info, &error);
2227    if (!thread) {
2228      return NULL;
2229    }
2230    pool->addThread(thread);
2231  }
2232  return pool;
2233}
2234
2235void release(ThreadPool *pool) {
2236  releaseShared(pool);
2237}
2238
2239void retain(ThreadPool *pool) {
2240  acquireShared(pool);
2241}
2242
2243ThreadPool *getCurrentThreadPool() {
2244  return currentThreadPoolRef;
2245}
2246
2247static BOOLEAN getThreadPoolWorkers(leftv result, leftv arg) {
2248  Command cmd("getThreadPoolWorkers", result, arg);
2249  cmd.check_argc(1);
2250  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2251  cmd.check_init(0, "threadpool not initialized");
2252  int r = 0;
2253  if (cmd.ok()) {
2254    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2255    Scheduler *sched = pool->scheduler;
2256    sched->lock.lock();
2257    r = sched->threadpool_size(pool);
2258    sched->lock.unlock();
2259    cmd.set_result(INT_CMD, r);
2260  }
2261  return cmd.status();
2262}
2263
2264static BOOLEAN setThreadPoolWorkers(leftv result, leftv arg) {
2265  Command cmd("setThreadPoolWorkers", result, arg);
2266  cmd.check_argc(2);
2267  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2268  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2269  cmd.check_init(0, "threadpool not initialized");
2270  if (cmd.ok()) {
2271    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2272    Scheduler *sched = pool->scheduler;
2273    // TODO: count/add threads
2274    cmd.no_result();
2275  }
2276  return cmd.status();
2277}
2278
2279static BOOLEAN getThreadPoolConcurrency(leftv result, leftv arg) {
2280  Command cmd("getThreadPoolConcurrency", result, arg);
2281  cmd.check_argc(1);
2282  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2283  cmd.check_init(0, "threadpool not initialized");
2284  if (cmd.ok()) {
2285    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2286    Scheduler *sched = pool->scheduler;
2287    sched->lock.lock();
2288    cmd.set_result(INT_CMD, sched->get_maxconcurrency());
2289    sched->lock.unlock();
2290  }
2291  return cmd.status();
2292}
2293
2294static BOOLEAN setThreadPoolConcurrency(leftv result, leftv arg) {
2295  Command cmd("setThreadPoolWorkers", result, arg);
2296  cmd.check_argc(2);
2297  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2298  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2299  cmd.check_init(0, "threadpool not initialized");
2300  if (cmd.ok()) {
2301    ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2302    Scheduler *sched = pool->scheduler;
2303    sched->lock.lock();
2304    sched->set_maxconcurrency(cmd.int_arg(1));
2305    sched->lock.unlock();
2306    cmd.no_result();
2307  }
2308  return cmd.status();
2309}
2310
2311static BOOLEAN closeThreadPool(leftv result, leftv arg) {
2312  Command cmd("closeThreadPool", result, arg);
2313  cmd.check_argc(1, 2);
2314  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2315  cmd.check_init(0, "threadpool not initialized");
2316  if (cmd.nargs() > 1)
2317    cmd.check_arg(1, INT_CMD, "optional argument must be an integer");
2318  if (cmd.ok()) {
2319    ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2320    bool wait = cmd.nargs() == 2 ? (cmd.int_arg(1) != 0) : 1;
2321    pool->shutdown(wait);
2322    cmd.no_result();
2323  }
2324  return cmd.status();
2325}
2326
2327void closeThreadPool(ThreadPool *pool, bool wait) {
2328  pool->shutdown(wait);
2329}
2330
2331
2332BOOLEAN currentThreadPool(leftv result, leftv arg) {
2333  Command cmd("currentThreadPool", result, arg);
2334  cmd.check_argc(0);
2335  ThreadPool *pool = currentThreadPoolRef;
2336  if (pool) {
2337    cmd.set_result(type_threadpool, new_shared(pool));
2338  } else {
2339    cmd.report("no current threadpool");
2340  }
2341  return cmd.status();
2342}
2343
2344BOOLEAN setCurrentThreadPool(leftv result, leftv arg) {
2345  Command cmd("setCurrentThreadPool", result, arg);
2346  cmd.check_argc(1);
2347  cmd.check_init(0, "threadpool not initialized");
2348  if (cmd.ok()) {
2349    ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2350    acquireShared(pool);
2351    if (currentThreadPoolRef)
2352      releaseShared(currentThreadPoolRef);
2353    currentThreadPoolRef = pool;
2354  }
2355  return cmd.status();
2356}
2357
2358class EvalJob : public Job {
2359public:
2360  EvalJob() : Job() { }
2361  virtual void execute() {
2362    leftv val = LinTree::from_string(args[0]);
2363    result = (LinTree::to_string(val));
2364    val->CleanUp();
2365    omFreeBin(val, sleftv_bin);
2366  }
2367};
2368
2369class ExecJob : public Job {
2370public:
2371  ExecJob() : Job() { }
2372  virtual void execute() {
2373    leftv val = LinTree::from_string(args[0]);
2374    val->CleanUp();
2375    omFreeBin(val, sleftv_bin);
2376  }
2377};
2378
2379class ProcJob : public Job {
2380  string procname;
2381public:
2382  ProcJob(const char *procname_init) : Job(),
2383    procname(procname_init) {
2384    set_name(procname_init);
2385  }
2386  virtual void execute() {
2387    vector<leftv> argv;
2388    for (int i = 0; i <args.size(); i++) {
2389      appendArg(argv, args[i]);
2390    }
2391    for (int i = 0; i < deps.size(); i++) {
2392      appendArg(argv, deps[i]->result);
2393    }
2394    sleftv val;
2395    int error = executeProc(val, procname.c_str(), argv);
2396    if (!error) {
2397      result = (LinTree::to_string(&val));
2398      val.CleanUp();
2399    }
2400  }
2401};
2402
2403class KernelJob : public Job {
2404private:
2405  void (*cfunc)(leftv result, leftv arg);
2406public:
2407  KernelJob(void (*func)(leftv result, leftv arg)) : cfunc(func) { }
2408  virtual void execute() {
2409    vector<leftv> argv;
2410    for (int i = 0; i <args.size(); i++) {
2411      appendArg(argv, args[i]);
2412    }
2413    for (int i = 0; i < deps.size(); i++) {
2414      appendArg(argv, deps[i]->result);
2415    }
2416    sleftv val;
2417    memset(&val, 0, sizeof(val));
2418    if (argv.size() > 0) {
2419      leftv *tail = &argv[0]->next;
2420      for (int i = 1; i < argv.size(); i++) {
2421        *tail = argv[i];
2422        tail = &(*tail)->next;
2423      }
2424      *tail = NULL;
2425    }
2426    cfunc(&val, argv[0]);
2427    result = (LinTree::to_string(&val));
2428    val.CleanUp();
2429  }
2430};
2431
2432class RawKernelJob : public Job {
2433private:
2434  void (*cfunc)(long ndeps, Job **deps);
2435public:
2436  RawKernelJob(void (*func)(long ndeps, Job **deps)) : cfunc(func) { }
2437  virtual void execute() {
2438    long ndeps = deps.size();
2439    Job **jobs = (Job **) omAlloc0(sizeof(Job *) * ndeps);
2440    for (long i = 0; i < ndeps; i++)
2441      jobs[i] = deps[i];
2442    cfunc(ndeps, jobs);
2443    omFree(jobs);
2444  }
2445};
2446
2447static BOOLEAN createJob(leftv result, leftv arg) {
2448  Command cmd("createJob", result, arg);
2449  cmd.check_argc_min(1);
2450  cmd.check_arg(0, STRING_CMD, COMMAND,
2451    "job name must be a string or quote expression");
2452  if (cmd.ok()) {
2453    if (cmd.test_arg(0, STRING_CMD)) {
2454      ProcJob *job = new ProcJob((char *)(cmd.arg(0)));
2455      for (leftv a = arg->next; a != NULL; a = a->next) {
2456        job->args.push_back(LinTree::to_string(a));
2457      }
2458      cmd.set_result(type_job, new_shared(job));
2459    } else {
2460      cmd.check_argc(1);
2461      Job *job = new EvalJob();
2462      job->args.push_back(LinTree::to_string(arg));
2463      cmd.set_result(type_job, new_shared(job));
2464    }
2465  }
2466  return cmd.status();
2467}
2468
2469Job *createJob(void (*func)(leftv result, leftv arg)) {
2470  KernelJob *job = new KernelJob(func);
2471  return job;
2472}
2473
2474Job *createJob(void (*func)(long ndeps, Job **deps)) {
2475  RawKernelJob *job = new RawKernelJob(func);
2476  return job;
2477}
2478
2479Job *startJob(ThreadPool *pool, Job *job, leftv arg) {
2480  if (job->pool) return NULL;
2481  while (arg) {
2482    job->args.push_back(LinTree::to_string(arg));
2483    arg = arg->next;
2484  }
2485  pool->attachJob(job);
2486  return job;
2487}
2488
2489Job *startJob(ThreadPool *pool, Job *job) {
2490  return startJob(pool, job, NULL);
2491}
2492
2493Job *scheduleJob(ThreadPool *pool, Job *job, long ndeps, Job **deps) {
2494  if (job->pool) return NULL;
2495  pool->scheduler->lock.lock();
2496  bool cancelled = false;
2497  job->addDep(ndeps, deps);
2498  for (long i = 0; i < ndeps; i++) {
2499    deps[i]->addNotify(job);
2500    cancelled |= deps[i]->cancelled;
2501  }
2502  if (cancelled) {
2503    job->pool = pool;
2504    pool->cancelJob(job);
2505  }
2506  else
2507    pool->attachJob(job);
2508  pool->scheduler->lock.unlock();
2509}
2510
2511void cancelJob(Job *job) {
2512  ThreadPool *pool = job->pool;
2513  if (pool) pool->cancelJob(job);
2514}
2515
2516Job *getCurrentJob() {
2517  return currentJobRef;
2518}
2519
2520static BOOLEAN startJob(leftv result, leftv arg) {
2521  Command cmd("startJob", result, arg);
2522  cmd.check_argc_min(1);
2523  int has_pool = cmd.test_arg(0, type_threadpool);
2524  cmd.check_argc_min(1+has_pool);
2525  if (has_pool)
2526    cmd.check_init(0, "threadpool not initialized");
2527  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2528  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2529  int first_arg = has_pool + has_prio;
2530  cmd.check_arg(first_arg, type_job, STRING_CMD,
2531    "job argument must be a job or string");
2532  if (cmd.ok() && cmd.argtype(first_arg) == type_job)
2533    cmd.check_init(first_arg, "job not initialized");
2534  if (!cmd.ok()) return cmd.status();
2535  ThreadPool *pool;
2536  if (has_pool)
2537    pool = cmd.shared_arg<ThreadPool>(0);
2538  else {
2539    if (!currentThreadPoolRef)
2540      return cmd.abort("no current threadpool defined");
2541    pool = currentThreadPoolRef;
2542  }
2543  Job *job;
2544  if (cmd.argtype(first_arg) == type_job) 
2545    job = *(Job **)(cmd.arg(first_arg));
2546  else
2547    job = new ProcJob((char *)(cmd.arg(first_arg)));
2548  leftv a = arg->next;
2549  if (has_pool) a = a->next;
2550  if (has_prio) a = a->next;
2551  for (; a != NULL; a = a->next) {
2552    job->args.push_back(LinTree::to_string(a));
2553  }
2554  if (job->pool)
2555    return cmd.abort("job has already been scheduled");
2556  job->prio = prio;
2557  pool->attachJob(job);
2558  cmd.set_result(type_job, new_shared(job));
2559  return cmd.status();
2560}
2561
2562static BOOLEAN waitJob(leftv result, leftv arg) {
2563  Command cmd("waitJob", result, arg);
2564  cmd.check_argc(1);
2565  cmd.check_arg(0, type_job, "argument must be a job");
2566  cmd.check_init(0, "job not initialized");
2567  if (cmd.ok()) {
2568    Job *job = *(Job **)(cmd.arg(0));
2569    ThreadPool *pool = job->pool;
2570    if (!pool) {
2571      return cmd.abort("job has not yet been started or scheduled");
2572    }
2573    pool->waitJob(job);
2574    if (job->cancelled) {
2575      return cmd.abort("job has been cancelled");
2576    }
2577    if (job->result.size() == 0)
2578      cmd.no_result();
2579    else {
2580      leftv res = LinTree::from_string(job->result);
2581      cmd.set_result(res->Typ(), res->Data());
2582    }
2583  }
2584  return cmd.status();
2585}
2586
2587void waitJob(Job *job) {
2588  assert(job->pool != NULL);
2589  job->pool->waitJob(job);
2590}
2591
2592static BOOLEAN cancelJob(leftv result, leftv arg) {
2593  Command cmd("cancelJob", result, arg);
2594  cmd.check_argc(1);
2595  cmd.check_arg(0, type_job, "argument must be a job");
2596  cmd.check_init(0, "job not initialized");
2597  if (cmd.ok()) {
2598    Job *job = cmd.shared_arg<Job>(0);
2599    ThreadPool *pool = job->pool;
2600    if (!pool) {
2601      return cmd.abort("job has not yet been started or scheduled");
2602    }
2603    pool->cancelJob(job);
2604    cmd.no_result();
2605  }
2606  return cmd.status();
2607}
2608
2609static BOOLEAN jobCancelled(leftv result, leftv arg) {
2610  Job *job;
2611  Command cmd("jobCancelled", result, arg);
2612  cmd.check_argc(0, 1);
2613  if (cmd.nargs() == 1) {
2614    cmd.check_arg(0, type_job, "argument must be a job");
2615    cmd.check_init(0, "job not initialized");
2616    job = cmd.shared_arg<Job>(0);
2617  } else {
2618    job = currentJobRef;
2619    if (!job)
2620      cmd.report("no current job");
2621  }
2622  if (cmd.ok()) {
2623    ThreadPool *pool = job->pool;
2624    if (!pool) {
2625      return cmd.abort("job has not yet been started or scheduled");
2626    }
2627    pool->scheduler->lock.lock();
2628    cmd.set_result((long) job->cancelled);
2629    pool->scheduler->lock.unlock();
2630  }
2631  return cmd.status();
2632}
2633
2634bool getJobCancelled(Job *job) {
2635  ThreadPool *pool = job->pool;
2636  if (pool) pool->scheduler->lock.lock();
2637  bool result = job->cancelled;
2638  if (pool) pool->scheduler->lock.unlock();
2639  return result;
2640}
2641
2642bool getJobCancelled() {
2643  return getJobCancelled(currentJobRef);
2644}
2645
2646void setJobData(Job *job, void *data) {
2647  ThreadPool *pool = job->pool;
2648  if (pool) pool->scheduler->lock.lock();
2649  job->data = data;
2650  if (pool) pool->scheduler->lock.unlock();
2651}
2652
2653
2654void *getJobData(Job *job) {
2655  ThreadPool *pool = job->pool;
2656  if (pool) pool->scheduler->lock.lock();
2657  void *result = job->data;
2658  if (pool) pool->scheduler->lock.unlock();
2659  return result;
2660}
2661
2662void addJobArgs(Job *job, leftv arg) {
2663  ThreadPool *pool = job->pool;
2664  if (pool) pool->scheduler->lock.lock();
2665  while (arg) {
2666    job->args.push_back(LinTree::to_string(arg));
2667    arg = arg->next;
2668  }
2669  if (pool) pool->scheduler->lock.unlock();
2670}
2671
2672leftv getJobResult(Job *job) {
2673  ThreadPool *pool = job->pool;
2674  if (pool) pool->scheduler->lock.lock();
2675  leftv result = LinTree::from_string(job->result);
2676  if (pool) pool->scheduler->lock.unlock();
2677  return result;
2678}
2679
2680const char *getJobName(Job *job) {
2681  // TODO
2682  return "";
2683}
2684
2685void setJobName(Job *job, const char *name) {
2686  // TODO
2687}
2688
2689static BOOLEAN createTrigger(leftv result, leftv arg) {
2690  Command cmd("createTrigger", result, arg);
2691  cmd.check_argc_min(1);
2692  int has_pool = cmd.test_arg(0, type_threadpool);
2693  ThreadPool *pool;
2694  if (has_pool) {
2695    cmd.check_init(0, "threadpool not initialized");
2696    pool = cmd.shared_arg<ThreadPool>(0);
2697  } else {
2698    pool = currentThreadPoolRef;
2699    if (!pool)
2700      return cmd.abort("no default threadpool");
2701  }
2702  cmd.check_argc(has_pool + 2);
2703  cmd.check_arg(has_pool + 0, STRING_CMD, "trigger subtype must be a string");
2704  const char *kind = (const char *)(cmd.arg(has_pool + 0));
2705  if (0 == strcmp(kind, "proc")) {
2706    cmd.check_arg(has_pool + 1, STRING_CMD, "proc trigger argument must be a string");
2707  } else {
2708    cmd.check_arg(has_pool + 1, INT_CMD, "trigger argument must be an integer");
2709  }
2710  if (cmd.ok()) {
2711    Trigger *trigger;
2712    long n = (long) (cmd.arg(has_pool + 1));
2713    if (n < 0)
2714      return cmd.abort("trigger argument must be a non-negative integer");
2715    if (0 == strcmp(kind, "acc")) {
2716      trigger = new AccTrigger(n);
2717    } else if (0 == strcmp(kind, "count")) {
2718      trigger = new CountTrigger(n);
2719    } else if (0 == strcmp(kind, "set")) {
2720      trigger = new SetTrigger(n);
2721    } else if (0 == strcmp(kind, "proc")) {
2722      trigger = new ProcTrigger((const char *) cmd.arg(has_pool + 1));
2723    } else {
2724      return cmd.abort("unknown trigger subtype");
2725    }
2726    pool->attachJob(trigger);
2727    cmd.set_result(type_trigger, new_shared(trigger));
2728  }
2729  return cmd.status();
2730}
2731
2732static BOOLEAN updateTrigger(leftv result, leftv arg) {
2733  Command cmd("updateTrigger", result, arg);
2734  cmd.check_argc_min(1);
2735  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2736  cmd.check_init(0, "trigger not initialized");
2737  if (cmd.ok()) {
2738    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2739    trigger->pool->scheduler->lock.lock();
2740    if (!trigger->accept(arg->next))
2741      cmd.report("incompatible argument type(s) for this trigger");
2742    else {
2743      trigger->activate(arg->next);
2744      if (trigger->ready()) {
2745        trigger->run();
2746        Scheduler::notifyDeps(trigger->pool->scheduler, trigger);
2747      }
2748    }
2749    trigger->pool->scheduler->lock.unlock();
2750  }
2751  return cmd.status();
2752}
2753
2754static BOOLEAN chainTrigger(leftv result, leftv arg) {
2755  Command cmd("chainTrigger", result, arg);
2756  cmd.check_argc(2);
2757  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2758  cmd.check_arg(1, type_trigger, type_job,
2759    "second argument must be a trigger or job");
2760  cmd.check_init(0, "trigger not initialized");
2761  cmd.check_init(1, "trigger/job not initialized");
2762  if (cmd.ok()) {
2763    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2764    Job *job = cmd.shared_arg<Job>(1);
2765    if (trigger->pool != job->pool)
2766      return cmd.abort("arguments use different threadpools");
2767    ThreadPool *pool = trigger->pool;
2768    pool->scheduler->lock.lock();
2769    job->triggers.push_back(trigger);
2770    pool->scheduler->lock.unlock();
2771  }
2772  return cmd.status();
2773}
2774
2775static BOOLEAN testTrigger(leftv result, leftv arg) {
2776  Command cmd("testTrigger", result, arg);
2777  cmd.check_argc(1);
2778  cmd.check_arg(0, type_trigger, "argument must be a trigger");
2779  cmd.check_init(0, "trigger not initialized");
2780  if (cmd.ok()) {
2781    Trigger *trigger = cmd.shared_arg<Trigger>(0);
2782    ThreadPool *pool = trigger->pool;
2783    pool->scheduler->lock.lock();
2784    cmd.set_result((long)trigger->ready());
2785    pool->scheduler->lock.unlock();
2786  }
2787  return cmd.status();
2788}
2789
2790
2791static BOOLEAN scheduleJob(leftv result, leftv arg) {
2792  vector<Job *> jobs;
2793  vector<Job *> deps;
2794  Command cmd("scheduleJob", result, arg);
2795  cmd.check_argc_min(1);
2796  int has_pool = cmd.test_arg(0, type_threadpool);
2797  if (has_pool) {
2798    cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2799    cmd.check_init(0, "threadpool not initialized");
2800  }
2801  cmd.check_argc_min(has_pool+1);
2802  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2803  ThreadPool *pool;
2804  if (has_pool)
2805    pool = cmd.shared_arg<ThreadPool>(0);
2806  else {
2807    if (!currentThreadPoolRef)
2808      return cmd.abort("no current threadpool defined");
2809    pool = currentThreadPoolRef;
2810  }
2811  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2812  int first_arg = has_pool + has_prio;
2813  if (cmd.test_arg(first_arg, type_job)) {
2814    jobs.push_back(*(Job **)(cmd.arg(first_arg)));
2815  } else if (cmd.test_arg(first_arg, STRING_CMD)) {
2816    jobs.push_back(new ProcJob((char *)(cmd.arg(first_arg))));
2817  } else if (cmd.test_arg(first_arg, LIST_CMD)) {
2818    lists l = (lists) (cmd.arg(first_arg));
2819    int n = lSize(l);
2820    for (int i = 0; i < n; i++) {
2821      if (l->m[i].Typ() != type_job)
2822        return cmd.abort("job argument must be a job, string, or list of jobs");
2823    }
2824    for (int i = 0; i < n; i++) {
2825      Job *job = *(Job **) (l->m[i].Data());
2826      if (!job)
2827        return cmd.abort("job not initialized");
2828      jobs.push_back(job);
2829    }
2830  } else {
2831    return cmd.abort("job argument must be a job, string, or list of jobs");
2832  }
2833  bool error = false;
2834  leftv a = arg->next;
2835  if (has_pool) a = a->next;
2836  if (has_prio) a = a->next;
2837  for (; !error && a; a = a->next) {
2838    if (a->Typ() == type_job || a->Typ() == type_trigger) {
2839      deps.push_back(*(Job **)(a->Data()));
2840    } else if (a->Typ() == LIST_CMD) {
2841      lists l = (lists) a->Data();
2842      int n = lSize(l);
2843      for (int i = 0; i < n; i++) {
2844        if (l->m[i].Typ() == type_job || l->m[i].Typ() == type_trigger) {
2845          deps.push_back(*(Job **)(l->m[i].Data()));
2846        } else {
2847          error = true;
2848          break;
2849        }
2850      }
2851    }
2852  }
2853  if (error) {
2854    return cmd.abort("illegal dependency");
2855  }
2856  for (int i = 0; i < jobs.size(); i++) {
2857    Job *job = jobs[i];
2858    if (job->pool) {
2859      return cmd.abort("job has already been scheduled");
2860    }
2861    job->prio = prio;
2862  }
2863  for (int i = 0; i < deps.size(); i++) {
2864    Job *job = deps[i];
2865    if (!job->pool) {
2866      return cmd.abort("dependency has not yet been scheduled");
2867    }
2868    if (job->pool != pool) {
2869      return cmd.abort("dependency has been scheduled on a different threadpool");
2870    }
2871  }
2872  pool->scheduler->lock.lock();
2873  bool cancelled = false;
2874  for (int i = 0; i < jobs.size(); i++) {
2875    jobs[i]->addDep(deps);
2876  }
2877  for (int i = 0; i < deps.size(); i++) {
2878    deps[i]->addNotify(jobs);
2879    cancelled |= deps[i]->cancelled;
2880  }
2881  for (int i = 0; i < jobs.size(); i++) {
2882    if (cancelled) {
2883      jobs[i]->pool = pool;
2884      pool->cancelJob(jobs[i]);
2885    }
2886    else
2887      pool->attachJob(jobs[i]);
2888  }
2889  pool->scheduler->lock.unlock();
2890  if (jobs.size() > 0)
2891    cmd.set_result(type_job, new_shared(jobs[0]));
2892  return cmd.status();
2893}
2894
2895BOOLEAN currentJob(leftv result, leftv arg) {
2896  Command cmd("currentJob", result, arg);
2897  cmd.check_argc(0);
2898  Job *job = currentJobRef;
2899  if (job) {
2900    cmd.set_result(type_job, new_shared(job));
2901  } else {
2902    cmd.report("no current job");
2903  }
2904  return cmd.status();
2905}
2906
2907
2908BOOLEAN threadID(leftv result, leftv arg) {
2909  int i;
2910  if (wrong_num_args("threadID", arg, 0))
2911    return TRUE;
2912  result->rtyp = INT_CMD;
2913  result->data = (char *)thread_id;
2914  return FALSE;
2915}
2916
2917BOOLEAN mainThread(leftv result, leftv arg) {
2918  int i;
2919  if (wrong_num_args("mainThread", arg, 0))
2920    return TRUE;
2921  result->rtyp = INT_CMD;
2922  result->data = (char *)(long)(thread_id == 0L);
2923  return FALSE;
2924}
2925
2926BOOLEAN threadEval(leftv result, leftv arg) {
2927  if (wrong_num_args("threadEval", arg, 2))
2928    return TRUE;
2929  if (arg->Typ() != type_thread) {
2930    WerrorS("threadEval: argument is not a thread");
2931    return TRUE;
2932  }
2933  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2934  string expr = LinTree::to_string(arg->next);
2935  ThreadState *ts = thread->getThreadState();
2936  if (ts && ts->parent != pthread_self()) {
2937    WerrorS("threadEval: can only be called from parent thread");
2938    return TRUE;
2939  }
2940  if (ts) ts->lock.lock();
2941  if (!ts || !ts->running || !ts->active) {
2942    WerrorS("threadEval: thread is no longer running");
2943    if (ts) ts->lock.unlock();
2944    return TRUE;
2945  }
2946  ts->to_thread.push("e");
2947  ts->to_thread.push(expr);
2948  ts->to_cond.signal();
2949  ts->lock.unlock();
2950  result->rtyp = NONE;
2951  return FALSE;
2952}
2953
2954BOOLEAN threadExec(leftv result, leftv arg) {
2955  if (wrong_num_args("threadExec", arg, 2))
2956    return TRUE;
2957  if (arg->Typ() != type_thread) {
2958    WerrorS("threadExec: argument is not a thread");
2959    return TRUE;
2960  }
2961  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2962  string expr = LinTree::to_string(arg->next);
2963  ThreadState *ts = thread->getThreadState();
2964  if (ts && ts->parent != pthread_self()) {
2965    WerrorS("threadExec: can only be called from parent thread");
2966    return TRUE;
2967  }
2968  if (ts) ts->lock.lock();
2969  if (!ts || !ts->running || !ts->active) {
2970    WerrorS("threadExec: thread is no longer running");
2971    if (ts) ts->lock.unlock();
2972    return TRUE;
2973  }
2974  ts->to_thread.push("x");
2975  ts->to_thread.push(expr);
2976  ts->to_cond.signal();
2977  ts->lock.unlock();
2978  result->rtyp = NONE;
2979  return FALSE;
2980}
2981
2982BOOLEAN threadPoolExec(leftv result, leftv arg) {
2983  Command cmd("threadPoolExec", result, arg);
2984  ThreadPool *pool;
2985  cmd.check_argc(1, 2);
2986  int has_pool = cmd.nargs() == 2;
2987  if (has_pool) {
2988    cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2989    cmd.check_init(0, "threadpool not initialized");
2990    pool = cmd.shared_arg<ThreadPool>(0);
2991  } else {
2992    pool = currentThreadPoolRef;
2993    if (!pool)
2994      return cmd.abort("no current threadpool");
2995  }
2996  if (cmd.ok()) {
2997    string expr = LinTree::to_string(has_pool ? arg->next : arg);
2998    Job* job = new ExecJob();
2999    job->args.push_back(expr);
3000    job->pool = pool;
3001    pool->broadcastJob(job);
3002  }
3003  return cmd.status();
3004}
3005
3006BOOLEAN threadResult(leftv result, leftv arg) {
3007  if (wrong_num_args("threadResult", arg, 1))
3008    return TRUE;
3009  if (arg->Typ() != type_thread) {
3010    WerrorS("threadResult: argument is not a thread");
3011    return TRUE;
3012  }
3013  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
3014  ThreadState *ts = thread->getThreadState();
3015  if (ts && ts->parent != pthread_self()) {
3016    WerrorS("threadResult: can only be called from parent thread");
3017    return TRUE;
3018  }
3019  if (ts) ts->lock.lock();
3020  if (!ts || !ts->running || !ts->active) {
3021    WerrorS("threadResult: thread is no longer running");
3022    if (ts) ts->lock.unlock();
3023    return TRUE;
3024  }
3025  while (ts->from_thread.empty()) {
3026    ts->from_cond.wait();
3027  }
3028  string expr = ts->from_thread.front();
3029  ts->from_thread.pop();
3030  ts->lock.unlock();
3031  leftv val = LinTree::from_string(expr);
3032  result->rtyp = val->Typ();
3033  result->data = val->Data();
3034  return FALSE;
3035}
3036
3037BOOLEAN setSharedName(leftv result, leftv arg) {
3038  Command cmd("setSharedName", result, arg);
3039  cmd.check_argc(2);
3040  int type = cmd.argtype(0);
3041  cmd.check_init(0, "first argument is not initialized");
3042  if (type != type_job && type != type_trigger && type != type_threadpool) {
3043    cmd.report("first argument must be a job, trigger, or threadpool");
3044  }
3045  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
3046  if (cmd.ok()) {
3047    SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3048    name_lock.lock();
3049    obj->set_name((char *) cmd.arg(1));
3050    name_lock.unlock();
3051  }
3052  return cmd.status();
3053}
3054
3055BOOLEAN getSharedName(leftv result, leftv arg) {
3056  Command cmd("getSharedName", result, arg);
3057  cmd.check_argc(1);
3058  int type = cmd.argtype(0);
3059  cmd.check_init(0, "first argument is not initialized");
3060  if (type != type_job && type != type_trigger && type != type_threadpool) {
3061    cmd.report("first argument must be a job, trigger, or threadpool");
3062  }
3063  if (cmd.ok()) {
3064    SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3065    name_lock.lock();
3066    cmd.set_result(obj->get_name().c_str());
3067    name_lock.unlock();
3068  }
3069  return cmd.status();
3070}
3071
3072}
3073
3074using namespace LibThread;
3075
3076
3077extern "C" int SI_MOD_INIT(systhreads)(SModulFunctions *fn)
3078{
3079  const char *libname = currPack->libname;
3080  if (!libname) libname = "";
3081  master_lock.lock();
3082  if (!thread_state)
3083    thread_state = new ThreadState[MAX_THREADS];
3084  makeSharedType(type_atomic_table, "atomic_table");
3085  makeSharedType(type_atomic_list, "atomic_list");
3086  makeSharedType(type_shared_table, "shared_table");
3087  makeSharedType(type_shared_list, "shared_list");
3088  makeSharedType(type_channel, "channel");
3089  makeSharedType(type_syncvar, "syncvar");
3090  makeSharedType(type_region, "region");
3091  makeSharedType(type_thread, "thread");
3092  makeSharedType(type_threadpool, "threadpool");
3093  makeSharedType(type_job, "job");
3094  makeSharedType(type_trigger, "trigger");
3095  makeRegionlockType(type_regionlock, "regionlock");
3096
3097  fn->iiAddCproc(libname, "putTable", FALSE, putTable);
3098  fn->iiAddCproc(libname, "getTable", FALSE, getTable);
3099  fn->iiAddCproc(libname, "inTable", FALSE, inTable);
3100  fn->iiAddCproc(libname, "putList", FALSE, putList);
3101  fn->iiAddCproc(libname, "getList", FALSE, getList);
3102  fn->iiAddCproc(libname, "lockRegion", FALSE, lockRegion);
3103  fn->iiAddCproc(libname, "regionLock", FALSE, regionLock);
3104  fn->iiAddCproc(libname, "unlockRegion", FALSE, unlockRegion);
3105  fn->iiAddCproc(libname, "sendChannel", FALSE, sendChannel);
3106  fn->iiAddCproc(libname, "receiveChannel", FALSE, receiveChannel);
3107  fn->iiAddCproc(libname, "statChannel", FALSE, statChannel);
3108  fn->iiAddCproc(libname, "writeSyncVar", FALSE, writeSyncVar);
3109  fn->iiAddCproc(libname, "updateSyncVar", FALSE, updateSyncVar);
3110  fn->iiAddCproc(libname, "readSyncVar", FALSE, readSyncVar);
3111  fn->iiAddCproc(libname, "statSyncVar", FALSE, statSyncVar);
3112
3113  fn->iiAddCproc(libname, "makeAtomicTable", FALSE, makeAtomicTable);
3114  fn->iiAddCproc(libname, "makeAtomicList", FALSE, makeAtomicList);
3115  fn->iiAddCproc(libname, "makeSharedTable", FALSE, makeSharedTable);
3116  fn->iiAddCproc(libname, "makeSharedList", FALSE, makeSharedList);
3117  fn->iiAddCproc(libname, "makeChannel", FALSE, makeChannel);
3118  fn->iiAddCproc(libname, "makeSyncVar", FALSE, makeSyncVar);
3119  fn->iiAddCproc(libname, "makeRegion", FALSE, makeRegion);
3120  fn->iiAddCproc(libname, "findSharedObject", FALSE, findSharedObject);
3121  fn->iiAddCproc(libname, "bindSharedObject", FALSE, bindSharedObject);
3122  fn->iiAddCproc(libname, "typeSharedObject", FALSE, typeSharedObject);
3123
3124  fn->iiAddCproc(libname, "createThread", FALSE, createThread);
3125  fn->iiAddCproc(libname, "joinThread", FALSE, joinThread);
3126  fn->iiAddCproc(libname, "createThreadPool", FALSE, createThreadPool);
3127  fn->iiAddCproc(libname, "createThreadPoolSet", FALSE, createThreadPoolSet);
3128#if 0
3129  fn->iiAddCproc(libname, "adjoinThreadPool", FALSE, adjoinThreadPool);
3130  fn->iiAddCproc(libname, "getAdjoinedThreadPools", FALSE, getAdjoinedThreadPools);
3131#endif
3132  fn->iiAddCproc(libname, "closeThreadPool", FALSE, closeThreadPool);
3133  fn->iiAddCproc(libname, "getThreadPoolWorkers", FALSE, getThreadPoolWorkers);
3134  fn->iiAddCproc(libname, "setThreadPoolWorkers", FALSE, setThreadPoolWorkers);
3135  fn->iiAddCproc(libname, "getThreadPoolConcurrency", FALSE, getThreadPoolConcurrency);
3136  fn->iiAddCproc(libname, "setThreadPoolConcurrency", FALSE, setThreadPoolConcurrency);
3137  fn->iiAddCproc(libname, "currentThreadPool", FALSE, currentThreadPool);
3138  fn->iiAddCproc(libname, "setCurrentThreadPool", FALSE, setCurrentThreadPool);
3139  fn->iiAddCproc(libname, "threadPoolExec", FALSE, threadPoolExec);
3140  fn->iiAddCproc(libname, "threadID", FALSE, threadID);
3141  fn->iiAddCproc(libname, "mainThread", FALSE, mainThread);
3142  fn->iiAddCproc(libname, "threadEval", FALSE, threadEval);
3143  fn->iiAddCproc(libname, "threadExec", FALSE, threadExec);
3144  fn->iiAddCproc(libname, "threadResult", FALSE, threadResult);
3145  fn->iiAddCproc(libname, "createJob", FALSE, createJob);
3146  fn->iiAddCproc(libname, "currentJob", FALSE, currentJob);
3147  fn->iiAddCproc(libname, "setSharedName", FALSE, setSharedName);
3148  fn->iiAddCproc(libname, "getSharedName", FALSE, getSharedName);
3149  fn->iiAddCproc(libname, "startJob", FALSE, startJob);
3150  fn->iiAddCproc(libname, "waitJob", FALSE, waitJob);
3151  fn->iiAddCproc(libname, "cancelJob", FALSE, cancelJob);
3152  fn->iiAddCproc(libname, "jobCancelled", FALSE, jobCancelled);
3153  fn->iiAddCproc(libname, "scheduleJob", FALSE, scheduleJob);
3154  fn->iiAddCproc(libname, "scheduleJobs", FALSE, scheduleJob);
3155  fn->iiAddCproc(libname, "createTrigger", FALSE, createTrigger);
3156  fn->iiAddCproc(libname, "updateTrigger", FALSE, updateTrigger);
3157  fn->iiAddCproc(libname, "testTrigger", FALSE, testTrigger);
3158  fn->iiAddCproc(libname, "chainTrigger", FALSE, chainTrigger);
3159
3160  LinTree::init();
3161  master_lock.unlock();
3162
3163  return MAX_TOK;
3164}
Note: See TracBrowser for help on using the repository browser.