source: git/Singular/LIB/parallel.lib @ 5f0de8

jengelh-datetimespielwiese
Last change on this file since 5f0de8 was 5f0de8, checked in by Andreas Steenpaß <steenpas@…>, 12 years ago
parallel.lib git-svn-id: file:///usr/local/Singular/svn/trunk@14356 2c84dea3-7e68-4137-9b89-c4e89433aadc
  • Property mode set to 100644
File size: 19.1 KB
Line 
1////////////////////////////////////////////////////////////////////
2version="$Id$";
3category="General purpose";
4info="
5LIBRARY:   parallel.lib  An Interface for Parallelization
6AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
7
8SEE ALSO:  link, modstd_lib, assprimeszerodim_lib
9
10KEYWORDS:  parallel.lib; Parallelization; Links, user interface;
11           Skeletons for parallelization; Distributed computing
12
13PROCEDURES:
14  parallelWaitN(...)     execute several jobs in parallel,
15                         wait for N of them to finish
16  parallelWaitFirst(...) execute several jobs in parallel,
17                         wait for the first to finish
18  parallelWaitAll(...)   execute several jobs in parallel,
19                         wait for all of them to finish
20";
21
22proc parallelWaitN(list commands, list args, int N, list #)
23"USAGE:  parallelWaitN(commands, args, N[, timeout, linktype, servers,
24         maxmemory]); commands list, args list, N int, timeout int,
25         linktype string, servers list, maxmemory intvec
26RETURN:  a list, containing the results of commands[i] applied to arg[i],
27         i = 1, ..., size(commands).
28         @* The entries of the list commands must be strings.
29         @* The entries of the list args must be lists.
30         @* The procedure waits for N jobs to finish.
31         @* An optional timeout in ms can be provided. Default is 0 which
32            disables the timeout.
33         @* Supported linktypes are up to now \"ssi\" and \"mp\", see
34            @ref{Ssi links} and @ref{MP links}.
35         @* Each server is given by a list containing the address of the server,
36            the number of cores to use on this server and the command to start
37            Singular. If the address is \"localhost\", the processes will
38            be generated on the same machine using forks. If the command to
39            start Singular is \"\" (the empty string), \"Singular\" will be
40            used. Default is @code{list(\"localhost\", system(\"cpu\"), \"\")}.
41            There are some obvious shortcuts for servers, e.g. \"myserver\" is
42            a shortcut for
43            @code{list(\"myserver\", [nb. of cores on myserver], \"\")}, or 3
44            for @code{list(\"localhost\", 3, \"\")}.
45         @* If an intvec maxmemory of size @code{size(commands)} is given, the
46            i-th job will be killed if it uses more than maxmemory[i] MB of
47            memory. If maxmemory[i] is 0, there will be no restraint for the
48            i-th job. Default is @code{0:size(commands)}.
49NOTE:       The returned list may contain more than N results if several jobs
50            finished \"at the same time\". It may contain less than N results in
51            the case of timeout or errors occurring.
52         @* MP links do not work with a 64 bit version of Singular. If you want
53            to use MP links, make sure that MP is available. This can be checked
54            by the Singular command @code{system(\"with\", \"MP\");}.
55         @* The check whether the jobs exceed the memory sizes given by
56            maxmemory is only done from time to time. This feature is
57            experimental and should be used with care.
58SEE ALSO: Ssi links, MP links, waitfirst, waitall
59KEYWORDS: parallelWaitN; Parallelization; Links, user interface;
60          Skeletons for parallelization; Distributed computing
61EXAMPLE:  @code{example parallelWaitN;} shows an example."
62{
63  // initialize the timer
64  int oldtimerresolution = system("--ticks-per-sec");
65  system("--ticks-per-sec", 1000);
66  int t = rtimer;
67
68  // auxiliary variables
69  int i, j, m, tt;
70
71  // read optional parameters
72  list defaultserver = list("localhost", system("cpu"), "");
73  list defaults = list(0, "ssi", list(defaultserver), 0:size(commands));
74  for(i = 1; i <= size(defaults); i++)
75  {
76    if(typeof(#[i]) != typeof(defaults[i]))
77    {
78      # = insert(#, defaults[i], i-1);
79    }
80  }
81  if(size(#) != size(defaults))
82  {
83    ERROR("wrong optional parameters");
84  }
85  for(j = size(#[3]); j > 0; j--)
86  {
87    if(typeof(#[3][j][1]) != typeof(defaultserver[1]))
88    {
89      #[3][j] = insert(#[3][j], defaultserver[1], 0);
90    }
91    defaultserver[3] = "";
92    // only for ssi:tcp links, the default program is system("Singular")
93    if(#[2] == "ssi" && #[3][j][1] != "localhost")
94    {
95      defaultserver[3] = system("Singular");
96    }
97    for(i = 2; i <= size(defaultserver); i++)
98    {
99      if(typeof(#[3][j][i]) != typeof(defaultserver[i]))
100      {
101        #[3][j] = insert(#[3][j], defaultserver[i], i-1);
102      }
103    }
104    if(size(#[3][j]) != size(defaultserver))
105    {
106      ERROR("wrong declaration for server no. "+string(j));
107    }
108  }
109  int timeout = #[1];
110  string linktype = #[2];
111  list servers = #[3];
112  intvec maxmems = #[4];
113
114  // error checking
115  int njobs = size(commands);
116  if(njobs != size(args))
117  {
118    ERROR("The number of commands does not match the number of lists"
119         +newline+"of arguments.");
120  }
121  if(njobs == 0)
122  {
123    ERROR("no commands specified");
124  }
125  for(i = 1; i <= njobs; i++)
126  {
127    if(typeof(commands[i]) != "string")
128    {
129      ERROR("The first argument is not a list of strings.");
130    }
131    if(typeof(args[i]) != "list")
132    {
133      ERROR("The second argument is not a list of lists.");
134    }
135  }
136  if(N < 0)
137  {
138    ERROR("The number of jobs which you want to wait for is negative.");
139  }
140  if(N > njobs)
141  {
142    ERROR("The number of jobs which you wnat to wait for is greater"
143         +newline+"than the number of jobs itself.");
144  }
145  if(timeout < 0)
146  {
147    ERROR("The given timeout is negative.");
148  }
149  if(linktype != "ssi" && linktype != "mp")
150  {
151    ERROR("The given linktype is not recognized.");
152  }
153  int nservers = size(servers);
154  if(nservers <= 0)
155  {
156    ERROR("no server specified");
157  }
158  for(i = 1; i <= nservers; i++)
159  {
160    if(servers[i][1] != "localhost")
161    {
162      if(system("sh", "ssh "+servers[i][1]+" exit"))
163      {
164        ERROR("Could not connect to server \""+servers[i][1]+"\"");
165      }
166    }
167    if(servers[i][2] < 0)
168    {
169      ERROR("The number of cores to be used on server \""+servers[i][1]+"\""
170           +newline+" is negative.");
171    }
172    if(servers[i][1] == "localhost")
173    {
174      int ncpus(i) = system("cpu");
175    }
176    else
177    {
178      if(linktype == "ssi")
179      {
180        link lcpu(i) = "ssi:tcp "+servers[i][1]+":"+servers[i][3];
181      }
182      else
183      {
184        if(program == "")
185        {
186          link lcpu(i) = "MPtcp:launch+" --MPhost "+servers[i][1];
187        }
188        else
189        {
190          link lcpu(i) = "MPtcp:launch+" --MPhost "+servers[i][1]
191            +" --MPapplication "+servers[i][3];
192        }
193      }
194      open(lcpu(i));
195      write(lcpu(i), quote(system("cpu")));
196      int ncpus(i) = read(lcpu(i));
197      close(lcpu(i));
198      kill lcpu(i);
199    }
200    if(servers[i][2] == 0)
201    {
202      servers[i][2] = ncpus(i);
203    }
204    else
205    {
206      if(servers[i][2] > ncpus(i))
207      {
208        ERROR("The number of cores to use on server \""+servers[i][1]+"\""
209             +newline+"is greater than the number of available cores");
210      }
211    }
212    if(servers[i][1] != "localhost")
213    {
214      if(system("sh", "ssh "+servers[i][1]+
215                      " 'test -e `which "+servers[i][3]+"`'"))
216      {
217        ERROR("\""+servers[i][3]+"\" was not found on"
218             +"\""+servers[i][1]+"\".");
219      }
220    }
221  }
222  if(size(maxmems) != njobs)
223  {
224    ERROR("The size of the intvec which specifies the maximal amount of memory"
225         +newline+"to be used for each job does not match the number of jobs.");
226  }
227  int havemaxmem;
228  for(i = 1; i <= njobs; i++)
229  {
230    if(maxmems[i] < 0)
231    {
232      ERROR("The maximal amount of memory to be used for job no. "+string(i)
233           +"is negative.");
234    }
235    havemaxmem = havemaxmem+maxmems[i];
236  }
237
238  // skip those cores which won't be needed
239  int nlinks;
240  for(i = 1; i <= nservers; i++)
241  {
242    if(nlinks+servers[i][2] <= njobs)
243    {
244      nlinks = nlinks+servers[i][2];
245    }
246    else
247    {
248      if(nlinks == njobs)
249      {
250        servers = list(servers[1..(i-1)]);
251      }
252      else
253      {
254        servers = list(servers[1..i]);
255        servers[i][2] = njobs-nlinks;
256        nlinks = njobs;
257      }
258      nservers = size(servers);
259    }
260  }
261
262  // open the links
263  string server;
264  int ncores;
265  string program;
266  int k = 1;   // the index of the link
267  for(i = 1; i <= nservers; i++)
268  {
269    server = servers[i][1];
270    ncores = servers[i][2];
271    program = servers[i][3];
272    for(j = 1; j <= ncores; j++)
273    {
274      if(server == "localhost")
275      {
276        if(linktype == "ssi")
277        {
278          link l(k) = "ssi:fork";
279        }
280        else
281        {
282          link l(k) = "MPtcp:fork";
283        }
284      }
285      else
286      {
287        if(linktype == "ssi")
288        {
289          link l(k) = "ssi:tcp "+server+":"+program;
290        }
291        else
292        {
293          if(program == "")
294          {
295            link l(k) = "MPtcp:launch+" --MPhost "+server;
296          }
297          else
298          {
299            link l(k) = "MPtcp:launch+" --MPhost "+server
300              +" --MPapplication "+program;
301          }
302        }
303      }
304      open(l(k));
305      k++;
306    }
307  }
308  list links = list(l(1..nlinks));
309
310  // start a memory watchdog if needed
311  if(havemaxmem)
312  {
313    if(linktype == "ssi")
314    {
315      link mempatrol = "ssi:fork";
316    }
317    else
318    {
319      link mempatrol = "MPtcp:fork";
320    }
321    open(mempatrol);
322    write(mempatrol, quote(watchlinks()));
323    links = insert(links, mempatrol, nlinks);
324  }
325  int nkilled;   // the number of jobs killed by the mempatrol
326
327  // distribute work to the links
328  k = 1; // from here on, k is the index of the next job which must be
329         // distributed to some link
330  intvec assignment = 0:nlinks;  // link number i is currently doing
331                                 // job number assignment[i]
332  int pid;
333  for(i = 1; i <= nlinks; i++)
334  {
335    write(l(i), quote(execute("option(noredefine);")));
336    read(l(i));
337    write(l(i), quote(execute("def result;")));
338    read(l(i));
339    write(l(i), quote(execute("list currentargs;")));
340    read(l(i));
341    if(status(l(i), "mode", "fork"))
342    {
343      write(l(i), quote(currentargs = args[eval(k)]));
344    }
345    else
346    {
347      write(l(i), quote(currentargs = eval(args[k])));
348    }
349    read(l(i));
350    if(maxmems[k] > 0)
351    {
352      m = i;
353      for(j = 1; j <= nservers; j++)
354      {
355        if(servers[j][2] > m)
356        {
357          server = servers[j][1];
358          break;
359        }
360        else
361        {
362          m = m-servers[j][2];
363        }
364      }
365      write(l(i), quote(system("pid")));
366      pid = read(l(i));
367      write(mempatrol, list(server, pid, i, maxmems[k]));
368    }
369    write(l(i), quote(execute("result = "+eval(commands[k])
370                                    +"(currentargs[1..size(currentargs)]);")));
371    assignment[i] = k;
372    k++;
373  }
374
375  // distribute the rest of the work
376  list results;
377  for(i = njobs; i > 0; i--)
378  {
379    results[i] = list();  // TODO: What if the result of one of the commands is
380                          // list()?
381  }
382  int nfinished;  // the number of finished jobs
383  int wait;   // the index of the link which is finished, or 0 for timeout
384  while(k <= njobs && nfinished < N-1)
385  {
386    if(timeout == 0)
387    {
388      wait = waitfirst(links);
389    }
390    else
391    {
392      tt = timeout-(rtimer-t);
393      if(tt < 0)
394      {
395        wait = waitfirst(links, 0);
396        wait = 0;
397      }
398      else
399      {
400        wait = waitfirst(links, tt);
401      }
402    }
403    if(wait == -1)
404    {
405      ERROR("All links crashed.");
406    }
407    if(wait)
408    {
409      if(wait == nlinks+1)
410      {
411        wait = read(mempatrol);
412        close(l(wait));
413        open(l(wait));
414        results[assignment[wait]] = "out of memory";
415        nkilled++;
416      }
417      else
418      {
419        read(l(wait));
420        write(l(wait), quote(result));
421        results[assignment[wait]] = read(l(wait));
422        if(maxmems[assignment[wait]] > 0)
423        {
424          write(mempatrol, assignment[wait]);
425        }
426        nfinished++;
427      }
428      if(status(l(wait), "mode", "fork"))
429      {
430        write(l(wait), quote(currentargs = args[eval(k)]));
431      }
432      else
433      {
434        write(l(wait), quote(currentargs = eval(args[k])));
435      }
436      read(l(wait));
437      if(maxmems[k] > 0)
438      {
439        m = wait;
440        for(j = 1; j <= nservers; j++)
441        {
442          if(servers[j][2] > m)
443          {
444            server = servers[j][1];
445            break;
446          }
447          else
448          {
449            m = m-servers[j][2];
450          }
451        }
452        write(l(wait), quote(system("pid")));
453        pid = read(l(wait));
454        write(mempatrol, list(server, pid, wait, maxmems[k]));
455      }
456      write(l(wait), quote(execute("def result = "+eval(commands[k])
457                                     +"(currentargs[1..size(currentargs)]);")));
458      assignment[wait] = k;
459      k++;
460    }
461    else
462    {
463      break;
464    }
465  }
466
467  // collect the rest of the results
468  while(nfinished < N && nfinished+nkilled < njobs)
469  {
470    if(timeout == 0)
471    {
472      wait = waitfirst(links);
473    }
474    else
475    {
476      tt = timeout-(rtimer-t);
477      if(tt < 0)
478      {
479        wait = waitfirst(links, 0);
480        wait = 0;
481      }
482      else
483      {
484        wait = waitfirst(links, tt);
485      }
486    }
487    if(wait == -1)
488    {
489      ERROR("All links crashed.");
490    }
491    if(wait)
492    {
493      if(wait == nlinks+1)
494      {
495        wait = read(mempatrol);
496        close(l(wait));
497        results[assignment[wait]] = "out of memory";
498        nkilled++;
499      }
500      else
501      {
502        read(l(wait));
503        write(l(wait), quote(result));
504        results[assignment[wait]] = read(l(wait));
505        if(maxmems[assignment[wait]] > 0)
506        {
507          write(mempatrol, assignment[wait]);
508        }
509        nfinished++;
510      }
511    }
512    else
513    {
514      break;
515    }
516  }
517
518  //close all links
519  for(i = 1; i <= nlinks; i++)
520  {
521    if(status(l(i), "read", "ready"))
522    {
523      read(l(i));
524      write(l(i), quote(result));
525      results[assignment[i]] = read(l(i));
526    }
527    close(l(i));
528  }
529  if(havemaxmem)
530  {
531    close(mempatrol);
532  }
533
534  system("--ticks-per-sec", oldtimerresolution);
535  return(results);
536}
537example
538{
539  "EXAMPLE:"; echo = 2;
540  LIB "primdec.lib";
541  ring r = 0, (x,y,z), lp;
542  ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
543  ideal j = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4;
544  list commands = list("std", "primdecGTZ", "primdecSY", "std", "primdecGTZ",
545                       "primdecSY");
546  list args = list(list(i), list(i), list(i), list(j), list(j), list(j));
547  parallelWaitN(commands, args, 3);
548}
549
550proc parallelWaitFirst(list commands, list args, list #)
551"USAGE:  parallelWaitFirst(commands, args[, timeout, linktype, servers,
552         maxmemory]); commands list, args list, timeout int, linktype string,
553         servers list, maxmemory intvec
554RETURN:  a list, containing at least one (if no timeout occurs) of the results
555         of commands[i] applied to arg[i], i = 1, ..., size(commands).
556         @* The command
557         @code{parallelWaitFirst(list commands, list args, list #)} is
558         synonymous to
559         @code{parallelWaitN(list commands, list args, 1, list #)}. See
560         @ref{parallelWaitN} for details on optional arguments and other
561         remarks.
562SEE ALSO: Ssi links, MP links, waitfirst
563KEYWORDS: parallelWaitFirst; Parallelization; Links, user interface;
564          Skeletons for parallelization; Distributed computing
565EXAMPLE:  @code{example parallelWaitFirst;} shows an example."
566{
567  return(parallelWaitN(commands, args, 1, #));
568}
569example
570{
571  "EXAMPLE:"; echo = 2;
572  LIB "primdec.lib";
573  ring r = 0, (x,y,z), lp;
574  ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
575  list commands = list("primdecGTZ", "primdecSY");
576  list args = list(list(i), list(i));
577  parallelWaitFirst(commands, args);
578}
579
580proc parallelWaitAll(def commands, list args, list #)
581"USAGE:  parallelWaitAll(commands, args[, timeout, linktype, servers,
582         maxmemory]); commands list or string, args list, timeout int,
583         linktype string, servers list, maxmemory intvec
584RETURN:  a list, containing the results of commands[i] applied to arg[i],
585         i = 1, ..., size(commands).
586         @* The command
587         @code{parallelWaitAll(list commands, list args, list #)} is
588         synonymous to
589         @code{parallelWaitN(list commands, list args, size(args), list #)}. See
590         @ref{parallelWaitN} for details on optional arguments and other
591         remarks.
592         If commands is of type string, this is a shortcut for a list of size
593         @code{size(args)} whose entries are just this string.
594SEE ALSO: Ssi links, MP links, waitall
595KEYWORDS: parallelWaitAll; Parallelization; Links, user interface;
596          Skeletons for parallelization; Distributed computing
597EXAMPLE:  @code{example parallelWaitAll;} shows an example."
598{
599  if(typeof(commands) != "list" && typeof(commands) != "string")
600  {
601    ERROR("invalid type of first argument");
602  }
603  if(typeof(commands) == "list")
604  {
605    return(parallelWaitN(commands, args, size(args), #));
606  }
607  else
608  {
609    list cmds;
610    for(int i = size(args); i > 0; i--)
611    {
612      cmds[i] = commands;
613    }
614    return(parallelWaitN(cmds, args, size(args), #));
615  }
616}
617example
618{
619  "EXAMPLE:"; echo = 2;
620  ring r = 0, (x,y,z), dp;
621  ideal i1 = z8+z6+4z5+4z3+4z2+4, y-z2;
622  ideal i2 = x10+x9y2, y8-x2y7;
623  ideal i3 = x3-2xy, x2y-2y2+x;
624  string command = "std";
625  list args = list(list(i1), list(i2), list(i3));
626  parallelWaitAll(command, args);
627}
628
629// TODO
630/// http://arxiv.org/abs/1005.5663v2
631static proc doModular(command, args, proc deleteUnlucksPrimes, proc testInChar0)
632{
633}
634
635// TODO
636/* worker farm */
637static proc Create() {}
638
639/* auxiliary procedures */
640static proc watchlinks()
641{
642  list parent = list(mempatrol);
643  list watchedlinks;
644  int wait;
645  int i, sys;
646  while(1)
647  {
648    if(size(watchedlinks) == 0)
649    {
650      wait = waitall(parent);
651    }
652    else
653    {
654      wait = waitall(parent, 10000);
655    }
656    if(wait == -1)
657    {
658      ERROR("The main process crashed.");
659    }
660    if(wait)
661    {
662      def query = read(mempatrol);
663      if(typeof(query) == "list")
664      {
665        watchedlinks = insert(watchedlinks, query);
666      }
667      else // in this case, typeof(query) is assumed to be "int", the
668           // index of the link
669      {
670        for(i = size(watchedlinks); i > 0; i--)
671        {
672          if(watchedlinks[i][3] == query)
673          {
674            watchedlinks = delete(watchedlinks, i);
675            break;
676          }
677        }
678      }
679    }
680    for(i = size(watchedlinks); i > 0; i--)
681    {
682      if(getusedmemory(watchedlinks[i][1], watchedlinks[i][2])
683           > watchedlinks[i][4])
684      {
685        if(watchedlinks[i][1] == "localhost")
686        {
687          sys = system("sh", "kill "+string(watchedlinks[i][2]));
688        }
689        else
690        {
691          sys = system("sh", "ssh "+watchedlinks[i][1]+" kill "
692                             +string(watchedlinks[i][2]));
693        }
694        write(mempatrol, watchedlinks[i][3]);
695        watchedlinks = delete(watchedlinks, i);
696      }
697    }
698  }
699}
700
701static proc getusedmemory(string server, int pid)
702{
703  string s;
704  if(server == "localhost")
705  {
706    s = read("|: grep VmSize /proc/"+string(pid)+"/status "+
707             "| awk '{ print $2; }'");
708  }
709  else
710  {
711    s = read("|: ssh "+server+" grep VmSize /proc/"+string(pid)+"/status "+
712             "| awk '{ print $2; }'");
713  }
714  bigint b;
715  execute("b = "+s+";");
716  int i = int(b/1000);
717  return(i);
718}
Note: See TracBrowser for help on using the repository browser.