source: git/Singular/LIB/tasks.lib @ 857ee3

spielwiese
Last change on this file since 857ee3 was 857ee3, checked in by Andreas Steenpass <steenpass@…>, 10 years ago
add: resources.lib, tasks.lib (cherry picked from commit 1b24bf8a8157af4cce127446caff60fe84edd35f) Signed-off-by: Andreas Steenpass <steenpass@mathematik.uni-kl.de>
  • Property mode set to 100644
File size: 40.7 KB
RevLine 
[857ee3]1//////////////////////////////////////////////////////////////////////
2version="version tasks.lib 4.0.0.0 Dec_2013 "; // $Id$
3category="General purpose";
4info="
5LIBRARY:   tasks.lib  A parallel framework based on tasks
6
7AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
8
9OVERVIEW:
10This library provides a parallel framework based on tasks. It introduces a new
11Singular type @code{task}; an object of this type is a command (given by a
12string) applied to a list of arguments. Tasks can be computed in parallel via
13the procedures in this library and they can even be started recursively, i.e.
14from within other tasks.
15
16tasks.lib respects the limits for computational resources defined
17in @ref{resources.lib}, i.e., all tasks within the same Singular session will
18not use more computational resources than provided via resources.lib, even if
19tasks are started recursively.
20
21The Singular library @ref{parallel.lib} provides implementations of several
22parallel 'skeletons' based on tasks.lib.
23
24KEYWORDS:  parallelization; distributed computing; task
25
26SEE ALSO:  resources_lib, parallel_lib
27
28PROCEDURES:
29  createTask();    create a task
30  killTask();      kill a task
31  copyTask();      copy a task
32  compareTasks();  compare two tasks
33  printTask();     print a task
34  startTasks();    start tasks
35  stopTask();      stop a task
36  waitTasks();     wait for a certain number of tasks
37  waitAllTasks();  wait for all tasks
38  pollTask();      poll a task
39  getCommand();    get the command of a task
40  getArguments();  get the arguments of a task
41  getResult();     get the result of a task
42  getState();      get the state of a task
43";
44
45/*
46RATIONALE FOR DEVELOPERS
47
48The Singular type 'task'
49------------------------
50tasks.lib introduces a Singular type 'task' which makes use of a pointer-like
51model in order to avoid unnecessary copying of data. 'task' is defined as a
52newstruct whose only member is 'int index'. This index points to an entry in
53the lib-internal list 'tasks'. The elements of this list are of the type
54'internal_task' which is defined as a newstruct with the following members:
55int id         - the internal ID
56string command - the command
57list arguments - the arguments
58def result     - the result
59string state   - the current state, see 'The life cycle of a task'
60list links     - control handles, see 'Links'
61int linkID     - the ID of the control handles
62
63
64The life cycle of a task
65------------------------
66'uninitialized' --> 'created' --> 'started' --> 'completed'
67                                     | ^
68                                     v |
69                                  'stopped'
70
71The state of a task t is 'uninitialized' iff
72(t.index == 0) or (typeof(tasks[t.index]) != "internal_task").
73
74A task can be reset to 'uninitialized' by killTask() at any time.
75
76
77Assigned members for 'internal_task'
78------------------------------------
79For each state, the following members of an internal_task must be assigned:
80
81created:       command arguments        state
82started:    id command arguments        state links linkID
83stopped:       command arguments        state
84completed:     command arguments result state
85
86All other members should be wiped out.
87
88Local supervisors
89-----------------
90A call of 'startTasks(t(1..n));' for tasks t(1), ..., t(n) creates a child
91process which plays the role of a supervisor for these tasks. The computation
92of the tasks is done in child processes of the supervisor.
93
94The supervisor assigns an internal state to each task which is represented by
95an integer. The meaning of these integers and their relation to the global
96state of each task is as follows:
97
98internal state | meaning           | corresponding global state
99---------------|-------------------|---------------------------
100             0 | waiting           | started
101             1 | started           | started
102             2 | (result) computed | started
103             3 | (result) sent     | completed
104            -1 | stopped           | stopped
105
106Links
107-----
108The ssi link between the main process and the supervisor is named 'l(pid)'
109where pid is the PID of the main process. The links between the supervisor and
110its child processes are named 'll(pid)(1)', 'll(pid)(2)', ... where pid is the
111PID of the supervisor. The link between the child processes of the supervisor
112and the main process is named 'L(pid)' where pid is the PID of the main
113process. This link is only for sending the results to the main process and must
114not be used in the other direction!
115
116For any task t whose state is 'started', tasks[t.index].links is
117list(L(pid), l(pid)) where pid is the PID of the main process.
118
119Communication model
120-------------------
121stopTask() <--> supervisor
122    0, id -->
123
124waitTasks() <--> supervisor
125(demanded_task is an intvec containing the IDs of the tasks which are being
126waited for; ndemanded is the number of tasks that is being waited for.)
127    1, demanded_tasks, ndemanded -->
128    [receiving results]
129    1, 0:2, -1 -->
130    results_sent <--
131    [receiving remaining results]
132
133pollTask() <--> supervisor
134    2, id -->
135    state <--
136    [receive result if state == 2 (computed)]
137
138startTasks_child() <--> startTasks_grandchild()
139    [compute the result]
140    1, id <--
141    [wait until the result is requested]
142    1 -->
143    [send the result]
144    2 <--
145
146sending and receiving results:
147main process <--> supervisor <--> startTasks_grandchild()
148    [request the result, see above]
149    index, result  (main process <-- startTasks_grandchild())
150    3, id          (main process --> supervisor)
151*/
152
153LIB "resources.lib";
154
155static proc mod_init()
156{
157    /* initialize the semaphores */
158    if (!defined(Resources)) {
159        LIB "resources.lib";
160    }
161    // the number of processor cores
162    int sem_cores = Resources::sem_cores;
163    exportto(Tasks, sem_cores);
164    // the number of leaves in the parallel tree (not strict)
165    int sem_leaves = semaphore(system("cpu")+10);
166    exportto(Tasks, sem_leaves);
167    // the number of processes waiting for sem_cores with low priority
168    int sem_queue = semaphore(2);
169    exportto(Tasks, sem_queue);
170
171    /* define the Singular type 'task' */
172    newstruct("task", "int index");
173    newstruct("internal_task", "int id, string command, list arguments,"
174        +"def result, string state, list links, int linkID");
175    system("install", "task", "=", createTask, 1);
176    system("install", "task", "==", compareTasks, 2);
177    system("install", "task", "print", printTask, 1);
178
179    /* define (lib-)global variables */
180    list tasks;     // the lib-internal list of tasks
181    exportto(Tasks, tasks);
182    int ntasks;     // the current maximal index in 'tasks'
183    exportto(Tasks, ntasks);
184    int nlinkIDs;   // the current maximal linkID
185    exportto(Tasks, nlinkIDs);
186}
187
188proc createTask(alias string command, alias list arguments)
189"USAGE:   createTask(command, arguments), command string, arguments list
190RETURN:   a task with the given command and arguments whose state is 'created'.
191NOTE:     't = command, arguments;' is a shortcut for
192          't = createTask(command, arguments);'.
193SEE ALSO: startTasks, getCommand, getArguments, getState, killTask, copyTask,
194          compareTasks, printTask
195EXAMPLE:  example createTask; shows an example"
196{
197    internal_task T;
198    ntasks++;
199    tasks[ntasks] = T;
200    tasks[ntasks].command = command;
201    tasks[ntasks].arguments = arguments;
202    tasks[ntasks].state = "created";
203    task t;
204    t.index = ntasks;
205    return(t);
206}
207example
208{
209    "EXAMPLE:";
210    echo = 2;
211    ring R = 0, (x,y), dp;
212    ideal I = x9y2+x10, x2y7-y8;
213    task t = createTask("std", list(I));
214    // This is the same as:
215    // task t = "std", list(I);
216    t;
217    killTask(t);
218}
219
220proc killTask(task t)
221"USAGE:   killTask(t), t task
222RETURN:   nothing. If the state of t is 'started', then t is stopped first. The
223          internal data structures of t are erased and its state is set to
224          'uninitialized'.
225NOTE:     'killTask(t);' is not the same as 'kill t;'. The latter command does
226          not erase the internal data structures of t. Hence killTask() should
227          be called for any no longer needed task in order to free memory.
228SEE ALSO: stopTask, getState, createTask, printTask
229EXAMPLE:  example killTask; shows an example"
230{
231    if (t.index == 0) {
232        return();
233    }
234    if (typeof(tasks[t.index]) != "internal_task") {
235        return();
236    }
237    if (tasks[t.index].state == "started") {
238        stopTask(t);
239    }
240    tasks[t.index] = def(0);
241}
242example
243{
244    "EXAMPLE:";
245    echo = 2;
246    ring R = 0, (x,y), dp;
247    ideal I = x9y2+x10, x2y7-y8;
248    task t = "std", list(I);
249    startTasks(t);
250    t;
251    killTask(t);
252    t;
253    getState(t);
254}
255
256proc copyTask(task t)
257"USAGE:   copyTask(t), t task
258RETURN:   a copy of t.
259NOTE:     'task t1 = copyTask(t2);' is not the same as 'task t1 = t2;'. After
260          the latter command, t1 points to the same object as t2; any changes
261          to t2 will also effect t1. In contrast to this, copyTask() creates a
262          new independend task.
263       @* A task whose state is 'started' cannot be copied.
264SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask,
265          compareTasks, printTask
266EXAMPLE:  example copyTask; shows an example"
267{
268    task t_copy;
269    if (t.index == 0) {
270        return(t_copy);
271    }
272    if (typeof(tasks[t.index]) != "internal_task") {
273        return(t_copy);
274    }
275    if (tasks[t.index].state == "started") {
276        ERROR("cannot copy a task whose state is 'started'");
277    }
278    ntasks++;
279    tasks[ntasks] = tasks[t.index];
280    t_copy.index = ntasks;
281    return(t_copy);
282}
283example
284{
285    "EXAMPLE:";
286    echo = 2;
287    ring R = 0, (x,y), dp;
288    ideal I = x9y2+x10, x2y7-y8;
289    task t1 = "std", list(I);
290    startTasks(t1);
291    waitAllTasks(t1);
292    task t2 = copyTask(t1);
293    killTask(t1);
294    t2;   // t2 survived
295    getResult(t2);
296    killTask(t2);
297}
298
299proc compareTasks(task t1, task t2)
300"USAGE:   compareTasks(t1, t2), t1, t2 tasks
301RETURN:   1, if t1 and t2 coincide;
302          0, otherwise.
303NOTE:     The arguments and the results of t1 and t2 are not compared.
304       @* 't1 == t2' is a shortcut for 'compareTasks(t1, t2)'.
305SEE ALSO: getCommand, getArguments, getResult, getState, copyTask, printTask
306EXAMPLE:  example compareTasks; shows an example"
307{
308    if (tasks[t1.index].id != tasks[t2.index].id) {
309        return(0);
310    }
311    if (tasks[t1.index].command != tasks[t2.index].command) {
312        return(0);
313    }
314    if (tasks[t1.index].state != tasks[t2.index].state) {
315        return(0);
316    }
317    if (tasks[t1.index].linkID != tasks[t2.index].linkID) {
318        return(0);
319    }
320    return(1);
321}
322example
323{
324    "EXAMPLE:";
325    echo = 2;
326    ring R = 0, (x,y), dp;
327    ideal I = x9y2+x10, x2y7-y8;
328    task t1 = "std", list(I);
329    task t2 = "std", list(I);
330    compareTasks(t1, t2);
331    startTasks(t1);
332    waitAllTasks(t1);
333    t1 == t2;   // the same as compareTasks(t1, t2);
334    killTask(t1);
335    killTask(t2);
336    // The arguments and the result are not compared!
337    ideal J = x;
338    task t3 = "std", list(I);
339    task t4 = "std", list(J);
340    t3 == t4;
341    killTask(t3);
342    killTask(t4);
343}
344
345proc printTask(task t)
346"USAGE:   printTask(t), t task
347RETURN:   nothing. Prints information about t.
348NOTE:     'print(t);' and 't;' are shortcuts for 'printTask(t)'.
349SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask
350EXAMPLE:  example printTask; shows an example"
351{
352    if (t.index == 0) {
353        "An uninitialized task";
354        return();
355    }
356    if (typeof(tasks[t.index]) != "internal_task") {
357        "An uninitialized task";
358        return();
359    }
360    "A task with the following properties:"+newline
361        +"command:          "+tasks[t.index].command+newline
362        +"no. of arguments: "+string(size(tasks[t.index].arguments))+newline
363        +"state:            "+tasks[t.index].state;
364}
365example
366{
367    "EXAMPLE:";
368    echo = 2;
369    ring R = 0, (x,y), dp;
370    ideal I = x9y2+x10, x2y7-y8;
371    task t;
372    printTask(t);
373    t = "std", list(I);
374    t;   // the same as printTask(t);
375    startTasks(t);
376    waitAllTasks(t);
377    t;
378    killTask(t);
379}
380
381proc startTasks(list #)
382"USAGE:   startTasks(t1, t2, ...), t1, t2, ... tasks
383RETURN:   nothing. Starts the tasks t1, t2, ... and sets their states to
384          'started'.
385NOTE:     A task whose state is neither 'created' nor 'stopped' cannot be
386          started.
387       @* If startTasks() is applied to a task whose state is 'stopped', then
388          the computation of this task will be restarted from the beginning.
389       @* Tasks can be started from within other tasks. A started task should
390          not be accessed from within any task other than the one within which
391          it was started.
392       @* For each task, the start of its computation is subject to the
393          internal scheduling.
394SEE ALSO: stopTask, waitTasks, pollTask, getState, createTask, printTask
395EXAMPLE:  example startTasks; shows an example"
396{
397    int nargs = size(#);
398    if (nargs == 0) {
399        ERROR("missing argument");
400    }
401    int i;
402    for (i = nargs; i > 0; i--) {
403        if (typeof(#[i]) != "task") {
404            ERROR("argument not of type 'task' (argument no. "+string(i)+")");
405        }
406        if (#[i].index == 0) {
407            ERROR("cannot start an uninitialized task (task no. "
408                +string(i)+")");
409        }
410        if (typeof(tasks[#[i].index]) != "internal_task") {
411            ERROR("cannot start an uninitialized task (task no. "
412                +string(i)+")");
413        }
414        if (tasks[#[i].index].state != "created"
415            && tasks[#[i].index].state != "stopped") {
416            ERROR("cannot start a task whose state is not"+newline
417                +"'created' or 'stopped'");
418        }
419    }
420    for (i = nargs; i > 0; i--) {
421        tasks[#[i].index].id = i;   // has to be set before forking
422        tasks[#[i].index].state = "started";
423    }
424    int pid = system("pid");
425    link l(pid) = "ssi:fork";
426    open(l(pid));
427    write(l(pid), quote(startTasks_child(#, eval(pid))));
428    int port = read(l(pid));
429    link L(pid) = "ssi:connect localhost:"+string(port);
430    open(L(pid));
431    nlinkIDs++;
432    for (i = nargs; i > 0; i--) {
433        tasks[#[i].index].links = list(L(pid), l(pid));
434        tasks[#[i].index].linkID = nlinkIDs;
435    }
436}
437example
438{
439    "EXAMPLE:";
440    echo = 2;
441    ring R = 0, (x,y), dp;
442    ideal I = x9y2+x10, x2y7-y8;
443    task t1 = "std", list(I);
444    task t2 = "slimgb", list(I);
445    startTasks(t1, t2);
446    waitAllTasks(t1, t2);
447    getResult(t1);
448    getResult(t2);
449    killTask(t1);
450    killTask(t2);
451}
452
453/* This procedure is started within the child after forking. */
454static proc startTasks_child(list localtasks, int pid_parent)
455{
456    int port = system("reserve", 1);
457    write(l(pid_parent), port);
458    link L(pid_parent) = system("reservedLink");
459    export(L(pid_parent));
460
461    int sem_write = semaphore(1);
462    int pid = system("pid");
463
464    int nlocaltasks = size(localtasks);
465    intvec state = 0:nlocaltasks;
466        // the internal state of each localtask (see rationale)
467    int nwaiting = nlocaltasks;
468        // the number of local tasks with internal state 0 (waiting)
469    int nfinished;
470        // the number of local tasks with internal state 3 (result sent) or
471        // -1 (stopped)
472    intvec queue = 0, 1..nlocaltasks, 0;   // zeroes for easier handling
473
474    list links;
475    links[nlocaltasks+1] = l(pid_parent);
476    intvec assignment = 0:nlocaltasks;
477        // the task with id = i is running in link no. assignment[i]
478    int nlinks;
479
480    // data sent by other processes
481    int code;
482    int id;
483    intvec demanded_tasks = 0, 0;   // zeroes for easier handling
484    int ndemanded = -1;
485
486    // internal counts
487    int granted_leaves;
488    int results_sent;
489
490    // auxiliary variables
491    int wait;
492    int deadlock;
493    int tmp;
494    int i;
495
496    while (nwaiting > 0) {
497        wait = 0;
498        if (nlinks == 0) {
499            wait = -1;
500            granted_leaves++;
501            while (-wait < nwaiting) {
502                if (system("semaphore", "try_acquire", sem_leaves) == 1) {
503                    wait--;
504                }
505                else {
506                    break;
507                }
508            }
509        }
510        while (wait == 0) {
511            wait = waitfirst(links, 500);
512            if (wait == 0) {
513                while (-wait < nwaiting) {
514                    if (system("semaphore", "try_acquire", sem_leaves) == 1) {
515                        wait--;
516                    }
517                    else {
518                        break;
519                    }
520                }
521            }
522        }
523        if (wait < 0) {   // open (-wait) new links
524            while (wait < 0) {
525                wait++;
526                nlinks++;
527                link ll(pid)(nlinks) = "ssi:fork";
528                open(ll(pid)(nlinks));
529                links[nlinks] = ll(pid)(nlinks);
530                write(links[nlinks],
531                    quote(startTasks_grandchild(
532                    eval(localtasks[queue[2]].index), eval(pid_parent),
533                    eval(pid), eval(nlinks), eval(sem_write))));
534                assignment[queue[2]] = nlinks;
535                state[queue[2]] = 1;
536                nwaiting--;
537                queue = 0, queue[3..size(queue)];
538            }
539            // wait == 0
540        }
541        if (wait > 0 && wait <= nlocaltasks) {
542            code = read(links[wait]);
543            if (code == 1) {   // result computed
544                id = read(links[wait]);
545                state[id] = 2;
546                if (ndemanded > 0 && isElement(demanded_tasks, id)) {
547                    write(links[wait], 1);
548                    demanded_tasks = removeFromIntvec(demanded_tasks, id);
549                    ndemanded--;
550                    results_sent++;
551                }
552            }
553            // code == 2: startTasks_grandchild() ended, do nothing
554        }
555        if (wait == nlocaltasks+1) {
556            code = read(l(pid_parent));
557            if (code == 0) {   // stopTask
558                id = read(l(pid_parent));
559                if (state[id] == 0) {   // waiting
560                    queue = removeFromIntvec(queue, id);
561                }
562                if (state[id] == 1 || state[id] == 2) {  // started or computed
563                    close(links[assignment[id]]);
564                    open(links[assignment[id]]);
565                    write(links[assignment[id]],
566                        quote(startTasks_grandchild(
567                        eval(localtasks[queue[2]].index), eval(pid_parent),
568                        eval(pid), eval(assignment[id]), eval(sem_write))));
569                    assignment[queue[2]] = assignment[id];
570                    assignment[id] = 0;
571                    state[queue[2]] = 1;
572                    queue = 0, queue[3..size(queue)];
573                }
574                // state[id] == -1 (stopped) or state[id] == 3 (sent)
575                // should not happen
576                nwaiting--;
577                nfinished++;
578                state[id] = -1;
579            }
580            if (code == 1) {   // waitTasks
581                demanded_tasks = read(l(pid_parent));
582                ndemanded = read(l(pid_parent));
583                if (ndemanded > size(demanded_tasks)-2) {
584                    ndemanded = size(demanded_tasks)-2;
585                }
586                if (demanded_tasks == 0:2 && ndemanded == -1) {
587                    write(l(pid_parent), results_sent);
588                }
589                else {
590                    results_sent = 0;
591                }
592                deadlock = 0;
593                for (i = size(demanded_tasks)-1; i > 1; i--) {
594                    id = demanded_tasks[i];
595                    if (state[id] == 0) {   // waiting
596                        queue = give_priority(queue, id);
597                        deadlock = 1;
598                    }
599                    if (state[id] == 2) {   // computed
600                        write(links[assignment[id]], 1);
601                        demanded_tasks = removeFromIntvec(demanded_tasks, id);
602                        ndemanded--;
603                        results_sent++;
604                    }
605                }
606                for (i = size(demanded_tasks)-1; i > 1; i--) {
607                    id = demanded_tasks[i];
608                    if (state[id] == 1) {   // started
609                        deadlock = 0;
610                    }
611                }
612                if (deadlock) {
613                    granted_leaves++;
614                    nlinks++;
615                    link ll(pid)(nlinks) = "ssi:fork";
616                    open(ll(pid)(nlinks));
617                    links[nlinks] = ll(pid)(nlinks);
618                    write(links[nlinks],
619                        quote(startTasks_grandchild(
620                        eval(localtasks[queue[2]].index), eval(pid_parent),
621                        eval(pid), eval(nlinks), eval(sem_write))));
622                    assignment[queue[2]] = nlinks;
623                    state[queue[2]] = 1;
624                    nwaiting--;
625                    queue = 0, queue[3..size(queue)];
626                }
627            }
628            if (code == 2) {   // pollTask
629                id = read(l(pid_parent));
630                if (state[id] == 0) {   // waiting
631                    queue = give_priority(queue, id);
632                }
633                if (state[id] == 2) {   // computed
634                    write(links[assignment[id]], 1);
635                }
636                write(l(pid_parent), state[id]);
637            }
638            if (code == 3) {   // got result
639                id = read(l(pid_parent));
640                write(links[assignment[id]],
641                    quote(startTasks_grandchild(
642                    eval(localtasks[queue[2]].index), eval(pid_parent),
643                    eval(pid), eval(assignment[id]), eval(sem_write))));
644                assignment[queue[2]] = assignment[id];
645                assignment[id] = 0;
646                state[queue[2]] = 1;
647                state[id] = 3;
648                nwaiting--;
649                nfinished++;
650                queue = 0, queue[3..size(queue)];
651            }
652        }
653    }
654    while (nfinished < nlocaltasks || ndemanded != -1) {
655        wait = waitfirst(links);
656        if (wait <= nlocaltasks) {
657            code = read(links[wait]);
658            if (code == 1) {   // result computed
659                id = read(links[wait]);
660                state[id] = 2;
661                if (ndemanded > 0 && isElement(demanded_tasks, id)) {
662                    write(links[wait], 1);
663                    demanded_tasks = removeFromIntvec(demanded_tasks, id);
664                    ndemanded--;
665                    results_sent++;
666                }
667            }
668            // code == 2: startTasks_grandchild() ended, do nothing
669        }
670        if (wait == nlocaltasks+1) {
671            code = read(l(pid_parent));
672            if (code == 0) {   // stopTask
673                id = read(l(pid_parent));
674                if (state[id] == 1 || state[id] == 2) {  // started or computed
675                    close(links[assignment[id]]);
676                    if (nlinks > granted_leaves) {
677                        tmp = system("semaphore", "release", sem_leaves);
678                    }
679                    links[assignment[id]] = def(0);
680                    nlinks--;
681                    assignment[id] = 0;
682                    nfinished++;
683                }
684                // else: nothing to do
685                state[id] = -1;
686            }
687            if (code == 1) {   // waitTasks
688                demanded_tasks = read(l(pid_parent));
689                ndemanded = read(l(pid_parent));
690                if (ndemanded > size(demanded_tasks)-2) {
691                    ndemanded = size(demanded_tasks)-2;
692                }
693                if (demanded_tasks == 0:2 && ndemanded == -1) {
694                    write(l(pid_parent), results_sent);
695                }
696                else {
697                    results_sent = 0;
698                }
699                for (i = size(demanded_tasks)-1; i > 1; i--) {
700                    id = demanded_tasks[i];
701                    if (state[id] == 2) {   // computed
702                        write(links[assignment[id]], 1);
703                        demanded_tasks = removeFromIntvec(demanded_tasks, id);
704                        ndemanded--;
705                        results_sent++;
706                    }
707                }
708            }
709            if (code == 2) {   // pollTask
710                id = read(l(pid_parent));
711                if (state[id] == 2) {   // computed
712                    write(links[assignment[id]], 1);
713                }
714                write(l(pid_parent), state[id]);
715            }
716            if (code == 3) {   // got result
717                id = read(l(pid_parent));
718                close(links[assignment[id]]);
719                if (nlinks > granted_leaves) {
720                    tmp = system("semaphore", "release", sem_leaves);
721                }
722                links[assignment[id]] = def(0);
723                nlinks--;
724                assignment[id] = 0;
725                state[id] = 3;
726                nfinished++;
727            }
728        }
729    }
730}
731
732/* This procedure has to be started within the grandchildren after forking. */
733static proc startTasks_grandchild(int index, int pid_grandparent,
734    int pid_parent, int link_no, int sem_write)
735{
736    def result;
737    int tmp = system("semaphore", "acquire", sem_queue);
738    tmp = system("semaphore", "acquire", sem_cores);
739    tmp = system("semaphore", "release", sem_queue);
740    execute("result = "+tasks[index].command+"("
741        +argsToString("tasks[index].arguments", size(tasks[index].arguments))
742        +");");
743    tmp = system("semaphore", "release", sem_cores);
744    write(ll(pid_parent)(link_no), 1);
745    write(ll(pid_parent)(link_no), tasks[index].id);
746    tmp = read(ll(pid_parent)(link_no));
747    tmp = system("semaphore", "acquire", sem_write);
748    write(L(pid_grandparent), index);
749    write(L(pid_grandparent), result);
750    tmp = system("semaphore", "release", sem_write);
751    return(2);
752}
753
754/* Check if n is an element of v[2..(size(v)-1)].
755 * Note that v[1] and v[size(v)] are not checked.
756 */
757static proc isElement(alias intvec v, alias int n)
758{
759    int i;
760    for (i = size(v)-1; i > 1; i--) {
761        if (v[i] == n) {
762            return(1);
763        }
764    }
765    return(0);
766}
767
768/* Remove the first occurence (if any) of n in v[2..(size(v)-1)].
769 * Note that v[1] and v[size(v)] are not checked.
770 */
771static proc removeFromIntvec(intvec v, int n)
772{
773    int size_v = size(v);
774    int i;
775    for (i = size_v-1; i > 1; i--) {
776        if (v[i] == n) {
777            return(v[1..(i-1)], v[(i+1)..size_v]);
778        }
779    }
780    return(v);
781}
782
783/* Move the first occurence (if any) of id in queue[2..(size(queue)-1)] to
784 * queue[2]. Note that queue[1] and queue[size(v)] are not checked.
785 */
786static proc give_priority(intvec queue, int id)
787{
788    int size_queue = size(queue);
789    int i;
790    for (i = size_queue-1; i > 2; i--) {
791        if (queue[i] == id) {
792            return(0, id, queue[2..(i-1)], queue[(i+1)..size_queue]);
793        }
794    }
795    return(queue);
796}
797
798proc stopTask(task t)
799"USAGE:   stopTask(t), t task
800RETURN:   nothing. Stops the t and sets its state to 'stopped'.
801NOTE:     A task whose state is not 'started' cannot be stopped.
802       @* Intermediate results are discarded when a task is stopped.
803       @* killTask() should be called for any no longer needed task.
804SEE ALSO: startTasks, waitTasks, pollTask, getState, killTask, printTask
805EXAMPLE:  example stopTask; shows an example"
806{
807    if (t.index == 0) {
808        ERROR("cannot stop an uninitialized task");
809    }
810    if (typeof(tasks[t.index]) != "internal_task") {
811        ERROR("cannot stop an uninitialized task");
812    }
813    if (tasks[t.index].state != "started") {
814        ERROR("cannot stop a task whose state is not 'started'");
815    }
816    write(tasks[t.index].links[2], 0);
817    write(tasks[t.index].links[2], tasks[t.index].id);
818    tasks[t.index].id = 0;
819    tasks[t.index].links = list();
820    tasks[t.index].linkID = 0;
821    tasks[t.index].state = "stopped";
822}
823example
824{
825    "EXAMPLE:";
826    echo = 2;
827    ring R = 0, (x,y), dp;
828    ideal I = x9y2+x10, x2y7-y8;
829    task t = "std", list(I);
830    startTasks(t);
831    stopTask(t);
832    t;
833    killTask(t);
834}
835
836proc waitTasks(list T, int N, list #)
837"USAGE:   waitTasks(T, N[, timeout]), T list of tasks, N int, timeout int
838RETURN:   an ordered list of the indices of those tasks which have been
839          successfully completed. The state of these tasks is set to
840          'completed'.
841       @* The procedure waits for N tasks to complete.
842       @* An optional timeout in ms can be provided. Default is 0 which
843          disables the timeout.
844NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
845          waited for.
846       @* The result of any completed task can be accessed via @ref{getResult}.
847       @* The returned list may contain more than N entries if the computation
848          of some tasks has already finished and/or if several tasks finish
849          \"at the same time\". It may contain less than N entries in
850          the case of timeout or errors occurring.
851       @* Polling is guaranteed, i.e. the index of any task t for which
852          'pollTask(t);' would return 1 will appear in the returned list.
853SEE ALSO: startTasks, pollTask, getResult, getState, printTask
854EXAMPLE:  example waitTasks; shows an example"
855{
856    /* initialize the timer */
857    int oldtimerresolution = system("--ticks-per-sec");
858    system("--ticks-per-sec", 1000);
859    int starting_time = rtimer;
860
861    /* read optional parameters */
862    int timeout;
863    if (size(#) > 0) {
864        if (size(#) > 1 || typeof(#[1]) != "int") {
865            ERROR("wrong optional parameter");
866        }
867        timeout = #[1];
868    }
869
870    /* check for errors */
871    if (timeout < 0) {
872        ERROR("negative timeout");
873    }
874    int nargs = size(T);
875    if (nargs == 0) {
876        ERROR("missing task");
877    }
878    if (N < 1 || N > nargs) {
879        ERROR("wrong number of tasks to wait for");
880    }
881    int i;
882    for (i = nargs; i > 0; i--) {
883        if (typeof(T[i]) != "task") {
884            ERROR("element not of type 'task' (element no. "+string(i)+")");
885        }
886        if (T[i].index == 0) {
887            ERROR("cannot wait for an uninitialized task (task no. "+string(i)
888                +")");
889        }
890        if (typeof(tasks[T[i].index]) != "internal_task") {
891            ERROR("cannot wait for an uninitialized task (task no. "+string(i)
892                +")");
893        }
894        if (tasks[T[i].index].state != "started"
895            && tasks[T[i].index].state != "completed") {
896            ERROR("cannot wait for a task whose state is not"+newline
897                +"'started' or 'completed' (task no. "+string(i)+")");
898        }
899    }
900
901    /* sort the tasks */
902    int ncompleted;
903    list requests;
904    list links;
905    int sorted_in;
906    int j;
907    for (i = 1; i <= nargs; i++) {
908        if (tasks[T[i].index].state == "completed") {
909            ncompleted++;
910        }
911        else {   // tasks[T[i].index].state == "started"
912            sorted_in = 0;
913            for (j = size(requests); j > 0; j--) {
914                if (requests[j][1] == tasks[T[i].index].linkID) {
915                    requests[j][2][size(requests[j][2])+1] =
916                        tasks[T[i].index].id;
917                    sorted_in = 1;
918                    break;
919                }
920            }
921            if (!sorted_in) {
922                requests[size(requests)+1] = list(tasks[T[i].index].linkID,
923                    intvec(0, tasks[T[i].index].id),
924                    tasks[T[i].index].links[2]);
925                links[size(links)+1] = tasks[T[i].index].links[1];
926            }
927        }
928    }
929
930    /* send the reqests */
931    for (j = size(requests); j > 0; j--) {
932        requests[j][2][size(requests[j][2])+1] = 0;
933        write(requests[j][3], 1);
934        write(requests[j][3], requests[j][2]);
935        write(requests[j][3], N-ncompleted);
936    }
937
938    /* wait for the results */
939    int wait;
940    int index;
941    int results_got;
942    int remaining_time;
943    int tmp;
944    while (ncompleted < N) {
945        wait = waitfirst(links, 0);
946        if (wait == 0) {
947            if (timeout == 0) {
948                tmp = system("semaphore", "release", sem_cores);
949                wait = waitfirst(links);
950                tmp = system("semaphore", "acquire", sem_cores);
951            }
952            else {
953                remaining_time = timeout-(rtimer-starting_time);
954                if (remaining_time < 0) {
955                    break;
956                }
957                else {
958                    tmp = system("semaphore", "release", sem_cores);
959                    wait = waitfirst(links, remaining_time);
960                    tmp = system("semaphore", "acquire", sem_cores);
961                }
962            }
963        }
964        if (wait < 1) {
965            break;
966        }
967        index = read(links[wait]);
968        tasks[index].result = read(links[wait]);
969        write(tasks[index].links[2], 3);
970        write(tasks[index].links[2], tasks[index].id);
971        tasks[index].id = 0;
972        tasks[index].links = list();
973        tasks[index].linkID = 0;
974        tasks[index].state = "completed";
975        ncompleted++;
976        results_got++;
977    }
978    if (wait == -1) {
979        ERROR("error in waitfirst()");
980    }
981
982    /* end communication process */
983    for (j = size(requests); j > 0; j--) {
984        write(requests[j][3], 1);
985        write(requests[j][3], 0:2);
986        write(requests[j][3], -1);
987    }
988    int results_sent;
989    for (j = size(requests); j > 0; j--) {
990        results_sent = results_sent + read(requests[j][3]);
991    }
992    while (results_sent > results_got) {
993        wait = waitfirst(links);
994        if (wait == -1) {
995            ERROR("error in waitfirst()");
996        }
997        index = read(links[wait]);
998        tasks[index].result = read(links[wait]);
999        write(tasks[index].links[2], 3);
1000        write(tasks[index].links[2], tasks[index].id);
1001        tasks[index].id = 0;
1002        tasks[index].links = list();
1003        tasks[index].linkID = 0;
1004        tasks[index].state = "completed";
1005        results_got++;
1006    }
1007
1008    /* list completed tasks */
1009    list completed;
1010    completed[nargs+1] = 0;
1011    j = 0;
1012    for (i = 1; i <= nargs; i++) {
1013        if (tasks[T[i].index].state == "completed") {
1014            j++;
1015            completed[j] = i;
1016        }
1017    }
1018    completed[nargs+1] = def(0);
1019
1020    /* return the result */
1021    system("--ticks-per-sec", oldtimerresolution);
1022    return(completed);
1023}
1024example
1025{
1026    "EXAMPLE:";
1027    echo = 2;
1028    ring R = 0, (x,y), dp;
1029    ideal I = x9y2+x10, x2y7-y8;
1030    task t1 = "std", list(I);
1031    task t2 = "slimgb", list(I);
1032    startTasks(t1, t2);
1033    waitTasks(list(t1, t2), 2);   // wait for both tasks
1034    getResult(t1);
1035    getResult(t2);
1036    killTask(t1);
1037    killTask(t2);
1038}
1039
1040proc waitAllTasks(list #)
1041"USAGE:   waitAllTasks(t1, t2, ...), t1, t2, ... tasks
1042RETURN:   nothing. Waits for all the tasks t1, t2, ... to complete. The state
1043          of the tasks is set to 'completed'.
1044NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
1045          waited for.
1046       @* The result of any completed task can be accessed via @ref{getResult}.
1047       @* 'waitAllTasks(t1, t2, ...);' is a shortcut for
1048          'waitTasks(list(t1, t2, ...), size(list(t1, t2, ...)));'. Since
1049          returning a list of the indices of the completed tasks does not make
1050          sense in this case, nothing is returned.
1051SEE ALSO: waitTasks, startTasks, pollTask, getResult, getState, printTask
1052EXAMPLE:  example waitAllTasks; shows an example"
1053{
1054    list tmp = waitTasks(#, size(#));
1055}
1056example
1057{
1058    "EXAMPLE:";
1059    echo = 2;
1060    ring R = 0, (x,y), dp;
1061    ideal I = x9y2+x10, x2y7-y8;
1062    task t1 = "std", list(I);
1063    task t2 = "slimgb", list(I);
1064    startTasks(t1, t2);
1065    waitAllTasks(t1, t2);   // the same as 'waitTasks(list(t1, t2), 2);',
1066                            // but without return value
1067    getResult(t1);
1068    getResult(t2);
1069    killTask(t1);
1070    killTask(t2);
1071}
1072
1073proc pollTask(task t)
1074"USAGE:   pollTask(t), t task
1075RETURN:   1, if the computation of the task t has successfully finished;
1076          0, otherwise.
1077       @* The state of any task whose computation has successfully finished is
1078          set to 'completed'.
1079NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
1080          polled.
1081       @* The result of any completed task can be accessed via @ref{getResult}.
1082       @* pollTask() should return immediately. However, receiving the result
1083          of the task may take some time.
1084SEE ALSO: startTasks, waitTasks, getResult, getState, printTask
1085EXAMPLE:  example pollTask; shows an example"
1086{
1087    if (t.index == 0) {
1088        ERROR("cannot poll an uninitialized task");
1089    }
1090    if (typeof(tasks[t.index]) != "internal_task") {
1091        ERROR("cannot poll an uninitialized task");
1092    }
1093    if (tasks[t.index].state != "started"
1094        && tasks[t.index].state != "completed") {
1095        ERROR("cannot poll a task whose state is not"+newline
1096            +"'started' or 'completed'");
1097    }
1098    if (tasks[t.index].state == "completed") {
1099        return(1);
1100    }
1101    // tasks[t.index].state == "started"
1102    write(tasks[t.index].links[2], 2);
1103    write(tasks[t.index].links[2], tasks[t.index].id);
1104    int state = read(tasks[t.index].links[2]);
1105    if (state == 0 || state == 1) {   // waiting or started
1106        return(0);
1107    }
1108    if (state == 2) {   // computed
1109        int index = read(tasks[t.index].links[1]);   // index == t.index
1110        tasks[t.index].result = read(tasks[t.index].links[1]);
1111        write(tasks[t.index].links[2], 3);
1112        write(tasks[t.index].links[2], tasks[t.index].id);
1113        tasks[t.index].id = 0;
1114        tasks[t.index].links = list();
1115        tasks[t.index].linkID = 0;
1116        tasks[t.index].state = "completed";
1117        return(1);
1118    }
1119    // state == -1 (stopped) or state == 3 (sent) should not happen
1120}
1121example
1122{
1123    "EXAMPLE:";
1124    echo = 2;
1125    ring R = 0, (x,y), dp;
1126    ideal I = x9y2+x10, x2y7-y8;
1127    task t = "std", list(I);
1128    startTasks(t);
1129    waitAllTasks(t);
1130    pollTask(t);   // task already completed
1131    t;
1132    getResult(t);
1133    killTask(t);
1134}
1135
1136proc getCommand(task t)
1137"USAGE:   getCommand(t), t task
1138RETURN:   a string, the command of t.
1139NOTE:     This command cannot be applied to tasks whose state is
1140          'uninitialized'.
1141SEE ALSO: getArguments, getResult, getState, createTask, printTask
1142EXAMPLE:  example getCommand; shows an example"
1143{
1144    if (t.index == 0) {
1145        ERROR("cannot get command of an uninitialized task");
1146    }
1147    if (typeof(tasks[t.index]) != "internal_task") {
1148        ERROR("cannot get command of an uninitialized task");
1149    }
1150    return(tasks[t.index].command);
1151}
1152example
1153{
1154    "EXAMPLE:";
1155    echo = 2;
1156    ring R = 0, (x,y), dp;
1157    ideal I = x9y2+x10, x2y7-y8;
1158    task t = "std", list(I);
1159    getCommand(t);
1160    killTask(t);
1161}
1162
1163proc getArguments(task t)
1164"USAGE:   getArguments(t), t task
1165RETURN:   a list, the arguments of t.
1166NOTE:     This command cannot be applied to tasks whose state is
1167          'uninitialized'.
1168SEE ALSO: getCommand, getResult, getState, createTask, printTask
1169EXAMPLE:  example getArguments; shows an example"
1170{
1171    if (t.index == 0) {
1172        ERROR("cannot get arguments of an uninitialized task");
1173    }
1174    if (typeof(tasks[t.index]) != "internal_task") {
1175        ERROR("cannot get arguments of an uninitialized task");
1176    }
1177    return(tasks[t.index].arguments);
1178}
1179example
1180{
1181    "EXAMPLE:";
1182    echo = 2;
1183    ring R = 0, (x,y), dp;
1184    ideal I = x9y2+x10, x2y7-y8;
1185    task t = "std", list(I);
1186    getArguments(t);
1187    killTask(t);
1188}
1189
1190proc getResult(task t)
1191"USAGE:   getResult(t), t task
1192RETURN:   the result of t.
1193NOTE:     This command cannot be applied to tasks whose state is not
1194          'completed'.
1195SEE ALSO: getCommand, getArguments, getState, waitTasks, pollTask, printTask
1196EXAMPLE:  example getResult; shows an example"
1197{
1198    if (t.index == 0) {
1199        ERROR("cannot get result of an uninitialized task");
1200    }
1201    if (typeof(tasks[t.index]) != "internal_task") {
1202        ERROR("cannot get result of an uninitialized task");
1203    }
1204    if (tasks[t.index].state != "completed") {
1205        ERROR("cannot get result of a task which is not completed");
1206    }
1207    return(tasks[t.index].result);
1208}
1209example
1210{
1211    "EXAMPLE:";
1212    echo = 2;
1213    ring R = 0, (x,y), dp;
1214    ideal I = x9y2+x10, x2y7-y8;
1215    task t = "std", list(I);
1216    startTasks(t);
1217    waitAllTasks(t);
1218    getResult(t);
1219    killTask(t);
1220}
1221
1222proc getState(task t)
1223"USAGE:   getState(t), t task
1224RETURN:   a string, the state of t.
1225SEE ALSO: getCommand, getArguments, getResult, printTask, createTask,
1226          startTasks, stopTask, waitTasks, pollTask, killTask
1227EXAMPLE:  example getState; shows an example"
1228{
1229    if (t.index == 0) {
1230        return("uninitialized");
1231    }
1232    if (typeof(tasks[t.index]) != "internal_task") {
1233        return("uninitialized");
1234    }
1235    return(tasks[t.index].state);
1236}
1237example
1238{
1239    "EXAMPLE:";
1240    echo = 2;
1241    ring R = 0, (x,y), dp;
1242    ideal I = x9y2+x10, x2y7-y8;
1243    task t = "std", list(I);
1244    getState(t);
1245    startTasks(t);
1246    getState(t);
1247    waitAllTasks(t);
1248    getState(t);
1249    killTask(t);
1250    getState(t);
1251}
1252
1253/ * construct the string "name[1], name[2], name[3], ..., name[length]" */
1254static proc argsToString(string name, int length)
1255{
1256    string output;
1257    if (length > 0) {
1258        output = name+"[1]";
1259    }
1260    int i;
1261    for (i = 2; i <= length; i++) {
1262        output = output+", "+name+"["+string(i)+"]";
1263    }
1264    return(output);
1265}
Note: See TracBrowser for help on using the repository browser.