/////////////////////////////////////////////////////////////////// version="version parallel.lib 4.0.0.0 Jun_2013 "; // $Id$ category="General purpose"; info=" LIBRARY: parallel.lib Tools for Parallelization AUTHOR: Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de OVERVIEW: This library provides tools to do several computations in parallel. They are aimed at ordinary Singular users as well as authors of Singular libraries. @* Even without this library, it is possible to do execute self-defined Singular commands in parallel using @ref{link}, but the handling of such links can be quite tedious. With the pocedures described below, this can be done by one-line commands. @* There are many parallel 'skeletons' (i.e. ways in which parallel tasks rely upon and interact with each other). A few of them are already implemented. Future plans include an abstraction layer for modular techniques, 'worker farms', and parallel tests. SEE ALSO: link, modstd_lib, assprimeszerodim_lib KEYWORDS: parallel.lib; Parallelization; Links, user interface; Skeletons for parallelization; Distributed computing PROCEDURES: parallelWaitN(...) execute several jobs in parallel, wait for N of them to finish parallelWaitFirst(...) execute several jobs in parallel, wait for the first to finish parallelWaitAll(...) execute several jobs in parallel, wait for all of them to finish "; proc parallelWaitN(list commands, list args, int N, list #) "USAGE: parallelWaitN(commands, args, N[, timeout, linktype, servers, maxmemory]); commands list, args list, N int, timeout int, linktype string, servers list, maxmemory intvec RETURN: a list, containing the results of commands[i] applied to arg[i], i = 1, ..., size(commands). @* The procedure waits for N jobs to finish. @* OPTIONAL PARAMETERS: An optional timeout in ms can be provided. Default is 0 which disables the timeout. Supported linktypes are up to now \"ssi\" and \"mp\", see @ref{Ssi links}. Servers: @* Each server is given by a list containing the address of the server, the number of cores to use on this server and the command to start Singular. @* If the address is \"localhost\", the processes will be generated on the same machine using forks. If the command to start Singular is \"\" (the empty string), \"Singular\" will be used. @* Default is @code{list(\"localhost\", system(\"cpu\"), \"\")}. @* There are some obvious shortcuts for servers, e.g. \"myserver\" is a shortcut for @code{list(\"myserver\", [nb. of cores on myserver], \"\")}, or 3 for @code{list(\"localhost\", 3, \"\")}. Memory limits: @* If an intvec maxmemory of size @code{size(commands)} is given, the i-th job will be killed if it uses more than maxmemory[i] MB of memory. If maxmemory[i] is 0, there will be no restraint for the i-th job. Default is @code{0:size(commands)}. NOTE: The entries of the list commands must be strings. @* The entries of the list args must be lists. @* The returned list may contain more than N results if several jobs finished \"at the same time\". It may contain less than N results in the case of timeout or errors occurring. @* The check whether the jobs exceed the memory sizes given by maxmemory is only done from time to time. This feature is experimental and should be used with care. SEE ALSO: Ssi links, waitfirst, waitall KEYWORDS: parallelWaitN; Parallelization; Links, user interface; Skeletons for parallelization; Distributed computing EXAMPLE: @code{example parallelWaitN;} shows an example." { // initialize the timer int oldtimerresolution = system("--ticks-per-sec"); system("--ticks-per-sec", 1000); int t = rtimer; // auxiliary variables int i, j, m, tt; // read optional parameters list defaultserver = list("localhost", system("cpu"), ""); list defaults = list(0, "ssi", list(defaultserver), 0:size(commands)); for(i = 1; i <= size(defaults); i++) { if(typeof(#[i]) != typeof(defaults[i])) { # = insert(#, defaults[i], i-1); } } if(size(#) != size(defaults)) { ERROR("wrong optional parameters"); } for(j = size(#[3]); j > 0; j--) { if(typeof(#[3][j][1]) != typeof(defaultserver[1])) { #[3][j] = insert(#[3][j], defaultserver[1], 0); } defaultserver[3] = ""; // only for ssi:tcp links, the default program is system("Singular") if(#[2] == "ssi" && #[3][j][1] != "localhost") { defaultserver[3] = system("Singular"); } for(i = 2; i <= size(defaultserver); i++) { if(typeof(#[3][j][i]) != typeof(defaultserver[i])) { #[3][j] = insert(#[3][j], defaultserver[i], i-1); } } if(size(#[3][j]) != size(defaultserver)) { ERROR("wrong declaration for server no. "+string(j)); } } int timeout = #[1]; string linktype = #[2]; list servers = #[3]; intvec maxmems = #[4]; // error checking int njobs = size(commands); if(njobs != size(args)) { ERROR("The number of commands does not match the number of lists" +newline+"of arguments."); } if(njobs == 0) { ERROR("no commands specified"); } for(i = 1; i <= njobs; i++) { if(typeof(commands[i]) != "string") { ERROR("The first argument is not a list of strings."); } if(typeof(args[i]) != "list") { ERROR("The second argument is not a list of lists."); } } if(N < 0) { ERROR("The number of jobs which you want to wait for is negative."); } if(N > njobs) { ERROR("The number of jobs which you wnat to wait for is greater" +newline+"than the number of jobs itself."); } if(timeout < 0) { ERROR("The given timeout is negative."); } if(linktype != "ssi" && linktype != "mp") { ERROR("The given linktype is not recognized."); } int nservers = size(servers); if(nservers <= 0) { ERROR("no server specified"); } for(i = 1; i <= nservers; i++) { if(servers[i][1] != "localhost") { if(system("sh", "ssh "+servers[i][1]+" exit")) { ERROR("Could not connect to server \""+servers[i][1]+"\""); } } if(servers[i][2] < 0) { ERROR("The number of cores to be used on server \""+servers[i][1]+"\"" +newline+" is negative."); } if(servers[i][1] == "localhost") { int ncpus(i) = system("cpu"); } else { //if(linktype == "ssi") //{ link lcpu(i) = "ssi:tcp "+servers[i][1]+":"+servers[i][3]; //} open(lcpu(i)); write(lcpu(i), quote(system("cpu"))); int ncpus(i) = read(lcpu(i)); close(lcpu(i)); kill lcpu(i); } if(servers[i][2] == 0) { servers[i][2] = ncpus(i); } else { if(servers[i][2] > ncpus(i)) { ERROR("The number of cores to use on server \""+servers[i][1]+"\"" +newline+"is greater than the number of available cores"); } } if(servers[i][1] != "localhost") { if(system("sh", "ssh "+servers[i][1]+ " 'test -e `which "+servers[i][3]+"`'")) { ERROR("\""+servers[i][3]+"\" was not found on" +"\""+servers[i][1]+"\"."); } } } if(size(maxmems) != njobs) { ERROR("The size of the intvec which specifies the maximal amount of memory" +newline+"to be used for each job does not match the number of jobs."); } int havemaxmem; for(i = 1; i <= njobs; i++) { if(maxmems[i] < 0) { ERROR("The maximal amount of memory to be used for job no. "+string(i) +"is negative."); } havemaxmem = havemaxmem+maxmems[i]; } // skip those cores which won't be needed int nlinks; for(i = 1; i <= nservers; i++) { if(nlinks+servers[i][2] <= njobs) { nlinks = nlinks+servers[i][2]; } else { if(nlinks == njobs) { servers = list(servers[1..(i-1)]); } else { servers = list(servers[1..i]); servers[i][2] = njobs-nlinks; nlinks = njobs; } nservers = size(servers); } } // open the links string server; int ncores; string program; int k = 1; // the index of the link for(i = 1; i <= nservers; i++) { server = servers[i][1]; ncores = servers[i][2]; program = servers[i][3]; for(j = 1; j <= ncores; j++) { if(server == "localhost") { //if(linktype == "ssi") //{ link l(k) = "ssi:fork"; //} } else { //if(linktype == "ssi") //{ link l(k) = "ssi:tcp "+server+":"+program; //} } open(l(k)); k++; } } list links = list(l(1..nlinks)); // start a memory watchdog if needed if(havemaxmem) { //if(linktype == "ssi") //{ link mempatrol = "ssi:fork"; //} open(mempatrol); write(mempatrol, quote(watchlinks())); links = insert(links, mempatrol, nlinks); } int nkilled; // the number of jobs killed by the mempatrol // distribute work to the links k = 1; // from here on, k is the index of the next job which must be // distributed to some link intvec assignment = 0:nlinks; // link number i is currently doing // job number assignment[i] int pid; for(i = 1; i <= nlinks; i++) { write(l(i), quote(execute("option(noredefine);"))); read(l(i)); write(l(i), quote(execute("def result;"))); read(l(i)); write(l(i), quote(execute("list currentargs;"))); read(l(i)); if(status(l(i), "mode", "fork")) { write(l(i), quote(currentargs = args[eval(k)])); } else { write(l(i), quote(currentargs = eval(args[k]))); } read(l(i)); if(maxmems[k] > 0) { m = i; for(j = 1; j <= nservers; j++) { if(servers[j][2] > m) { server = servers[j][1]; break; } else { m = m-servers[j][2]; } } write(l(i), quote(system("pid"))); pid = read(l(i)); write(mempatrol, list(server, pid, i, maxmems[k])); } write(l(i), quote(execute("result = "+eval(commands[k]) +"("+argsToString("currentargs", size(currentargs))+");"))); assignment[i] = k; k++; } // distribute the rest of the work list results; for(i = njobs; i > 0; i--) { results[i] = list(); // TODO: What if the result of one of the commands is // list()? } int nfinished; // the number of finished jobs int wait; // the index of the link which is finished, or 0 for timeout while(k <= njobs && nfinished < N-1) { if(timeout == 0) { wait = waitfirst(links); } else { tt = timeout-(rtimer-t); if(tt < 0) { wait = waitfirst(links, 0); wait = 0; } else { wait = waitfirst(links, tt); } } if(wait == -1) { ERROR("All links crashed."); } if(wait) { if(wait == nlinks+1) { wait = read(mempatrol); close(l(wait)); open(l(wait)); results[assignment[wait]] = "out of memory"; nkilled++; } else { read(l(wait)); write(l(wait), quote(result)); results[assignment[wait]] = read(l(wait)); if(maxmems[assignment[wait]] > 0) { write(mempatrol, assignment[wait]); } nfinished++; } if(status(l(wait), "mode", "fork")) { write(l(wait), quote(currentargs = args[eval(k)])); } else { write(l(wait), quote(currentargs = eval(args[k]))); } read(l(wait)); if(maxmems[k] > 0) { m = wait; for(j = 1; j <= nservers; j++) { if(servers[j][2] > m) { server = servers[j][1]; break; } else { m = m-servers[j][2]; } } write(l(wait), quote(system("pid"))); pid = read(l(wait)); write(mempatrol, list(server, pid, wait, maxmems[k])); } write(l(wait), quote(execute("def result = "+eval(commands[k]) +"("+argsToString("currentargs", size(currentargs))+");"))); assignment[wait] = k; k++; } else { break; } } // collect the rest of the results while(nfinished < N && nfinished+nkilled < njobs) { if(timeout == 0) { wait = waitfirst(links); } else { tt = timeout-(rtimer-t); if(tt < 0) { wait = waitfirst(links, 0); wait = 0; } else { wait = waitfirst(links, tt); } } if(wait == -1) { ERROR("All links crashed."); } if(wait) { if(wait == nlinks+1) { wait = read(mempatrol); close(l(wait)); results[assignment[wait]] = "out of memory"; nkilled++; } else { read(l(wait)); write(l(wait), quote(result)); results[assignment[wait]] = read(l(wait)); if(maxmems[assignment[wait]] > 0) { write(mempatrol, assignment[wait]); } nfinished++; } } else { break; } } //close all links for(i = 1; i <= nlinks; i++) { if(status(l(i), "read", "ready")) { read(l(i)); write(l(i), quote(result)); results[assignment[i]] = read(l(i)); } close(l(i)); } if(havemaxmem) { close(mempatrol); } system("--ticks-per-sec", oldtimerresolution); return(results); } example { "EXAMPLE:"; echo = 2; LIB "primdec.lib"; ring r = 0, (x,y,z), lp; ideal i = z8+z6+4z5+4z3+4z2+4, y-z2; ideal j = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4; list commands = list("std", "primdecGTZ", "primdecSY", "std", "primdecGTZ", "primdecSY"); list args = list(list(i), list(i), list(i), list(j), list(j), list(j)); parallelWaitN(commands, args, 3); } proc parallelWaitFirst(list commands, list args, list #) "USAGE: parallelWaitFirst(commands, args[, timeout, linktype, servers, maxmemory]); commands list, args list, timeout int, linktype string, servers list, maxmemory intvec RETURN: a list, containing at least one (if no timeout occurs) of the results of commands[i] applied to arg[i], i = 1, ..., size(commands). @* The command @code{parallelWaitFirst(list commands, list args, list #)} is synonymous to @code{parallelWaitN(list commands, list args, 1, list #)}. See @ref{parallelWaitN} for details on optional arguments and other remarks. SEE ALSO: Ssi links, waitfirst KEYWORDS: parallelWaitFirst; Parallelization; Links, user interface; Skeletons for parallelization; Distributed computing EXAMPLE: @code{example parallelWaitFirst;} shows an example." { return(parallelWaitN(commands, args, 1, #)); } example { "EXAMPLE:"; echo = 2; LIB "primdec.lib"; ring r = 0, (x,y,z), lp; ideal i = z8+z6+4z5+4z3+4z2+4, y-z2; list commands = list("primdecGTZ", "primdecSY"); list args = list(list(i), list(i)); parallelWaitFirst(commands, args); } proc parallelWaitAll(def commands, list args, list #) "USAGE: parallelWaitAll(commands, args[, timeout, linktype, servers, maxmemory]); commands list or string, args list, timeout int, linktype string, servers list, maxmemory intvec RETURN: a list, containing the results of commands[i] applied to arg[i], i = 1, ..., size(commands). @* The command @code{parallelWaitAll(list commands, list args, list #)} is synonymous to @code{parallelWaitN(list commands, list args, size(args), list #)}. See @ref{parallelWaitN} for details on optional arguments and other remarks. If commands is of type string, this is a shortcut for a list of size @code{size(args)} whose entries are just this string. SEE ALSO: Ssi links, waitall KEYWORDS: parallelWaitAll; Parallelization; Links, user interface; Skeletons for parallelization; Distributed computing EXAMPLE: @code{example parallelWaitAll;} shows an example." { if(typeof(commands) != "list" && typeof(commands) != "string") { ERROR("invalid type of first argument"); } if(typeof(commands) == "list") { return(parallelWaitN(commands, args, size(args), #)); } else { list cmds; for(int i = size(args); i > 0; i--) { cmds[i] = commands; } return(parallelWaitN(cmds, args, size(args), #)); } } example { "EXAMPLE:"; echo = 2; ring r = 0, (x,y,z), dp; ideal i1 = z8+z6+4z5+4z3+4z2+4, y-z2; ideal i2 = x10+x9y2, y8-x2y7; ideal i3 = x3-2xy, x2y-2y2+x; string command = "std"; list args = list(list(i1), list(i2), list(i3)); parallelWaitAll(command, args); } // TODO /// http://arxiv.org/abs/1005.5663v2 static proc doModular(command, args, proc deleteUnlucksPrimes, proc testInChar0) { } // TODO /* worker farm */ static proc Create() {} /* auxiliary procedures */ static proc watchlinks() { list parent = list(mempatrol); list watchedlinks; int wait; int i, sys; while(1) { if(size(watchedlinks) == 0) { wait = waitall(parent); } else { wait = waitall(parent, 10000); } if(wait == -1) { ERROR("The main process crashed."); } if(wait) { def query = read(mempatrol); if(typeof(query) == "list") { watchedlinks = insert(watchedlinks, query); } else // in this case, typeof(query) is assumed to be "int", the // index of the link { for(i = size(watchedlinks); i > 0; i--) { if(watchedlinks[i][3] == query) { watchedlinks = delete(watchedlinks, i); break; } } } } for(i = size(watchedlinks); i > 0; i--) { if(getusedmemory(watchedlinks[i][1], watchedlinks[i][2]) > watchedlinks[i][4]) { if(watchedlinks[i][1] == "localhost") { sys = system("sh", "kill "+string(watchedlinks[i][2])); } else { sys = system("sh", "ssh "+watchedlinks[i][1]+" kill " +string(watchedlinks[i][2])); } write(mempatrol, watchedlinks[i][3]); watchedlinks = delete(watchedlinks, i); } } } } static proc getusedmemory(string server, int pid) { string s; if(server == "localhost") { s = read("|: grep VmSize /proc/"+string(pid)+"/status "+ "| awk '{ print $2; }'"); } else { s = read("|: ssh "+server+" grep VmSize /proc/"+string(pid)+"/status "+ "| awk '{ print $2; }'"); } bigint b; execute("b = "+s+";"); int i = int(b/1000); return(i); } static proc argsToString(string name, int length) { string arglist; if(length > 0) { arglist = name+"[1]"; } int i; for(i = 2; i <= length; i++) { arglist = arglist+", "+name+"["+string(i)+"]"; } return(arglist); }