source: git/Singular/LIB/parallel.lib @ e1b841

spielwiese
Last change on this file since e1b841 was e1b841, checked in by Andreas Steenpass <steenpass@…>, 12 years ago
update documentation for parallel.lib and realclassify.lib
  • Property mode set to 100644
File size: 20.0 KB
Line 
1////////////////////////////////////////////////////////////////////
2version="$Id$";
3category="General purpose";
4info="
5LIBRARY:   parallel.lib  Tools for Parallelization
6AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
7
8OVERVIEW:
9This library provides tools to do several computations in parallel. They
10are aimed at ordinary Singular users as well as authors of Singular
11libraries.
12@* Even without this library, it is possible to do execute self-defined
13Singular commands in parallel using @ref{links}, but the handling of
14such links can be quite tedious. With the pocedures described below,
15this can be done by one-line commands.
16@* There are many parallel 'skeletons' (i.e. ways in which parallel
17tasks rely upon and interact with each other). A few of them are already
18implemented. Future plans include an abstraction layer for modular
19techniques, 'worker farms', and parallel tests.
20
21SEE ALSO:  link, modstd_lib, assprimeszerodim_lib
22
23KEYWORDS:  parallel.lib; Parallelization; Links, user interface;
24           Skeletons for parallelization; Distributed computing
25
26PROCEDURES:
27  parallelWaitN(...)     execute several jobs in parallel,
28                         wait for N of them to finish
29  parallelWaitFirst(...) execute several jobs in parallel,
30                         wait for the first to finish
31  parallelWaitAll(...)   execute several jobs in parallel,
32                         wait for all of them to finish
33";
34
35proc parallelWaitN(list commands, list args, int N, list #)
36"USAGE:  parallelWaitN(commands, args, N[, timeout, linktype, servers,
37         maxmemory]); commands list, args list, N int, timeout int,
38         linktype string, servers list, maxmemory intvec
39RETURN:  a list, containing the results of commands[i] applied to arg[i],
40         i = 1, ..., size(commands).
41         @* The procedure waits for N jobs to finish.
42
43         @* OPTIONAL PARAMETERS:
44
45            An optional timeout in ms can be provided. Default is 0 which
46            disables the timeout.
47
48            Supported linktypes are up to now \"ssi\" and \"mp\", see
49            @ref{Ssi links} and @ref{MP links}.
50
51            Servers:
52         @* Each server is given by a list containing the address of the server,
53            the number of cores to use on this server and the command to start
54            Singular.
55         @* If the address is \"localhost\", the processes will be generated on
56            the same machine using forks. If the command to start Singular is
57            \"\" (the empty string), \"Singular\" will be used.
58         @* Default is @code{list(\"localhost\", system(\"cpu\"), \"\")}.
59         @* There are some obvious shortcuts for servers, e.g. \"myserver\" is
60            a shortcut for
61            @code{list(\"myserver\", [nb. of cores on myserver], \"\")}, or 3
62            for @code{list(\"localhost\", 3, \"\")}.
63
64            Memory limits:
65         @* If an intvec maxmemory of size @code{size(commands)} is given, the
66            i-th job will be killed if it uses more than maxmemory[i] MB of
67            memory. If maxmemory[i] is 0, there will be no restraint for the
68            i-th job. Default is @code{0:size(commands)}.
69NOTE:       The entries of the list commands must be strings.
70         @* The entries of the list args must be lists.
71         @* The returned list may contain more than N results if several jobs
72            finished \"at the same time\". It may contain less than N results in
73            the case of timeout or errors occurring.
74         @* MP links do not work with a 64 bit version of Singular. If you want
75            to use MP links, make sure that MP is available. This can be checked
76            by the Singular command @code{system(\"with\", \"MP\");}.
77         @* The check whether the jobs exceed the memory sizes given by
78            maxmemory is only done from time to time. This feature is
79            experimental and should be used with care.
80SEE ALSO: Ssi links, MP links, waitfirst, waitall
81KEYWORDS: parallelWaitN; Parallelization; Links, user interface;
82          Skeletons for parallelization; Distributed computing
83EXAMPLE:  @code{example parallelWaitN;} shows an example."
84{
85  // initialize the timer
86  int oldtimerresolution = system("--ticks-per-sec");
87  system("--ticks-per-sec", 1000);
88  int t = rtimer;
89
90  // auxiliary variables
91  int i, j, m, tt;
92
93  // read optional parameters
94  list defaultserver = list("localhost", system("cpu"), "");
95  list defaults = list(0, "ssi", list(defaultserver), 0:size(commands));
96  for(i = 1; i <= size(defaults); i++)
97  {
98    if(typeof(#[i]) != typeof(defaults[i]))
99    {
100      # = insert(#, defaults[i], i-1);
101    }
102  }
103  if(size(#) != size(defaults))
104  {
105    ERROR("wrong optional parameters");
106  }
107  for(j = size(#[3]); j > 0; j--)
108  {
109    if(typeof(#[3][j][1]) != typeof(defaultserver[1]))
110    {
111      #[3][j] = insert(#[3][j], defaultserver[1], 0);
112    }
113    defaultserver[3] = "";
114    // only for ssi:tcp links, the default program is system("Singular")
115    if(#[2] == "ssi" && #[3][j][1] != "localhost")
116    {
117      defaultserver[3] = system("Singular");
118    }
119    for(i = 2; i <= size(defaultserver); i++)
120    {
121      if(typeof(#[3][j][i]) != typeof(defaultserver[i]))
122      {
123        #[3][j] = insert(#[3][j], defaultserver[i], i-1);
124      }
125    }
126    if(size(#[3][j]) != size(defaultserver))
127    {
128      ERROR("wrong declaration for server no. "+string(j));
129    }
130  }
131  int timeout = #[1];
132  string linktype = #[2];
133  list servers = #[3];
134  intvec maxmems = #[4];
135
136  // error checking
137  int njobs = size(commands);
138  if(njobs != size(args))
139  {
140    ERROR("The number of commands does not match the number of lists"
141         +newline+"of arguments.");
142  }
143  if(njobs == 0)
144  {
145    ERROR("no commands specified");
146  }
147  for(i = 1; i <= njobs; i++)
148  {
149    if(typeof(commands[i]) != "string")
150    {
151      ERROR("The first argument is not a list of strings.");
152    }
153    if(typeof(args[i]) != "list")
154    {
155      ERROR("The second argument is not a list of lists.");
156    }
157  }
158  if(N < 0)
159  {
160    ERROR("The number of jobs which you want to wait for is negative.");
161  }
162  if(N > njobs)
163  {
164    ERROR("The number of jobs which you wnat to wait for is greater"
165         +newline+"than the number of jobs itself.");
166  }
167  if(timeout < 0)
168  {
169    ERROR("The given timeout is negative.");
170  }
171  if(linktype != "ssi" && linktype != "mp")
172  {
173    ERROR("The given linktype is not recognized.");
174  }
175  int nservers = size(servers);
176  if(nservers <= 0)
177  {
178    ERROR("no server specified");
179  }
180  for(i = 1; i <= nservers; i++)
181  {
182    if(servers[i][1] != "localhost")
183    {
184      if(system("sh", "ssh "+servers[i][1]+" exit"))
185      {
186        ERROR("Could not connect to server \""+servers[i][1]+"\"");
187      }
188    }
189    if(servers[i][2] < 0)
190    {
191      ERROR("The number of cores to be used on server \""+servers[i][1]+"\""
192           +newline+" is negative.");
193    }
194    if(servers[i][1] == "localhost")
195    {
196      int ncpus(i) = system("cpu");
197    }
198    else
199    {
200      if(linktype == "ssi")
201      {
202        link lcpu(i) = "ssi:tcp "+servers[i][1]+":"+servers[i][3];
203      }
204      else
205      {
206        if(program == "")
207        {
208          link lcpu(i) = "MPtcp:launch+" --MPhost "+servers[i][1];
209        }
210        else
211        {
212          link lcpu(i) = "MPtcp:launch+" --MPhost "+servers[i][1]
213            +" --MPapplication "+servers[i][3];
214        }
215      }
216      open(lcpu(i));
217      write(lcpu(i), quote(system("cpu")));
218      int ncpus(i) = read(lcpu(i));
219      close(lcpu(i));
220      kill lcpu(i);
221    }
222    if(servers[i][2] == 0)
223    {
224      servers[i][2] = ncpus(i);
225    }
226    else
227    {
228      if(servers[i][2] > ncpus(i))
229      {
230        ERROR("The number of cores to use on server \""+servers[i][1]+"\""
231             +newline+"is greater than the number of available cores");
232      }
233    }
234    if(servers[i][1] != "localhost")
235    {
236      if(system("sh", "ssh "+servers[i][1]+
237                      " 'test -e `which "+servers[i][3]+"`'"))
238      {
239        ERROR("\""+servers[i][3]+"\" was not found on"
240             +"\""+servers[i][1]+"\".");
241      }
242    }
243  }
244  if(size(maxmems) != njobs)
245  {
246    ERROR("The size of the intvec which specifies the maximal amount of memory"
247         +newline+"to be used for each job does not match the number of jobs.");
248  }
249  int havemaxmem;
250  for(i = 1; i <= njobs; i++)
251  {
252    if(maxmems[i] < 0)
253    {
254      ERROR("The maximal amount of memory to be used for job no. "+string(i)
255           +"is negative.");
256    }
257    havemaxmem = havemaxmem+maxmems[i];
258  }
259
260  // skip those cores which won't be needed
261  int nlinks;
262  for(i = 1; i <= nservers; i++)
263  {
264    if(nlinks+servers[i][2] <= njobs)
265    {
266      nlinks = nlinks+servers[i][2];
267    }
268    else
269    {
270      if(nlinks == njobs)
271      {
272        servers = list(servers[1..(i-1)]);
273      }
274      else
275      {
276        servers = list(servers[1..i]);
277        servers[i][2] = njobs-nlinks;
278        nlinks = njobs;
279      }
280      nservers = size(servers);
281    }
282  }
283
284  // open the links
285  string server;
286  int ncores;
287  string program;
288  int k = 1;   // the index of the link
289  for(i = 1; i <= nservers; i++)
290  {
291    server = servers[i][1];
292    ncores = servers[i][2];
293    program = servers[i][3];
294    for(j = 1; j <= ncores; j++)
295    {
296      if(server == "localhost")
297      {
298        if(linktype == "ssi")
299        {
300          link l(k) = "ssi:fork";
301        }
302        else
303        {
304          link l(k) = "MPtcp:fork";
305        }
306      }
307      else
308      {
309        if(linktype == "ssi")
310        {
311          link l(k) = "ssi:tcp "+server+":"+program;
312        }
313        else
314        {
315          if(program == "")
316          {
317            link l(k) = "MPtcp:launch+" --MPhost "+server;
318          }
319          else
320          {
321            link l(k) = "MPtcp:launch+" --MPhost "+server
322              +" --MPapplication "+program;
323          }
324        }
325      }
326      open(l(k));
327      k++;
328    }
329  }
330  list links = list(l(1..nlinks));
331
332  // start a memory watchdog if needed
333  if(havemaxmem)
334  {
335    if(linktype == "ssi")
336    {
337      link mempatrol = "ssi:fork";
338    }
339    else
340    {
341      link mempatrol = "MPtcp:fork";
342    }
343    open(mempatrol);
344    write(mempatrol, quote(watchlinks()));
345    links = insert(links, mempatrol, nlinks);
346  }
347  int nkilled;   // the number of jobs killed by the mempatrol
348
349  // distribute work to the links
350  k = 1; // from here on, k is the index of the next job which must be
351         // distributed to some link
352  intvec assignment = 0:nlinks;  // link number i is currently doing
353                                 // job number assignment[i]
354  int pid;
355  for(i = 1; i <= nlinks; i++)
356  {
357    write(l(i), quote(execute("option(noredefine);")));
358    read(l(i));
359    write(l(i), quote(execute("def result;")));
360    read(l(i));
361    write(l(i), quote(execute("list currentargs;")));
362    read(l(i));
363    if(status(l(i), "mode", "fork"))
364    {
365      write(l(i), quote(currentargs = args[eval(k)]));
366    }
367    else
368    {
369      write(l(i), quote(currentargs = eval(args[k])));
370    }
371    read(l(i));
372    if(maxmems[k] > 0)
373    {
374      m = i;
375      for(j = 1; j <= nservers; j++)
376      {
377        if(servers[j][2] > m)
378        {
379          server = servers[j][1];
380          break;
381        }
382        else
383        {
384          m = m-servers[j][2];
385        }
386      }
387      write(l(i), quote(system("pid")));
388      pid = read(l(i));
389      write(mempatrol, list(server, pid, i, maxmems[k]));
390    }
391    write(l(i), quote(execute("result = "+eval(commands[k])
392      +"("+argsToString("currentargs", size(currentargs))+");")));
393    assignment[i] = k;
394    k++;
395  }
396
397  // distribute the rest of the work
398  list results;
399  for(i = njobs; i > 0; i--)
400  {
401    results[i] = list();  // TODO: What if the result of one of the commands is
402                          // list()?
403  }
404  int nfinished;  // the number of finished jobs
405  int wait;   // the index of the link which is finished, or 0 for timeout
406  while(k <= njobs && nfinished < N-1)
407  {
408    if(timeout == 0)
409    {
410      wait = waitfirst(links);
411    }
412    else
413    {
414      tt = timeout-(rtimer-t);
415      if(tt < 0)
416      {
417        wait = waitfirst(links, 0);
418        wait = 0;
419      }
420      else
421      {
422        wait = waitfirst(links, tt);
423      }
424    }
425    if(wait == -1)
426    {
427      ERROR("All links crashed.");
428    }
429    if(wait)
430    {
431      if(wait == nlinks+1)
432      {
433        wait = read(mempatrol);
434        close(l(wait));
435        open(l(wait));
436        results[assignment[wait]] = "out of memory";
437        nkilled++;
438      }
439      else
440      {
441        read(l(wait));
442        write(l(wait), quote(result));
443        results[assignment[wait]] = read(l(wait));
444        if(maxmems[assignment[wait]] > 0)
445        {
446          write(mempatrol, assignment[wait]);
447        }
448        nfinished++;
449      }
450      if(status(l(wait), "mode", "fork"))
451      {
452        write(l(wait), quote(currentargs = args[eval(k)]));
453      }
454      else
455      {
456        write(l(wait), quote(currentargs = eval(args[k])));
457      }
458      read(l(wait));
459      if(maxmems[k] > 0)
460      {
461        m = wait;
462        for(j = 1; j <= nservers; j++)
463        {
464          if(servers[j][2] > m)
465          {
466            server = servers[j][1];
467            break;
468          }
469          else
470          {
471            m = m-servers[j][2];
472          }
473        }
474        write(l(wait), quote(system("pid")));
475        pid = read(l(wait));
476        write(mempatrol, list(server, pid, wait, maxmems[k]));
477      }
478      write(l(wait), quote(execute("def result = "+eval(commands[k])
479        +"("+argsToString("currentargs", size(currentargs))+");")));
480      assignment[wait] = k;
481      k++;
482    }
483    else
484    {
485      break;
486    }
487  }
488
489  // collect the rest of the results
490  while(nfinished < N && nfinished+nkilled < njobs)
491  {
492    if(timeout == 0)
493    {
494      wait = waitfirst(links);
495    }
496    else
497    {
498      tt = timeout-(rtimer-t);
499      if(tt < 0)
500      {
501        wait = waitfirst(links, 0);
502        wait = 0;
503      }
504      else
505      {
506        wait = waitfirst(links, tt);
507      }
508    }
509    if(wait == -1)
510    {
511      ERROR("All links crashed.");
512    }
513    if(wait)
514    {
515      if(wait == nlinks+1)
516      {
517        wait = read(mempatrol);
518        close(l(wait));
519        results[assignment[wait]] = "out of memory";
520        nkilled++;
521      }
522      else
523      {
524        read(l(wait));
525        write(l(wait), quote(result));
526        results[assignment[wait]] = read(l(wait));
527        if(maxmems[assignment[wait]] > 0)
528        {
529          write(mempatrol, assignment[wait]);
530        }
531        nfinished++;
532      }
533    }
534    else
535    {
536      break;
537    }
538  }
539
540  //close all links
541  for(i = 1; i <= nlinks; i++)
542  {
543    if(status(l(i), "read", "ready"))
544    {
545      read(l(i));
546      write(l(i), quote(result));
547      results[assignment[i]] = read(l(i));
548    }
549    close(l(i));
550  }
551  if(havemaxmem)
552  {
553    close(mempatrol);
554  }
555
556  system("--ticks-per-sec", oldtimerresolution);
557  return(results);
558}
559example
560{
561  "EXAMPLE:"; echo = 2;
562  LIB "primdec.lib";
563  ring r = 0, (x,y,z), lp;
564  ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
565  ideal j = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4;
566  list commands = list("std", "primdecGTZ", "primdecSY",
567                       "std", "primdecGTZ", "primdecSY");
568  list args = list(list(i), list(i), list(i), list(j), list(j), list(j));
569  parallelWaitN(commands, args, 3);
570}
571
572proc parallelWaitFirst(list commands, list args, list #)
573"USAGE:  parallelWaitFirst(commands, args[, timeout, linktype, servers,
574         maxmemory]); commands list, args list, timeout int, linktype string,
575         servers list, maxmemory intvec
576RETURN:  a list, containing at least one (if no timeout occurs) of the results
577         of commands[i] applied to arg[i], i = 1, ..., size(commands).
578         @* The command
579         @code{parallelWaitFirst(list commands, list args, list #)} is
580         synonymous to
581         @code{parallelWaitN(list commands, list args, 1, list #)}. See
582         @ref{parallelWaitN} for details on optional arguments and other
583         remarks.
584SEE ALSO: Ssi links, MP links, waitfirst
585KEYWORDS: parallelWaitFirst; Parallelization; Links, user interface;
586          Skeletons for parallelization; Distributed computing
587EXAMPLE:  @code{example parallelWaitFirst;} shows an example."
588{
589  return(parallelWaitN(commands, args, 1, #));
590}
591example
592{
593  "EXAMPLE:"; echo = 2;
594  LIB "primdec.lib";
595  ring r = 0, (x,y,z), lp;
596  ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
597  list commands = list("primdecGTZ", "primdecSY");
598  list args = list(list(i), list(i));
599  parallelWaitFirst(commands, args);
600}
601
602proc parallelWaitAll(def commands, list args, list #)
603"USAGE:  parallelWaitAll(commands, args[, timeout, linktype, servers,
604         maxmemory]); commands list or string, args list, timeout int,
605         linktype string, servers list, maxmemory intvec
606RETURN:  a list, containing the results of commands[i] applied to arg[i],
607         i = 1, ..., size(commands).
608         @* The command
609         @code{parallelWaitAll(list commands, list args, list #)} is
610         synonymous to
611         @code{parallelWaitN(list commands, list args, size(args), list #)}. See
612         @ref{parallelWaitN} for details on optional arguments and other
613         remarks.
614         If commands is of type string, this is a shortcut for a list of size
615         @code{size(args)} whose entries are just this string.
616SEE ALSO: Ssi links, MP links, waitall
617KEYWORDS: parallelWaitAll; Parallelization; Links, user interface;
618          Skeletons for parallelization; Distributed computing
619EXAMPLE:  @code{example parallelWaitAll;} shows an example."
620{
621  if(typeof(commands) != "list" && typeof(commands) != "string")
622  {
623    ERROR("invalid type of first argument");
624  }
625  if(typeof(commands) == "list")
626  {
627    return(parallelWaitN(commands, args, size(args), #));
628  }
629  else
630  {
631    list cmds;
632    for(int i = size(args); i > 0; i--)
633    {
634      cmds[i] = commands;
635    }
636    return(parallelWaitN(cmds, args, size(args), #));
637  }
638}
639example
640{
641  "EXAMPLE:"; echo = 2;
642  ring r = 0, (x,y,z), dp;
643  ideal i1 = z8+z6+4z5+4z3+4z2+4, y-z2;
644  ideal i2 = x10+x9y2, y8-x2y7;
645  ideal i3 = x3-2xy, x2y-2y2+x;
646  string command = "std";
647  list args = list(list(i1), list(i2), list(i3));
648  parallelWaitAll(command, args);
649}
650
651// TODO
652/// http://arxiv.org/abs/1005.5663v2
653static proc doModular(command, args, proc deleteUnlucksPrimes, proc testInChar0)
654{
655}
656
657// TODO
658/* worker farm */
659static proc Create() {}
660
661/* auxiliary procedures */
662static proc watchlinks()
663{
664  list parent = list(mempatrol);
665  list watchedlinks;
666  int wait;
667  int i, sys;
668  while(1)
669  {
670    if(size(watchedlinks) == 0)
671    {
672      wait = waitall(parent);
673    }
674    else
675    {
676      wait = waitall(parent, 10000);
677    }
678    if(wait == -1)
679    {
680      ERROR("The main process crashed.");
681    }
682    if(wait)
683    {
684      def query = read(mempatrol);
685      if(typeof(query) == "list")
686      {
687        watchedlinks = insert(watchedlinks, query);
688      }
689      else // in this case, typeof(query) is assumed to be "int", the
690           // index of the link
691      {
692        for(i = size(watchedlinks); i > 0; i--)
693        {
694          if(watchedlinks[i][3] == query)
695          {
696            watchedlinks = delete(watchedlinks, i);
697            break;
698          }
699        }
700      }
701    }
702    for(i = size(watchedlinks); i > 0; i--)
703    {
704      if(getusedmemory(watchedlinks[i][1], watchedlinks[i][2])
705           > watchedlinks[i][4])
706      {
707        if(watchedlinks[i][1] == "localhost")
708        {
709          sys = system("sh", "kill "+string(watchedlinks[i][2]));
710        }
711        else
712        {
713          sys = system("sh", "ssh "+watchedlinks[i][1]+" kill "
714                             +string(watchedlinks[i][2]));
715        }
716        write(mempatrol, watchedlinks[i][3]);
717        watchedlinks = delete(watchedlinks, i);
718      }
719    }
720  }
721}
722
723static proc getusedmemory(string server, int pid)
724{
725  string s;
726  if(server == "localhost")
727  {
728    s = read("|: grep VmSize /proc/"+string(pid)+"/status "+
729             "| awk '{ print $2; }'");
730  }
731  else
732  {
733    s = read("|: ssh "+server+" grep VmSize /proc/"+string(pid)+"/status "+
734             "| awk '{ print $2; }'");
735  }
736  bigint b;
737  execute("b = "+s+";");
738  int i = int(b/1000);
739  return(i);
740}
741
742static proc argsToString(string name, int length)
743{
744  string arglist;
745  if(length > 0) {
746    arglist = name+"[1]";
747  }
748  int i;
749  for(i = 2; i <= length; i++) {
750    arglist = arglist+", "+name+"["+string(i)+"]";
751  }
752  return(arglist);
753}
Note: See TracBrowser for help on using the repository browser.