source: git/Singular/LIB/tasks.lib

spielwiese
Last change on this file was ac408e, checked in by Hans Schoenemann <hannes@…>, 6 weeks ago
new select/poll call for waitfirst/waitall
  • Property mode set to 100644
File size: 42.6 KB
Line 
1//////////////////////////////////////////////////////////////////////
2version="version tasks.lib 4.1.2.0 Feb_2019 "; // $Id$
3category="General purpose";
4info="
5LIBRARY:   tasks.lib  A parallel framework based on tasks
6
7AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
8
9OVERVIEW:
10This library provides a parallel framework based on tasks. It introduces a new
11Singular type @code{task}; an object of this type is a command (given by a
12string) applied to a list of arguments. Tasks can be computed in parallel via
13the procedures in this library and they can even be started recursively, i.e.
14from within other tasks.
15
16tasks.lib respects the limits for computational resources defined
17in @ref{resources_lib}, i.e., all tasks within the same Singular session will
18not use more computational resources than provided via resources.lib, even if
19tasks are started recursively.
20
21The Singular library @ref{parallel_lib} provides implementations of several
22parallel 'skeletons' based on tasks.lib.
23
24KEYWORDS:  parallelization; distributed computing; task
25
26SEE ALSO:  resources_lib, parallel_lib
27
28PROCEDURES:
29  createTask();    create a task
30  killTask();      kill a task
31  copyTask();      copy a task
32  compareTasks();  compare two tasks
33  printTask();     print a task
34  startTasks();    start tasks
35  stopTask();      stop a task
36  waitTasks();     wait for a certain number of tasks
37  waitAllTasks();  wait for all tasks
38  pollTask();      poll a task
39  getCommand();    get the command of a task
40  getArguments();  get the arguments of a task
41  getResult();     get the result of a task
42  getState();      get the state of a task
43";
44
45/*
46RATIONALE FOR DEVELOPERS
47
48The Singular type 'task'
49------------------------
50tasks.lib introduces a Singular type 'task' which makes use of a pointer-like
51model in order to avoid unnecessary copying of data. 'task' is defined as a
52newstruct whose only member is 'int index'. This index points to an entry in
53the lib-internal list 'tasks'. The elements of this list are of the type
54'internal_task' which is defined as a newstruct with the following members:
55int id         - the internal ID
56string command - the command
57list arguments - the arguments
58def result     - the result
59string state   - the current state, see 'The life cycle of a task'
60list links     - control handles, see 'Links'
61int linkID     - the ID of the control handles
62
63
64The life cycle of a task
65------------------------
66'uninitialized' --> 'created' --> 'started' --> 'completed'
67                                     | ^
68                                     v |
69                                  'stopped'
70
71The state of a task t is 'uninitialized' iff
72(t.index == 0) or (typeof(tasks[t.index]) != "internal_task").
73
74A task can be reset to 'uninitialized' by killTask() at any time.
75
76
77Assigned members for 'internal_task'
78------------------------------------
79For each state, the following members of an internal_task must be assigned:
80
81created:       command arguments        state
82started:    id command arguments        state links linkID
83stopped:       command arguments        state
84completed:     command arguments result state
85
86All other members should be wiped out.
87
88Local supervisors
89-----------------
90A call of 'startTasks(t(1..n));' for tasks t(1), ..., t(n) creates a child
91process which plays the role of a supervisor for these tasks. The computation
92of the tasks is done in child processes of the supervisor.
93
94The supervisor assigns an internal state to each task which is represented by
95an integer. The meaning of these integers and their relation to the global
96state of each task is as follows:
97
98internal state | meaning           | corresponding global state
99---------------|-------------------|---------------------------
100             0 | waiting           | started
101             1 | started           | started
102             2 | (result) computed | started
103             3 | (result) sent     | completed
104            -1 | stopped           | stopped
105
106Links
107-----
108The ssi link between the main process and the supervisor is named 'l(pid)'
109where pid is the PID of the main process. The links between the supervisor and
110its child processes are named 'll(pid)(1)', 'll(pid)(2)', ... where pid is the
111PID of the supervisor. The link between the child processes of the supervisor
112and the main process is named 'L(pid)' where pid is the PID of the main
113process. This link is only for sending the results to the main process and must
114not be used in the other direction!
115
116For any task t whose state is 'started', tasks[t.index].links is
117list(L(pid), l(pid)) where pid is the PID of the main process.
118
119Communication model
120-------------------
121stopTask() <--> supervisor
122    0, id -->
123
124waitTasks() <--> supervisor
125(demanded_task is an intvec containing the IDs of the tasks which are being
126waited for; ndemanded is the number of tasks that is being waited for.)
127    1, demanded_tasks, ndemanded -->
128    [receiving results]
129    1, 0:2, -1 -->
130    results_sent <--
131    [receiving remaining results]
132
133pollTask() <--> supervisor
134    2, id -->
135    state <--
136    [receive result if state == 2 (computed)]
137
138startTasks_child() <--> startTasks_grandchild()
139    [compute the result]
140    1, id <--
141    [wait until the result is requested]
142    1 -->
143    [send the result]
144    2 <--
145
146sending and receiving results:
147main process <--> supervisor <--> startTasks_grandchild()
148    [request the result, see above]
149    index, result  (main process <-- startTasks_grandchild())
150    3, id          (main process --> supervisor)
151*/
152
153LIB "resources.lib";
154
155static proc mod_init()
156{
157    /* initialize the semaphores */
158    if (!defined(Resources)) {
159        LIB "resources.lib";
160    }
161    // the number of processor cores
162    int sem_cores = Resources::sem_cores;
163    exportto(Tasks, sem_cores);
164    // the number of leaves in the parallel tree (not strict)
165    int sem_leaves = semaphore(system("--cpus")+10);
166    exportto(Tasks, sem_leaves);
167    // the number of processes waiting for sem_cores with low priority
168    int sem_queue = semaphore(2);
169    exportto(Tasks, sem_queue);
170
171    /* define the Singular type 'task' */
172    newstruct("task", "int index");
173    newstruct("internal_task", "int id, string command, list arguments,"
174        +"def result, string state, list links, int linkID");
175    system("install", "task", "=", createTask, 1);
176    system("install", "task", "==", compareTasks, 2);
177    system("install", "task", "print", printTask, 1);
178
179    /* define (lib-)global variables */
180    list tasks;     // the lib-internal list of tasks
181    exportto(Tasks, tasks);
182    int ntasks;     // the current maximal index in 'tasks'
183    exportto(Tasks, ntasks);
184    int nlinkIDs;   // the current maximal linkID
185    exportto(Tasks, nlinkIDs);
186}
187
188proc createTask(alias string command, alias list arguments)
189"USAGE:   createTask(command, arguments), command string, arguments list
190RETURN:   a task with the given command and arguments whose state is 'created'.
191NOTE:     't = command, arguments;' is a shortcut for
192          't = createTask(command, arguments);'.
193SEE ALSO: startTasks, getCommand, getArguments, getState, killTask, copyTask,
194          compareTasks, printTask
195EXAMPLE:  example createTask; shows an example"
196{
197    internal_task T;
198    ntasks++;
199    tasks[ntasks] = T;
200    tasks[ntasks].command = command;
201    tasks[ntasks].arguments = arguments;
202    tasks[ntasks].state = "created";
203    task t;
204    t.index = ntasks;
205    return(t);
206}
207example
208{
209    "EXAMPLE:";
210    echo = 2;
211    ring R = 0, (x,y), dp;
212    ideal I = x9y2+x10, x2y7-y8;
213    task t = createTask("std", list(I));
214    // This is the same as:
215    // task t = "std", list(I);
216    t;
217    killTask(t);
218}
219
220proc killTask(task t)
221"USAGE:   killTask(t), t task
222RETURN:   nothing. If the state of t is 'started', then t is stopped first. The
223          internal data structures of t are erased and its state is set to
224          'uninitialized'.
225NOTE:     'killTask(t);' is not the same as 'kill t;'. The latter command does
226          not erase the internal data structures of t. Hence killTask() should
227          be called for any no longer needed task in order to free memory.
228SEE ALSO: stopTask, getState, createTask, printTask
229EXAMPLE:  example killTask; shows an example"
230{
231    if (t.index == 0) {
232        return();
233    }
234    if (typeof(tasks[t.index]) != "internal_task") {
235        return();
236    }
237    if (tasks[t.index].state == "started") {
238        stopTask(t);
239    }
240    tasks[t.index] = def(0);
241}
242example
243{
244    "EXAMPLE:";
245    echo = 2;
246    ring R = 0, (x,y), dp;
247    ideal I = x9y2+x10, x2y7-y8;
248    task t = "std", list(I);
249    startTasks(t);
250    t;
251    killTask(t);
252    t;
253    getState(t);
254}
255
256proc copyTask(task t)
257"USAGE:   copyTask(t), t task
258RETURN:   a copy of t.
259NOTE:     'task t1 = copyTask(t2);' is not the same as 'task t1 = t2;'. After
260          the latter command, t1 points to the same object as t2; any changes
261          to t2 will also effect t1. In contrast to this, copyTask() creates a
262          new independent task.
263       @* A task whose state is 'started' cannot be copied.
264SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask,
265          compareTasks, printTask
266EXAMPLE:  example copyTask; shows an example"
267{
268    task t_copy;
269    if (t.index == 0) {
270        return(t_copy);
271    }
272    if (typeof(tasks[t.index]) != "internal_task") {
273        return(t_copy);
274    }
275    if (tasks[t.index].state == "started") {
276        ERROR("cannot copy a task whose state is 'started'");
277    }
278    ntasks++;
279    tasks[ntasks] = tasks[t.index];
280    t_copy.index = ntasks;
281    return(t_copy);
282}
283example
284{
285    "EXAMPLE:";
286    echo = 2;
287    ring R = 0, (x,y), dp;
288    ideal I = x9y2+x10, x2y7-y8;
289    task t1 = "std", list(I);
290    startTasks(t1);
291    waitAllTasks(t1);
292    task t2 = copyTask(t1);
293    killTask(t1);
294    t2;   // t2 survived
295    getResult(t2);
296    killTask(t2);
297}
298
299proc compareTasks(task t1, task t2)
300"USAGE:   compareTasks(t1, t2), t1, t2 tasks
301RETURN:   1, if t1 and t2 coincide;
302          0, otherwise.
303NOTE:     The arguments and the results of t1 and t2 are not compared.
304       @* 't1 == t2' is a shortcut for 'compareTasks(t1, t2)'.
305SEE ALSO: getCommand, getArguments, getResult, getState, copyTask, printTask
306EXAMPLE:  example compareTasks; shows an example"
307{
308    if (tasks[t1.index].id != tasks[t2.index].id) {
309        return(0);
310    }
311    if (tasks[t1.index].command != tasks[t2.index].command) {
312        return(0);
313    }
314    if (tasks[t1.index].state != tasks[t2.index].state) {
315        return(0);
316    }
317    if (tasks[t1.index].linkID != tasks[t2.index].linkID) {
318        return(0);
319    }
320    return(1);
321}
322example
323{
324    "EXAMPLE:";
325    echo = 2;
326    ring R = 0, (x,y), dp;
327    ideal I = x9y2+x10, x2y7-y8;
328    task t1 = "std", list(I);
329    task t2 = "std", list(I);
330    compareTasks(t1, t2);
331    startTasks(t1);
332    waitAllTasks(t1);
333    t1 == t2;   // the same as compareTasks(t1, t2);
334    killTask(t1);
335    killTask(t2);
336    // The arguments and the result are not compared!
337    ideal J = x;
338    task t3 = "std", list(I);
339    task t4 = "std", list(J);
340    t3 == t4;
341    killTask(t3);
342    killTask(t4);
343}
344
345proc printTask(task t)
346"USAGE:   printTask(t), t task
347RETURN:   nothing. Prints information about t.
348NOTE:     'print(t);' and 't;' are shortcuts for 'printTask(t)'.
349SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask
350EXAMPLE:  example printTask; shows an example"
351{
352    if (t.index == 0) {
353        "An uninitialized task";
354        return();
355    }
356    if (typeof(tasks[t.index]) != "internal_task") {
357        "An uninitialized task";
358        return();
359    }
360    "A task with the following properties:"+newline
361        +"command:          "+tasks[t.index].command+newline
362        +"no. of arguments: "+string(size(tasks[t.index].arguments))+newline
363        +"state:            "+tasks[t.index].state;
364}
365example
366{
367    "EXAMPLE:";
368    echo = 2;
369    ring R = 0, (x,y), dp;
370    ideal I = x9y2+x10, x2y7-y8;
371    task t;
372    printTask(t);
373    t = "std", list(I);
374    t;   // the same as printTask(t);
375    startTasks(t);
376    waitAllTasks(t);
377    t;
378    killTask(t);
379}
380
381proc startTasks(list #)
382"USAGE:   startTasks(t1, t2, ...), t1, t2, ... tasks
383RETURN:   nothing. Starts the tasks t1, t2, ... and sets their states to
384          'started'.
385NOTE:     A task whose state is neither 'created' nor 'stopped' cannot be
386          started.
387       @* If startTasks() is applied to a task whose state is 'stopped', then
388          the computation of this task will be restarted from the beginning.
389       @* Tasks can be started from within other tasks. A started task should
390          not be accessed from within any task other than the one within which
391          it was started.
392       @* For each task, the start of its computation is subject to the
393          internal scheduling.
394SEE ALSO: stopTask, waitTasks, pollTask, getState, createTask, printTask
395EXAMPLE:  example startTasks; shows an example"
396{
397    int nargs = size(#);
398    if (nargs == 0) {
399        ERROR("missing argument");
400    }
401    int i;
402    for (i = nargs; i > 0; i--) {
403        if (typeof(#[i]) != "task") {
404            ERROR("argument not of type 'task' (argument no. "+string(i)+")");
405        }
406        if (#[i].index == 0) {
407            ERROR("cannot start an uninitialized task (task no. "
408                +string(i)+")");
409        }
410        if (typeof(tasks[#[i].index]) != "internal_task") {
411            ERROR("cannot start an uninitialized task (task no. "
412                +string(i)+")");
413        }
414        if (tasks[#[i].index].state != "created"
415            && tasks[#[i].index].state != "stopped") {
416            ERROR("cannot start a task whose state is not"+newline
417                +"'created' or 'stopped'");
418        }
419    }
420    for (i = nargs; i > 0; i--) {
421        tasks[#[i].index].id = i;   // has to be set before forking
422        tasks[#[i].index].state = "started";
423    }
424    int pid = system("pid");
425    link l(pid) = "ssi:fork";
426    open(l(pid));
427    write(l(pid), quote(startTasks_child(#, eval(pid))));
428    int port = read(l(pid));
429    link L(pid) = "ssi:connect localhost:"+string(port);
430    open(L(pid));
431    nlinkIDs++;
432    for (i = nargs; i > 0; i--) {
433        tasks[#[i].index].links = list(L(pid), l(pid));
434        tasks[#[i].index].linkID = nlinkIDs;
435    }
436}
437example
438{
439    "EXAMPLE:";
440    echo = 2;
441    ring R = 0, (x,y), dp;
442    ideal I = x9y2+x10, x2y7-y8;
443    task t1 = "std", list(I);
444    task t2 = "slimgb", list(I);
445    startTasks(t1, t2);
446    waitAllTasks(t1, t2);
447    getResult(t1);
448    getResult(t2);
449    killTask(t1);
450    killTask(t2);
451}
452
453/* This procedure is started within the child after forking. */
454static proc startTasks_child(list localtasks, int pid_parent)
455{
456    int port = system("reserve", 1);
457    write(l(pid_parent), port);
458    link L(pid_parent) = system("reservedLink");
459    export(L(pid_parent));
460
461    int sem_write = semaphore(1);
462    int pid = system("pid");
463
464    int nlocaltasks = size(localtasks);
465    intvec state = 0:nlocaltasks;
466        // the internal state of each localtask (see rationale)
467    int nwaiting = nlocaltasks;
468        // the number of local tasks with internal state 0 (waiting)
469    int nfinished;
470        // the number of local tasks with internal state 3 (result sent) or
471        // -1 (stopped)
472    intvec queue = 1..nlocaltasks;
473    int next = 1;
474
475    list links;
476    links[nlocaltasks+1] = l(pid_parent);
477    intvec assignment = 0:nlocaltasks;
478        // the task with id = i is running in link no. assignment[i]
479    int nlinks;
480
481    // data sent by other processes
482    int code;
483    int id;
484    if (!defined(demanded_tasks)) {
485        intvec demanded_tasks;
486        int demanded_tasks_index = 1;
487        exportto(Tasks, demanded_tasks);
488        exportto(Tasks, demanded_tasks_index);
489    }
490    else {
491        demanded_tasks = 0;
492        demanded_tasks_index = 1;
493    }
494    int ndemanded = -1;
495
496    // internal counts
497    int granted_leaves;
498    int results_sent;
499
500    // auxiliary variables
501    intvec waiting_tasks;
502    int wait;
503    int deadlock;
504    int tmp;
505    int i;
506    int j;
507
508    while (nwaiting > 0) {
509        wait = 0;
510        if (nlinks == 0) {
511            wait = -1;
512            granted_leaves++;
513            while (-wait < nwaiting) {
514                if (system("semaphore", "try_acquire", sem_leaves) == 1) {
515                    wait--;
516                }
517                else {
518                    break;
519                }
520            }
521        }
522        while (wait == 0) {
523            wait = waitfirst(links, 500);
524            if (wait == 0) {
525                while (-wait < nwaiting) {
526                    if (system("semaphore", "try_acquire", sem_leaves) == 1) {
527                        wait--;
528                    }
529                    else {
530                        break;
531                    }
532                }
533            }
534        }
535        if (wait < 0) {   // open (-wait) new links
536            while (wait < 0) {
537                wait++;
538                nlinks++;
539                link ll(pid)(nlinks) = "ssi:fork";
540                open(ll(pid)(nlinks));
541                links[nlinks] = ll(pid)(nlinks);
542                write(links[nlinks],
543                    quote(startTasks_grandchild(
544                    eval(localtasks[queue[next]].index), eval(pid_parent),
545                    eval(pid), eval(nlinks), eval(sem_write))));
546                assignment[queue[next]] = nlinks;
547                state[queue[next]] = 1;
548                nwaiting--;
549                next++;
550            }
551            // wait == 0
552        }
553        if (wait > 0 && wait <= nlocaltasks) {
554            code = read(links[wait]);
555            if (code == 1) {   // result computed
556                id = read(links[wait]);
557                state[id] = 2;
558                if (ndemanded > 0 && removeDemanded(id)) {
559                    write(links[wait], 1);
560                    ndemanded--;
561                    results_sent++;
562                }
563            }
564            // code == 2: startTasks_grandchild() ended, do nothing
565        }
566        if (wait == nlocaltasks+1) {
567            code = read(l(pid_parent));
568            if (code == 0) {   // stopTask
569                id = read(l(pid_parent));
570                if (state[id] == 0) {   // waiting
571                    queue = give_priority(queue, intvec(id));
572                    next++;
573                }
574                if (state[id] == 1 || state[id] == 2) {  // started or computed
575                    close(links[assignment[id]]);
576                    open(links[assignment[id]]);
577                    write(links[assignment[id]],
578                        quote(startTasks_grandchild(
579                        eval(localtasks[queue[next]].index), eval(pid_parent),
580                        eval(pid), eval(assignment[id]), eval(sem_write))));
581                    assignment[queue[next]] = assignment[id];
582                    assignment[id] = 0;
583                    state[queue[next]] = 1;
584                    next++;
585                }
586                // state[id] == -1 (stopped) or state[id] == 3 (sent)
587                // should not happen
588                nwaiting--;
589                nfinished++;
590                state[id] = -1;
591            }
592            if (code == 1) {   // waitTasks
593                demanded_tasks = read(l(pid_parent));
594                demanded_tasks_index = size(demanded_tasks);
595                ndemanded = read(l(pid_parent));
596                if (ndemanded > demanded_tasks_index) {
597                    ndemanded = demanded_tasks_index;
598                }
599                if (demanded_tasks == 0 && ndemanded == -1) {
600                    write(l(pid_parent), results_sent);
601                    continue;
602                }
603                else {
604                    results_sent = 0;
605                }
606                demanded_tasks = demanded_tasks[demanded_tasks_index..1];
607                deadlock = 0;
608                waiting_tasks = 0:demanded_tasks_index;
609                j = 0;
610                for (i = demanded_tasks_index; i > 0; i--) {
611                    id = demanded_tasks[i];
612                    if (state[id] == 0) {   // waiting
613                        j++;
614                        waiting_tasks[j] = id;
615                        deadlock = 1;
616                    }
617                }
618                if (j > 0) {
619                    waiting_tasks = waiting_tasks[1..j];
620                    queue = queue[next..size(queue)];
621                    next = 1;
622                    queue = give_priority(queue, waiting_tasks);
623                    waiting_tasks = 0;
624                }
625                for (i = demanded_tasks_index; i > 0; i--) {
626                    id = demanded_tasks[i];
627                    if (state[id] == 1) {   // started
628                        deadlock = 0;
629                    }
630                    if (state[id] == 2) {   // computed
631                        write(links[assignment[id]], 1);
632                        tmp = removeDemanded(id);
633                        ndemanded--;
634                        results_sent++;
635                        deadlock = 0;
636                    }
637                }
638                if (deadlock) {
639                    granted_leaves++;
640                    nlinks++;
641                    link ll(pid)(nlinks) = "ssi:fork";
642                    open(ll(pid)(nlinks));
643                    links[nlinks] = ll(pid)(nlinks);
644                    write(links[nlinks],
645                        quote(startTasks_grandchild(
646                        eval(localtasks[queue[next]].index), eval(pid_parent),
647                        eval(pid), eval(nlinks), eval(sem_write))));
648                    assignment[queue[next]] = nlinks;
649                    state[queue[next]] = 1;
650                    nwaiting--;
651                    next++;
652                }
653            }
654            if (code == 2) {   // pollTask
655                id = read(l(pid_parent));
656                if (state[id] == 0) {   // waiting
657                    queue = queue[next..size(queue)];
658                    next = 1;
659                    queue = give_priority(queue, intvec(id));
660                }
661                if (state[id] == 2) {   // computed
662                    write(links[assignment[id]], 1);
663                }
664                write(l(pid_parent), state[id]);
665            }
666            if (code == 3) {   // got result
667                id = read(l(pid_parent));
668                write(links[assignment[id]],
669                    quote(startTasks_grandchild(
670                    eval(localtasks[queue[next]].index), eval(pid_parent),
671                    eval(pid), eval(assignment[id]), eval(sem_write))));
672                assignment[queue[next]] = assignment[id];
673                assignment[id] = 0;
674                state[queue[next]] = 1;
675                state[id] = 3;
676                nwaiting--;
677                nfinished++;
678                next++;
679            }
680        }
681    }
682    while (nfinished < nlocaltasks || ndemanded != -1) {
683        wait = waitfirst(links);
684        if (wait <= nlocaltasks) {
685            code = read(links[wait]);
686            if (code == 1) {   // result computed
687                id = read(links[wait]);
688                state[id] = 2;
689                if (ndemanded > 0 && removeDemanded(id)) {
690                    write(links[wait], 1);
691                    ndemanded--;
692                    results_sent++;
693                }
694            }
695            // code == 2: startTasks_grandchild() ended, do nothing
696        }
697        if (wait == nlocaltasks+1) {
698            code = read(l(pid_parent));
699            if (code == 0) {   // stopTask
700                id = read(l(pid_parent));
701                if (state[id] == 1 || state[id] == 2) {  // started or computed
702                    close(links[assignment[id]]);
703                    if (nlinks > granted_leaves) {
704                        tmp = system("semaphore", "release", sem_leaves);
705                    }
706                    links[assignment[id]] = def(0);
707                    nlinks--;
708                    assignment[id] = 0;
709                    nfinished++;
710                }
711                // else: nothing to do
712                state[id] = -1;
713            }
714            if (code == 1) {   // waitTasks
715                demanded_tasks = read(l(pid_parent));
716                demanded_tasks_index = size(demanded_tasks);
717                ndemanded = read(l(pid_parent));
718                if (ndemanded > demanded_tasks_index) {
719                    ndemanded = demanded_tasks_index;
720                }
721                if (demanded_tasks == 0 && ndemanded == -1) {
722                    write(l(pid_parent), results_sent);
723                    continue;
724                }
725                else {
726                    results_sent = 0;
727                }
728                demanded_tasks = demanded_tasks[demanded_tasks_index..1];
729                for (i = demanded_tasks_index; i > 0; i--) {
730                    id = demanded_tasks[i];
731                    if (state[id] == 2) {   // computed
732                        write(links[assignment[id]], 1);
733                        tmp = removeDemanded(id);
734                        ndemanded--;
735                        results_sent++;
736                    }
737                }
738            }
739            if (code == 2) {   // pollTask
740                id = read(l(pid_parent));
741                if (state[id] == 2) {   // computed
742                    write(links[assignment[id]], 1);
743                }
744                write(l(pid_parent), state[id]);
745            }
746            if (code == 3) {   // got result
747                id = read(l(pid_parent));
748                if(typeof(links[assignment[id]])=="link")
749                {
750                  close(links[assignment[id]]);
751                }
752                if (nlinks > granted_leaves)
753                {
754                  tmp = system("semaphore", "release", sem_leaves);
755                }
756                links[assignment[id]] = def(0);
757                nlinks--;
758                assignment[id] = 0;
759                state[id] = 3;
760                nfinished++;
761            }
762        }
763    }
764}
765
766/* This procedure has to be started within the grandchildren after forking. */
767static proc startTasks_grandchild(int index, int pid_grandparent,
768    int pid_parent, int link_no, int sem_write)
769{
770    def result;
771    int tmp = system("semaphore", "acquire", sem_queue);
772    tmp = system("semaphore", "acquire", sem_cores);
773    tmp = system("semaphore", "release", sem_queue);
774    execute("result = "+tasks[index].command+"("
775        +argsToString("tasks[index].arguments", size(tasks[index].arguments))
776        +");");
777    tmp = system("semaphore", "release", sem_cores);
778    write(ll(pid_parent)(link_no), 1);
779    write(ll(pid_parent)(link_no), tasks[index].id);
780    tmp = read(ll(pid_parent)(link_no));
781    tmp = system("semaphore", "acquire", sem_write);
782    write(L(pid_grandparent), index);
783    write(L(pid_grandparent), result);
784    tmp = system("semaphore", "release", sem_write);
785    return(2);
786}
787
788/* Remove id from demanded_tasks and return 1, if id is an element of
789 * demanded_tasks; return 0, otherwise. Note:
790 * - demanded_tasks and demanded_tasks_index are (lib-)global objects
791 *   exported in startTasks_child().
792 * - demanded_tasks_index is used to avoid copying. It is defined to be
793 *   the greatest integer with demanded_tasks[demanded_tasks_index] != 0
794 *   and demanded_tasks[demanded_tasks_index+1] == 0 (if defined).
795 */
796static proc removeDemanded(alias int id)
797{
798    if (demanded_tasks[demanded_tasks_index] == id) {
799        demanded_tasks[demanded_tasks_index] = 0;
800        demanded_tasks_index--;
801        return(1);
802    }
803    int i;
804    for (i = demanded_tasks_index-1; i > 0; i--) {
805        if (demanded_tasks[i] == id) {
806            demanded_tasks[i..demanded_tasks_index]
807                = demanded_tasks[(i+1)..demanded_tasks_index], 0;
808            demanded_tasks_index--;
809            return(1);
810        }
811    }
812    return(0);
813}
814
815/* Move the elements in 'preferred' to the beginning of 'queue'. We may assume
816 * that
817 * - 'preferred' is a subset of 'queue';
818 * - the elements of 'preferred' are distinct and non-zero;
819 * - the elements of 'queue' are distinct and non-zero.
820 * For performance reasons, we may also assume that 'queue' and 'preferred' are
821 * more or less ordered in most cases. Note that queue has the format
822 * '0, indices, 0'.
823 */
824static proc give_priority(intvec queue, intvec preferred)
825{
826    int size_queue = size(queue);
827    int size_preferred = size(preferred);
828    if (size_queue == size_preferred) {
829        return(queue);
830    }
831    int index = size_queue;
832    int i;
833    int j;
834    for (i = size_preferred; i > 0; i--) {
835        for (j = size_queue; j > 0; j--) {
836            if (queue[index] == preferred[i]) {
837                queue[index] = 0;
838                break;
839            }
840            index--;
841            if (index == 0) {
842                index = size_queue;
843            }
844        }
845    }
846    intvec not_preferred = 0:(size_queue-size_preferred);
847    index = 1;
848    for (i = 1; i <= size_queue; i++) {
849        if (queue[i]) {
850            not_preferred[index] = queue[i];
851            index++;
852        }
853    }
854    queue = preferred, not_preferred;
855    return(queue);
856}
857
858proc stopTask(task t)
859"USAGE:   stopTask(t), t task
860RETURN:   nothing. Stops the t and sets its state to 'stopped'.
861NOTE:     A task whose state is not 'started' cannot be stopped.
862       @* Intermediate results are discarded when a task is stopped.
863       @* killTask() should be called for any no longer needed task.
864SEE ALSO: startTasks, waitTasks, pollTask, getState, killTask, printTask
865EXAMPLE:  example stopTask; shows an example"
866{
867    if (t.index == 0) {
868        ERROR("cannot stop an uninitialized task");
869    }
870    if (typeof(tasks[t.index]) != "internal_task") {
871        ERROR("cannot stop an uninitialized task");
872    }
873    if (tasks[t.index].state != "started") {
874        ERROR("cannot stop a task whose state is not 'started'");
875    }
876    write(tasks[t.index].links[2], 0);
877    write(tasks[t.index].links[2], tasks[t.index].id);
878    tasks[t.index].id = 0;
879    tasks[t.index].links = list();
880    tasks[t.index].linkID = 0;
881    tasks[t.index].state = "stopped";
882}
883example
884{
885    "EXAMPLE:";
886    echo = 2;
887    ring R = 0, (x,y), dp;
888    ideal I = x9y2+x10, x2y7-y8;
889    task t = "std", list(I);
890    startTasks(t);
891    stopTask(t);
892    t;
893    killTask(t);
894}
895
896proc waitTasks(list T, int N, list #)
897"USAGE:   waitTasks(T, N[, timeout]), T list of tasks, N int, timeout int
898RETURN:   an ordered list of the indices of those tasks which have been
899          successfully completed. The state of these tasks is set to
900          'completed'.
901       @* The procedure waits for N tasks to complete.
902       @* An optional timeout in ms can be provided. Default is 0 which
903          disables the timeout.
904NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
905          waited for.
906       @* The result of any completed task can be accessed via @ref{getResult}.
907       @* The returned list may contain more than N entries if the computation
908          of some tasks has already finished and/or if several tasks finish
909          \"at the same time\". It may contain less than N entries in
910          the case of timeout or errors occurring.
911       @* Polling is guaranteed, i.e. the index of any task t for which
912          'pollTask(t);' would return 1 will appear in the returned list.
913SEE ALSO: startTasks, pollTask, getResult, getState, printTask
914EXAMPLE:  example waitTasks; shows an example"
915{
916    /* initialize the timer */
917    int oldtimerresolution = system("--ticks-per-sec");
918    system("--ticks-per-sec", 1000);
919    int starting_time = rtimer;
920
921    /* read optional parameters */
922    int timeout;
923    if (size(#) > 0) {
924        if (size(#) > 1 || typeof(#[1]) != "int") {
925            ERROR("wrong optional parameter");
926        }
927        timeout = #[1];
928    }
929
930    /* check for errors */
931    if (timeout < 0) {
932        ERROR("negative timeout");
933    }
934    int nargs = size(T);
935    if (nargs == 0) {
936        ERROR("missing task");
937    }
938    if (N < 1 || N > nargs) {
939        ERROR("wrong number of tasks to wait for");
940    }
941    int i;
942    for (i = nargs; i > 0; i--) {
943        if (typeof(T[i]) != "task") {
944            ERROR("element not of type 'task' (element no. "+string(i)+")");
945        }
946        if (T[i].index == 0) {
947            ERROR("cannot wait for an uninitialized task (task no. "+string(i)
948                +")");
949        }
950        if (typeof(tasks[T[i].index]) != "internal_task") {
951            ERROR("cannot wait for an uninitialized task (task no. "+string(i)
952                +")");
953        }
954        if (tasks[T[i].index].state != "started"
955            && tasks[T[i].index].state != "completed") {
956            ERROR("cannot wait for a task whose state is not"+newline
957                +"'started' or 'completed' (task no. "+string(i)+")");
958        }
959    }
960
961    /* sort the tasks */
962    int ncompleted;
963    list requests;
964    list links;
965    int sorted_in;
966    int j;
967    for (i = 1; i <= nargs; i++) {
968        if (tasks[T[i].index].state == "completed") {
969            ncompleted++;
970        }
971        else {   // tasks[T[i].index].state == "started"
972            sorted_in = 0;
973            for (j = size(requests); j > 0; j--) {
974                if (requests[j][1] == tasks[T[i].index].linkID) {
975                    requests[j][2][size(requests[j][2])+1] =
976                        tasks[T[i].index].id;
977                    sorted_in = 1;
978                    break;
979                }
980            }
981            if (!sorted_in) {
982                requests[size(requests)+1] = list(tasks[T[i].index].linkID,
983                    intvec(tasks[T[i].index].id),
984                    tasks[T[i].index].links[2]);
985                links[size(links)+1] = tasks[T[i].index].links[1];
986            }
987        }
988    }
989
990    /* send the requests */
991    for (j = size(requests); j > 0; j--) {
992        write(requests[j][3], 1);
993        write(requests[j][3], requests[j][2]);
994        write(requests[j][3], N-ncompleted);
995    }
996
997    /* wait for the results */
998    int wait;
999    int index;
1000    int results_got;
1001    int remaining_time;
1002    int tmp;
1003    while (ncompleted < N) {
1004        wait = waitfirst(links, 0);
1005        if (wait == 0) {
1006            if (timeout == 0) {
1007                tmp = system("semaphore", "release", sem_cores);
1008                wait = waitfirst(links);
1009                tmp = system("semaphore", "acquire", sem_cores);
1010            }
1011            else {
1012                remaining_time = timeout-(rtimer-starting_time);
1013                if (remaining_time < 0) {
1014                    break;
1015                }
1016                else {
1017                    tmp = system("semaphore", "release", sem_cores);
1018                    wait = waitfirst(links, remaining_time);
1019                    tmp = system("semaphore", "acquire", sem_cores);
1020                }
1021            }
1022        }
1023        if (wait < 1) {
1024            break;
1025        }
1026        index = read(links[wait]);
1027        tasks[index].result = read(links[wait]);
1028        write(tasks[index].links[2], 3);
1029        write(tasks[index].links[2], tasks[index].id);
1030        tasks[index].id = 0;
1031        tasks[index].links = list();
1032        tasks[index].linkID = 0;
1033        tasks[index].state = "completed";
1034        ncompleted++;
1035        results_got++;
1036    }
1037    if (wait == -1) {
1038        ERROR("error in waitfirst()");
1039    }
1040
1041    /* end communication process */
1042    for (j = size(requests); j > 0; j--) {
1043        write(requests[j][3], 1);
1044        write(requests[j][3], 0);
1045        write(requests[j][3], -1);
1046    }
1047    int results_sent;
1048    for (j = size(requests); j > 0; j--) {
1049        results_sent = results_sent + read(requests[j][3]);
1050    }
1051    while (results_sent > results_got) {
1052        wait = waitfirst(links);
1053        if (wait == -1) {
1054            ERROR("error in waitfirst()");
1055        }
1056        index = read(links[wait]);
1057        tasks[index].result = read(links[wait]);
1058        write(tasks[index].links[2], 3);
1059        write(tasks[index].links[2], tasks[index].id);
1060        tasks[index].id = 0;
1061        tasks[index].links = list();
1062        tasks[index].linkID = 0;
1063        tasks[index].state = "completed";
1064        results_got++;
1065    }
1066
1067    /* list completed tasks */
1068    list completed;
1069    completed[nargs+1] = 0;
1070    j = 0;
1071    for (i = 1; i <= nargs; i++) {
1072        if (tasks[T[i].index].state == "completed") {
1073            j++;
1074            completed[j] = i;
1075        }
1076    }
1077    completed[nargs+1] = def(0);
1078
1079    /* return the result */
1080    system("--ticks-per-sec", oldtimerresolution);
1081    return(completed);
1082}
1083example
1084{
1085    "EXAMPLE:";
1086    echo = 2;
1087    ring R = 0, (x,y), dp;
1088    ideal I = x9y2+x10, x2y7-y8;
1089    task t1 = "std", list(I);
1090    task t2 = "slimgb", list(I);
1091    startTasks(t1, t2);
1092    waitTasks(list(t1, t2), 2);   // wait for both tasks
1093    getResult(t1);
1094    getResult(t2);
1095    killTask(t1);
1096    killTask(t2);
1097}
1098
1099proc waitAllTasks(list #)
1100"USAGE:   waitAllTasks(t1, t2, ...), t1, t2, ... tasks
1101RETURN:   nothing. Waits for all the tasks t1, t2, ... to complete. The state
1102          of the tasks is set to 'completed'.
1103NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
1104          waited for.
1105       @* The result of any completed task can be accessed via @ref{getResult}.
1106       @* 'waitAllTasks(t1, t2, ...);' is a shortcut for
1107          'waitTasks(list(t1, t2, ...), size(list(t1, t2, ...)));'. Since
1108          returning a list of the indices of the completed tasks does not make
1109          sense in this case, nothing is returned.
1110SEE ALSO: waitTasks, startTasks, pollTask, getResult, getState, printTask
1111EXAMPLE:  example waitAllTasks; shows an example"
1112{
1113    list tmp = waitTasks(#, size(#));
1114}
1115example
1116{
1117    "EXAMPLE:";
1118    echo = 2;
1119    ring R = 0, (x,y), dp;
1120    ideal I = x9y2+x10, x2y7-y8;
1121    task t1 = "std", list(I);
1122    task t2 = "slimgb", list(I);
1123    startTasks(t1, t2);
1124    waitAllTasks(t1, t2);   // the same as 'waitTasks(list(t1, t2), 2);',
1125                            // but without return value
1126    getResult(t1);
1127    getResult(t2);
1128    killTask(t1);
1129    killTask(t2);
1130}
1131
1132proc pollTask(task t)
1133"USAGE:   pollTask(t), t task
1134RETURN:   1, if the computation of the task t has successfully finished;
1135          0, otherwise.
1136       @* The state of any task whose computation has successfully finished is
1137          set to 'completed'.
1138NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
1139          polled.
1140       @* The result of any completed task can be accessed via @ref{getResult}.
1141       @* pollTask() should return immediately. However, receiving the result
1142          of the task may take some time.
1143SEE ALSO: startTasks, waitTasks, getResult, getState, printTask
1144EXAMPLE:  example pollTask; shows an example"
1145{
1146    if (t.index == 0) {
1147        ERROR("cannot poll an uninitialized task");
1148    }
1149    if (typeof(tasks[t.index]) != "internal_task") {
1150        ERROR("cannot poll an uninitialized task");
1151    }
1152    if (tasks[t.index].state != "started"
1153        && tasks[t.index].state != "completed") {
1154        ERROR("cannot poll a task whose state is not"+newline
1155            +"'started' or 'completed'");
1156    }
1157    if (tasks[t.index].state == "completed") {
1158        return(1);
1159    }
1160    // tasks[t.index].state == "started"
1161    write(tasks[t.index].links[2], 2);
1162    write(tasks[t.index].links[2], tasks[t.index].id);
1163    int state = read(tasks[t.index].links[2]);
1164    if (state == 0 || state == 1) {   // waiting or started
1165        return(0);
1166    }
1167    if (state == 2) {   // computed
1168        int index = read(tasks[t.index].links[1]);   // index == t.index
1169        tasks[t.index].result = read(tasks[t.index].links[1]);
1170        write(tasks[t.index].links[2], 3);
1171        write(tasks[t.index].links[2], tasks[t.index].id);
1172        tasks[t.index].id = 0;
1173        tasks[t.index].links = list();
1174        tasks[t.index].linkID = 0;
1175        tasks[t.index].state = "completed";
1176        return(1);
1177    }
1178    // state == -1 (stopped) or state == 3 (sent) should not happen
1179}
1180example
1181{
1182    "EXAMPLE:";
1183    echo = 2;
1184    ring R = 0, (x,y), dp;
1185    ideal I = x9y2+x10, x2y7-y8;
1186    task t = "std", list(I);
1187    startTasks(t);
1188    waitAllTasks(t);
1189    pollTask(t);   // task already completed
1190    t;
1191    getResult(t);
1192    killTask(t);
1193}
1194
1195proc getCommand(task t)
1196"USAGE:   getCommand(t), t task
1197RETURN:   a string, the command of t.
1198NOTE:     This command cannot be applied to tasks whose state is
1199          'uninitialized'.
1200SEE ALSO: getArguments, getResult, getState, createTask, printTask
1201EXAMPLE:  example getCommand; shows an example"
1202{
1203    if (t.index == 0) {
1204        ERROR("cannot get command of an uninitialized task");
1205    }
1206    if (typeof(tasks[t.index]) != "internal_task") {
1207        ERROR("cannot get command of an uninitialized task");
1208    }
1209    return(tasks[t.index].command);
1210}
1211example
1212{
1213    "EXAMPLE:";
1214    echo = 2;
1215    ring R = 0, (x,y), dp;
1216    ideal I = x9y2+x10, x2y7-y8;
1217    task t = "std", list(I);
1218    getCommand(t);
1219    killTask(t);
1220}
1221
1222proc getArguments(task t)
1223"USAGE:   getArguments(t), t task
1224RETURN:   a list, the arguments of t.
1225NOTE:     This command cannot be applied to tasks whose state is
1226          'uninitialized'.
1227SEE ALSO: getCommand, getResult, getState, createTask, printTask
1228EXAMPLE:  example getArguments; shows an example"
1229{
1230    if (t.index == 0) {
1231        ERROR("cannot get arguments of an uninitialized task");
1232    }
1233    if (typeof(tasks[t.index]) != "internal_task") {
1234        ERROR("cannot get arguments of an uninitialized task");
1235    }
1236    return(tasks[t.index].arguments);
1237}
1238example
1239{
1240    "EXAMPLE:";
1241    echo = 2;
1242    ring R = 0, (x,y), dp;
1243    ideal I = x9y2+x10, x2y7-y8;
1244    task t = "std", list(I);
1245    getArguments(t);
1246    killTask(t);
1247}
1248
1249proc getResult(task t)
1250"USAGE:   getResult(t), t task
1251RETURN:   the result of t.
1252NOTE:     This command cannot be applied to tasks whose state is not
1253          'completed'.
1254SEE ALSO: getCommand, getArguments, getState, waitTasks, pollTask, printTask
1255EXAMPLE:  example getResult; shows an example"
1256{
1257    if (t.index == 0) {
1258        ERROR("cannot get result of an uninitialized task");
1259    }
1260    if (typeof(tasks[t.index]) != "internal_task") {
1261        ERROR("cannot get result of an uninitialized task");
1262    }
1263    if (tasks[t.index].state != "completed") {
1264        ERROR("cannot get result of a task which is not completed");
1265    }
1266    return(tasks[t.index].result);
1267}
1268example
1269{
1270    "EXAMPLE:";
1271    echo = 2;
1272    ring R = 0, (x,y), dp;
1273    ideal I = x9y2+x10, x2y7-y8;
1274    task t = "std", list(I);
1275    startTasks(t);
1276    waitAllTasks(t);
1277    getResult(t);
1278    killTask(t);
1279}
1280
1281proc getState(task t)
1282"USAGE:   getState(t), t task
1283RETURN:   a string, the state of t.
1284SEE ALSO: getCommand, getArguments, getResult, printTask, createTask,
1285          startTasks, stopTask, waitTasks, pollTask, killTask
1286EXAMPLE:  example getState; shows an example"
1287{
1288    if (t.index == 0) {
1289        return("uninitialized");
1290    }
1291    if (typeof(tasks[t.index]) != "internal_task") {
1292        return("uninitialized");
1293    }
1294    return(tasks[t.index].state);
1295}
1296example
1297{
1298    "EXAMPLE:";
1299    echo = 2;
1300    ring R = 0, (x,y), dp;
1301    ideal I = x9y2+x10, x2y7-y8;
1302    task t = "std", list(I);
1303    getState(t);
1304    startTasks(t);
1305    getState(t);
1306    waitAllTasks(t);
1307    getState(t);
1308    killTask(t);
1309    getState(t);
1310}
1311
1312/ * construct the string "name[1], name[2], name[3], ..., name[length]" */
1313static proc argsToString(string name, int length)
1314{
1315    string output;
1316    if (length > 0) {
1317        output = name+"[1]";
1318    }
1319    int i;
1320    for (i = 2; i <= length; i++) {
1321        output = output+", "+name+"["+string(i)+"]";
1322    }
1323    return(output);
1324}
Note: See TracBrowser for help on using the repository browser.