source: git/Singular/LIB/parallel.lib @ 4c20ee

spielwiese
Last change on this file since 4c20ee was 4c20ee, checked in by Hans Schoenemann <hannes@…>, 11 years ago
fix: new version numbers for libs
  • Property mode set to 100644
File size: 19.1 KB
Line 
1////////////////////////////////////////////////////////////////////
2version="version parallel.lib 4.0.0.0 Jun_2013 ";
3category="General purpose";
4info="
5LIBRARY:   parallel.lib  Tools for Parallelization
6AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
7
8OVERVIEW:
9This library provides tools to do several computations in parallel. They
10are aimed at ordinary Singular users as well as authors of Singular
11libraries.
12@* Even without this library, it is possible to do execute self-defined
13Singular commands in parallel using @ref{link}, but the handling of
14such links can be quite tedious. With the pocedures described below,
15this can be done by one-line commands.
16@* There are many parallel 'skeletons' (i.e. ways in which parallel
17tasks rely upon and interact with each other). A few of them are already
18implemented. Future plans include an abstraction layer for modular
19techniques, 'worker farms', and parallel tests.
20
21SEE ALSO:  link, modstd_lib, assprimeszerodim_lib
22
23KEYWORDS:  parallel.lib; Parallelization; Links, user interface;
24           Skeletons for parallelization; Distributed computing
25
26PROCEDURES:
27  parallelWaitN(...)     execute several jobs in parallel,
28                         wait for N of them to finish
29  parallelWaitFirst(...) execute several jobs in parallel,
30                         wait for the first to finish
31  parallelWaitAll(...)   execute several jobs in parallel,
32                         wait for all of them to finish
33";
34
35proc parallelWaitN(list commands, list args, int N, list #)
36"USAGE:  parallelWaitN(commands, args, N[, timeout, linktype, servers,
37         maxmemory]); commands list, args list, N int, timeout int,
38         linktype string, servers list, maxmemory intvec
39RETURN:  a list, containing the results of commands[i] applied to arg[i],
40         i = 1, ..., size(commands).
41         @* The procedure waits for N jobs to finish.
42
43         @* OPTIONAL PARAMETERS:
44
45            An optional timeout in ms can be provided. Default is 0 which
46            disables the timeout.
47
48            Supported linktypes are up to now \"ssi\" and \"mp\", see
49            @ref{Ssi links}.
50
51            Servers:
52         @* Each server is given by a list containing the address of the server,
53            the number of cores to use on this server and the command to start
54            Singular.
55         @* If the address is \"localhost\", the processes will be generated on
56            the same machine using forks. If the command to start Singular is
57            \"\" (the empty string), \"Singular\" will be used.
58         @* Default is @code{list(\"localhost\", system(\"cpu\"), \"\")}.
59         @* There are some obvious shortcuts for servers, e.g. \"myserver\" is
60            a shortcut for
61            @code{list(\"myserver\", [nb. of cores on myserver], \"\")}, or 3
62            for @code{list(\"localhost\", 3, \"\")}.
63
64            Memory limits:
65         @* If an intvec maxmemory of size @code{size(commands)} is given, the
66            i-th job will be killed if it uses more than maxmemory[i] MB of
67            memory. If maxmemory[i] is 0, there will be no restraint for the
68            i-th job. Default is @code{0:size(commands)}.
69NOTE:       The entries of the list commands must be strings.
70         @* The entries of the list args must be lists.
71         @* The returned list may contain more than N results if several jobs
72            finished \"at the same time\". It may contain less than N results in
73            the case of timeout or errors occurring.
74         @* The check whether the jobs exceed the memory sizes given by
75            maxmemory is only done from time to time. This feature is
76            experimental and should be used with care.
77SEE ALSO: Ssi links, waitfirst, waitall
78KEYWORDS: parallelWaitN; Parallelization; Links, user interface;
79          Skeletons for parallelization; Distributed computing
80EXAMPLE:  @code{example parallelWaitN;} shows an example."
81{
82  // initialize the timer
83  int oldtimerresolution = system("--ticks-per-sec");
84  system("--ticks-per-sec", 1000);
85  int t = rtimer;
86
87  // auxiliary variables
88  int i, j, m, tt;
89
90  // read optional parameters
91  list defaultserver = list("localhost", system("cpu"), "");
92  list defaults = list(0, "ssi", list(defaultserver), 0:size(commands));
93  for(i = 1; i <= size(defaults); i++)
94  {
95    if(typeof(#[i]) != typeof(defaults[i]))
96    {
97      # = insert(#, defaults[i], i-1);
98    }
99  }
100  if(size(#) != size(defaults))
101  {
102    ERROR("wrong optional parameters");
103  }
104  for(j = size(#[3]); j > 0; j--)
105  {
106    if(typeof(#[3][j][1]) != typeof(defaultserver[1]))
107    {
108      #[3][j] = insert(#[3][j], defaultserver[1], 0);
109    }
110    defaultserver[3] = "";
111    // only for ssi:tcp links, the default program is system("Singular")
112    if(#[2] == "ssi" && #[3][j][1] != "localhost")
113    {
114      defaultserver[3] = system("Singular");
115    }
116    for(i = 2; i <= size(defaultserver); i++)
117    {
118      if(typeof(#[3][j][i]) != typeof(defaultserver[i]))
119      {
120        #[3][j] = insert(#[3][j], defaultserver[i], i-1);
121      }
122    }
123    if(size(#[3][j]) != size(defaultserver))
124    {
125      ERROR("wrong declaration for server no. "+string(j));
126    }
127  }
128  int timeout = #[1];
129  string linktype = #[2];
130  list servers = #[3];
131  intvec maxmems = #[4];
132
133  // error checking
134  int njobs = size(commands);
135  if(njobs != size(args))
136  {
137    ERROR("The number of commands does not match the number of lists"
138         +newline+"of arguments.");
139  }
140  if(njobs == 0)
141  {
142    ERROR("no commands specified");
143  }
144  for(i = 1; i <= njobs; i++)
145  {
146    if(typeof(commands[i]) != "string")
147    {
148      ERROR("The first argument is not a list of strings.");
149    }
150    if(typeof(args[i]) != "list")
151    {
152      ERROR("The second argument is not a list of lists.");
153    }
154  }
155  if(N < 0)
156  {
157    ERROR("The number of jobs which you want to wait for is negative.");
158  }
159  if(N > njobs)
160  {
161    ERROR("The number of jobs which you wnat to wait for is greater"
162         +newline+"than the number of jobs itself.");
163  }
164  if(timeout < 0)
165  {
166    ERROR("The given timeout is negative.");
167  }
168  if(linktype != "ssi" && linktype != "mp")
169  {
170    ERROR("The given linktype is not recognized.");
171  }
172  int nservers = size(servers);
173  if(nservers <= 0)
174  {
175    ERROR("no server specified");
176  }
177  for(i = 1; i <= nservers; i++)
178  {
179    if(servers[i][1] != "localhost")
180    {
181      if(system("sh", "ssh "+servers[i][1]+" exit"))
182      {
183        ERROR("Could not connect to server \""+servers[i][1]+"\"");
184      }
185    }
186    if(servers[i][2] < 0)
187    {
188      ERROR("The number of cores to be used on server \""+servers[i][1]+"\""
189           +newline+" is negative.");
190    }
191    if(servers[i][1] == "localhost")
192    {
193      int ncpus(i) = system("cpu");
194    }
195    else
196    {
197      //if(linktype == "ssi")
198      //{
199        link lcpu(i) = "ssi:tcp "+servers[i][1]+":"+servers[i][3];
200      //}
201      open(lcpu(i));
202      write(lcpu(i), quote(system("cpu")));
203      int ncpus(i) = read(lcpu(i));
204      close(lcpu(i));
205      kill lcpu(i);
206    }
207    if(servers[i][2] == 0)
208    {
209      servers[i][2] = ncpus(i);
210    }
211    else
212    {
213      if(servers[i][2] > ncpus(i))
214      {
215        ERROR("The number of cores to use on server \""+servers[i][1]+"\""
216             +newline+"is greater than the number of available cores");
217      }
218    }
219    if(servers[i][1] != "localhost")
220    {
221      if(system("sh", "ssh "+servers[i][1]+
222                      " 'test -e `which "+servers[i][3]+"`'"))
223      {
224        ERROR("\""+servers[i][3]+"\" was not found on"
225             +"\""+servers[i][1]+"\".");
226      }
227    }
228  }
229  if(size(maxmems) != njobs)
230  {
231    ERROR("The size of the intvec which specifies the maximal amount of memory"
232         +newline+"to be used for each job does not match the number of jobs.");
233  }
234  int havemaxmem;
235  for(i = 1; i <= njobs; i++)
236  {
237    if(maxmems[i] < 0)
238    {
239      ERROR("The maximal amount of memory to be used for job no. "+string(i)
240           +"is negative.");
241    }
242    havemaxmem = havemaxmem+maxmems[i];
243  }
244
245  // skip those cores which won't be needed
246  int nlinks;
247  for(i = 1; i <= nservers; i++)
248  {
249    if(nlinks+servers[i][2] <= njobs)
250    {
251      nlinks = nlinks+servers[i][2];
252    }
253    else
254    {
255      if(nlinks == njobs)
256      {
257        servers = list(servers[1..(i-1)]);
258      }
259      else
260      {
261        servers = list(servers[1..i]);
262        servers[i][2] = njobs-nlinks;
263        nlinks = njobs;
264      }
265      nservers = size(servers);
266    }
267  }
268
269  // open the links
270  string server;
271  int ncores;
272  string program;
273  int k = 1;   // the index of the link
274  for(i = 1; i <= nservers; i++)
275  {
276    server = servers[i][1];
277    ncores = servers[i][2];
278    program = servers[i][3];
279    for(j = 1; j <= ncores; j++)
280    {
281      if(server == "localhost")
282      {
283        //if(linktype == "ssi")
284        //{
285          link l(k) = "ssi:fork";
286        //}
287      }
288      else
289      {
290        //if(linktype == "ssi")
291        //{
292          link l(k) = "ssi:tcp "+server+":"+program;
293        //}
294      }
295      open(l(k));
296      k++;
297    }
298  }
299  list links = list(l(1..nlinks));
300
301  // start a memory watchdog if needed
302  if(havemaxmem)
303  {
304    //if(linktype == "ssi")
305    //{
306      link mempatrol = "ssi:fork";
307    //}
308    open(mempatrol);
309    write(mempatrol, quote(watchlinks()));
310    links = insert(links, mempatrol, nlinks);
311  }
312  int nkilled;   // the number of jobs killed by the mempatrol
313
314  // distribute work to the links
315  k = 1; // from here on, k is the index of the next job which must be
316         // distributed to some link
317  intvec assignment = 0:nlinks;  // link number i is currently doing
318                                 // job number assignment[i]
319  int pid;
320  for(i = 1; i <= nlinks; i++)
321  {
322    write(l(i), quote(execute("option(noredefine);")));
323    read(l(i));
324    write(l(i), quote(execute("def result;")));
325    read(l(i));
326    write(l(i), quote(execute("list currentargs;")));
327    read(l(i));
328    if(status(l(i), "mode", "fork"))
329    {
330      write(l(i), quote(currentargs = args[eval(k)]));
331    }
332    else
333    {
334      write(l(i), quote(currentargs = eval(args[k])));
335    }
336    read(l(i));
337    if(maxmems[k] > 0)
338    {
339      m = i;
340      for(j = 1; j <= nservers; j++)
341      {
342        if(servers[j][2] > m)
343        {
344          server = servers[j][1];
345          break;
346        }
347        else
348        {
349          m = m-servers[j][2];
350        }
351      }
352      write(l(i), quote(system("pid")));
353      pid = read(l(i));
354      write(mempatrol, list(server, pid, i, maxmems[k]));
355    }
356    write(l(i), quote(execute("result = "+eval(commands[k])
357      +"("+argsToString("currentargs", size(currentargs))+");")));
358    assignment[i] = k;
359    k++;
360  }
361
362  // distribute the rest of the work
363  list results;
364  for(i = njobs; i > 0; i--)
365  {
366    results[i] = list();  // TODO: What if the result of one of the commands is
367                          // list()?
368  }
369  int nfinished;  // the number of finished jobs
370  int wait;   // the index of the link which is finished, or 0 for timeout
371  while(k <= njobs && nfinished < N-1)
372  {
373    if(timeout == 0)
374    {
375      wait = waitfirst(links);
376    }
377    else
378    {
379      tt = timeout-(rtimer-t);
380      if(tt < 0)
381      {
382        wait = waitfirst(links, 0);
383        wait = 0;
384      }
385      else
386      {
387        wait = waitfirst(links, tt);
388      }
389    }
390    if(wait == -1)
391    {
392      ERROR("All links crashed.");
393    }
394    if(wait)
395    {
396      if(wait == nlinks+1)
397      {
398        wait = read(mempatrol);
399        close(l(wait));
400        open(l(wait));
401        results[assignment[wait]] = "out of memory";
402        nkilled++;
403      }
404      else
405      {
406        read(l(wait));
407        write(l(wait), quote(result));
408        results[assignment[wait]] = read(l(wait));
409        if(maxmems[assignment[wait]] > 0)
410        {
411          write(mempatrol, assignment[wait]);
412        }
413        nfinished++;
414      }
415      if(status(l(wait), "mode", "fork"))
416      {
417        write(l(wait), quote(currentargs = args[eval(k)]));
418      }
419      else
420      {
421        write(l(wait), quote(currentargs = eval(args[k])));
422      }
423      read(l(wait));
424      if(maxmems[k] > 0)
425      {
426        m = wait;
427        for(j = 1; j <= nservers; j++)
428        {
429          if(servers[j][2] > m)
430          {
431            server = servers[j][1];
432            break;
433          }
434          else
435          {
436            m = m-servers[j][2];
437          }
438        }
439        write(l(wait), quote(system("pid")));
440        pid = read(l(wait));
441        write(mempatrol, list(server, pid, wait, maxmems[k]));
442      }
443      write(l(wait), quote(execute("def result = "+eval(commands[k])
444        +"("+argsToString("currentargs", size(currentargs))+");")));
445      assignment[wait] = k;
446      k++;
447    }
448    else
449    {
450      break;
451    }
452  }
453
454  // collect the rest of the results
455  while(nfinished < N && nfinished+nkilled < njobs)
456  {
457    if(timeout == 0)
458    {
459      wait = waitfirst(links);
460    }
461    else
462    {
463      tt = timeout-(rtimer-t);
464      if(tt < 0)
465      {
466        wait = waitfirst(links, 0);
467        wait = 0;
468      }
469      else
470      {
471        wait = waitfirst(links, tt);
472      }
473    }
474    if(wait == -1)
475    {
476      ERROR("All links crashed.");
477    }
478    if(wait)
479    {
480      if(wait == nlinks+1)
481      {
482        wait = read(mempatrol);
483        close(l(wait));
484        results[assignment[wait]] = "out of memory";
485        nkilled++;
486      }
487      else
488      {
489        read(l(wait));
490        write(l(wait), quote(result));
491        results[assignment[wait]] = read(l(wait));
492        if(maxmems[assignment[wait]] > 0)
493        {
494          write(mempatrol, assignment[wait]);
495        }
496        nfinished++;
497      }
498    }
499    else
500    {
501      break;
502    }
503  }
504
505  //close all links
506  for(i = 1; i <= nlinks; i++)
507  {
508    if(status(l(i), "read", "ready"))
509    {
510      read(l(i));
511      write(l(i), quote(result));
512      results[assignment[i]] = read(l(i));
513    }
514    close(l(i));
515  }
516  if(havemaxmem)
517  {
518    close(mempatrol);
519  }
520
521  system("--ticks-per-sec", oldtimerresolution);
522  return(results);
523}
524example
525{
526  "EXAMPLE:"; echo = 2;
527  LIB "primdec.lib";
528  ring r = 0, (x,y,z), lp;
529  ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
530  ideal j = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4;
531  list commands = list("std", "primdecGTZ", "primdecSY",
532                       "std", "primdecGTZ", "primdecSY");
533  list args = list(list(i), list(i), list(i), list(j), list(j), list(j));
534  parallelWaitN(commands, args, 3);
535}
536
537proc parallelWaitFirst(list commands, list args, list #)
538"USAGE:  parallelWaitFirst(commands, args[, timeout, linktype, servers,
539         maxmemory]); commands list, args list, timeout int, linktype string,
540         servers list, maxmemory intvec
541RETURN:  a list, containing at least one (if no timeout occurs) of the results
542         of commands[i] applied to arg[i], i = 1, ..., size(commands).
543         @* The command
544         @code{parallelWaitFirst(list commands, list args, list #)} is
545         synonymous to
546         @code{parallelWaitN(list commands, list args, 1, list #)}. See
547         @ref{parallelWaitN} for details on optional arguments and other
548         remarks.
549SEE ALSO: Ssi links, waitfirst
550KEYWORDS: parallelWaitFirst; Parallelization; Links, user interface;
551          Skeletons for parallelization; Distributed computing
552EXAMPLE:  @code{example parallelWaitFirst;} shows an example."
553{
554  return(parallelWaitN(commands, args, 1, #));
555}
556example
557{
558  "EXAMPLE:"; echo = 2;
559  LIB "primdec.lib";
560  ring r = 0, (x,y,z), lp;
561  ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
562  list commands = list("primdecGTZ", "primdecSY");
563  list args = list(list(i), list(i));
564  parallelWaitFirst(commands, args);
565}
566
567proc parallelWaitAll(def commands, list args, list #)
568"USAGE:  parallelWaitAll(commands, args[, timeout, linktype, servers,
569         maxmemory]); commands list or string, args list, timeout int,
570         linktype string, servers list, maxmemory intvec
571RETURN:  a list, containing the results of commands[i] applied to arg[i],
572         i = 1, ..., size(commands).
573         @* The command
574         @code{parallelWaitAll(list commands, list args, list #)} is
575         synonymous to
576         @code{parallelWaitN(list commands, list args, size(args), list #)}. See
577         @ref{parallelWaitN} for details on optional arguments and other
578         remarks.
579         If commands is of type string, this is a shortcut for a list of size
580         @code{size(args)} whose entries are just this string.
581SEE ALSO: Ssi links, waitall
582KEYWORDS: parallelWaitAll; Parallelization; Links, user interface;
583          Skeletons for parallelization; Distributed computing
584EXAMPLE:  @code{example parallelWaitAll;} shows an example."
585{
586  if(typeof(commands) != "list" && typeof(commands) != "string")
587  {
588    ERROR("invalid type of first argument");
589  }
590  if(typeof(commands) == "list")
591  {
592    return(parallelWaitN(commands, args, size(args), #));
593  }
594  else
595  {
596    list cmds;
597    for(int i = size(args); i > 0; i--)
598    {
599      cmds[i] = commands;
600    }
601    return(parallelWaitN(cmds, args, size(args), #));
602  }
603}
604example
605{
606  "EXAMPLE:"; echo = 2;
607  ring r = 0, (x,y,z), dp;
608  ideal i1 = z8+z6+4z5+4z3+4z2+4, y-z2;
609  ideal i2 = x10+x9y2, y8-x2y7;
610  ideal i3 = x3-2xy, x2y-2y2+x;
611  string command = "std";
612  list args = list(list(i1), list(i2), list(i3));
613  parallelWaitAll(command, args);
614}
615
616// TODO
617/// http://arxiv.org/abs/1005.5663v2
618static proc doModular(command, args, proc deleteUnlucksPrimes, proc testInChar0)
619{
620}
621
622// TODO
623/* worker farm */
624static proc Create() {}
625
626/* auxiliary procedures */
627static proc watchlinks()
628{
629  list parent = list(mempatrol);
630  list watchedlinks;
631  int wait;
632  int i, sys;
633  while(1)
634  {
635    if(size(watchedlinks) == 0)
636    {
637      wait = waitall(parent);
638    }
639    else
640    {
641      wait = waitall(parent, 10000);
642    }
643    if(wait == -1)
644    {
645      ERROR("The main process crashed.");
646    }
647    if(wait)
648    {
649      def query = read(mempatrol);
650      if(typeof(query) == "list")
651      {
652        watchedlinks = insert(watchedlinks, query);
653      }
654      else // in this case, typeof(query) is assumed to be "int", the
655           // index of the link
656      {
657        for(i = size(watchedlinks); i > 0; i--)
658        {
659          if(watchedlinks[i][3] == query)
660          {
661            watchedlinks = delete(watchedlinks, i);
662            break;
663          }
664        }
665      }
666    }
667    for(i = size(watchedlinks); i > 0; i--)
668    {
669      if(getusedmemory(watchedlinks[i][1], watchedlinks[i][2])
670           > watchedlinks[i][4])
671      {
672        if(watchedlinks[i][1] == "localhost")
673        {
674          sys = system("sh", "kill "+string(watchedlinks[i][2]));
675        }
676        else
677        {
678          sys = system("sh", "ssh "+watchedlinks[i][1]+" kill "
679                             +string(watchedlinks[i][2]));
680        }
681        write(mempatrol, watchedlinks[i][3]);
682        watchedlinks = delete(watchedlinks, i);
683      }
684    }
685  }
686}
687
688static proc getusedmemory(string server, int pid)
689{
690  string s;
691  if(server == "localhost")
692  {
693    s = read("|: grep VmSize /proc/"+string(pid)+"/status "+
694             "| awk '{ print $2; }'");
695  }
696  else
697  {
698    s = read("|: ssh "+server+" grep VmSize /proc/"+string(pid)+"/status "+
699             "| awk '{ print $2; }'");
700  }
701  bigint b;
702  execute("b = "+s+";");
703  int i = int(b/1000);
704  return(i);
705}
706
707static proc argsToString(string name, int length)
708{
709  string arglist;
710  if(length > 0) {
711    arglist = name+"[1]";
712  }
713  int i;
714  for(i = 2; i <= length; i++) {
715    arglist = arglist+", "+name+"["+string(i)+"]";
716  }
717  return(arglist);
718}
Note: See TracBrowser for help on using the repository browser.