source: git/Singular/LIB/tasks.lib @ 57d677

spielwiese
Last change on this file since 57d677 was 57d677, checked in by Andreas Steenpass <steenpass@…>, 10 years ago
chg: update tasks.lib Improve handling of internal data structures, especially intvecs. (cherry picked from commit 091fbfe87825a276fac0079c772304a96a7cb118) Signed-off-by: Andreas Steenpass <steenpass@mathematik.uni-kl.de>
  • Property mode set to 100644
File size: 42.5 KB
Line 
1//////////////////////////////////////////////////////////////////////
2version="version tasks.lib 4.0.0.0 Dec_2013 "; // $Id$
3category="General purpose";
4info="
5LIBRARY:   tasks.lib  A parallel framework based on tasks
6
7AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
8
9OVERVIEW:
10This library provides a parallel framework based on tasks. It introduces a new
11Singular type @code{task}; an object of this type is a command (given by a
12string) applied to a list of arguments. Tasks can be computed in parallel via
13the procedures in this library and they can even be started recursively, i.e.
14from within other tasks.
15
16tasks.lib respects the limits for computational resources defined
17in @ref{resources_lib}, i.e., all tasks within the same Singular session will
18not use more computational resources than provided via resources.lib, even if
19tasks are started recursively.
20
21The Singular library @ref{parallel_lib} provides implementations of several
22parallel 'skeletons' based on tasks.lib.
23
24KEYWORDS:  parallelization; distributed computing; task
25
26SEE ALSO:  resources_lib, parallel_lib
27
28PROCEDURES:
29  createTask();    create a task
30  killTask();      kill a task
31  copyTask();      copy a task
32  compareTasks();  compare two tasks
33  printTask();     print a task
34  startTasks();    start tasks
35  stopTask();      stop a task
36  waitTasks();     wait for a certain number of tasks
37  waitAllTasks();  wait for all tasks
38  pollTask();      poll a task
39  getCommand();    get the command of a task
40  getArguments();  get the arguments of a task
41  getResult();     get the result of a task
42  getState();      get the state of a task
43";
44
45/*
46RATIONALE FOR DEVELOPERS
47
48The Singular type 'task'
49------------------------
50tasks.lib introduces a Singular type 'task' which makes use of a pointer-like
51model in order to avoid unnecessary copying of data. 'task' is defined as a
52newstruct whose only member is 'int index'. This index points to an entry in
53the lib-internal list 'tasks'. The elements of this list are of the type
54'internal_task' which is defined as a newstruct with the following members:
55int id         - the internal ID
56string command - the command
57list arguments - the arguments
58def result     - the result
59string state   - the current state, see 'The life cycle of a task'
60list links     - control handles, see 'Links'
61int linkID     - the ID of the control handles
62
63
64The life cycle of a task
65------------------------
66'uninitialized' --> 'created' --> 'started' --> 'completed'
67                                     | ^
68                                     v |
69                                  'stopped'
70
71The state of a task t is 'uninitialized' iff
72(t.index == 0) or (typeof(tasks[t.index]) != "internal_task").
73
74A task can be reset to 'uninitialized' by killTask() at any time.
75
76
77Assigned members for 'internal_task'
78------------------------------------
79For each state, the following members of an internal_task must be assigned:
80
81created:       command arguments        state
82started:    id command arguments        state links linkID
83stopped:       command arguments        state
84completed:     command arguments result state
85
86All other members should be wiped out.
87
88Local supervisors
89-----------------
90A call of 'startTasks(t(1..n));' for tasks t(1), ..., t(n) creates a child
91process which plays the role of a supervisor for these tasks. The computation
92of the tasks is done in child processes of the supervisor.
93
94The supervisor assigns an internal state to each task which is represented by
95an integer. The meaning of these integers and their relation to the global
96state of each task is as follows:
97
98internal state | meaning           | corresponding global state
99---------------|-------------------|---------------------------
100             0 | waiting           | started
101             1 | started           | started
102             2 | (result) computed | started
103             3 | (result) sent     | completed
104            -1 | stopped           | stopped
105
106Links
107-----
108The ssi link between the main process and the supervisor is named 'l(pid)'
109where pid is the PID of the main process. The links between the supervisor and
110its child processes are named 'll(pid)(1)', 'll(pid)(2)', ... where pid is the
111PID of the supervisor. The link between the child processes of the supervisor
112and the main process is named 'L(pid)' where pid is the PID of the main
113process. This link is only for sending the results to the main process and must
114not be used in the other direction!
115
116For any task t whose state is 'started', tasks[t.index].links is
117list(L(pid), l(pid)) where pid is the PID of the main process.
118
119Communication model
120-------------------
121stopTask() <--> supervisor
122    0, id -->
123
124waitTasks() <--> supervisor
125(demanded_task is an intvec containing the IDs of the tasks which are being
126waited for; ndemanded is the number of tasks that is being waited for.)
127    1, demanded_tasks, ndemanded -->
128    [receiving results]
129    1, 0:2, -1 -->
130    results_sent <--
131    [receiving remaining results]
132
133pollTask() <--> supervisor
134    2, id -->
135    state <--
136    [receive result if state == 2 (computed)]
137
138startTasks_child() <--> startTasks_grandchild()
139    [compute the result]
140    1, id <--
141    [wait until the result is requested]
142    1 -->
143    [send the result]
144    2 <--
145
146sending and receiving results:
147main process <--> supervisor <--> startTasks_grandchild()
148    [request the result, see above]
149    index, result  (main process <-- startTasks_grandchild())
150    3, id          (main process --> supervisor)
151*/
152
153LIB "resources.lib";
154
155static proc mod_init()
156{
157    /* initialize the semaphores */
158    if (!defined(Resources)) {
159        LIB "resources.lib";
160    }
161    // the number of processor cores
162    int sem_cores = Resources::sem_cores;
163    exportto(Tasks, sem_cores);
164    // the number of leaves in the parallel tree (not strict)
165    int sem_leaves = semaphore(system("cpu")+10);
166    exportto(Tasks, sem_leaves);
167    // the number of processes waiting for sem_cores with low priority
168    int sem_queue = semaphore(2);
169    exportto(Tasks, sem_queue);
170
171    /* define the Singular type 'task' */
172    newstruct("task", "int index");
173    newstruct("internal_task", "int id, string command, list arguments,"
174        +"def result, string state, list links, int linkID");
175    system("install", "task", "=", createTask, 1);
176    system("install", "task", "==", compareTasks, 2);
177    system("install", "task", "print", printTask, 1);
178
179    /* define (lib-)global variables */
180    list tasks;     // the lib-internal list of tasks
181    exportto(Tasks, tasks);
182    int ntasks;     // the current maximal index in 'tasks'
183    exportto(Tasks, ntasks);
184    int nlinkIDs;   // the current maximal linkID
185    exportto(Tasks, nlinkIDs);
186}
187
188proc createTask(alias string command, alias list arguments)
189"USAGE:   createTask(command, arguments), command string, arguments list
190RETURN:   a task with the given command and arguments whose state is 'created'.
191NOTE:     't = command, arguments;' is a shortcut for
192          't = createTask(command, arguments);'.
193SEE ALSO: startTasks, getCommand, getArguments, getState, killTask, copyTask,
194          compareTasks, printTask
195EXAMPLE:  example createTask; shows an example"
196{
197    internal_task T;
198    ntasks++;
199    tasks[ntasks] = T;
200    tasks[ntasks].command = command;
201    tasks[ntasks].arguments = arguments;
202    tasks[ntasks].state = "created";
203    task t;
204    t.index = ntasks;
205    return(t);
206}
207example
208{
209    "EXAMPLE:";
210    echo = 2;
211    ring R = 0, (x,y), dp;
212    ideal I = x9y2+x10, x2y7-y8;
213    task t = createTask("std", list(I));
214    // This is the same as:
215    // task t = "std", list(I);
216    t;
217    killTask(t);
218}
219
220proc killTask(task t)
221"USAGE:   killTask(t), t task
222RETURN:   nothing. If the state of t is 'started', then t is stopped first. The
223          internal data structures of t are erased and its state is set to
224          'uninitialized'.
225NOTE:     'killTask(t);' is not the same as 'kill t;'. The latter command does
226          not erase the internal data structures of t. Hence killTask() should
227          be called for any no longer needed task in order to free memory.
228SEE ALSO: stopTask, getState, createTask, printTask
229EXAMPLE:  example killTask; shows an example"
230{
231    if (t.index == 0) {
232        return();
233    }
234    if (typeof(tasks[t.index]) != "internal_task") {
235        return();
236    }
237    if (tasks[t.index].state == "started") {
238        stopTask(t);
239    }
240    tasks[t.index] = def(0);
241}
242example
243{
244    "EXAMPLE:";
245    echo = 2;
246    ring R = 0, (x,y), dp;
247    ideal I = x9y2+x10, x2y7-y8;
248    task t = "std", list(I);
249    startTasks(t);
250    t;
251    killTask(t);
252    t;
253    getState(t);
254}
255
256proc copyTask(task t)
257"USAGE:   copyTask(t), t task
258RETURN:   a copy of t.
259NOTE:     'task t1 = copyTask(t2);' is not the same as 'task t1 = t2;'. After
260          the latter command, t1 points to the same object as t2; any changes
261          to t2 will also effect t1. In contrast to this, copyTask() creates a
262          new independend task.
263       @* A task whose state is 'started' cannot be copied.
264SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask,
265          compareTasks, printTask
266EXAMPLE:  example copyTask; shows an example"
267{
268    task t_copy;
269    if (t.index == 0) {
270        return(t_copy);
271    }
272    if (typeof(tasks[t.index]) != "internal_task") {
273        return(t_copy);
274    }
275    if (tasks[t.index].state == "started") {
276        ERROR("cannot copy a task whose state is 'started'");
277    }
278    ntasks++;
279    tasks[ntasks] = tasks[t.index];
280    t_copy.index = ntasks;
281    return(t_copy);
282}
283example
284{
285    "EXAMPLE:";
286    echo = 2;
287    ring R = 0, (x,y), dp;
288    ideal I = x9y2+x10, x2y7-y8;
289    task t1 = "std", list(I);
290    startTasks(t1);
291    waitAllTasks(t1);
292    task t2 = copyTask(t1);
293    killTask(t1);
294    t2;   // t2 survived
295    getResult(t2);
296    killTask(t2);
297}
298
299proc compareTasks(task t1, task t2)
300"USAGE:   compareTasks(t1, t2), t1, t2 tasks
301RETURN:   1, if t1 and t2 coincide;
302          0, otherwise.
303NOTE:     The arguments and the results of t1 and t2 are not compared.
304       @* 't1 == t2' is a shortcut for 'compareTasks(t1, t2)'.
305SEE ALSO: getCommand, getArguments, getResult, getState, copyTask, printTask
306EXAMPLE:  example compareTasks; shows an example"
307{
308    if (tasks[t1.index].id != tasks[t2.index].id) {
309        return(0);
310    }
311    if (tasks[t1.index].command != tasks[t2.index].command) {
312        return(0);
313    }
314    if (tasks[t1.index].state != tasks[t2.index].state) {
315        return(0);
316    }
317    if (tasks[t1.index].linkID != tasks[t2.index].linkID) {
318        return(0);
319    }
320    return(1);
321}
322example
323{
324    "EXAMPLE:";
325    echo = 2;
326    ring R = 0, (x,y), dp;
327    ideal I = x9y2+x10, x2y7-y8;
328    task t1 = "std", list(I);
329    task t2 = "std", list(I);
330    compareTasks(t1, t2);
331    startTasks(t1);
332    waitAllTasks(t1);
333    t1 == t2;   // the same as compareTasks(t1, t2);
334    killTask(t1);
335    killTask(t2);
336    // The arguments and the result are not compared!
337    ideal J = x;
338    task t3 = "std", list(I);
339    task t4 = "std", list(J);
340    t3 == t4;
341    killTask(t3);
342    killTask(t4);
343}
344
345proc printTask(task t)
346"USAGE:   printTask(t), t task
347RETURN:   nothing. Prints information about t.
348NOTE:     'print(t);' and 't;' are shortcuts for 'printTask(t)'.
349SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask
350EXAMPLE:  example printTask; shows an example"
351{
352    if (t.index == 0) {
353        "An uninitialized task";
354        return();
355    }
356    if (typeof(tasks[t.index]) != "internal_task") {
357        "An uninitialized task";
358        return();
359    }
360    "A task with the following properties:"+newline
361        +"command:          "+tasks[t.index].command+newline
362        +"no. of arguments: "+string(size(tasks[t.index].arguments))+newline
363        +"state:            "+tasks[t.index].state;
364}
365example
366{
367    "EXAMPLE:";
368    echo = 2;
369    ring R = 0, (x,y), dp;
370    ideal I = x9y2+x10, x2y7-y8;
371    task t;
372    printTask(t);
373    t = "std", list(I);
374    t;   // the same as printTask(t);
375    startTasks(t);
376    waitAllTasks(t);
377    t;
378    killTask(t);
379}
380
381proc startTasks(list #)
382"USAGE:   startTasks(t1, t2, ...), t1, t2, ... tasks
383RETURN:   nothing. Starts the tasks t1, t2, ... and sets their states to
384          'started'.
385NOTE:     A task whose state is neither 'created' nor 'stopped' cannot be
386          started.
387       @* If startTasks() is applied to a task whose state is 'stopped', then
388          the computation of this task will be restarted from the beginning.
389       @* Tasks can be started from within other tasks. A started task should
390          not be accessed from within any task other than the one within which
391          it was started.
392       @* For each task, the start of its computation is subject to the
393          internal scheduling.
394SEE ALSO: stopTask, waitTasks, pollTask, getState, createTask, printTask
395EXAMPLE:  example startTasks; shows an example"
396{
397    int nargs = size(#);
398    if (nargs == 0) {
399        ERROR("missing argument");
400    }
401    int i;
402    for (i = nargs; i > 0; i--) {
403        if (typeof(#[i]) != "task") {
404            ERROR("argument not of type 'task' (argument no. "+string(i)+")");
405        }
406        if (#[i].index == 0) {
407            ERROR("cannot start an uninitialized task (task no. "
408                +string(i)+")");
409        }
410        if (typeof(tasks[#[i].index]) != "internal_task") {
411            ERROR("cannot start an uninitialized task (task no. "
412                +string(i)+")");
413        }
414        if (tasks[#[i].index].state != "created"
415            && tasks[#[i].index].state != "stopped") {
416            ERROR("cannot start a task whose state is not"+newline
417                +"'created' or 'stopped'");
418        }
419    }
420    for (i = nargs; i > 0; i--) {
421        tasks[#[i].index].id = i;   // has to be set before forking
422        tasks[#[i].index].state = "started";
423    }
424    int pid = system("pid");
425    link l(pid) = "ssi:fork";
426    open(l(pid));
427    write(l(pid), quote(startTasks_child(#, eval(pid))));
428    int port = read(l(pid));
429    link L(pid) = "ssi:connect localhost:"+string(port);
430    open(L(pid));
431    nlinkIDs++;
432    for (i = nargs; i > 0; i--) {
433        tasks[#[i].index].links = list(L(pid), l(pid));
434        tasks[#[i].index].linkID = nlinkIDs;
435    }
436}
437example
438{
439    "EXAMPLE:";
440    echo = 2;
441    ring R = 0, (x,y), dp;
442    ideal I = x9y2+x10, x2y7-y8;
443    task t1 = "std", list(I);
444    task t2 = "slimgb", list(I);
445    startTasks(t1, t2);
446    waitAllTasks(t1, t2);
447    getResult(t1);
448    getResult(t2);
449    killTask(t1);
450    killTask(t2);
451}
452
453/* This procedure is started within the child after forking. */
454static proc startTasks_child(list localtasks, int pid_parent)
455{
456    int port = system("reserve", 1);
457    write(l(pid_parent), port);
458    link L(pid_parent) = system("reservedLink");
459    export(L(pid_parent));
460
461    int sem_write = semaphore(1);
462    int pid = system("pid");
463
464    int nlocaltasks = size(localtasks);
465    intvec state = 0:nlocaltasks;
466        // the internal state of each localtask (see rationale)
467    int nwaiting = nlocaltasks;
468        // the number of local tasks with internal state 0 (waiting)
469    int nfinished;
470        // the number of local tasks with internal state 3 (result sent) or
471        // -1 (stopped)
472    intvec queue = 1..nlocaltasks;
473    int next = 1;
474
475    list links;
476    links[nlocaltasks+1] = l(pid_parent);
477    intvec assignment = 0:nlocaltasks;
478        // the task with id = i is running in link no. assignment[i]
479    int nlinks;
480
481    // data sent by other processes
482    int code;
483    int id;
484    if (!defined(demanded_tasks)) {
485        intvec demanded_tasks;
486        int demanded_tasks_index = 1;
487        exportto(Tasks, demanded_tasks);
488        exportto(Tasks, demanded_tasks_index);
489    }
490    else {
491        demanded_tasks = 0;
492        demanded_tasks_index = 1;
493    }
494    int ndemanded = -1;
495
496    // internal counts
497    int granted_leaves;
498    int results_sent;
499
500    // auxiliary variables
501    intvec waiting_tasks;
502    int wait;
503    int deadlock;
504    int tmp;
505    int i;
506    int j;
507
508    while (nwaiting > 0) {
509        wait = 0;
510        if (nlinks == 0) {
511            wait = -1;
512            granted_leaves++;
513            while (-wait < nwaiting) {
514                if (system("semaphore", "try_acquire", sem_leaves) == 1) {
515                    wait--;
516                }
517                else {
518                    break;
519                }
520            }
521        }
522        while (wait == 0) {
523            wait = waitfirst(links, 500);
524            if (wait == 0) {
525                while (-wait < nwaiting) {
526                    if (system("semaphore", "try_acquire", sem_leaves) == 1) {
527                        wait--;
528                    }
529                    else {
530                        break;
531                    }
532                }
533            }
534        }
535        if (wait < 0) {   // open (-wait) new links
536            while (wait < 0) {
537                wait++;
538                nlinks++;
539                link ll(pid)(nlinks) = "ssi:fork";
540                open(ll(pid)(nlinks));
541                links[nlinks] = ll(pid)(nlinks);
542                write(links[nlinks],
543                    quote(startTasks_grandchild(
544                    eval(localtasks[queue[next]].index), eval(pid_parent),
545                    eval(pid), eval(nlinks), eval(sem_write))));
546                assignment[queue[next]] = nlinks;
547                state[queue[next]] = 1;
548                nwaiting--;
549                next++;
550            }
551            // wait == 0
552        }
553        if (wait > 0 && wait <= nlocaltasks) {
554            code = read(links[wait]);
555            if (code == 1) {   // result computed
556                id = read(links[wait]);
557                state[id] = 2;
558                if (ndemanded > 0 && removeDemanded(id)) {
559                    write(links[wait], 1);
560                    ndemanded--;
561                    results_sent++;
562                }
563            }
564            // code == 2: startTasks_grandchild() ended, do nothing
565        }
566        if (wait == nlocaltasks+1) {
567            code = read(l(pid_parent));
568            if (code == 0) {   // stopTask
569                id = read(l(pid_parent));
570                if (state[id] == 0) {   // waiting
571                    queue = give_priority(queue, intvec(id));
572                    next++;
573                }
574                if (state[id] == 1 || state[id] == 2) {  // started or computed
575                    close(links[assignment[id]]);
576                    open(links[assignment[id]]);
577                    write(links[assignment[id]],
578                        quote(startTasks_grandchild(
579                        eval(localtasks[queue[next]].index), eval(pid_parent),
580                        eval(pid), eval(assignment[id]), eval(sem_write))));
581                    assignment[queue[next]] = assignment[id];
582                    assignment[id] = 0;
583                    state[queue[next]] = 1;
584                    next++;
585                }
586                // state[id] == -1 (stopped) or state[id] == 3 (sent)
587                // should not happen
588                nwaiting--;
589                nfinished++;
590                state[id] = -1;
591            }
592            if (code == 1) {   // waitTasks
593                demanded_tasks = read(l(pid_parent));
594                demanded_tasks_index = size(demanded_tasks);
595                ndemanded = read(l(pid_parent));
596                if (ndemanded > demanded_tasks_index) {
597                    ndemanded = demanded_tasks_index;
598                }
599                if (demanded_tasks == 0 && ndemanded == -1) {
600                    write(l(pid_parent), results_sent);
601                    continue;
602                }
603                else {
604                    results_sent = 0;
605                }
606                demanded_tasks = demanded_tasks[demanded_tasks_index..1];
607                deadlock = 0;
608                waiting_tasks = 0:demanded_tasks_index;
609                j = 0;
610                for (i = demanded_tasks_index; i > 0; i--) {
611                    id = demanded_tasks[i];
612                    if (state[id] == 0) {   // waiting
613                        j++;
614                        waiting_tasks[j] = id;
615                        deadlock = 1;
616                    }
617                }
618                if (j > 0) {
619                    waiting_tasks = waiting_tasks[1..j];
620                    queue = queue[next..size(queue)];
621                    next = 1;
622                    queue = give_priority(queue, waiting_tasks);
623                    waiting_tasks = 0;
624                }
625                for (i = demanded_tasks_index; i > 0; i--) {
626                    id = demanded_tasks[i];
627                    if (state[id] == 1) {   // started
628                        deadlock = 0;
629                    }
630                    if (state[id] == 2) {   // computed
631                        write(links[assignment[id]], 1);
632                        tmp = removeDemanded(id);
633                        ndemanded--;
634                        results_sent++;
635                        deadlock = 0;
636                    }
637                }
638                if (deadlock) {
639                    granted_leaves++;
640                    nlinks++;
641                    link ll(pid)(nlinks) = "ssi:fork";
642                    open(ll(pid)(nlinks));
643                    links[nlinks] = ll(pid)(nlinks);
644                    write(links[nlinks],
645                        quote(startTasks_grandchild(
646                        eval(localtasks[queue[next]].index), eval(pid_parent),
647                        eval(pid), eval(nlinks), eval(sem_write))));
648                    assignment[queue[next]] = nlinks;
649                    state[queue[next]] = 1;
650                    nwaiting--;
651                    next++;
652                }
653            }
654            if (code == 2) {   // pollTask
655                id = read(l(pid_parent));
656                if (state[id] == 0) {   // waiting
657                    queue = queue[next..size(queue)];
658                    next = 1;
659                    queue = give_priority(queue, intvec(id));
660                }
661                if (state[id] == 2) {   // computed
662                    write(links[assignment[id]], 1);
663                }
664                write(l(pid_parent), state[id]);
665            }
666            if (code == 3) {   // got result
667                id = read(l(pid_parent));
668                write(links[assignment[id]],
669                    quote(startTasks_grandchild(
670                    eval(localtasks[queue[next]].index), eval(pid_parent),
671                    eval(pid), eval(assignment[id]), eval(sem_write))));
672                assignment[queue[next]] = assignment[id];
673                assignment[id] = 0;
674                state[queue[next]] = 1;
675                state[id] = 3;
676                nwaiting--;
677                nfinished++;
678                next++;
679            }
680        }
681    }
682    while (nfinished < nlocaltasks || ndemanded != -1) {
683        wait = waitfirst(links);
684        if (wait <= nlocaltasks) {
685            code = read(links[wait]);
686            if (code == 1) {   // result computed
687                id = read(links[wait]);
688                state[id] = 2;
689                if (ndemanded > 0 && removeDemanded(id)) {
690                    write(links[wait], 1);
691                    ndemanded--;
692                    results_sent++;
693                }
694            }
695            // code == 2: startTasks_grandchild() ended, do nothing
696        }
697        if (wait == nlocaltasks+1) {
698            code = read(l(pid_parent));
699            if (code == 0) {   // stopTask
700                id = read(l(pid_parent));
701                if (state[id] == 1 || state[id] == 2) {  // started or computed
702                    close(links[assignment[id]]);
703                    if (nlinks > granted_leaves) {
704                        tmp = system("semaphore", "release", sem_leaves);
705                    }
706                    links[assignment[id]] = def(0);
707                    nlinks--;
708                    assignment[id] = 0;
709                    nfinished++;
710                }
711                // else: nothing to do
712                state[id] = -1;
713            }
714            if (code == 1) {   // waitTasks
715                demanded_tasks = read(l(pid_parent));
716                demanded_tasks_index = size(demanded_tasks);
717                ndemanded = read(l(pid_parent));
718                if (ndemanded > demanded_tasks_index) {
719                    ndemanded = demanded_tasks_index;
720                }
721                if (demanded_tasks == 0 && ndemanded == -1) {
722                    write(l(pid_parent), results_sent);
723                    continue;
724                }
725                else {
726                    results_sent = 0;
727                }
728                demanded_tasks = demanded_tasks[demanded_tasks_index..1];
729                for (i = demanded_tasks_index; i > 0; i--) {
730                    id = demanded_tasks[i];
731                    if (state[id] == 2) {   // computed
732                        write(links[assignment[id]], 1);
733                        tmp = removeDemanded(id);
734                        ndemanded--;
735                        results_sent++;
736                    }
737                }
738            }
739            if (code == 2) {   // pollTask
740                id = read(l(pid_parent));
741                if (state[id] == 2) {   // computed
742                    write(links[assignment[id]], 1);
743                }
744                write(l(pid_parent), state[id]);
745            }
746            if (code == 3) {   // got result
747                id = read(l(pid_parent));
748                close(links[assignment[id]]);
749                if (nlinks > granted_leaves) {
750                    tmp = system("semaphore", "release", sem_leaves);
751                }
752                links[assignment[id]] = def(0);
753                nlinks--;
754                assignment[id] = 0;
755                state[id] = 3;
756                nfinished++;
757            }
758        }
759    }
760}
761
762/* This procedure has to be started within the grandchildren after forking. */
763static proc startTasks_grandchild(int index, int pid_grandparent,
764    int pid_parent, int link_no, int sem_write)
765{
766    def result;
767    int tmp = system("semaphore", "acquire", sem_queue);
768    tmp = system("semaphore", "acquire", sem_cores);
769    tmp = system("semaphore", "release", sem_queue);
770    execute("result = "+tasks[index].command+"("
771        +argsToString("tasks[index].arguments", size(tasks[index].arguments))
772        +");");
773    tmp = system("semaphore", "release", sem_cores);
774    write(ll(pid_parent)(link_no), 1);
775    write(ll(pid_parent)(link_no), tasks[index].id);
776    tmp = read(ll(pid_parent)(link_no));
777    tmp = system("semaphore", "acquire", sem_write);
778    write(L(pid_grandparent), index);
779    write(L(pid_grandparent), result);
780    tmp = system("semaphore", "release", sem_write);
781    return(2);
782}
783
784/* Remove id from demanded_tasks and return 1, if id is an element of
785 * demanded_tasks; return 0, otherwise. Note:
786 * - demanded_tasks and demanded_tasks_index are (lib-)global objects
787 *   exported in startTasks_child().
788 * - demanded_tasks_index is used to avoid copying. It is defined to be
789 *   the greatest integer with demanded_tasks[demanded_tasks_index] != 0
790 *   and demanded_tasks[demanded_tasks_index+1] == 0 (if defined).
791 */
792static proc removeDemanded(alias int id)
793{
794    if (demanded_tasks[demanded_tasks_index] == id) {
795        demanded_tasks[demanded_tasks_index] = 0;
796        demanded_tasks_index--;
797        return(1);
798    }
799    int i;
800    for (i = demanded_tasks_index-1; i > 0; i--) {
801        if (demanded_tasks[i] == id) {
802            demanded_tasks[i..demanded_tasks_index]
803                = demanded_tasks[(i+1)..demanded_tasks_index], 0;
804            demanded_tasks_index--;
805            return(1);
806        }
807    }
808    return(0);
809}
810
811/* Move the elements in 'preferred' to the beginning of 'queue'. We may assume
812 * that
813 * - 'preferred' is a subset of 'queue';
814 * - the elements of 'preferred' are distinct and non-zero;
815 * - the elements of 'queue' are distinct and non-zero.
816 * For performance reasons, we may also assume that 'queue' and 'preferred' are
817 * more or less ordered in most cases. Note that queue has the format
818 * '0, indices, 0'.
819 */
820static proc give_priority(intvec queue, intvec preferred)
821{
822    int size_queue = size(queue);
823    int size_preferred = size(preferred);
824    if (size_queue == size_preferred) {
825        return(queue);
826    }
827    int index = size_queue;
828    int i;
829    int j;
830    for (i = size_preferred; i > 0; i--) {
831        for (j = size_queue; j > 0; j--) {
832            if (queue[index] == preferred[i]) {
833                queue[index] = 0;
834                break;
835            }
836            index--;
837            if (index == 0) {
838                index = size_queue;
839            }
840        }
841    }
842    intvec not_preferred = 0:(size_queue-size_preferred);
843    index = 1;
844    for (i = 1; i <= size_queue; i++) {
845        if (queue[i]) {
846            not_preferred[index] = queue[i];
847            index++;
848        }
849    }
850    queue = preferred, not_preferred;
851    return(queue);
852}
853
854proc stopTask(task t)
855"USAGE:   stopTask(t), t task
856RETURN:   nothing. Stops the t and sets its state to 'stopped'.
857NOTE:     A task whose state is not 'started' cannot be stopped.
858       @* Intermediate results are discarded when a task is stopped.
859       @* killTask() should be called for any no longer needed task.
860SEE ALSO: startTasks, waitTasks, pollTask, getState, killTask, printTask
861EXAMPLE:  example stopTask; shows an example"
862{
863    if (t.index == 0) {
864        ERROR("cannot stop an uninitialized task");
865    }
866    if (typeof(tasks[t.index]) != "internal_task") {
867        ERROR("cannot stop an uninitialized task");
868    }
869    if (tasks[t.index].state != "started") {
870        ERROR("cannot stop a task whose state is not 'started'");
871    }
872    write(tasks[t.index].links[2], 0);
873    write(tasks[t.index].links[2], tasks[t.index].id);
874    tasks[t.index].id = 0;
875    tasks[t.index].links = list();
876    tasks[t.index].linkID = 0;
877    tasks[t.index].state = "stopped";
878}
879example
880{
881    "EXAMPLE:";
882    echo = 2;
883    ring R = 0, (x,y), dp;
884    ideal I = x9y2+x10, x2y7-y8;
885    task t = "std", list(I);
886    startTasks(t);
887    stopTask(t);
888    t;
889    killTask(t);
890}
891
892proc waitTasks(list T, int N, list #)
893"USAGE:   waitTasks(T, N[, timeout]), T list of tasks, N int, timeout int
894RETURN:   an ordered list of the indices of those tasks which have been
895          successfully completed. The state of these tasks is set to
896          'completed'.
897       @* The procedure waits for N tasks to complete.
898       @* An optional timeout in ms can be provided. Default is 0 which
899          disables the timeout.
900NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
901          waited for.
902       @* The result of any completed task can be accessed via @ref{getResult}.
903       @* The returned list may contain more than N entries if the computation
904          of some tasks has already finished and/or if several tasks finish
905          \"at the same time\". It may contain less than N entries in
906          the case of timeout or errors occurring.
907       @* Polling is guaranteed, i.e. the index of any task t for which
908          'pollTask(t);' would return 1 will appear in the returned list.
909SEE ALSO: startTasks, pollTask, getResult, getState, printTask
910EXAMPLE:  example waitTasks; shows an example"
911{
912    /* initialize the timer */
913    int oldtimerresolution = system("--ticks-per-sec");
914    system("--ticks-per-sec", 1000);
915    int starting_time = rtimer;
916
917    /* read optional parameters */
918    int timeout;
919    if (size(#) > 0) {
920        if (size(#) > 1 || typeof(#[1]) != "int") {
921            ERROR("wrong optional parameter");
922        }
923        timeout = #[1];
924    }
925
926    /* check for errors */
927    if (timeout < 0) {
928        ERROR("negative timeout");
929    }
930    int nargs = size(T);
931    if (nargs == 0) {
932        ERROR("missing task");
933    }
934    if (N < 1 || N > nargs) {
935        ERROR("wrong number of tasks to wait for");
936    }
937    int i;
938    for (i = nargs; i > 0; i--) {
939        if (typeof(T[i]) != "task") {
940            ERROR("element not of type 'task' (element no. "+string(i)+")");
941        }
942        if (T[i].index == 0) {
943            ERROR("cannot wait for an uninitialized task (task no. "+string(i)
944                +")");
945        }
946        if (typeof(tasks[T[i].index]) != "internal_task") {
947            ERROR("cannot wait for an uninitialized task (task no. "+string(i)
948                +")");
949        }
950        if (tasks[T[i].index].state != "started"
951            && tasks[T[i].index].state != "completed") {
952            ERROR("cannot wait for a task whose state is not"+newline
953                +"'started' or 'completed' (task no. "+string(i)+")");
954        }
955    }
956
957    /* sort the tasks */
958    int ncompleted;
959    list requests;
960    list links;
961    int sorted_in;
962    int j;
963    for (i = 1; i <= nargs; i++) {
964        if (tasks[T[i].index].state == "completed") {
965            ncompleted++;
966        }
967        else {   // tasks[T[i].index].state == "started"
968            sorted_in = 0;
969            for (j = size(requests); j > 0; j--) {
970                if (requests[j][1] == tasks[T[i].index].linkID) {
971                    requests[j][2][size(requests[j][2])+1] =
972                        tasks[T[i].index].id;
973                    sorted_in = 1;
974                    break;
975                }
976            }
977            if (!sorted_in) {
978                requests[size(requests)+1] = list(tasks[T[i].index].linkID,
979                    intvec(tasks[T[i].index].id),
980                    tasks[T[i].index].links[2]);
981                links[size(links)+1] = tasks[T[i].index].links[1];
982            }
983        }
984    }
985
986    /* send the reqests */
987    for (j = size(requests); j > 0; j--) {
988        write(requests[j][3], 1);
989        write(requests[j][3], requests[j][2]);
990        write(requests[j][3], N-ncompleted);
991    }
992
993    /* wait for the results */
994    int wait;
995    int index;
996    int results_got;
997    int remaining_time;
998    int tmp;
999    while (ncompleted < N) {
1000        wait = waitfirst(links, 0);
1001        if (wait == 0) {
1002            if (timeout == 0) {
1003                tmp = system("semaphore", "release", sem_cores);
1004                wait = waitfirst(links);
1005                tmp = system("semaphore", "acquire", sem_cores);
1006            }
1007            else {
1008                remaining_time = timeout-(rtimer-starting_time);
1009                if (remaining_time < 0) {
1010                    break;
1011                }
1012                else {
1013                    tmp = system("semaphore", "release", sem_cores);
1014                    wait = waitfirst(links, remaining_time);
1015                    tmp = system("semaphore", "acquire", sem_cores);
1016                }
1017            }
1018        }
1019        if (wait < 1) {
1020            break;
1021        }
1022        index = read(links[wait]);
1023        tasks[index].result = read(links[wait]);
1024        write(tasks[index].links[2], 3);
1025        write(tasks[index].links[2], tasks[index].id);
1026        tasks[index].id = 0;
1027        tasks[index].links = list();
1028        tasks[index].linkID = 0;
1029        tasks[index].state = "completed";
1030        ncompleted++;
1031        results_got++;
1032    }
1033    if (wait == -1) {
1034        ERROR("error in waitfirst()");
1035    }
1036
1037    /* end communication process */
1038    for (j = size(requests); j > 0; j--) {
1039        write(requests[j][3], 1);
1040        write(requests[j][3], 0);
1041        write(requests[j][3], -1);
1042    }
1043    int results_sent;
1044    for (j = size(requests); j > 0; j--) {
1045        results_sent = results_sent + read(requests[j][3]);
1046    }
1047    while (results_sent > results_got) {
1048        wait = waitfirst(links);
1049        if (wait == -1) {
1050            ERROR("error in waitfirst()");
1051        }
1052        index = read(links[wait]);
1053        tasks[index].result = read(links[wait]);
1054        write(tasks[index].links[2], 3);
1055        write(tasks[index].links[2], tasks[index].id);
1056        tasks[index].id = 0;
1057        tasks[index].links = list();
1058        tasks[index].linkID = 0;
1059        tasks[index].state = "completed";
1060        results_got++;
1061    }
1062
1063    /* list completed tasks */
1064    list completed;
1065    completed[nargs+1] = 0;
1066    j = 0;
1067    for (i = 1; i <= nargs; i++) {
1068        if (tasks[T[i].index].state == "completed") {
1069            j++;
1070            completed[j] = i;
1071        }
1072    }
1073    completed[nargs+1] = def(0);
1074
1075    /* return the result */
1076    system("--ticks-per-sec", oldtimerresolution);
1077    return(completed);
1078}
1079example
1080{
1081    "EXAMPLE:";
1082    echo = 2;
1083    ring R = 0, (x,y), dp;
1084    ideal I = x9y2+x10, x2y7-y8;
1085    task t1 = "std", list(I);
1086    task t2 = "slimgb", list(I);
1087    startTasks(t1, t2);
1088    waitTasks(list(t1, t2), 2);   // wait for both tasks
1089    getResult(t1);
1090    getResult(t2);
1091    killTask(t1);
1092    killTask(t2);
1093}
1094
1095proc waitAllTasks(list #)
1096"USAGE:   waitAllTasks(t1, t2, ...), t1, t2, ... tasks
1097RETURN:   nothing. Waits for all the tasks t1, t2, ... to complete. The state
1098          of the tasks is set to 'completed'.
1099NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
1100          waited for.
1101       @* The result of any completed task can be accessed via @ref{getResult}.
1102       @* 'waitAllTasks(t1, t2, ...);' is a shortcut for
1103          'waitTasks(list(t1, t2, ...), size(list(t1, t2, ...)));'. Since
1104          returning a list of the indices of the completed tasks does not make
1105          sense in this case, nothing is returned.
1106SEE ALSO: waitTasks, startTasks, pollTask, getResult, getState, printTask
1107EXAMPLE:  example waitAllTasks; shows an example"
1108{
1109    list tmp = waitTasks(#, size(#));
1110}
1111example
1112{
1113    "EXAMPLE:";
1114    echo = 2;
1115    ring R = 0, (x,y), dp;
1116    ideal I = x9y2+x10, x2y7-y8;
1117    task t1 = "std", list(I);
1118    task t2 = "slimgb", list(I);
1119    startTasks(t1, t2);
1120    waitAllTasks(t1, t2);   // the same as 'waitTasks(list(t1, t2), 2);',
1121                            // but without return value
1122    getResult(t1);
1123    getResult(t2);
1124    killTask(t1);
1125    killTask(t2);
1126}
1127
1128proc pollTask(task t)
1129"USAGE:   pollTask(t), t task
1130RETURN:   1, if the computation of the task t has successfully finished;
1131          0, otherwise.
1132       @* The state of any task whose computation has successfully finished is
1133          set to 'completed'.
1134NOTE:     A task whose state is neither 'started' nor 'completed' cannot be
1135          polled.
1136       @* The result of any completed task can be accessed via @ref{getResult}.
1137       @* pollTask() should return immediately. However, receiving the result
1138          of the task may take some time.
1139SEE ALSO: startTasks, waitTasks, getResult, getState, printTask
1140EXAMPLE:  example pollTask; shows an example"
1141{
1142    if (t.index == 0) {
1143        ERROR("cannot poll an uninitialized task");
1144    }
1145    if (typeof(tasks[t.index]) != "internal_task") {
1146        ERROR("cannot poll an uninitialized task");
1147    }
1148    if (tasks[t.index].state != "started"
1149        && tasks[t.index].state != "completed") {
1150        ERROR("cannot poll a task whose state is not"+newline
1151            +"'started' or 'completed'");
1152    }
1153    if (tasks[t.index].state == "completed") {
1154        return(1);
1155    }
1156    // tasks[t.index].state == "started"
1157    write(tasks[t.index].links[2], 2);
1158    write(tasks[t.index].links[2], tasks[t.index].id);
1159    int state = read(tasks[t.index].links[2]);
1160    if (state == 0 || state == 1) {   // waiting or started
1161        return(0);
1162    }
1163    if (state == 2) {   // computed
1164        int index = read(tasks[t.index].links[1]);   // index == t.index
1165        tasks[t.index].result = read(tasks[t.index].links[1]);
1166        write(tasks[t.index].links[2], 3);
1167        write(tasks[t.index].links[2], tasks[t.index].id);
1168        tasks[t.index].id = 0;
1169        tasks[t.index].links = list();
1170        tasks[t.index].linkID = 0;
1171        tasks[t.index].state = "completed";
1172        return(1);
1173    }
1174    // state == -1 (stopped) or state == 3 (sent) should not happen
1175}
1176example
1177{
1178    "EXAMPLE:";
1179    echo = 2;
1180    ring R = 0, (x,y), dp;
1181    ideal I = x9y2+x10, x2y7-y8;
1182    task t = "std", list(I);
1183    startTasks(t);
1184    waitAllTasks(t);
1185    pollTask(t);   // task already completed
1186    t;
1187    getResult(t);
1188    killTask(t);
1189}
1190
1191proc getCommand(task t)
1192"USAGE:   getCommand(t), t task
1193RETURN:   a string, the command of t.
1194NOTE:     This command cannot be applied to tasks whose state is
1195          'uninitialized'.
1196SEE ALSO: getArguments, getResult, getState, createTask, printTask
1197EXAMPLE:  example getCommand; shows an example"
1198{
1199    if (t.index == 0) {
1200        ERROR("cannot get command of an uninitialized task");
1201    }
1202    if (typeof(tasks[t.index]) != "internal_task") {
1203        ERROR("cannot get command of an uninitialized task");
1204    }
1205    return(tasks[t.index].command);
1206}
1207example
1208{
1209    "EXAMPLE:";
1210    echo = 2;
1211    ring R = 0, (x,y), dp;
1212    ideal I = x9y2+x10, x2y7-y8;
1213    task t = "std", list(I);
1214    getCommand(t);
1215    killTask(t);
1216}
1217
1218proc getArguments(task t)
1219"USAGE:   getArguments(t), t task
1220RETURN:   a list, the arguments of t.
1221NOTE:     This command cannot be applied to tasks whose state is
1222          'uninitialized'.
1223SEE ALSO: getCommand, getResult, getState, createTask, printTask
1224EXAMPLE:  example getArguments; shows an example"
1225{
1226    if (t.index == 0) {
1227        ERROR("cannot get arguments of an uninitialized task");
1228    }
1229    if (typeof(tasks[t.index]) != "internal_task") {
1230        ERROR("cannot get arguments of an uninitialized task");
1231    }
1232    return(tasks[t.index].arguments);
1233}
1234example
1235{
1236    "EXAMPLE:";
1237    echo = 2;
1238    ring R = 0, (x,y), dp;
1239    ideal I = x9y2+x10, x2y7-y8;
1240    task t = "std", list(I);
1241    getArguments(t);
1242    killTask(t);
1243}
1244
1245proc getResult(task t)
1246"USAGE:   getResult(t), t task
1247RETURN:   the result of t.
1248NOTE:     This command cannot be applied to tasks whose state is not
1249          'completed'.
1250SEE ALSO: getCommand, getArguments, getState, waitTasks, pollTask, printTask
1251EXAMPLE:  example getResult; shows an example"
1252{
1253    if (t.index == 0) {
1254        ERROR("cannot get result of an uninitialized task");
1255    }
1256    if (typeof(tasks[t.index]) != "internal_task") {
1257        ERROR("cannot get result of an uninitialized task");
1258    }
1259    if (tasks[t.index].state != "completed") {
1260        ERROR("cannot get result of a task which is not completed");
1261    }
1262    return(tasks[t.index].result);
1263}
1264example
1265{
1266    "EXAMPLE:";
1267    echo = 2;
1268    ring R = 0, (x,y), dp;
1269    ideal I = x9y2+x10, x2y7-y8;
1270    task t = "std", list(I);
1271    startTasks(t);
1272    waitAllTasks(t);
1273    getResult(t);
1274    killTask(t);
1275}
1276
1277proc getState(task t)
1278"USAGE:   getState(t), t task
1279RETURN:   a string, the state of t.
1280SEE ALSO: getCommand, getArguments, getResult, printTask, createTask,
1281          startTasks, stopTask, waitTasks, pollTask, killTask
1282EXAMPLE:  example getState; shows an example"
1283{
1284    if (t.index == 0) {
1285        return("uninitialized");
1286    }
1287    if (typeof(tasks[t.index]) != "internal_task") {
1288        return("uninitialized");
1289    }
1290    return(tasks[t.index].state);
1291}
1292example
1293{
1294    "EXAMPLE:";
1295    echo = 2;
1296    ring R = 0, (x,y), dp;
1297    ideal I = x9y2+x10, x2y7-y8;
1298    task t = "std", list(I);
1299    getState(t);
1300    startTasks(t);
1301    getState(t);
1302    waitAllTasks(t);
1303    getState(t);
1304    killTask(t);
1305    getState(t);
1306}
1307
1308/ * construct the string "name[1], name[2], name[3], ..., name[length]" */
1309static proc argsToString(string name, int length)
1310{
1311    string output;
1312    if (length > 0) {
1313        output = name+"[1]";
1314    }
1315    int i;
1316    for (i = 2; i <= length; i++) {
1317        output = output+", "+name+"["+string(i)+"]";
1318    }
1319    return(output);
1320}
Note: See TracBrowser for help on using the repository browser.