////////////////////////////////////////////////////////////////////// version="version tasks.lib 4.0.0.0 Dec_2013 "; // $Id$ category="General purpose"; info=" LIBRARY: tasks.lib A parallel framework based on tasks AUTHOR: Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de OVERVIEW: This library provides a parallel framework based on tasks. It introduces a new Singular type @code{task}; an object of this type is a command (given by a string) applied to a list of arguments. Tasks can be computed in parallel via the procedures in this library and they can even be started recursively, i.e. from within other tasks. tasks.lib respects the limits for computational resources defined in @ref{resources.lib}, i.e., all tasks within the same Singular session will not use more computational resources than provided via resources.lib, even if tasks are started recursively. The Singular library @ref{parallel.lib} provides implementations of several parallel 'skeletons' based on tasks.lib. KEYWORDS: parallelization; distributed computing; task SEE ALSO: resources_lib, parallel_lib PROCEDURES: createTask(); create a task killTask(); kill a task copyTask(); copy a task compareTasks(); compare two tasks printTask(); print a task startTasks(); start tasks stopTask(); stop a task waitTasks(); wait for a certain number of tasks waitAllTasks(); wait for all tasks pollTask(); poll a task getCommand(); get the command of a task getArguments(); get the arguments of a task getResult(); get the result of a task getState(); get the state of a task "; /* RATIONALE FOR DEVELOPERS The Singular type 'task' ------------------------ tasks.lib introduces a Singular type 'task' which makes use of a pointer-like model in order to avoid unnecessary copying of data. 'task' is defined as a newstruct whose only member is 'int index'. This index points to an entry in the lib-internal list 'tasks'. The elements of this list are of the type 'internal_task' which is defined as a newstruct with the following members: int id - the internal ID string command - the command list arguments - the arguments def result - the result string state - the current state, see 'The life cycle of a task' list links - control handles, see 'Links' int linkID - the ID of the control handles The life cycle of a task ------------------------ 'uninitialized' --> 'created' --> 'started' --> 'completed' | ^ v | 'stopped' The state of a task t is 'uninitialized' iff (t.index == 0) or (typeof(tasks[t.index]) != "internal_task"). A task can be reset to 'uninitialized' by killTask() at any time. Assigned members for 'internal_task' ------------------------------------ For each state, the following members of an internal_task must be assigned: created: command arguments state started: id command arguments state links linkID stopped: command arguments state completed: command arguments result state All other members should be wiped out. Local supervisors ----------------- A call of 'startTasks(t(1..n));' for tasks t(1), ..., t(n) creates a child process which plays the role of a supervisor for these tasks. The computation of the tasks is done in child processes of the supervisor. The supervisor assigns an internal state to each task which is represented by an integer. The meaning of these integers and their relation to the global state of each task is as follows: internal state | meaning | corresponding global state ---------------|-------------------|--------------------------- 0 | waiting | started 1 | started | started 2 | (result) computed | started 3 | (result) sent | completed -1 | stopped | stopped Links ----- The ssi link between the main process and the supervisor is named 'l(pid)' where pid is the PID of the main process. The links between the supervisor and its child processes are named 'll(pid)(1)', 'll(pid)(2)', ... where pid is the PID of the supervisor. The link between the child processes of the supervisor and the main process is named 'L(pid)' where pid is the PID of the main process. This link is only for sending the results to the main process and must not be used in the other direction! For any task t whose state is 'started', tasks[t.index].links is list(L(pid), l(pid)) where pid is the PID of the main process. Communication model ------------------- stopTask() <--> supervisor 0, id --> waitTasks() <--> supervisor (demanded_task is an intvec containing the IDs of the tasks which are being waited for; ndemanded is the number of tasks that is being waited for.) 1, demanded_tasks, ndemanded --> [receiving results] 1, 0:2, -1 --> results_sent <-- [receiving remaining results] pollTask() <--> supervisor 2, id --> state <-- [receive result if state == 2 (computed)] startTasks_child() <--> startTasks_grandchild() [compute the result] 1, id <-- [wait until the result is requested] 1 --> [send the result] 2 <-- sending and receiving results: main process <--> supervisor <--> startTasks_grandchild() [request the result, see above] index, result (main process <-- startTasks_grandchild()) 3, id (main process --> supervisor) */ LIB "resources.lib"; static proc mod_init() { /* initialize the semaphores */ if (!defined(Resources)) { LIB "resources.lib"; } // the number of processor cores int sem_cores = Resources::sem_cores; exportto(Tasks, sem_cores); // the number of leaves in the parallel tree (not strict) int sem_leaves = semaphore(system("cpu")+10); exportto(Tasks, sem_leaves); // the number of processes waiting for sem_cores with low priority int sem_queue = semaphore(2); exportto(Tasks, sem_queue); /* define the Singular type 'task' */ newstruct("task", "int index"); newstruct("internal_task", "int id, string command, list arguments," +"def result, string state, list links, int linkID"); system("install", "task", "=", createTask, 1); system("install", "task", "==", compareTasks, 2); system("install", "task", "print", printTask, 1); /* define (lib-)global variables */ list tasks; // the lib-internal list of tasks exportto(Tasks, tasks); int ntasks; // the current maximal index in 'tasks' exportto(Tasks, ntasks); int nlinkIDs; // the current maximal linkID exportto(Tasks, nlinkIDs); } proc createTask(alias string command, alias list arguments) "USAGE: createTask(command, arguments), command string, arguments list RETURN: a task with the given command and arguments whose state is 'created'. NOTE: 't = command, arguments;' is a shortcut for 't = createTask(command, arguments);'. SEE ALSO: startTasks, getCommand, getArguments, getState, killTask, copyTask, compareTasks, printTask EXAMPLE: example createTask; shows an example" { internal_task T; ntasks++; tasks[ntasks] = T; tasks[ntasks].command = command; tasks[ntasks].arguments = arguments; tasks[ntasks].state = "created"; task t; t.index = ntasks; return(t); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = createTask("std", list(I)); // This is the same as: // task t = "std", list(I); t; killTask(t); } proc killTask(task t) "USAGE: killTask(t), t task RETURN: nothing. If the state of t is 'started', then t is stopped first. The internal data structures of t are erased and its state is set to 'uninitialized'. NOTE: 'killTask(t);' is not the same as 'kill t;'. The latter command does not erase the internal data structures of t. Hence killTask() should be called for any no longer needed task in order to free memory. SEE ALSO: stopTask, getState, createTask, printTask EXAMPLE: example killTask; shows an example" { if (t.index == 0) { return(); } if (typeof(tasks[t.index]) != "internal_task") { return(); } if (tasks[t.index].state == "started") { stopTask(t); } tasks[t.index] = def(0); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); startTasks(t); t; killTask(t); t; getState(t); } proc copyTask(task t) "USAGE: copyTask(t), t task RETURN: a copy of t. NOTE: 'task t1 = copyTask(t2);' is not the same as 'task t1 = t2;'. After the latter command, t1 points to the same object as t2; any changes to t2 will also effect t1. In contrast to this, copyTask() creates a new independend task. @* A task whose state is 'started' cannot be copied. SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask, compareTasks, printTask EXAMPLE: example copyTask; shows an example" { task t_copy; if (t.index == 0) { return(t_copy); } if (typeof(tasks[t.index]) != "internal_task") { return(t_copy); } if (tasks[t.index].state == "started") { ERROR("cannot copy a task whose state is 'started'"); } ntasks++; tasks[ntasks] = tasks[t.index]; t_copy.index = ntasks; return(t_copy); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t1 = "std", list(I); startTasks(t1); waitAllTasks(t1); task t2 = copyTask(t1); killTask(t1); t2; // t2 survived getResult(t2); killTask(t2); } proc compareTasks(task t1, task t2) "USAGE: compareTasks(t1, t2), t1, t2 tasks RETURN: 1, if t1 and t2 coincide; 0, otherwise. NOTE: The arguments and the results of t1 and t2 are not compared. @* 't1 == t2' is a shortcut for 'compareTasks(t1, t2)'. SEE ALSO: getCommand, getArguments, getResult, getState, copyTask, printTask EXAMPLE: example compareTasks; shows an example" { if (tasks[t1.index].id != tasks[t2.index].id) { return(0); } if (tasks[t1.index].command != tasks[t2.index].command) { return(0); } if (tasks[t1.index].state != tasks[t2.index].state) { return(0); } if (tasks[t1.index].linkID != tasks[t2.index].linkID) { return(0); } return(1); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t1 = "std", list(I); task t2 = "std", list(I); compareTasks(t1, t2); startTasks(t1); waitAllTasks(t1); t1 == t2; // the same as compareTasks(t1, t2); killTask(t1); killTask(t2); // The arguments and the result are not compared! ideal J = x; task t3 = "std", list(I); task t4 = "std", list(J); t3 == t4; killTask(t3); killTask(t4); } proc printTask(task t) "USAGE: printTask(t), t task RETURN: nothing. Prints information about t. NOTE: 'print(t);' and 't;' are shortcuts for 'printTask(t)'. SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask EXAMPLE: example printTask; shows an example" { if (t.index == 0) { "An uninitialized task"; return(); } if (typeof(tasks[t.index]) != "internal_task") { "An uninitialized task"; return(); } "A task with the following properties:"+newline +"command: "+tasks[t.index].command+newline +"no. of arguments: "+string(size(tasks[t.index].arguments))+newline +"state: "+tasks[t.index].state; } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t; printTask(t); t = "std", list(I); t; // the same as printTask(t); startTasks(t); waitAllTasks(t); t; killTask(t); } proc startTasks(list #) "USAGE: startTasks(t1, t2, ...), t1, t2, ... tasks RETURN: nothing. Starts the tasks t1, t2, ... and sets their states to 'started'. NOTE: A task whose state is neither 'created' nor 'stopped' cannot be started. @* If startTasks() is applied to a task whose state is 'stopped', then the computation of this task will be restarted from the beginning. @* Tasks can be started from within other tasks. A started task should not be accessed from within any task other than the one within which it was started. @* For each task, the start of its computation is subject to the internal scheduling. SEE ALSO: stopTask, waitTasks, pollTask, getState, createTask, printTask EXAMPLE: example startTasks; shows an example" { int nargs = size(#); if (nargs == 0) { ERROR("missing argument"); } int i; for (i = nargs; i > 0; i--) { if (typeof(#[i]) != "task") { ERROR("argument not of type 'task' (argument no. "+string(i)+")"); } if (#[i].index == 0) { ERROR("cannot start an uninitialized task (task no. " +string(i)+")"); } if (typeof(tasks[#[i].index]) != "internal_task") { ERROR("cannot start an uninitialized task (task no. " +string(i)+")"); } if (tasks[#[i].index].state != "created" && tasks[#[i].index].state != "stopped") { ERROR("cannot start a task whose state is not"+newline +"'created' or 'stopped'"); } } for (i = nargs; i > 0; i--) { tasks[#[i].index].id = i; // has to be set before forking tasks[#[i].index].state = "started"; } int pid = system("pid"); link l(pid) = "ssi:fork"; open(l(pid)); write(l(pid), quote(startTasks_child(#, eval(pid)))); int port = read(l(pid)); link L(pid) = "ssi:connect localhost:"+string(port); open(L(pid)); nlinkIDs++; for (i = nargs; i > 0; i--) { tasks[#[i].index].links = list(L(pid), l(pid)); tasks[#[i].index].linkID = nlinkIDs; } } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t1 = "std", list(I); task t2 = "slimgb", list(I); startTasks(t1, t2); waitAllTasks(t1, t2); getResult(t1); getResult(t2); killTask(t1); killTask(t2); } /* This procedure is started within the child after forking. */ static proc startTasks_child(list localtasks, int pid_parent) { int port = system("reserve", 1); write(l(pid_parent), port); link L(pid_parent) = system("reservedLink"); export(L(pid_parent)); int sem_write = semaphore(1); int pid = system("pid"); int nlocaltasks = size(localtasks); intvec state = 0:nlocaltasks; // the internal state of each localtask (see rationale) int nwaiting = nlocaltasks; // the number of local tasks with internal state 0 (waiting) int nfinished; // the number of local tasks with internal state 3 (result sent) or // -1 (stopped) intvec queue = 0, 1..nlocaltasks, 0; // zeroes for easier handling list links; links[nlocaltasks+1] = l(pid_parent); intvec assignment = 0:nlocaltasks; // the task with id = i is running in link no. assignment[i] int nlinks; // data sent by other processes int code; int id; intvec demanded_tasks = 0, 0; // zeroes for easier handling int ndemanded = -1; // internal counts int granted_leaves; int results_sent; // auxiliary variables int wait; int deadlock; int tmp; int i; while (nwaiting > 0) { wait = 0; if (nlinks == 0) { wait = -1; granted_leaves++; while (-wait < nwaiting) { if (system("semaphore", "try_acquire", sem_leaves) == 1) { wait--; } else { break; } } } while (wait == 0) { wait = waitfirst(links, 500); if (wait == 0) { while (-wait < nwaiting) { if (system("semaphore", "try_acquire", sem_leaves) == 1) { wait--; } else { break; } } } } if (wait < 0) { // open (-wait) new links while (wait < 0) { wait++; nlinks++; link ll(pid)(nlinks) = "ssi:fork"; open(ll(pid)(nlinks)); links[nlinks] = ll(pid)(nlinks); write(links[nlinks], quote(startTasks_grandchild( eval(localtasks[queue[2]].index), eval(pid_parent), eval(pid), eval(nlinks), eval(sem_write)))); assignment[queue[2]] = nlinks; state[queue[2]] = 1; nwaiting--; queue = 0, queue[3..size(queue)]; } // wait == 0 } if (wait > 0 && wait <= nlocaltasks) { code = read(links[wait]); if (code == 1) { // result computed id = read(links[wait]); state[id] = 2; if (ndemanded > 0 && isElement(demanded_tasks, id)) { write(links[wait], 1); demanded_tasks = removeFromIntvec(demanded_tasks, id); ndemanded--; results_sent++; } } // code == 2: startTasks_grandchild() ended, do nothing } if (wait == nlocaltasks+1) { code = read(l(pid_parent)); if (code == 0) { // stopTask id = read(l(pid_parent)); if (state[id] == 0) { // waiting queue = removeFromIntvec(queue, id); } if (state[id] == 1 || state[id] == 2) { // started or computed close(links[assignment[id]]); open(links[assignment[id]]); write(links[assignment[id]], quote(startTasks_grandchild( eval(localtasks[queue[2]].index), eval(pid_parent), eval(pid), eval(assignment[id]), eval(sem_write)))); assignment[queue[2]] = assignment[id]; assignment[id] = 0; state[queue[2]] = 1; queue = 0, queue[3..size(queue)]; } // state[id] == -1 (stopped) or state[id] == 3 (sent) // should not happen nwaiting--; nfinished++; state[id] = -1; } if (code == 1) { // waitTasks demanded_tasks = read(l(pid_parent)); ndemanded = read(l(pid_parent)); if (ndemanded > size(demanded_tasks)-2) { ndemanded = size(demanded_tasks)-2; } if (demanded_tasks == 0:2 && ndemanded == -1) { write(l(pid_parent), results_sent); } else { results_sent = 0; } deadlock = 0; for (i = size(demanded_tasks)-1; i > 1; i--) { id = demanded_tasks[i]; if (state[id] == 0) { // waiting queue = give_priority(queue, id); deadlock = 1; } if (state[id] == 2) { // computed write(links[assignment[id]], 1); demanded_tasks = removeFromIntvec(demanded_tasks, id); ndemanded--; results_sent++; } } for (i = size(demanded_tasks)-1; i > 1; i--) { id = demanded_tasks[i]; if (state[id] == 1) { // started deadlock = 0; } } if (deadlock) { granted_leaves++; nlinks++; link ll(pid)(nlinks) = "ssi:fork"; open(ll(pid)(nlinks)); links[nlinks] = ll(pid)(nlinks); write(links[nlinks], quote(startTasks_grandchild( eval(localtasks[queue[2]].index), eval(pid_parent), eval(pid), eval(nlinks), eval(sem_write)))); assignment[queue[2]] = nlinks; state[queue[2]] = 1; nwaiting--; queue = 0, queue[3..size(queue)]; } } if (code == 2) { // pollTask id = read(l(pid_parent)); if (state[id] == 0) { // waiting queue = give_priority(queue, id); } if (state[id] == 2) { // computed write(links[assignment[id]], 1); } write(l(pid_parent), state[id]); } if (code == 3) { // got result id = read(l(pid_parent)); write(links[assignment[id]], quote(startTasks_grandchild( eval(localtasks[queue[2]].index), eval(pid_parent), eval(pid), eval(assignment[id]), eval(sem_write)))); assignment[queue[2]] = assignment[id]; assignment[id] = 0; state[queue[2]] = 1; state[id] = 3; nwaiting--; nfinished++; queue = 0, queue[3..size(queue)]; } } } while (nfinished < nlocaltasks || ndemanded != -1) { wait = waitfirst(links); if (wait <= nlocaltasks) { code = read(links[wait]); if (code == 1) { // result computed id = read(links[wait]); state[id] = 2; if (ndemanded > 0 && isElement(demanded_tasks, id)) { write(links[wait], 1); demanded_tasks = removeFromIntvec(demanded_tasks, id); ndemanded--; results_sent++; } } // code == 2: startTasks_grandchild() ended, do nothing } if (wait == nlocaltasks+1) { code = read(l(pid_parent)); if (code == 0) { // stopTask id = read(l(pid_parent)); if (state[id] == 1 || state[id] == 2) { // started or computed close(links[assignment[id]]); if (nlinks > granted_leaves) { tmp = system("semaphore", "release", sem_leaves); } links[assignment[id]] = def(0); nlinks--; assignment[id] = 0; nfinished++; } // else: nothing to do state[id] = -1; } if (code == 1) { // waitTasks demanded_tasks = read(l(pid_parent)); ndemanded = read(l(pid_parent)); if (ndemanded > size(demanded_tasks)-2) { ndemanded = size(demanded_tasks)-2; } if (demanded_tasks == 0:2 && ndemanded == -1) { write(l(pid_parent), results_sent); } else { results_sent = 0; } for (i = size(demanded_tasks)-1; i > 1; i--) { id = demanded_tasks[i]; if (state[id] == 2) { // computed write(links[assignment[id]], 1); demanded_tasks = removeFromIntvec(demanded_tasks, id); ndemanded--; results_sent++; } } } if (code == 2) { // pollTask id = read(l(pid_parent)); if (state[id] == 2) { // computed write(links[assignment[id]], 1); } write(l(pid_parent), state[id]); } if (code == 3) { // got result id = read(l(pid_parent)); close(links[assignment[id]]); if (nlinks > granted_leaves) { tmp = system("semaphore", "release", sem_leaves); } links[assignment[id]] = def(0); nlinks--; assignment[id] = 0; state[id] = 3; nfinished++; } } } } /* This procedure has to be started within the grandchildren after forking. */ static proc startTasks_grandchild(int index, int pid_grandparent, int pid_parent, int link_no, int sem_write) { def result; int tmp = system("semaphore", "acquire", sem_queue); tmp = system("semaphore", "acquire", sem_cores); tmp = system("semaphore", "release", sem_queue); execute("result = "+tasks[index].command+"(" +argsToString("tasks[index].arguments", size(tasks[index].arguments)) +");"); tmp = system("semaphore", "release", sem_cores); write(ll(pid_parent)(link_no), 1); write(ll(pid_parent)(link_no), tasks[index].id); tmp = read(ll(pid_parent)(link_no)); tmp = system("semaphore", "acquire", sem_write); write(L(pid_grandparent), index); write(L(pid_grandparent), result); tmp = system("semaphore", "release", sem_write); return(2); } /* Check if n is an element of v[2..(size(v)-1)]. * Note that v[1] and v[size(v)] are not checked. */ static proc isElement(alias intvec v, alias int n) { int i; for (i = size(v)-1; i > 1; i--) { if (v[i] == n) { return(1); } } return(0); } /* Remove the first occurence (if any) of n in v[2..(size(v)-1)]. * Note that v[1] and v[size(v)] are not checked. */ static proc removeFromIntvec(intvec v, int n) { int size_v = size(v); int i; for (i = size_v-1; i > 1; i--) { if (v[i] == n) { return(v[1..(i-1)], v[(i+1)..size_v]); } } return(v); } /* Move the first occurence (if any) of id in queue[2..(size(queue)-1)] to * queue[2]. Note that queue[1] and queue[size(v)] are not checked. */ static proc give_priority(intvec queue, int id) { int size_queue = size(queue); int i; for (i = size_queue-1; i > 2; i--) { if (queue[i] == id) { return(0, id, queue[2..(i-1)], queue[(i+1)..size_queue]); } } return(queue); } proc stopTask(task t) "USAGE: stopTask(t), t task RETURN: nothing. Stops the t and sets its state to 'stopped'. NOTE: A task whose state is not 'started' cannot be stopped. @* Intermediate results are discarded when a task is stopped. @* killTask() should be called for any no longer needed task. SEE ALSO: startTasks, waitTasks, pollTask, getState, killTask, printTask EXAMPLE: example stopTask; shows an example" { if (t.index == 0) { ERROR("cannot stop an uninitialized task"); } if (typeof(tasks[t.index]) != "internal_task") { ERROR("cannot stop an uninitialized task"); } if (tasks[t.index].state != "started") { ERROR("cannot stop a task whose state is not 'started'"); } write(tasks[t.index].links[2], 0); write(tasks[t.index].links[2], tasks[t.index].id); tasks[t.index].id = 0; tasks[t.index].links = list(); tasks[t.index].linkID = 0; tasks[t.index].state = "stopped"; } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); startTasks(t); stopTask(t); t; killTask(t); } proc waitTasks(list T, int N, list #) "USAGE: waitTasks(T, N[, timeout]), T list of tasks, N int, timeout int RETURN: an ordered list of the indices of those tasks which have been successfully completed. The state of these tasks is set to 'completed'. @* The procedure waits for N tasks to complete. @* An optional timeout in ms can be provided. Default is 0 which disables the timeout. NOTE: A task whose state is neither 'started' nor 'completed' cannot be waited for. @* The result of any completed task can be accessed via @ref{getResult}. @* The returned list may contain more than N entries if the computation of some tasks has already finished and/or if several tasks finish \"at the same time\". It may contain less than N entries in the case of timeout or errors occurring. @* Polling is guaranteed, i.e. the index of any task t for which 'pollTask(t);' would return 1 will appear in the returned list. SEE ALSO: startTasks, pollTask, getResult, getState, printTask EXAMPLE: example waitTasks; shows an example" { /* initialize the timer */ int oldtimerresolution = system("--ticks-per-sec"); system("--ticks-per-sec", 1000); int starting_time = rtimer; /* read optional parameters */ int timeout; if (size(#) > 0) { if (size(#) > 1 || typeof(#[1]) != "int") { ERROR("wrong optional parameter"); } timeout = #[1]; } /* check for errors */ if (timeout < 0) { ERROR("negative timeout"); } int nargs = size(T); if (nargs == 0) { ERROR("missing task"); } if (N < 1 || N > nargs) { ERROR("wrong number of tasks to wait for"); } int i; for (i = nargs; i > 0; i--) { if (typeof(T[i]) != "task") { ERROR("element not of type 'task' (element no. "+string(i)+")"); } if (T[i].index == 0) { ERROR("cannot wait for an uninitialized task (task no. "+string(i) +")"); } if (typeof(tasks[T[i].index]) != "internal_task") { ERROR("cannot wait for an uninitialized task (task no. "+string(i) +")"); } if (tasks[T[i].index].state != "started" && tasks[T[i].index].state != "completed") { ERROR("cannot wait for a task whose state is not"+newline +"'started' or 'completed' (task no. "+string(i)+")"); } } /* sort the tasks */ int ncompleted; list requests; list links; int sorted_in; int j; for (i = 1; i <= nargs; i++) { if (tasks[T[i].index].state == "completed") { ncompleted++; } else { // tasks[T[i].index].state == "started" sorted_in = 0; for (j = size(requests); j > 0; j--) { if (requests[j][1] == tasks[T[i].index].linkID) { requests[j][2][size(requests[j][2])+1] = tasks[T[i].index].id; sorted_in = 1; break; } } if (!sorted_in) { requests[size(requests)+1] = list(tasks[T[i].index].linkID, intvec(0, tasks[T[i].index].id), tasks[T[i].index].links[2]); links[size(links)+1] = tasks[T[i].index].links[1]; } } } /* send the reqests */ for (j = size(requests); j > 0; j--) { requests[j][2][size(requests[j][2])+1] = 0; write(requests[j][3], 1); write(requests[j][3], requests[j][2]); write(requests[j][3], N-ncompleted); } /* wait for the results */ int wait; int index; int results_got; int remaining_time; int tmp; while (ncompleted < N) { wait = waitfirst(links, 0); if (wait == 0) { if (timeout == 0) { tmp = system("semaphore", "release", sem_cores); wait = waitfirst(links); tmp = system("semaphore", "acquire", sem_cores); } else { remaining_time = timeout-(rtimer-starting_time); if (remaining_time < 0) { break; } else { tmp = system("semaphore", "release", sem_cores); wait = waitfirst(links, remaining_time); tmp = system("semaphore", "acquire", sem_cores); } } } if (wait < 1) { break; } index = read(links[wait]); tasks[index].result = read(links[wait]); write(tasks[index].links[2], 3); write(tasks[index].links[2], tasks[index].id); tasks[index].id = 0; tasks[index].links = list(); tasks[index].linkID = 0; tasks[index].state = "completed"; ncompleted++; results_got++; } if (wait == -1) { ERROR("error in waitfirst()"); } /* end communication process */ for (j = size(requests); j > 0; j--) { write(requests[j][3], 1); write(requests[j][3], 0:2); write(requests[j][3], -1); } int results_sent; for (j = size(requests); j > 0; j--) { results_sent = results_sent + read(requests[j][3]); } while (results_sent > results_got) { wait = waitfirst(links); if (wait == -1) { ERROR("error in waitfirst()"); } index = read(links[wait]); tasks[index].result = read(links[wait]); write(tasks[index].links[2], 3); write(tasks[index].links[2], tasks[index].id); tasks[index].id = 0; tasks[index].links = list(); tasks[index].linkID = 0; tasks[index].state = "completed"; results_got++; } /* list completed tasks */ list completed; completed[nargs+1] = 0; j = 0; for (i = 1; i <= nargs; i++) { if (tasks[T[i].index].state == "completed") { j++; completed[j] = i; } } completed[nargs+1] = def(0); /* return the result */ system("--ticks-per-sec", oldtimerresolution); return(completed); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t1 = "std", list(I); task t2 = "slimgb", list(I); startTasks(t1, t2); waitTasks(list(t1, t2), 2); // wait for both tasks getResult(t1); getResult(t2); killTask(t1); killTask(t2); } proc waitAllTasks(list #) "USAGE: waitAllTasks(t1, t2, ...), t1, t2, ... tasks RETURN: nothing. Waits for all the tasks t1, t2, ... to complete. The state of the tasks is set to 'completed'. NOTE: A task whose state is neither 'started' nor 'completed' cannot be waited for. @* The result of any completed task can be accessed via @ref{getResult}. @* 'waitAllTasks(t1, t2, ...);' is a shortcut for 'waitTasks(list(t1, t2, ...), size(list(t1, t2, ...)));'. Since returning a list of the indices of the completed tasks does not make sense in this case, nothing is returned. SEE ALSO: waitTasks, startTasks, pollTask, getResult, getState, printTask EXAMPLE: example waitAllTasks; shows an example" { list tmp = waitTasks(#, size(#)); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t1 = "std", list(I); task t2 = "slimgb", list(I); startTasks(t1, t2); waitAllTasks(t1, t2); // the same as 'waitTasks(list(t1, t2), 2);', // but without return value getResult(t1); getResult(t2); killTask(t1); killTask(t2); } proc pollTask(task t) "USAGE: pollTask(t), t task RETURN: 1, if the computation of the task t has successfully finished; 0, otherwise. @* The state of any task whose computation has successfully finished is set to 'completed'. NOTE: A task whose state is neither 'started' nor 'completed' cannot be polled. @* The result of any completed task can be accessed via @ref{getResult}. @* pollTask() should return immediately. However, receiving the result of the task may take some time. SEE ALSO: startTasks, waitTasks, getResult, getState, printTask EXAMPLE: example pollTask; shows an example" { if (t.index == 0) { ERROR("cannot poll an uninitialized task"); } if (typeof(tasks[t.index]) != "internal_task") { ERROR("cannot poll an uninitialized task"); } if (tasks[t.index].state != "started" && tasks[t.index].state != "completed") { ERROR("cannot poll a task whose state is not"+newline +"'started' or 'completed'"); } if (tasks[t.index].state == "completed") { return(1); } // tasks[t.index].state == "started" write(tasks[t.index].links[2], 2); write(tasks[t.index].links[2], tasks[t.index].id); int state = read(tasks[t.index].links[2]); if (state == 0 || state == 1) { // waiting or started return(0); } if (state == 2) { // computed int index = read(tasks[t.index].links[1]); // index == t.index tasks[t.index].result = read(tasks[t.index].links[1]); write(tasks[t.index].links[2], 3); write(tasks[t.index].links[2], tasks[t.index].id); tasks[t.index].id = 0; tasks[t.index].links = list(); tasks[t.index].linkID = 0; tasks[t.index].state = "completed"; return(1); } // state == -1 (stopped) or state == 3 (sent) should not happen } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); startTasks(t); waitAllTasks(t); pollTask(t); // task already completed t; getResult(t); killTask(t); } proc getCommand(task t) "USAGE: getCommand(t), t task RETURN: a string, the command of t. NOTE: This command cannot be applied to tasks whose state is 'uninitialized'. SEE ALSO: getArguments, getResult, getState, createTask, printTask EXAMPLE: example getCommand; shows an example" { if (t.index == 0) { ERROR("cannot get command of an uninitialized task"); } if (typeof(tasks[t.index]) != "internal_task") { ERROR("cannot get command of an uninitialized task"); } return(tasks[t.index].command); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); getCommand(t); killTask(t); } proc getArguments(task t) "USAGE: getArguments(t), t task RETURN: a list, the arguments of t. NOTE: This command cannot be applied to tasks whose state is 'uninitialized'. SEE ALSO: getCommand, getResult, getState, createTask, printTask EXAMPLE: example getArguments; shows an example" { if (t.index == 0) { ERROR("cannot get arguments of an uninitialized task"); } if (typeof(tasks[t.index]) != "internal_task") { ERROR("cannot get arguments of an uninitialized task"); } return(tasks[t.index].arguments); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); getArguments(t); killTask(t); } proc getResult(task t) "USAGE: getResult(t), t task RETURN: the result of t. NOTE: This command cannot be applied to tasks whose state is not 'completed'. SEE ALSO: getCommand, getArguments, getState, waitTasks, pollTask, printTask EXAMPLE: example getResult; shows an example" { if (t.index == 0) { ERROR("cannot get result of an uninitialized task"); } if (typeof(tasks[t.index]) != "internal_task") { ERROR("cannot get result of an uninitialized task"); } if (tasks[t.index].state != "completed") { ERROR("cannot get result of a task which is not completed"); } return(tasks[t.index].result); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); startTasks(t); waitAllTasks(t); getResult(t); killTask(t); } proc getState(task t) "USAGE: getState(t), t task RETURN: a string, the state of t. SEE ALSO: getCommand, getArguments, getResult, printTask, createTask, startTasks, stopTask, waitTasks, pollTask, killTask EXAMPLE: example getState; shows an example" { if (t.index == 0) { return("uninitialized"); } if (typeof(tasks[t.index]) != "internal_task") { return("uninitialized"); } return(tasks[t.index].state); } example { "EXAMPLE:"; echo = 2; ring R = 0, (x,y), dp; ideal I = x9y2+x10, x2y7-y8; task t = "std", list(I); getState(t); startTasks(t); getState(t); waitAllTasks(t); getState(t); killTask(t); getState(t); } / * construct the string "name[1], name[2], name[3], ..., name[length]" */ static proc argsToString(string name, int length) { string output; if (length > 0) { output = name+"[1]"; } int i; for (i = 2; i <= length; i++) { output = output+", "+name+"["+string(i)+"]"; } return(output); }