Changeset b7fbc4 in git for Singular/LIB/parallel.lib
- Timestamp:
- Dec 4, 2013, 8:50:48 PM (9 years ago)
- Branches:
- (u'jengelh-datetime', 'ceac47cbc86fe4a15902392bdbb9bd2ae0ea02c6')(u'spielwiese', 'a800fe4b3e9d37a38c5a10cc0ae9dfa0c15a4ee6')
- Children:
- 6c8ba30f3cbc3b424a708defb57649b060a4b243843b28b90e7bb33ae285dcdd56c7dc85a67490f8
- Parents:
- 2e553acee0b24bae7b2fe1cb119ef17007180257c5c7aa0443a28ee851d552fdd34d96d06af91ae1
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
Singular/LIB/parallel.lib
r2e553a rb7fbc4 1 /////////////////////////////////////////////////////////////////// 2 version="version parallel.lib 4.0.0.0 Jun_2013 "; // $Id$1 //////////////////////////////////////////////////////////////////// 2 version="version parallel.lib 4.0.0.0 Dec_2013 "; // $Id$ 3 3 category="General purpose"; 4 4 info=" 5 LIBRARY: parallel.lib Tools for Parallelization 5 LIBRARY: parallel.lib An abstraction layer for parallel skeletons 6 6 7 AUTHOR: Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de 7 8 8 9 OVERVIEW: 9 This library provides tools to do several computations in parallel. They 10 are aimed at ordinary Singular users as well as authors of Singular 11 libraries. 12 @* Even without this library, it is possible to do execute self-defined 13 Singular commands in parallel using @ref{link}, but the handling of 14 such links can be quite tedious. With the pocedures described below, 15 this can be done by one-line commands. 16 @* There are many parallel 'skeletons' (i.e. ways in which parallel 17 tasks rely upon and interact with each other). A few of them are already 18 implemented. Future plans include an abstraction layer for modular 19 techniques, 'worker farms', and parallel tests. 20 21 SEE ALSO: link, modstd_lib, assprimeszerodim_lib 22 23 KEYWORDS: parallel.lib; Parallelization; Links, user interface; 24 Skeletons for parallelization; Distributed computing 10 This library provides implementations of several parallel 'skeletons' (i.e. 11 ways in which parallel tasks rely upon and interact with each other). It is 12 based on the library tasks.lib and aims at both ordinary Singular users as well 13 as authors of Singular libraries. 14 15 KEYWORDS: parallelization; parallel skeletons; distributed computing 16 17 SEE ALSO: resources_lib, tasks_lib, modstd_lib, modnormal_lib 25 18 26 19 PROCEDURES: 27 parallelWaitN(...) execute several jobs in parallel, 28 wait for N of them to finish 29 parallelWaitFirst(...) execute several jobs in parallel, 30 wait for the first to finish 31 parallelWaitAll(...) execute several jobs in parallel, 32 wait for all of them to finish 20 parallelWaitN(); execute several jobs in parallel 21 and wait for N of them to finish 22 parallelWaitFirst(); execute several jobs in parallel 23 and wait for the first to finish 24 parallelWaitAll(); execute several jobs in parallel 25 and wait for all of them to finish 26 parallelTestAND(); run several tests in parallel 27 and determine if they all succeed 28 parallelTestOR(); run several tests in parallel 29 and determine if any of them succeeds 33 30 "; 34 31 35 proc parallelWaitN(list commands, list args, int N, list #) 36 "USAGE: parallelWaitN(commands, args, N[, timeout, linktype, servers, 37 maxmemory]); commands list, args list, N int, timeout int, 38 linktype string, servers list, maxmemory intvec 39 RETURN: a list, containing the results of commands[i] applied to arg[i], 40 i = 1, ..., size(commands). 41 @* The procedure waits for N jobs to finish. 42 43 @* OPTIONAL PARAMETERS: 44 45 An optional timeout in ms can be provided. Default is 0 which 46 disables the timeout. 47 48 Supported linktypes are up to now \"ssi\" and \"mp\", see 49 @ref{Ssi links}. 50 51 Servers: 52 @* Each server is given by a list containing the address of the server, 53 the number of cores to use on this server and the command to start 54 Singular. 55 @* If the address is \"localhost\", the processes will be generated on 56 the same machine using forks. If the command to start Singular is 57 \"\" (the empty string), \"Singular\" will be used. 58 @* Default is @code{list(\"localhost\", system(\"cpu\"), \"\")}. 59 @* There are some obvious shortcuts for servers, e.g. \"myserver\" is 60 a shortcut for 61 @code{list(\"myserver\", [nb. of cores on myserver], \"\")}, or 3 62 for @code{list(\"localhost\", 3, \"\")}. 63 64 Memory limits: 65 @* If an intvec maxmemory of size @code{size(commands)} is given, the 66 i-th job will be killed if it uses more than maxmemory[i] MB of 67 memory. If maxmemory[i] is 0, there will be no restraint for the 68 i-th job. Default is @code{0:size(commands)}. 69 NOTE: The entries of the list commands must be strings. 70 @* The entries of the list args must be lists. 71 @* The returned list may contain more than N results if several jobs 72 finished \"at the same time\". It may contain less than N results in 73 the case of timeout or errors occurring. 74 @* The check whether the jobs exceed the memory sizes given by 75 maxmemory is only done from time to time. This feature is 76 experimental and should be used with care. 77 SEE ALSO: Ssi links, waitfirst, waitall 78 KEYWORDS: parallelWaitN; Parallelization; Links, user interface; 79 Skeletons for parallelization; Distributed computing 80 EXAMPLE: @code{example parallelWaitN;} shows an example." 81 { 82 // initialize the timer 83 int oldtimerresolution = system("--ticks-per-sec"); 84 system("--ticks-per-sec", 1000); 85 int t = rtimer; 86 87 // auxiliary variables 88 int i, j, m, tt; 89 90 // read optional parameters 91 list defaultserver = list("localhost", system("cpu"), ""); 92 list defaults = list(0, "ssi", list(defaultserver), 0:size(commands)); 93 for(i = 1; i <= size(defaults); i++) 94 { 95 if(typeof(#[i]) != typeof(defaults[i])) 96 { 97 # = insert(#, defaults[i], i-1); 98 } 99 } 100 if(size(#) != size(defaults)) 101 { 102 ERROR("wrong optional parameters"); 103 } 104 for(j = size(#[3]); j > 0; j--) 105 { 106 if(typeof(#[3][j][1]) != typeof(defaultserver[1])) 107 { 108 #[3][j] = insert(#[3][j], defaultserver[1], 0); 109 } 110 defaultserver[3] = ""; 111 // only for ssi:tcp links, the default program is system("Singular") 112 if(#[2] == "ssi" && #[3][j][1] != "localhost") 113 { 114 defaultserver[3] = system("Singular"); 115 } 116 for(i = 2; i <= size(defaultserver); i++) 117 { 118 if(typeof(#[3][j][i]) != typeof(defaultserver[i])) 119 { 120 #[3][j] = insert(#[3][j], defaultserver[i], i-1); 121 } 122 } 123 if(size(#[3][j]) != size(defaultserver)) 124 { 125 ERROR("wrong declaration for server no. "+string(j)); 126 } 127 } 128 int timeout = #[1]; 129 string linktype = #[2]; 130 list servers = #[3]; 131 intvec maxmems = #[4]; 132 133 // error checking 134 int njobs = size(commands); 135 if(njobs != size(args)) 136 { 137 ERROR("The number of commands does not match the number of lists" 138 +newline+"of arguments."); 139 } 140 if(njobs == 0) 141 { 142 ERROR("no commands specified"); 143 } 144 for(i = 1; i <= njobs; i++) 145 { 146 if(typeof(commands[i]) != "string") 147 { 148 ERROR("The first argument is not a list of strings."); 149 } 150 if(typeof(args[i]) != "list") 151 { 152 ERROR("The second argument is not a list of lists."); 153 } 154 } 155 if(N < 0) 156 { 157 ERROR("The number of jobs which you want to wait for is negative."); 158 } 159 if(N > njobs) 160 { 161 ERROR("The number of jobs which you wnat to wait for is greater" 162 +newline+"than the number of jobs itself."); 163 } 164 if(timeout < 0) 165 { 166 ERROR("The given timeout is negative."); 167 } 168 if(linktype != "ssi" && linktype != "mp") 169 { 170 ERROR("The given linktype is not recognized."); 171 } 172 int nservers = size(servers); 173 if(nservers <= 0) 174 { 175 ERROR("no server specified"); 176 } 177 for(i = 1; i <= nservers; i++) 178 { 179 if(servers[i][1] != "localhost") 180 { 181 if(system("sh", "ssh "+servers[i][1]+" exit")) 182 { 183 ERROR("Could not connect to server \""+servers[i][1]+"\""); 184 } 185 } 186 if(servers[i][2] < 0) 187 { 188 ERROR("The number of cores to be used on server \""+servers[i][1]+"\"" 189 +newline+" is negative."); 190 } 191 if(servers[i][1] == "localhost") 192 { 193 int ncpus(i) = system("cpu"); 194 } 195 else 196 { 197 //if(linktype == "ssi") 198 //{ 199 link lcpu(i) = "ssi:tcp "+servers[i][1]+":"+servers[i][3]; 200 //} 201 open(lcpu(i)); 202 write(lcpu(i), quote(system("cpu"))); 203 int ncpus(i) = read(lcpu(i)); 204 close(lcpu(i)); 205 kill lcpu(i); 206 } 207 if(servers[i][2] == 0) 208 { 209 servers[i][2] = ncpus(i); 210 } 211 else 212 { 213 if(servers[i][2] > ncpus(i)) 214 { 215 ERROR("The number of cores to use on server \""+servers[i][1]+"\"" 216 +newline+"is greater than the number of available cores"); 217 } 218 } 219 if(servers[i][1] != "localhost") 220 { 221 if(system("sh", "ssh "+servers[i][1]+ 222 " 'test -e `which "+servers[i][3]+"`'")) 223 { 224 ERROR("\""+servers[i][3]+"\" was not found on" 225 +"\""+servers[i][1]+"\"."); 226 } 227 } 228 } 229 if(size(maxmems) != njobs) 230 { 231 ERROR("The size of the intvec which specifies the maximal amount of memory" 232 +newline+"to be used for each job does not match the number of jobs."); 233 } 234 int havemaxmem; 235 for(i = 1; i <= njobs; i++) 236 { 237 if(maxmems[i] < 0) 238 { 239 ERROR("The maximal amount of memory to be used for job no. "+string(i) 240 +"is negative."); 241 } 242 havemaxmem = havemaxmem+maxmems[i]; 243 } 244 245 // skip those cores which won't be needed 246 int nlinks; 247 for(i = 1; i <= nservers; i++) 248 { 249 if(nlinks+servers[i][2] <= njobs) 250 { 251 nlinks = nlinks+servers[i][2]; 252 } 253 else 254 { 255 if(nlinks == njobs) 256 { 257 servers = list(servers[1..(i-1)]); 258 } 259 else 260 { 261 servers = list(servers[1..i]); 262 servers[i][2] = njobs-nlinks; 263 nlinks = njobs; 264 } 265 nservers = size(servers); 266 } 267 } 268 269 // open the links 270 string server; 271 int ncores; 272 string program; 273 int k = 1; // the index of the link 274 for(i = 1; i <= nservers; i++) 275 { 276 server = servers[i][1]; 277 ncores = servers[i][2]; 278 program = servers[i][3]; 279 for(j = 1; j <= ncores; j++) 280 { 281 if(server == "localhost") 282 { 283 //if(linktype == "ssi") 284 //{ 285 link l(k) = "ssi:fork"; 286 //} 287 } 288 else 289 { 290 //if(linktype == "ssi") 291 //{ 292 link l(k) = "ssi:tcp "+server+":"+program; 293 //} 294 } 295 open(l(k)); 296 k++; 297 } 298 } 299 list links = list(l(1..nlinks)); 300 301 // start a memory watchdog if needed 302 if(havemaxmem) 303 { 304 //if(linktype == "ssi") 305 //{ 306 link mempatrol = "ssi:fork"; 307 //} 308 open(mempatrol); 309 write(mempatrol, quote(watchlinks())); 310 links = insert(links, mempatrol, nlinks); 311 } 312 int nkilled; // the number of jobs killed by the mempatrol 313 314 // distribute work to the links 315 k = 1; // from here on, k is the index of the next job which must be 316 // distributed to some link 317 intvec assignment = 0:nlinks; // link number i is currently doing 318 // job number assignment[i] 319 int pid; 320 for(i = 1; i <= nlinks; i++) 321 { 322 write(l(i), quote(execute("option(noredefine);"))); 323 read(l(i)); 324 write(l(i), quote(execute("def result;"))); 325 read(l(i)); 326 write(l(i), quote(execute("list currentargs;"))); 327 read(l(i)); 328 if(status(l(i), "mode", "fork")) 329 { 330 write(l(i), quote(currentargs = args[eval(k)])); 331 } 332 else 333 { 334 write(l(i), quote(currentargs = eval(args[k]))); 335 } 336 read(l(i)); 337 if(maxmems[k] > 0) 338 { 339 m = i; 340 for(j = 1; j <= nservers; j++) 341 { 342 if(servers[j][2] > m) 343 { 344 server = servers[j][1]; 345 break; 346 } 347 else 348 { 349 m = m-servers[j][2]; 350 } 351 } 352 write(l(i), quote(system("pid"))); 353 pid = read(l(i)); 354 write(mempatrol, list(server, pid, i, maxmems[k])); 355 } 356 write(l(i), quote(execute("result = "+eval(commands[k]) 357 +"("+argsToString("currentargs", size(currentargs))+");"))); 358 assignment[i] = k; 359 k++; 360 } 361 362 // distribute the rest of the work 363 list results; 364 for(i = njobs; i > 0; i--) 365 { 366 results[i] = list(); // TODO: What if the result of one of the commands is 367 // list()? 368 } 369 int nfinished; // the number of finished jobs 370 int wait; // the index of the link which is finished, or 0 for timeout 371 while(k <= njobs && nfinished < N-1) 372 { 373 if(timeout == 0) 374 { 375 wait = waitfirst(links); 376 } 377 else 378 { 379 tt = timeout-(rtimer-t); 380 if(tt < 0) 381 { 382 wait = waitfirst(links, 0); 383 wait = 0; 384 } 385 else 386 { 387 wait = waitfirst(links, tt); 388 } 389 } 390 if(wait == -1) 391 { 392 ERROR("All links crashed."); 393 } 394 if(wait) 395 { 396 if(wait == nlinks+1) 397 { 398 wait = read(mempatrol); 399 close(l(wait)); 400 open(l(wait)); 401 results[assignment[wait]] = "out of memory"; 402 nkilled++; 403 } 404 else 405 { 406 read(l(wait)); 407 write(l(wait), quote(result)); 408 results[assignment[wait]] = read(l(wait)); 409 if(maxmems[assignment[wait]] > 0) 410 { 411 write(mempatrol, assignment[wait]); 412 } 413 nfinished++; 414 } 415 if(status(l(wait), "mode", "fork")) 416 { 417 write(l(wait), quote(currentargs = args[eval(k)])); 418 } 419 else 420 { 421 write(l(wait), quote(currentargs = eval(args[k]))); 422 } 423 read(l(wait)); 424 if(maxmems[k] > 0) 425 { 426 m = wait; 427 for(j = 1; j <= nservers; j++) 428 { 429 if(servers[j][2] > m) 430 { 431 server = servers[j][1]; 432 break; 433 } 434 else 435 { 436 m = m-servers[j][2]; 437 } 438 } 439 write(l(wait), quote(system("pid"))); 440 pid = read(l(wait)); 441 write(mempatrol, list(server, pid, wait, maxmems[k])); 442 } 443 write(l(wait), quote(execute("def result = "+eval(commands[k]) 444 +"("+argsToString("currentargs", size(currentargs))+");"))); 445 assignment[wait] = k; 446 k++; 447 } 448 else 449 { 450 break; 451 } 452 } 453 454 // collect the rest of the results 455 while(nfinished < N && nfinished+nkilled < njobs) 456 { 457 if(timeout == 0) 458 { 459 wait = waitfirst(links); 460 } 461 else 462 { 463 tt = timeout-(rtimer-t); 464 if(tt < 0) 465 { 466 wait = waitfirst(links, 0); 467 wait = 0; 468 } 469 else 470 { 471 wait = waitfirst(links, tt); 472 } 473 } 474 if(wait == -1) 475 { 476 ERROR("All links crashed."); 477 } 478 if(wait) 479 { 480 if(wait == nlinks+1) 481 { 482 wait = read(mempatrol); 483 close(l(wait)); 484 results[assignment[wait]] = "out of memory"; 485 nkilled++; 486 } 487 else 488 { 489 read(l(wait)); 490 write(l(wait), quote(result)); 491 results[assignment[wait]] = read(l(wait)); 492 if(maxmems[assignment[wait]] > 0) 493 { 494 write(mempatrol, assignment[wait]); 495 } 496 nfinished++; 497 } 498 } 499 else 500 { 501 break; 502 } 503 } 504 505 //close all links 506 for(i = 1; i <= nlinks; i++) 507 { 508 if(status(l(i), "read", "ready")) 509 { 510 read(l(i)); 511 write(l(i), quote(result)); 512 results[assignment[i]] = read(l(i)); 513 } 514 close(l(i)); 515 } 516 if(havemaxmem) 517 { 518 close(mempatrol); 519 } 520 521 system("--ticks-per-sec", oldtimerresolution); 522 return(results); 523 } 524 example 525 { 526 "EXAMPLE:"; echo = 2; 527 LIB "primdec.lib"; 528 ring r = 0, (x,y,z), lp; 529 ideal i = z8+z6+4z5+4z3+4z2+4, y-z2; 530 ideal j = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4; 531 list commands = list("std", "primdecGTZ", "primdecSY", 532 "std", "primdecGTZ", "primdecSY"); 533 list args = list(list(i), list(i), list(i), list(j), list(j), list(j)); 534 parallelWaitN(commands, args, 3); 535 } 536 537 proc parallelWaitFirst(list commands, list args, list #) 538 "USAGE: parallelWaitFirst(commands, args[, timeout, linktype, servers, 539 maxmemory]); commands list, args list, timeout int, linktype string, 540 servers list, maxmemory intvec 541 RETURN: a list, containing at least one (if no timeout occurs) of the results 542 of commands[i] applied to arg[i], i = 1, ..., size(commands). 543 @* The command 544 @code{parallelWaitFirst(list commands, list args, list #)} is 545 synonymous to 546 @code{parallelWaitN(list commands, list args, 1, list #)}. See 547 @ref{parallelWaitN} for details on optional arguments and other 548 remarks. 549 SEE ALSO: Ssi links, waitfirst 550 KEYWORDS: parallelWaitFirst; Parallelization; Links, user interface; 551 Skeletons for parallelization; Distributed computing 552 EXAMPLE: @code{example parallelWaitFirst;} shows an example." 553 { 554 return(parallelWaitN(commands, args, 1, #)); 555 } 556 example 557 { 558 "EXAMPLE:"; echo = 2; 559 LIB "primdec.lib"; 560 ring r = 0, (x,y,z), lp; 561 ideal i = z8+z6+4z5+4z3+4z2+4, y-z2; 562 list commands = list("primdecGTZ", "primdecSY"); 563 list args = list(list(i), list(i)); 564 parallelWaitFirst(commands, args); 565 } 566 567 proc parallelWaitAll(def commands, list args, list #) 568 "USAGE: parallelWaitAll(commands, args[, timeout, linktype, servers, 569 maxmemory]); commands list or string, args list, timeout int, 570 linktype string, servers list, maxmemory intvec 571 RETURN: a list, containing the results of commands[i] applied to arg[i], 572 i = 1, ..., size(commands). 573 @* The command 574 @code{parallelWaitAll(list commands, list args, list #)} is 575 synonymous to 576 @code{parallelWaitN(list commands, list args, size(args), list #)}. See 577 @ref{parallelWaitN} for details on optional arguments and other 578 remarks. 579 If commands is of type string, this is a shortcut for a list of size 580 @code{size(args)} whose entries are just this string. 581 SEE ALSO: Ssi links, waitall 582 KEYWORDS: parallelWaitAll; Parallelization; Links, user interface; 583 Skeletons for parallelization; Distributed computing 584 EXAMPLE: @code{example parallelWaitAll;} shows an example." 585 { 586 if(typeof(commands) != "list" && typeof(commands) != "string") 587 { 588 ERROR("invalid type of first argument"); 589 } 590 if(typeof(commands) == "list") 591 { 592 return(parallelWaitN(commands, args, size(args), #)); 593 } 594 else 595 { 596 list cmds; 597 for(int i = size(args); i > 0; i--) 598 { 599 cmds[i] = commands; 600 } 601 return(parallelWaitN(cmds, args, size(args), #)); 602 } 603 } 604 example 605 { 606 "EXAMPLE:"; echo = 2; 607 ring r = 0, (x,y,z), dp; 608 ideal i1 = z8+z6+4z5+4z3+4z2+4, y-z2; 609 ideal i2 = x10+x9y2, y8-x2y7; 610 ideal i3 = x3-2xy, x2y-2y2+x; 611 string command = "std"; 612 list args = list(list(i1), list(i2), list(i3)); 613 parallelWaitAll(command, args); 614 } 615 616 // TODO 617 /// http://arxiv.org/abs/1005.5663v2 618 static proc doModular(command, args, proc deleteUnlucksPrimes, proc testInChar0) 619 { 620 } 621 622 // TODO 623 /* worker farm */ 624 static proc Create() {} 625 626 /* auxiliary procedures */ 627 static proc watchlinks() 628 { 629 list parent = list(mempatrol); 630 list watchedlinks; 631 int wait; 632 int i, sys; 633 while(1) 634 { 635 if(size(watchedlinks) == 0) 636 { 637 wait = waitall(parent); 638 } 639 else 640 { 641 wait = waitall(parent, 10000); 642 } 643 if(wait == -1) 644 { 645 ERROR("The main process crashed."); 646 } 647 if(wait) 648 { 649 def query = read(mempatrol); 650 if(typeof(query) == "list") 651 { 652 watchedlinks = insert(watchedlinks, query); 653 } 654 else // in this case, typeof(query) is assumed to be "int", the 655 // index of the link 656 { 657 for(i = size(watchedlinks); i > 0; i--) 658 { 659 if(watchedlinks[i][3] == query) 660 { 661 watchedlinks = delete(watchedlinks, i); 662 break; 663 } 664 } 665 } 666 } 667 for(i = size(watchedlinks); i > 0; i--) 668 { 669 if(getusedmemory(watchedlinks[i][1], watchedlinks[i][2]) 670 > watchedlinks[i][4]) 671 { 672 if(watchedlinks[i][1] == "localhost") 673 { 674 sys = system("sh", "kill "+string(watchedlinks[i][2])); 675 } 676 else 677 { 678 sys = system("sh", "ssh "+watchedlinks[i][1]+" kill " 679 +string(watchedlinks[i][2])); 680 } 681 write(mempatrol, watchedlinks[i][3]); 682 watchedlinks = delete(watchedlinks, i); 683 } 684 } 685 } 686 } 687 688 static proc getusedmemory(string server, int pid) 689 { 690 string s; 691 if(server == "localhost") 692 { 693 s = read("|: grep VmSize /proc/"+string(pid)+"/status "+ 694 "| awk '{ print $2; }'"); 695 } 696 else 697 { 698 s = read("|: ssh "+server+" grep VmSize /proc/"+string(pid)+"/status "+ 699 "| awk '{ print $2; }'"); 700 } 701 bigint b; 702 execute("b = "+s+";"); 703 int i = int(b/1000); 704 return(i); 705 } 706 707 static proc argsToString(string name, int length) 708 { 709 string arglist; 710 if(length > 0) { 711 arglist = name+"[1]"; 712 } 713 int i; 714 for(i = 2; i <= length; i++) { 715 arglist = arglist+", "+name+"["+string(i)+"]"; 716 } 717 return(arglist); 718 } 32 LIB "tasks.lib"; 33 34 proc parallelWaitN(alias list commands, alias list args, int N, list #) 35 "USAGE: parallelWaitN(commands, arguments, N[, timeout]); commands list, 36 arguments list, N int, timeout int 37 RETURN: a list, containing the results of commands[i] applied to 38 arguments[i], i = 1, ..., size(arguments). 39 @* The procedure waits for N jobs to finish. 40 @* An optional timeout in ms can be provided. Default is 0 which 41 disables the timeout. 42 NOTE: The entries of the list commands must be strings. The entries of the 43 list arguments must be lists. 44 @* The type of any entry of the returned list whose corresponding task 45 did not finish (due to timeout or error) is \"none\". 46 @* The returned list may contain more than N results if several jobs 47 finished \"at the same time\". It may contain less than N results in 48 the case of timeout or errors occurring. 49 SEE ALSO: parallelWaitAll, parallelWaitFirst, tasks_lib 50 EXAMPLE: example parallelWaitN; shows an example" 51 { 52 // auxiliary variables 53 int i; 54 55 // read optional parameters 56 int timeout; 57 int ncores; // obsolete, but kept for compatibility with old libraries 58 if (size(#) > 0) { 59 if (typeof(#[1]) != "int") { 60 ERROR("wrong optional parameters"); 61 } 62 timeout = #[1]; 63 if (size(#) > 1) { 64 if (size(#) > 2 || typeof(#[2]) != "int") { 65 ERROR("wrong optional parameters"); 66 } 67 ncores = #[2]; 68 } 69 } 70 71 // apply wrapper for obsolete optional parameter ncores 72 if (ncores) { 73 list semaphore_save = Resources::setcores_subtree(ncores); 74 } 75 76 // error checking 77 int njobs = size(commands); 78 if (njobs != size(args)) { 79 ERROR("The number of commands does not match the number of lists" 80 +newline+"of arguments."); 81 } 82 if (njobs == 0) { 83 ERROR("no commands specified"); 84 } 85 for (i = 1; i <= njobs; i++) { 86 if (typeof(commands[i]) != "string") { 87 ERROR("The first argument is not a list of strings."); 88 } 89 if (typeof(args[i]) != "list") { 90 ERROR("The second argument is not a list of lists."); 91 } 92 } 93 94 // compute the tasks 95 for (i = 1; i <= njobs; i++) { 96 task t(i) = commands[i], args[i]; 97 } 98 startTasks(t(1..njobs)); 99 list indices = waitTasks(list(t(1..njobs)), N, timeout); 100 101 // wrap back to saved semaphore 102 if (ncores) { 103 Resources::resetcores_subtree(semaphore_save); 104 } 105 106 // return results 107 list results; 108 for (i = size(indices); i > 0; i--) { 109 results[indices[i]] = getResult(t(indices[i])); 110 } 111 for (i = 1; i <= njobs; i++) { 112 killTask(t(i)); 113 } 114 return(results); 115 } 116 example 117 { 118 "EXAMPLE:"; 119 echo = 2; 120 ring R = 0, (x,y,z), lp; 121 ideal I = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4; 122 ideal J = x10+x9y2, x2y7-y8; 123 list commands = list("std", "std"); 124 list arguments = list(list(I), list(J)); 125 parallelWaitN(commands, arguments, 1); 126 } 127 128 proc parallelWaitFirst(alias list commands, alias list args, list #) 129 "USAGE: parallelWaitFirst(commands, args[, timeout]); commands list, 130 arguments list, timeout int 131 RETURN: a list, containing at least one (if no timeout occurs) of the results 132 of commands[i] applied to arguments[i], i = 1, ..., size(arguments). 133 @* The command @code{parallelWaitFirst(commands, arguments[, timeout])} 134 is synonymous to 135 @code{parallelWaitN(commands, arguments, 1[, timeout])}. See 136 @ref{parallelWaitN} for details on optional arguments and other 137 remarks. 138 SEE ALSO: parallelWaitN, parallelWaitAll, tasks_lib 139 EXAMPLE: example parallelWaitFirst; shows an example" 140 { 141 return(parallelWaitN(commands, args, 1, #)); 142 } 143 example 144 { 145 "EXAMPLE:"; 146 echo = 2; 147 ring R = 0, (x,y,z), lp; 148 ideal I = 3x3y+x3+xy3+y2z2, 2x3z-xy-xz3-y4-z2, 2x2yz-2xy2+xz2-y4; 149 ideal J = x10+x9y2, x2y7-y8; 150 list commands = list("std", "std"); 151 list arguments = list(list(I), list(J)); 152 parallelWaitFirst(commands, arguments); 153 } 154 155 proc parallelWaitAll(def commands, alias list args, list #) 156 "USAGE: parallelWaitAll(commands, arguments[, timeout]); commands list or 157 string, arguments list, timeout int 158 RETURN: a list, containing the results of commands[i] applied to 159 arguments[i], i = 1, ..., size(arguments). 160 @* The command @code{parallelWaitAll(commands, arguments[, timeout])} is 161 synonymous to @code{parallelWaitN(commands, arguments, 162 size(arguments)[, timeout])}. See @ref{parallelWaitN} for details on 163 optional arguments and other remarks. 164 NOTE: As a shortcut, @code{commands} can be a string. This is synonymous to 165 providing a list of @code{size(arguments)} copies of this string. 166 SEE ALSO: parallelWaitFirst, parallelWaitN, tasks_lib 167 EXAMPLE: example parallelWaitAll; shows an example" 168 { 169 if (typeof(commands) != "list" && typeof(commands) != "string") { 170 ERROR("invalid type of first argument"); 171 } 172 if (typeof(commands) == "list") { 173 return(parallelWaitN(commands, args, size(args), #)); 174 } 175 else { 176 list cmds; 177 for (int i = size(args); i > 0; i--) { 178 cmds[i] = commands; 179 } 180 return(parallelWaitN(cmds, args, size(args), #)); 181 } 182 } 183 example 184 { 185 "EXAMPLE:"; 186 echo = 2; 187 ring R = 0, (x,y,z), dp; 188 ideal I1 = z8+z6+4z5+4z3+4z2+4, -z2+y; 189 ideal I2 = x9y2+x10, x2y7-y8; 190 ideal I3 = x3-2xy, x2y-2y2+x; 191 string command = "std"; 192 list arguments = list(list(I1), list(I2), list(I3)); 193 parallelWaitAll(command, arguments); 194 } 195 196 proc parallelTestAND(def commands, alias list args, list #) 197 "USAGE: parallelTestAND(commands, arguments[, timeout]); commands list or 198 string, arguments list, timeout int 199 RETURN: 1, if commands[i] applied to arguments[i] is not equal to zero for 200 all i = 1, ..., size(arguments); 201 0, otherwise. 202 @* An optional timeout in ms can be provided. Default is 0 which 203 disables the timeout. In case of timeout, -1 is returned. 204 NOTE: The entries of the list commands must be strings. The entries of the 205 list arguments must be lists. 206 @* commands[i] applied to arguments[i] must evaluate to an integer for 207 i = 1, ..., size(arguments). 208 @* As a shortcut, @code{commands} can be a string. This is synonymous to 209 providing a list of @code{size(arguments)} copies of this string. 210 SEE ALSO: parallelTestOR, tasks_lib 211 EXAMPLE: example parallelTestAND; shows an example" 212 { 213 // note: this can be improved 214 list results = parallelWaitAll(commands, args, #); 215 int i; 216 for (i = size(args); i > 0; i--) { 217 if (typeof(results[i]) != "int" && typeof(results[i]) != "none") { 218 ERROR("result no. "+string(i)+" not of type int"); 219 } 220 } 221 for (i = size(args); i > 0; i--) { 222 if (typeof(results[i]) == "none") { // timeout 223 return(-1); 224 } 225 } 226 for (i = size(results); i > 0; i--) { 227 if (!results[i]) { 228 return(0); 229 } 230 } 231 return(1); 232 } 233 example 234 { 235 "EXAMPLE:"; 236 echo = 2; 237 ring R = 0, (x,y,z), dp; 238 ideal I = x, y, z; 239 intvec v = 0:3; 240 list l = list(I, v); 241 module m1 = x*gen(1); 242 module m2; 243 string command = "size"; 244 list arguments1 = list(list(I), list(v), list(l), list(m1)); 245 list arguments2 = list(list(I), list(v), list(l), list(m2)); 246 // test if all the arguments have non-zero size 247 parallelTestAND(command, arguments1); 248 parallelTestAND(command, arguments2); 249 } 250 251 proc parallelTestOR(def commands, alias list args, list #) 252 "USAGE: parallelTestOR(commands, arguments[, timeout]); commands list or 253 string, arguments list, timeout int 254 RETURN: 1, if commands[i] applied to arguments[i] is not equal to zero for 255 any i = 1, ..., size(arguments); 256 0, otherwise. 257 @* An optional timeout in ms can be provided. Default is 0 which 258 disables the timeout. In case of timeout, -1 is returned. 259 NOTE: The entries of the list commands must be strings. The entries of the 260 list arguments must be lists. 261 @* commands[i] applied to arguments[i] must evaluate to an integer for 262 i = 1, ..., size(arguments). 263 @* As a shortcut, @code{commands} can be a string. This is synonymous to 264 providing a list of @code{size(arguments)} copies of this string. 265 SEE ALSO: parallelTestAND, tasks_lib 266 EXAMPLE: example parallelTestAND; shows an example" 267 { 268 // note: this can be improved 269 list results = parallelWaitAll(commands, args, #); 270 int i; 271 for (i = size(args); i > 0; i--) { 272 if (typeof(results[i]) != "int" && typeof(results[i]) != "none") { 273 ERROR("result no. "+string(i)+" not of type int"); 274 } 275 } 276 for (i = size(args); i > 0; i--) { 277 if (typeof(results[i]) == "none") { // timeout 278 return(-1); 279 } 280 } 281 for (i = size(results); i > 0; i--) { 282 if (results[i]) { 283 return(1); 284 } 285 } 286 return(0); 287 } 288 example 289 { 290 "EXAMPLE:"; 291 echo = 2; 292 ring R = 0, (x,y,z), dp; 293 ideal I; 294 string s; 295 list l; 296 module m1 = x*gen(1); 297 module m2; 298 string command = "size"; 299 list arguments1 = list(list(I), list(s), list(l), list(m1)); 300 list arguments2 = list(list(I), list(s), list(l), list(m2)); 301 // test if any of the arguments has non-zero size 302 parallelTestOR(command, arguments1); 303 parallelTestOR(command, arguments2); 304 } 305
Note: See TracChangeset
for help on using the changeset viewer.