Changeset dc8c23 in git for Singular/LIB/parallel.lib


Ignore:
Timestamp:
Dec 3, 2013, 1:55:20 PM (10 years ago)
Author:
Andreas Steenpass <steenpass@…>
Branches:
(u'spielwiese', '4a9821a93ffdc22a6696668bd4f6b8c9de3e6c5f')
Children:
0d845f767bf695f0cd705a3bf6aa0e3e1d2dc12f
Parents:
857ee3f6fb71d46b4b00c727b2614a570426ce36
git-author:
Andreas Steenpass <steenpass@mathematik.uni-kl.de>2013-12-03 13:55:20+01:00
git-committer:
Andreas Steenpass <steenpass@mathematik.uni-kl.de>2013-12-03 23:19:03+01:00
Message:
chg: base parallel.lib on tasks.lib
(cherry picked from commit 0cbbf3c2c18164653d67edfd8628967b92fa77cc)

Signed-off-by: Andreas Steenpass <steenpass@mathematik.uni-kl.de>

Conflicts:
	Singular/LIB/parallel.lib
File:
1 edited

Legend:

Unmodified
Added
Removed
  • Singular/LIB/parallel.lib

    r857ee3 rdc8c23  
    1 ///////////////////////////////////////////////////////////////////
    2 version="version parallel.lib 4.0.0.0 Jun_2013 "; // $Id$
     1////////////////////////////////////////////////////////////////////
     2version="version parallel.lib 4.0.0.0 Dec_2013 "; // $Id$
    33category="General purpose";
    44info="
    5 LIBRARY:   parallel.lib  Tools for Parallelization
     5LIBRARY:   parallel.lib  An abstraction layer for parallel skeletons
     6
    67AUTHOR:    Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
    78
    89OVERVIEW:
    9 This library provides tools to do several computations in parallel. They
    10 are aimed at ordinary Singular users as well as authors of Singular
    11 libraries.
    12 @* Even without this library, it is possible to do execute self-defined
    13 Singular commands in parallel using @ref{link}, but the handling of
    14 such links can be quite tedious. With the pocedures described below,
    15 this can be done by one-line commands.
    16 @* There are many parallel 'skeletons' (i.e. ways in which parallel
    17 tasks rely upon and interact with each other). A few of them are already
    18 implemented. Future plans include an abstraction layer for modular
    19 techniques, 'worker farms', and parallel tests.
    20 
    21 SEE ALSO:  link, modstd_lib, assprimeszerodim_lib
    22 
    23 KEYWORDS:  parallel.lib; Parallelization; Links, user interface;
    24            Skeletons for parallelization; Distributed computing
     10This library provides implementations of several parallel 'skeletons' (i.e.
     11ways in which parallel tasks rely upon and interact with each other). It is
     12based on the library tasks.lib and aims at both ordinary Singular users as well
     13as authors of Singular libraries.
     14
     15KEYWORDS:  parallelization; parallel skeletons; distributed computing
     16
     17SEE ALSO:  resources_lib, tasks_lib, modstd_lib, modnormal_lib
    2518
    2619PROCEDURES:
    27   parallelWaitN(...)     execute several jobs in parallel,
    28                          wait for N of them to finish
    29   parallelWaitFirst(...) execute several jobs in parallel,
    30                          wait for the first to finish
    31   parallelWaitAll(...)   execute several jobs in parallel,
    32                          wait for all of them to finish
     20  parallelWaitN();      execute several jobs in parallel
     21                        and wait for N of them to finish
     22  parallelWaitFirst();  execute several jobs in parallel
     23                        and wait for the first to finish
     24  parallelWaitAll();    execute several jobs in parallel
     25                        and wait for all of them to finish
     26  parallelTestAND();    run several tests in parallel
     27                        and determine if they all succeed
     28  parallelTestOR();     run several tests in parallel
     29                        and determine if any of them succeeds
    3330";
    3431
    35 proc parallelWaitN(list commands, list args, int N, list #)
    36 "USAGE:  parallelWaitN(commands, args, N[, timeout, linktype, servers,
    37          maxmemory]); commands list, args list, N int, timeout int,
    38          linktype string, servers list, maxmemory intvec
    39 RETURN:  a list, containing the results of commands[i] applied to arg[i],
    40          i = 1, ..., size(commands).
    41          @* The procedure waits for N jobs to finish.
    42 
    43          @* OPTIONAL PARAMETERS:
    44 
    45             An optional timeout in ms can be provided. Default is 0 which
    46             disables the timeout.
    47 
    48             Supported linktypes are up to now \"ssi\" and \"mp\", see
    49             @ref{Ssi links}.
    50 
    51             Servers:
    52          @* Each server is given by a list containing the address of the server,
    53             the number of cores to use on this server and the command to start
    54             Singular.
    55          @* If the address is \"localhost\", the processes will be generated on
    56             the same machine using forks. If the command to start Singular is
    57             \"\" (the empty string), \"Singular\" will be used.
    58          @* Default is @code{list(\"localhost\", system(\"cpu\"), \"\")}.
    59          @* There are some obvious shortcuts for servers, e.g. \"myserver\" is
    60             a shortcut for
    61             @code{list(\"myserver\", [nb. of cores on myserver], \"\")}, or 3
    62             for @code{list(\"localhost\", 3, \"\")}.
    63 
    64             Memory limits:
    65          @* If an intvec maxmemory of size @code{size(commands)} is given, the
    66             i-th job will be killed if it uses more than maxmemory[i] MB of
    67             memory. If maxmemory[i] is 0, there will be no restraint for the
    68             i-th job. Default is @code{0:size(commands)}.
    69 NOTE:       The entries of the list commands must be strings.
    70          @* The entries of the list args must be lists.
    71          @* The returned list may contain more than N results if several jobs
    72             finished \"at the same time\". It may contain less than N results in
    73             the case of timeout or errors occurring.
    74          @* The check whether the jobs exceed the memory sizes given by
    75             maxmemory is only done from time to time. This feature is
    76             experimental and should be used with care.
    77 SEE ALSO: Ssi links, waitfirst, waitall
    78 KEYWORDS: parallelWaitN; Parallelization; Links, user interface;
    79           Skeletons for parallelization; Distributed computing
    80 EXAMPLE:  @code{example parallelWaitN;} shows an example."
    81 {
    82   // initialize the timer
    83   int oldtimerresolution = system("--ticks-per-sec");
    84   system("--ticks-per-sec", 1000);
    85   int t = rtimer;
    86 
    87   // auxiliary variables
    88   int i, j, m, tt;
    89 
    90   // read optional parameters
    91   list defaultserver = list("localhost", system("cpu"), "");
    92   list defaults = list(0, "ssi", list(defaultserver), 0:size(commands));
    93   for(i = 1; i <= size(defaults); i++)
    94   {
    95     if(typeof(#[i]) != typeof(defaults[i]))
    96     {
    97       # = insert(#, defaults[i], i-1);
    98     }
    99   }
    100   if(size(#) != size(defaults))
    101   {
    102     ERROR("wrong optional parameters");
    103   }
    104   for(j = size(#[3]); j > 0; j--)
    105   {
    106     if(typeof(#[3][j][1]) != typeof(defaultserver[1]))
    107     {
    108       #[3][j] = insert(#[3][j], defaultserver[1], 0);
    109     }
    110     defaultserver[3] = "";
    111     // only for ssi:tcp links, the default program is system("Singular")
    112     if(#[2] == "ssi" && #[3][j][1] != "localhost")
    113     {
    114       defaultserver[3] = system("Singular");
    115     }
    116     for(i = 2; i <= size(defaultserver); i++)
    117     {
    118       if(typeof(#[3][j][i]) != typeof(defaultserver[i]))
    119       {
    120         #[3][j] = insert(#[3][j], defaultserver[i], i-1);
    121       }
    122     }
    123     if(size(#[3][j]) != size(defaultserver))
    124     {
    125       ERROR("wrong declaration for server no. "+string(j));
    126     }
    127   }
    128   int timeout = #[1];
    129   string linktype = #[2];
    130   list servers = #[3];
    131   intvec maxmems = #[4];
    132 
    133   // error checking
    134   int njobs = size(commands);
    135   if(njobs != size(args))
    136   {
    137     ERROR("The number of commands does not match the number of lists"
    138          +newline+"of arguments.");
    139   }
    140   if(njobs == 0)
    141   {
    142     ERROR("no commands specified");
    143   }
    144   for(i = 1; i <= njobs; i++)
    145   {
    146     if(typeof(commands[i]) != "string")
    147     {
    148       ERROR("The first argument is not a list of strings.");
    149     }
    150     if(typeof(args[i]) != "list")
    151     {
    152       ERROR("The second argument is not a list of lists.");
    153     }
    154   }
    155   if(N < 0)
    156   {
    157     ERROR("The number of jobs which you want to wait for is negative.");
    158   }
    159   if(N > njobs)
    160   {
    161     ERROR("The number of jobs which you wnat to wait for is greater"
    162          +newline+"than the number of jobs itself.");
    163   }
    164   if(timeout < 0)
    165   {
    166     ERROR("The given timeout is negative.");
    167   }
    168   if(linktype != "ssi" && linktype != "mp")
    169   {
    170     ERROR("The given linktype is not recognized.");
    171   }
    172   int nservers = size(servers);
    173   if(nservers <= 0)
    174   {
    175     ERROR("no server specified");
    176   }
    177   for(i = 1; i <= nservers; i++)
    178   {
    179     if(servers[i][1] != "localhost")
    180     {
    181       if(system("sh", "ssh "+servers[i][1]+" exit"))
    182       {
    183         ERROR("Could not connect to server \""+servers[i][1]+"\"");
    184       }
    185     }
    186     if(servers[i][2] < 0)
    187     {
    188       ERROR("The number of cores to be used on server \""+servers[i][1]+"\""
    189            +newline+" is negative.");
    190     }
    191     if(servers[i][1] == "localhost")
    192     {
    193       int ncpus(i) = system("cpu");
    194     }
    195     else
    196     {
    197       //if(linktype == "ssi")
    198       //{
    199         link lcpu(i) = "ssi:tcp "+servers[i][1]+":"+servers[i][3];
    200       //}
    201       open(lcpu(i));
    202       write(lcpu(i), quote(system("cpu")));
    203       int ncpus(i) = read(lcpu(i));
    204       close(lcpu(i));
    205       kill lcpu(i);
    206     }
    207     if(servers[i][2] == 0)
    208     {
    209       servers[i][2] = ncpus(i);
    210     }
    211     else
    212     {
    213       if(servers[i][2] > ncpus(i))
    214       {
    215         ERROR("The number of cores to use on server \""+servers[i][1]+"\""
    216              +newline+"is greater than the number of available cores");
    217       }
    218     }
    219     if(servers[i][1] != "localhost")
    220     {
    221       if(system("sh", "ssh "+servers[i][1]+
    222                       " 'test -e `which "+servers[i][3]+"`'"))
    223       {
    224         ERROR("\""+servers[i][3]+"\" was not found on"
    225              +"\""+servers[i][1]+"\".");
    226       }
    227     }
    228   }
    229   if(size(maxmems) != njobs)
    230   {
    231     ERROR("The size of the intvec which specifies the maximal amount of memory"
    232          +newline+"to be used for each job does not match the number of jobs.");
    233   }
    234   int havemaxmem;
    235   for(i = 1; i <= njobs; i++)
    236   {
    237     if(maxmems[i] < 0)
    238     {
    239       ERROR("The maximal amount of memory to be used for job no. "+string(i)
    240            +"is negative.");
    241     }
    242     havemaxmem = havemaxmem+maxmems[i];
    243   }
    244 
    245   // skip those cores which won't be needed
    246   int nlinks;
    247   for(i = 1; i <= nservers; i++)
    248   {
    249     if(nlinks+servers[i][2] <= njobs)
    250     {
    251       nlinks = nlinks+servers[i][2];
    252     }
    253     else
    254     {
    255       if(nlinks == njobs)
    256       {
    257         servers = list(servers[1..(i-1)]);
    258       }
    259       else
    260       {
    261         servers = list(servers[1..i]);
    262         servers[i][2] = njobs-nlinks;
    263         nlinks = njobs;
    264       }
    265       nservers = size(servers);
    266     }
    267   }
    268 
    269   // open the links
    270   string server;
    271   int ncores;
    272   string program;
    273   int k = 1;   // the index of the link
    274   for(i = 1; i <= nservers; i++)
    275   {
    276     server = servers[i][1];
    277     ncores = servers[i][2];
    278     program = servers[i][3];
    279     for(j = 1; j <= ncores; j++)
    280     {
    281       if(server == "localhost")
    282       {
    283         //if(linktype == "ssi")
    284         //{
    285           link l(k) = "ssi:fork";
    286         //}
    287       }
    288       else
    289       {
    290         //if(linktype == "ssi")
    291         //{
    292           link l(k) = "ssi:tcp "+server+":"+program;
    293         //}
    294       }
    295       open(l(k));
    296       k++;
    297     }
    298   }
    299   list links = list(l(1..nlinks));
    300 
    301   // start a memory watchdog if needed
    302   if(havemaxmem)
    303   {
    304     //if(linktype == "ssi")
    305     //{
    306       link mempatrol = "ssi:fork";
    307     //}
    308     open(mempatrol);
    309     write(mempatrol, quote(watchlinks()));
    310     links = insert(links, mempatrol, nlinks);
    311   }
    312   int nkilled;   // the number of jobs killed by the mempatrol
    313 
    314   // distribute work to the links
    315   k = 1; // from here on, k is the index of the next job which must be
    316          // distributed to some link
    317   intvec assignment = 0:nlinks;  // link number i is currently doing
    318                                  // job number assignment[i]
    319   int pid;
    320   for(i = 1; i <= nlinks; i++)
    321   {
    322     write(l(i), quote(execute("option(noredefine);")));
    323     read(l(i));
    324     write(l(i), quote(execute("def result;")));
    325     read(l(i));
    326     write(l(i), quote(execute("list currentargs;")));
    327     read(l(i));
    328     if(status(l(i), "mode", "fork"))
    329     {
    330       write(l(i), quote(currentargs = args[eval(k)]));
    331     }
    332     else
    333     {
    334       write(l(i), quote(currentargs = eval(args[k])));
    335     }
    336     read(l(i));
    337     if(maxmems[k] > 0)
    338     {
    339       m = i;
    340       for(j = 1; j <= nservers; j++)
    341       {
    342         if(servers[j][2] > m)
    343         {
    344           server = servers[j][1];
    345           break;
    346         }
    347         else
    348         {
    349           m = m-servers[j][2];
    350         }
    351       }
    352       write(l(i), quote(system("pid")));
    353       pid = read(l(i));
    354       write(mempatrol, list(server, pid, i, maxmems[k]));
    355     }
    356     write(l(i), quote(execute("result = "+eval(commands[k])
    357       +"("+argsToString("currentargs", size(currentargs))+");")));
    358     assignment[i] = k;
    359     k++;
    360   }
    361 
    362   // distribute the rest of the work
    363   list results;
    364   for(i = njobs; i > 0; i--)
    365   {
    366     results[i] = list();  // TODO: What if the result of one of the commands is
    367                           // list()?
    368   }
    369   int nfinished;  // the number of finished jobs
    370   int wait;   // the index of the link which is finished, or 0 for timeout
    371   while(k <= njobs && nfinished < N-1)
    372   {
    373     if(timeout == 0)
    374     {
    375       wait = waitfirst(links);
    376     }
    377     else
    378     {
    379       tt = timeout-(rtimer-t);
    380       if(tt < 0)
    381       {
    382         wait = waitfirst(links, 0);
    383         wait = 0;
    384       }
    385       else
    386       {
    387         wait = waitfirst(links, tt);
    388       }
    389     }
    390     if(wait == -1)
    391     {
    392       ERROR("All links crashed.");
    393     }
    394     if(wait)
    395     {
    396       if(wait == nlinks+1)
    397       {
    398         wait = read(mempatrol);
    399         close(l(wait));
    400         open(l(wait));
    401         results[assignment[wait]] = "out of memory";
    402         nkilled++;
    403       }
    404       else
    405       {
    406         read(l(wait));
    407         write(l(wait), quote(result));
    408         results[assignment[wait]] = read(l(wait));
    409         if(maxmems[assignment[wait]] > 0)
    410         {
    411           write(mempatrol, assignment[wait]);
    412         }
    413         nfinished++;
    414       }
    415       if(status(l(wait), "mode", "fork"))
    416       {
    417         write(l(wait), quote(currentargs = args[eval(k)]));
    418       }
    419       else
    420       {
    421         write(l(wait), quote(currentargs = eval(args[k])));
    422       }
    423       read(l(wait));
    424       if(maxmems[k] > 0)
    425       {
    426         m = wait;
    427         for(j = 1; j <= nservers; j++)
    428         {
    429           if(servers[j][2] > m)
    430           {
    431             server = servers[j][1];
    432             break;
    433           }
    434           else
    435           {
    436             m = m-servers[j][2];
    437           }
    438         }
    439         write(l(wait), quote(system("pid")));
    440         pid = read(l(wait));
    441         write(mempatrol, list(server, pid, wait, maxmems[k]));
    442       }
    443       write(l(wait), quote(execute("def result = "+eval(commands[k])
    444         +"("+argsToString("currentargs", size(currentargs))+");")));
    445       assignment[wait] = k;
    446       k++;
    447     }
    448     else
    449     {
    450       break;
    451     }
    452   }
    453 
    454   // collect the rest of the results
    455   while(nfinished < N && nfinished+nkilled < njobs)
    456   {
    457     if(timeout == 0)
    458     {
    459       wait = waitfirst(links);
    460     }
    461     else
    462     {
    463       tt = timeout-(rtimer-t);
    464       if(tt < 0)
    465       {
    466         wait = waitfirst(links, 0);
    467         wait = 0;
    468       }
    469       else
    470       {
    471         wait = waitfirst(links, tt);
    472       }
    473     }
    474     if(wait == -1)
    475     {
    476       ERROR("All links crashed.");
    477     }
    478     if(wait)
    479     {
    480       if(wait == nlinks+1)
    481       {
    482         wait = read(mempatrol);
    483         close(l(wait));
    484         results[assignment[wait]] = "out of memory";
    485         nkilled++;
    486       }
    487       else
    488       {
    489         read(l(wait));
    490         write(l(wait), quote(result));
    491         results[assignment[wait]] = read(l(wait));
    492         if(maxmems[assignment[wait]] > 0)
    493         {
    494           write(mempatrol, assignment[wait]);
    495         }
    496         nfinished++;
    497       }
    498     }
    499     else
    500     {
    501       break;
    502     }
    503   }
    504 
    505   //close all links
    506   for(i = 1; i <= nlinks; i++)
    507   {
    508     if(status(l(i), "read", "ready"))
    509     {
    510       read(l(i));
    511       write(l(i), quote(result));
    512       results[assignment[i]] = read(l(i));
    513     }
    514     close(l(i));
    515   }
    516   if(havemaxmem)
    517   {
    518     close(mempatrol);
    519   }
    520 
    521   system("--ticks-per-sec", oldtimerresolution);
    522   return(results);
    523 }
    524 example
    525 {
    526   "EXAMPLE:"; echo = 2;
    527   LIB "primdec.lib";
    528   ring r = 0, (x,y,z), lp;
    529   ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
    530   ideal j = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4;
    531   list commands = list("std", "primdecGTZ", "primdecSY",
    532                        "std", "primdecGTZ", "primdecSY");
    533   list args = list(list(i), list(i), list(i), list(j), list(j), list(j));
    534   parallelWaitN(commands, args, 3);
    535 }
    536 
    537 proc parallelWaitFirst(list commands, list args, list #)
    538 "USAGE:  parallelWaitFirst(commands, args[, timeout, linktype, servers,
    539          maxmemory]); commands list, args list, timeout int, linktype string,
    540          servers list, maxmemory intvec
    541 RETURN:  a list, containing at least one (if no timeout occurs) of the results
    542          of commands[i] applied to arg[i], i = 1, ..., size(commands).
    543          @* The command
    544          @code{parallelWaitFirst(list commands, list args, list #)} is
    545          synonymous to
    546          @code{parallelWaitN(list commands, list args, 1, list #)}. See
    547          @ref{parallelWaitN} for details on optional arguments and other
    548          remarks.
    549 SEE ALSO: Ssi links, waitfirst
    550 KEYWORDS: parallelWaitFirst; Parallelization; Links, user interface;
    551           Skeletons for parallelization; Distributed computing
    552 EXAMPLE:  @code{example parallelWaitFirst;} shows an example."
    553 {
    554   return(parallelWaitN(commands, args, 1, #));
    555 }
    556 example
    557 {
    558   "EXAMPLE:"; echo = 2;
    559   LIB "primdec.lib";
    560   ring r = 0, (x,y,z), lp;
    561   ideal i = z8+z6+4z5+4z3+4z2+4, y-z2;
    562   list commands = list("primdecGTZ", "primdecSY");
    563   list args = list(list(i), list(i));
    564   parallelWaitFirst(commands, args);
    565 }
    566 
    567 proc parallelWaitAll(def commands, list args, list #)
    568 "USAGE:  parallelWaitAll(commands, args[, timeout, linktype, servers,
    569          maxmemory]); commands list or string, args list, timeout int,
    570          linktype string, servers list, maxmemory intvec
    571 RETURN:  a list, containing the results of commands[i] applied to arg[i],
    572          i = 1, ..., size(commands).
    573          @* The command
    574          @code{parallelWaitAll(list commands, list args, list #)} is
    575          synonymous to
    576          @code{parallelWaitN(list commands, list args, size(args), list #)}. See
    577          @ref{parallelWaitN} for details on optional arguments and other
    578          remarks.
    579          If commands is of type string, this is a shortcut for a list of size
    580          @code{size(args)} whose entries are just this string.
    581 SEE ALSO: Ssi links, waitall
    582 KEYWORDS: parallelWaitAll; Parallelization; Links, user interface;
    583           Skeletons for parallelization; Distributed computing
    584 EXAMPLE:  @code{example parallelWaitAll;} shows an example."
    585 {
    586   if(typeof(commands) != "list" && typeof(commands) != "string")
    587   {
    588     ERROR("invalid type of first argument");
    589   }
    590   if(typeof(commands) == "list")
    591   {
    592     return(parallelWaitN(commands, args, size(args), #));
    593   }
    594   else
    595   {
    596     list cmds;
    597     for(int i = size(args); i > 0; i--)
    598     {
    599       cmds[i] = commands;
    600     }
    601     return(parallelWaitN(cmds, args, size(args), #));
    602   }
    603 }
    604 example
    605 {
    606   "EXAMPLE:"; echo = 2;
    607   ring r = 0, (x,y,z), dp;
    608   ideal i1 = z8+z6+4z5+4z3+4z2+4, y-z2;
    609   ideal i2 = x10+x9y2, y8-x2y7;
    610   ideal i3 = x3-2xy, x2y-2y2+x;
    611   string command = "std";
    612   list args = list(list(i1), list(i2), list(i3));
    613   parallelWaitAll(command, args);
    614 }
    615 
    616 // TODO
    617 /// http://arxiv.org/abs/1005.5663v2
    618 static proc doModular(command, args, proc deleteUnlucksPrimes, proc testInChar0)
    619 {
    620 }
    621 
    622 // TODO
    623 /* worker farm */
    624 static proc Create() {}
    625 
    626 /* auxiliary procedures */
    627 static proc watchlinks()
    628 {
    629   list parent = list(mempatrol);
    630   list watchedlinks;
    631   int wait;
    632   int i, sys;
    633   while(1)
    634   {
    635     if(size(watchedlinks) == 0)
    636     {
    637       wait = waitall(parent);
    638     }
    639     else
    640     {
    641       wait = waitall(parent, 10000);
    642     }
    643     if(wait == -1)
    644     {
    645       ERROR("The main process crashed.");
    646     }
    647     if(wait)
    648     {
    649       def query = read(mempatrol);
    650       if(typeof(query) == "list")
    651       {
    652         watchedlinks = insert(watchedlinks, query);
    653       }
    654       else // in this case, typeof(query) is assumed to be "int", the
    655            // index of the link
    656       {
    657         for(i = size(watchedlinks); i > 0; i--)
    658         {
    659           if(watchedlinks[i][3] == query)
    660           {
    661             watchedlinks = delete(watchedlinks, i);
    662             break;
    663           }
    664         }
    665       }
    666     }
    667     for(i = size(watchedlinks); i > 0; i--)
    668     {
    669       if(getusedmemory(watchedlinks[i][1], watchedlinks[i][2])
    670            > watchedlinks[i][4])
    671       {
    672         if(watchedlinks[i][1] == "localhost")
    673         {
    674           sys = system("sh", "kill "+string(watchedlinks[i][2]));
    675         }
    676         else
    677         {
    678           sys = system("sh", "ssh "+watchedlinks[i][1]+" kill "
    679                              +string(watchedlinks[i][2]));
    680         }
    681         write(mempatrol, watchedlinks[i][3]);
    682         watchedlinks = delete(watchedlinks, i);
    683       }
    684     }
    685   }
    686 }
    687 
    688 static proc getusedmemory(string server, int pid)
    689 {
    690   string s;
    691   if(server == "localhost")
    692   {
    693     s = read("|: grep VmSize /proc/"+string(pid)+"/status "+
    694              "| awk '{ print $2; }'");
    695   }
    696   else
    697   {
    698     s = read("|: ssh "+server+" grep VmSize /proc/"+string(pid)+"/status "+
    699              "| awk '{ print $2; }'");
    700   }
    701   bigint b;
    702   execute("b = "+s+";");
    703   int i = int(b/1000);
    704   return(i);
    705 }
    706 
    707 static proc argsToString(string name, int length)
    708 {
    709   string arglist;
    710   if(length > 0) {
    711     arglist = name+"[1]";
    712   }
    713   int i;
    714   for(i = 2; i <= length; i++) {
    715     arglist = arglist+", "+name+"["+string(i)+"]";
    716   }
    717   return(arglist);
    718 }
     32LIB "tasks.lib";
     33
     34proc parallelWaitN(alias list commands, alias list args, int N, list #)
     35"USAGE:   parallelWaitN(commands, arguments, N[, timeout]); commands list,
     36          arguments list, N int, timeout int
     37RETURN:   a list, containing the results of commands[i] applied to
     38          arguments[i], i = 1, ..., size(arguments).
     39       @* The procedure waits for N jobs to finish.
     40       @* An optional timeout in ms can be provided. Default is 0 which
     41          disables the timeout.
     42NOTE:     The entries of the list commands must be strings. The entries of the
     43          list arguments must be lists.
     44       @* The type of any entry of the returned list whose corresponding task
     45          did not finish (due to timeout or error) is \"none\".
     46       @* The returned list may contain more than N results if several jobs
     47          finished \"at the same time\". It may contain less than N results in
     48          the case of timeout or errors occurring.
     49SEE ALSO: parallelWaitAll, parallelWaitFirst, tasks_lib
     50EXAMPLE:  example parallelWaitN; shows an example"
     51{
     52    // auxiliary variables
     53    int i;
     54
     55    // read optional parameters
     56    int timeout;
     57    int ncores;   // obsolete, but kept for compatibility with old libraries
     58    if (size(#) > 0) {
     59        if (typeof(#[1]) != "int") {
     60            ERROR("wrong optional parameters");
     61        }
     62        timeout = #[1];
     63        if (size(#) > 1) {
     64            if (size(#) > 2 || typeof(#[2]) != "int") {
     65                ERROR("wrong optional parameters");
     66            }
     67            ncores = #[2];
     68        }
     69    }
     70
     71    // apply wrapper for obsolete optional parameter ncores
     72    if (ncores) {
     73        list semaphore_save = Resources::setcores_subtree(ncores);
     74    }
     75
     76    // error checking
     77    int njobs = size(commands);
     78    if (njobs != size(args)) {
     79        ERROR("The number of commands does not match the number of lists"
     80            +newline+"of arguments.");
     81    }
     82    if (njobs == 0) {
     83        ERROR("no commands specified");
     84    }
     85    for (i = 1; i <= njobs; i++) {
     86        if (typeof(commands[i]) != "string") {
     87            ERROR("The first argument is not a list of strings.");
     88        }
     89        if (typeof(args[i]) != "list") {
     90            ERROR("The second argument is not a list of lists.");
     91        }
     92    }
     93
     94    // compute the tasks
     95    for (i = 1; i <= njobs; i++) {
     96        task t(i) = commands[i], args[i];
     97    }
     98    startTasks(t(1..njobs));
     99    list indices = waitTasks(list(t(1..njobs)), N, timeout);
     100
     101    // wrap back to saved semaphore
     102    if (ncores) {
     103        Resources::resetcores_subtree(semaphore_save);
     104    }
     105
     106    // return results
     107    list results;
     108    for (i = size(indices); i > 0; i--) {
     109        results[indices[i]] = getResult(t(indices[i]));
     110    }
     111    for (i = 1; i <= njobs; i++) {
     112        killTask(t(i));
     113    }
     114    return(results);
     115}
     116example
     117{
     118    "EXAMPLE:";
     119    echo = 2;
     120    ring R = 0, (x,y,z), lp;
     121    ideal I = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4;
     122    ideal J = x10+x9y2, x2y7-y8;
     123    list commands = list("std", "std");
     124    list arguments = list(list(I), list(J));
     125    parallelWaitN(commands, arguments, 1);
     126}
     127
     128proc parallelWaitFirst(alias list commands, alias list args, list #)
     129"USAGE:   parallelWaitFirst(commands, args[, timeout]); commands list,
     130          arguments list, timeout int
     131RETURN:   a list, containing at least one (if no timeout occurs) of the results
     132          of commands[i] applied to arguments[i], i = 1, ..., size(arguments).
     133       @* The command @code{parallelWaitFirst(commands, arguments[, timeout])}
     134          is synonymous to
     135          @code{parallelWaitN(commands, arguments, 1[, timeout])}. See
     136          @ref{parallelWaitN} for details on optional arguments and other
     137          remarks.
     138SEE ALSO: parallelWaitN, parallelWaitAll, tasks_lib
     139EXAMPLE:  example parallelWaitFirst; shows an example"
     140{
     141    return(parallelWaitN(commands, args, 1, #));
     142}
     143example
     144{
     145    "EXAMPLE:";
     146    echo = 2;
     147    ring R = 0, (x,y,z), lp;
     148    ideal I = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4;
     149    ideal J = x10+x9y2, x2y7-y8;
     150    list commands = list("std", "std");
     151    list arguments = list(list(I), list(J));
     152    parallelWaitFirst(commands, arguments);
     153}
     154
     155proc parallelWaitAll(def commands, alias list args, list #)
     156"USAGE:   parallelWaitAll(commands, arguments[, timeout]); commands list or
     157          string, arguments list, timeout int
     158RETURN:   a list, containing the results of commands[i] applied to
     159          arguments[i], i = 1, ..., size(arguments).
     160       @* The command @code{parallelWaitAll(commands, arguments[, timeout])} is
     161          synonymous to @code{parallelWaitN(commands, arguments,
     162          size(arguments)[, timeout])}. See @ref{parallelWaitN} for details on
     163          optional arguments and other remarks.
     164NOTE:     As a shortcut, @code{commands} can be a string. This is synonymous to
     165          providing a list of @code{size(arguments)} copies of this string.
     166SEE ALSO: parallelWaitFirst, parallelWaitN, tasks_lib
     167EXAMPLE:  example parallelWaitAll; shows an example"
     168{
     169    if (typeof(commands) != "list" && typeof(commands) != "string") {
     170        ERROR("invalid type of first argument");
     171    }
     172    if (typeof(commands) == "list") {
     173        return(parallelWaitN(commands, args, size(args), #));
     174    }
     175    else {
     176        list cmds;
     177        for (int i = size(args); i > 0; i--) {
     178            cmds[i] = commands;
     179        }
     180        return(parallelWaitN(cmds, args, size(args), #));
     181    }
     182}
     183example
     184{
     185    "EXAMPLE:";
     186    echo = 2;
     187    ring R = 0, (x,y,z), dp;
     188    ideal I1 = z8+z6+4z5+4z3+4z2+4, -z2+y;
     189    ideal I2 = x9y2+x10, x2y7-y8;
     190    ideal I3 = x3-2xy, x2y-2y2+x;
     191    string command = "std";
     192    list arguments = list(list(I1), list(I2), list(I3));
     193    parallelWaitAll(command, arguments);
     194}
     195
     196proc parallelTestAND(def commands, alias list args, list #)
     197"USAGE:   parallelTestAND(commands, arguments[, timeout]); commands list or
     198          string, arguments list, timeout int
     199RETURN:   1, if commands[i] applied to arguments[i] is not equal to zero for
     200          all i = 1, ..., size(arguments);
     201          0, otherwise.
     202       @* An optional timeout in ms can be provided. Default is 0 which
     203          disables the timeout. In case of timeout, -1 is returned.
     204NOTE:     The entries of the list commands must be strings. The entries of the
     205          list arguments must be lists.
     206       @* commands[i] applied to arguments[i] must evaluate to an integer for
     207          i = 1, ..., size(arguments).
     208       @* As a shortcut, @code{commands} can be a string. This is synonymous to
     209          providing a list of @code{size(arguments)} copies of this string.
     210SEE ALSO: parallelTestOR, tasks_lib
     211EXAMPLE:  example parallelTestAND; shows an example"
     212{
     213    // note: this can be improved
     214    list results = parallelWaitAll(commands, args, #);
     215    int i;
     216    for (i = size(args); i > 0; i--) {
     217        if (typeof(results[i]) != "int" && typeof(results[i]) != "none") {
     218            ERROR("result no. "+string(i)+" not of type int");
     219        }
     220    }
     221    for (i = size(args); i > 0; i--) {
     222        if (typeof(results[i]) == "none") {   // timeout
     223            return(-1);
     224        }
     225    }
     226    for (i = size(results); i > 0; i--) {
     227        if (!results[i]) {
     228            return(0);
     229        }
     230    }
     231    return(1);
     232}
     233example
     234{
     235    "EXAMPLE:";
     236    echo = 2;
     237    ring R = 0, (x,y,z), dp;
     238    ideal I = x, y, z;
     239    intvec v = 0:3;
     240    list l = list(I, v);
     241    module m1 = x*gen(1);
     242    module m2;
     243    string command = "size";
     244    list arguments1 = list(list(I), list(v), list(l), list(m1));
     245    list arguments2 = list(list(I), list(v), list(l), list(m2));
     246    // test if all the arguments have non-zero size
     247    parallelTestAND(command, arguments1);
     248    parallelTestAND(command, arguments2);
     249}
     250
     251proc parallelTestOR(def commands, alias list args, list #)
     252"USAGE:   parallelTestOR(commands, arguments[, timeout]); commands list or
     253          string, arguments list, timeout int
     254RETURN:   1, if commands[i] applied to arguments[i] is not equal to zero for
     255          any i = 1, ..., size(arguments);
     256          0, otherwise.
     257       @* An optional timeout in ms can be provided. Default is 0 which
     258          disables the timeout. In case of timeout, -1 is returned.
     259NOTE:     The entries of the list commands must be strings. The entries of the
     260          list arguments must be lists.
     261       @* commands[i] applied to arguments[i] must evaluate to an integer for
     262          i = 1, ..., size(arguments).
     263       @* As a shortcut, @code{commands} can be a string. This is synonymous to
     264          providing a list of @code{size(arguments)} copies of this string.
     265SEE ALSO: parallelTestAND, tasks_lib
     266EXAMPLE:  example parallelTestAND; shows an example"
     267{
     268    // note: this can be improved
     269    list results = parallelWaitAll(commands, args, #);
     270    int i;
     271    for (i = size(args); i > 0; i--) {
     272        if (typeof(results[i]) != "int" && typeof(results[i]) != "none") {
     273            ERROR("result no. "+string(i)+" not of type int");
     274        }
     275    }
     276    for (i = size(args); i > 0; i--) {
     277        if (typeof(results[i]) == "none") {   // timeout
     278            return(-1);
     279        }
     280    }
     281    for (i = size(results); i > 0; i--) {
     282        if (results[i]) {
     283            return(1);
     284        }
     285    }
     286    return(0);
     287}
     288example
     289{
     290    "EXAMPLE:";
     291    echo = 2;
     292    ring R = 0, (x,y,z), dp;
     293    ideal I;
     294    string s;
     295    list l;
     296    module m1 = x*gen(1);
     297    module m2;
     298    string command = "size";
     299    list arguments1 = list(list(I), list(s), list(l), list(m1));
     300    list arguments2 = list(list(I), list(s), list(l), list(m2));
     301    // test if any of the arguments has non-zero size
     302    parallelTestOR(command, arguments1);
     303    parallelTestOR(command, arguments2);
     304}
     305
Note: See TracChangeset for help on using the changeset viewer.