Line data Source code
1 : //# Applicator.cc: Implementation of Applicator.h
2 : //# Copyright (C) 1999,2000,2002
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify it
6 : //# under the terms of the GNU Library General Public License as published by
7 : //# the Free Software Foundation; either version 2 of the License, or (at your
8 : //# option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful, but WITHOUT
11 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13 : //# License for more details.
14 : //#
15 : //# You should have received a copy of the GNU Library General Public License
16 : //# along with this library; if not, write to the Free Software Foundation,
17 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
18 : //#
19 : //# Correspondence concerning AIPS++ should be addressed as follows:
20 : //# Internet email: casa-feedback@nrao.edu.
21 : //# Postal address: AIPS++ Project Office
22 : //# National Radio Astronomy Observatory
23 : //# 520 Edgemont Road
24 : //# Charlottesville, VA 22903-2475 USA
25 : //#
26 : //# $Id$
27 :
28 : #include <casacore/casa/Utilities/Assert.h>
29 :
30 : #include <synthesis/Parallel/Applicator.h>
31 : #include <synthesis/Parallel/MPITransport.h>
32 : #include <synthesis/Parallel/SerialTransport.h>
33 : #include <synthesis/Parallel/Algorithm.h>
34 : #include <synthesis/MeasurementComponents/ClarkCleanAlgorithm.h>
35 : #include <synthesis/MeasurementComponents/ReadMSAlgorithm.h>
36 : #include <synthesis/MeasurementComponents/MakeApproxPSFAlgorithm.h>
37 : #include <synthesis/MeasurementComponents/PredictAlgorithm.h>
38 : #include <synthesis/MeasurementComponents/ResidualAlgorithm.h>
39 : #include <synthesis/ImagerObjects/CubeMajorCycleAlgorithm.h>
40 : #include <synthesis/ImagerObjects/CubeMakeImageAlgorithm.h>
41 : #include <synthesis/ImagerObjects/CubeMinorCycleAlgorithm.h>
42 : #include <synthesis/Parallel/MPIError.h>
43 :
44 : using namespace casacore;
45 : using namespace std;
46 : namespace casa { //# NAMESPACE CASA - BEGIN
47 :
48 8 : Applicator::Applicator() : comm(0), algorithmIds( ),
49 8 : knownAlgorithms( ), LastID(101), usedAllThreads(false),
50 8 : serial(true), nProcs(0), procStatus(0), initialized_p(false)
51 : {
52 : // Default constructor; requires later init().
53 8 : }
54 :
55 8 : Applicator::~Applicator()
56 : {
57 : // Default destructor
58 : //
59 8 : if (comm) {
60 : // If controller, then stop all worker processes
61 1 : if (isController() && !(comm->isFinalized())) {
62 0 : comm->setTag(STOP);
63 0 : for (Int i=0; i<nProcs; i++) {
64 0 : if (i != comm->controllerRank()) {
65 0 : comm->connect(i);
66 0 : put(STOP);
67 : }
68 : }
69 : }
70 1 : delete comm;
71 : }
72 :
73 16 : for (auto &algo : knownAlgorithms) {
74 8 : delete algo.second;
75 : }
76 8 : }
77 :
78 1 : void Applicator::initThreads(Int argc, Char *argv[]){
79 :
80 1 : Int numprocs=0;
81 :
82 : // A no-op if not using MPI
83 : #ifdef HAVE_MPI
84 : //if (debug_p) {
85 :
86 1 : if(initialized_p) return;
87 :
88 : //If detecting only 1 proc is offered to OpenMPI but compiling with MPI
89 1 : if (!getenv("OMPI_COMM_WORLD_LOCAL_SIZE") || (String::toInt(getenv("OMPI_COMM_WORLD_LOCAL_SIZE")) <2) ) {
90 : //go serial
91 1 : initThreads();
92 : }
93 : else {
94 : //cerr << "In initThreads. argc: " << argc << ", argv: " << argv << '\n';
95 0 : int flag=0;
96 0 : MPI_Initialized(&flag);
97 : //cerr << "FLAG " << flag << endl;
98 0 : if(flag || MPI_Init(&argc, &argv)==MPI_SUCCESS){
99 0 : Int numproc=0;
100 0 : MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
101 0 : if(numprocs < 2){
102 0 : initThreads();
103 0 : MPI_Finalize();
104 0 : return;
105 : }
106 : }
107 :
108 : // cerr << "In initThreads. argc: " << argc << ", argv: " << argv << '\n';
109 : // Initialize the MPI transport layer
110 : try {
111 0 : comm = new MPITransport(argc, argv);
112 :
113 : // Initialize the process status list
114 0 : setupProcStatus();
115 :
116 : // If controller then exit, else loop, waiting for an assigned task
117 0 : if (isWorker()) {
118 0 : loop();
119 : }
120 :
121 0 : } catch (MPIError x) {
122 0 : cerr << x.getMesg() << " doing serial "<< endl;
123 0 : initThreads();
124 0 : }
125 : }
126 :
127 : #else
128 : (void)argc;
129 : (void)argv;
130 : cerr << " doing serial "<< endl;
131 : initThreads();
132 : #endif
133 : }
134 :
135 : // Serial transport all around.
136 1 : void Applicator::initThreads(){
137 : // Initialize a serial transport layer
138 1 : comm = new SerialTransport();
139 : // Initialize the process status list
140 1 : setupProcStatus();
141 1 : }
142 0 : void Applicator::destroyThreads(){
143 0 : if(initialized_p){
144 0 : if (comm) {
145 : // If controller, then stop all worker processes
146 0 : if (isController() && !isSerial() && !(comm->isFinalized())) {
147 : //comm->setTag(STOP);
148 0 : for (Int i=0; i<nProcs; i++) {
149 0 : if (i != comm->controllerRank()) {
150 0 : comm->connect(i);
151 0 : comm->setTag(STOP);
152 0 : put(STOP);
153 :
154 : }
155 : }
156 : }
157 : //delete comm; ///leaking this for now as if initialized from python..it brings down the whole house
158 : //comm=nullptr;
159 : }
160 :
161 : }
162 :
163 0 : }
164 1091 : void Applicator::init(Int argc, Char *argv[])
165 : {
166 : // Initialize the process and parallel transport layer
167 : //
168 : //cerr <<"Applicatorinit " << initialized_p << endl;
169 1091 : if(comm){
170 : //if worker was released from loop...want it back now
171 1090 : if(comm && isWorker() && !isSerial())
172 0 : loop();
173 1090 : return;
174 : }
175 : // Fill the map of known algorithms
176 : //cerr << "APPINIT defining algorithms " << endl;
177 1 : defineAlgorithms();
178 :
179 : #ifdef HAVE_MPI
180 1 : if (debug_p) {
181 0 : cerr << "In init threads, HAVE_MPI...\n";
182 : }
183 1 : initThreads(argc, argv);
184 : #else
185 : if (debug_p) {
186 : cerr << "In init threads, not HAVE_MPI...\n";
187 : }
188 : (void)argc;
189 : (void)argv;
190 : initThreads();
191 : #endif
192 1 : initialized_p=true;
193 1 : return;
194 : }
195 :
196 2183 : Bool Applicator::isController()
197 : {
198 : // Return T if the current process is the controller
199 : //
200 : Bool result;
201 2183 : if (comm) {
202 2183 : result = comm->isController();
203 : } else {
204 0 : throw(AipsError("Parallel transport layer not initialized"));
205 : }
206 2183 : return result;
207 : }
208 :
209 3272 : Bool Applicator::isWorker()
210 : {
211 : // Return T if the current process is a worker process
212 : //
213 : Bool result;
214 3272 : if (comm) {
215 3272 : result = comm->isWorker();
216 : } else {
217 0 : throw(AipsError("Parallel transport layer not initialized"));
218 : }
219 3272 : return result;
220 : }
221 :
222 0 : void Applicator::loop()
223 : {
224 : // Loop, if a worker process, waiting for an assigned task
225 : //
226 0 : Bool die(false);
227 : Int what;
228 : // Wait for a message from the controller with any Algorithm tag
229 0 : while(!die){
230 0 : comm->connectToController();
231 0 : comm->setAnyTag();
232 : //cerr << "in loop get" << endl;
233 0 : comm->get(what);
234 0 : if (debug_p) {
235 0 : cerr << "worker, got what (algID/stop): " << what << endl;
236 : }
237 0 : switch(what){
238 0 : case STOP :
239 0 : die = true;
240 0 : break;
241 0 : default :
242 : // In this case, an Algorithm tag is expected.
243 : // First check that it is known.
244 0 : if (knownAlgorithms.find(what) != knownAlgorithms.end( )) {
245 : // Identified algorithm tag; set for subsequent communication
246 0 : comm->setTag(what);
247 : // Execute (apply) the algorithm
248 0 : knownAlgorithms.at(what)->apply();
249 : } else {
250 0 : throw(AipsError("Unidentified parallel algorithm code"));
251 : }
252 0 : break;
253 : }
254 : }
255 : //cerr <<"getting out of loop " <<endl;
256 0 : return;
257 : }
258 :
259 1091 : Bool Applicator::nextAvailProcess(Algorithm &a, Int &rank)
260 : {
261 : // Assign the next available process for the specified Algorithm
262 : //
263 : // Must be the controller to request a worker process
264 1091 : Bool assigned=False;
265 1091 : if (isWorker()) {
266 0 : throw(AipsError("Must be the controller to assign a worker process"));
267 : } else {
268 1091 : if (!usedAllThreads) {
269 : // Connect to the next available process in the list
270 : Bool lastOne;
271 1091 : rank = findFreeProc(lastOne);
272 1091 : AlwaysAssert(rank >= 0, AipsError);
273 1091 : if (lastOne) usedAllThreads = true;
274 1091 : Int tag = algorithmIds.find(a.name()) == algorithmIds.end( ) ? 0 : algorithmIds.at(a.name());
275 :
276 : // Send wake-up message (containing the Algorithm tag) to
277 : // the assigned worker process to activate it (see loop()).
278 1091 : comm->connect(rank);
279 1091 : comm->setTag(tag);
280 : //cerr << "nextAvailproc settag " << tag << " rank " << rank << " name " << a.name() << endl;
281 1091 : put(tag);
282 : /*
283 : if (not isWorker() and numProcs() <= 1){
284 : // the first int, algID, is consumed in the loop for the workers when running
285 : // in multiprocess mode and there are at least 2 processes. When not multiprocess or a
286 : // single process, we need to consume it:
287 : // TODO - it could be consumed up here, right after the put()
288 : int algID;
289 : comm->get(algID);
290 : if (debug_p) {
291 : cerr << "nextAvailproc controller, got algID: " << algID << " assigned " << assigned << " donesig " << donesig_p<< endl;
292 : }
293 : }
294 : */
295 1091 : assigned = true;
296 1091 : procStatus(rank) = ASSIGNED;
297 : } else {
298 0 : assigned = false;
299 : }
300 : }
301 : //cerr << "nextAvailproc controller assigned " << assigned << endl;
302 :
303 1091 : if ((!isWorker()) && (numProcs() <= 1) && assigned){
304 : // the first int, algID, is consumed in the loop for the workers when running
305 : // in multiprocess mode and there are at least 2 processes. When not multiprocess or a
306 : // single process, we need to consume it:
307 : // TODO - it could be consumed up here, right after the put()
308 : Int algID;
309 : //comm->get(algID);
310 1091 : get(algID);
311 1091 : if (debug_p) {
312 0 : cerr << "nextAvailproc controller, got algID: " << algID << " assigned " << assigned << " donesig " << donesig_p<< endl;
313 : }
314 : }
315 :
316 1091 : return assigned;
317 : }
318 :
319 0 : bool Applicator::initialized(){
320 : #ifdef HAVE_MPI
321 0 : return initialized_p;
322 : #endif
323 :
324 : return false;
325 : }
326 2182 : Int Applicator::nextProcessDone(Algorithm &a, Bool &allDone)
327 : {
328 : // Return the rank of the next process to complete the specified algorithm
329 : //
330 2182 : Int rank = -1;
331 2182 : allDone = true;
332 : //cerr << "nextprocess done procstatus " << procStatus << endl;
333 4364 : for (uInt i=0; i<procStatus.nelements(); i++) {
334 2182 : if (procStatus(i) == ASSIGNED) {
335 1091 : if (isSerial()) {
336 : // In the serial case, the controller can be assigned
337 1091 : allDone = false;
338 : } else {
339 : // In the parallel case, the controller is not assigned
340 0 : if (i != static_cast<uInt>(comm->controllerRank())) {
341 0 : allDone = false;
342 : }
343 : }
344 : }
345 : }
346 2182 : if (!allDone) {
347 : // Wait for a process to finish with the correct algorithm tag
348 1091 : comm->connectAnySource();
349 1091 : Int tag = algorithmIds.find(a.name()) == algorithmIds.end( ) ? 0 : algorithmIds.at(a.name());
350 : //cerr <<"procdone name" << a.name() << " id " << tag << endl;
351 1091 : comm->setTag(tag);
352 : Int doneSignal;
353 1091 : rank = get(doneSignal);
354 : //cerr <<" procdone rank " << rank << " donesig " << doneSignal << endl;
355 : // Consistency check; should return a DONE signal to contoller
356 : // on completion.
357 1091 : if (doneSignal != DONE) {
358 0 : throw(AipsError("Worker process terminated unexpectedly"));
359 : } else {
360 : // Set source in parallel transport layer
361 1091 : comm->connect(rank);
362 : // Mark process as free
363 1091 : procStatus(rank) = FREE;
364 : //cerr << "NEXTProcDone connect rank" << rank << " procstat " << procStatus << endl;
365 1091 : usedAllThreads = false;
366 : }
367 : }
368 2182 : return rank;
369 : }
370 :
371 1091 : void Applicator::done()
372 : {
373 : // Signal that a worker process is done
374 : //
375 1091 : donesig_p=DONE;
376 1091 : Int donesig=DONE;
377 1091 : if(isSerial())
378 1091 : put(donesig_p);
379 : else
380 0 : put(donesig);
381 2182 : return;
382 : }
383 :
384 1091 : void Applicator::apply(Algorithm &a)
385 : {
386 : // Execute an algorithm directly
387 : //
388 : // Null operation unless serial, in which case the
389 : // controller needs to execute the algorithm directly.
390 : // In the parallel case, the algorithm applies are
391 : // performed in workers processes' applicator.init().
392 1091 : donesig_p=10000;
393 1091 : if (isSerial() && isController()) {
394 1091 : a.apply();
395 : }
396 1091 : return;
397 : }
398 :
399 0 : void Applicator::defineAlgorithm(Algorithm *a)
400 : {
401 : //no need to add if it is already defined
402 : // if(algorithmIds.count(a->name()) <1){
403 : //knownAlgorithms.insert( std::pair<casacore::Int,Algorithm*>(LastID, a) );
404 : // algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a->name(), LastID) );
405 0 : Int theid=LastID;
406 0 : if(algorithmIds.count(a->name()) >0){
407 0 : theid=algorithmIds[a->name()];
408 : }
409 : else{
410 0 : theid=LastID;
411 0 : algorithmIds[a->name()]=LastID;
412 0 : ++LastID;
413 : }
414 0 : knownAlgorithms[theid]=a;
415 : // }
416 0 : return;
417 : }
418 :
419 1 : void Applicator::defineAlgorithms()
420 : {
421 : // Fill the algorithm map
422 : //
423 : // Clark CLEAN parallel deconvolution
424 1 : Algorithm *a1 = new ClarkCleanAlgorithm;
425 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a1) );
426 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a1->name(), LastID) );
427 1 : LastID++;
428 1 : Algorithm *a2 = new ReadMSAlgorithm;
429 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a2) );
430 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a2->name(), LastID) );
431 1 : LastID++;
432 1 : Algorithm *a3 = new MakeApproxPSFAlgorithm;
433 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a3) );
434 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a3->name(), LastID) );
435 1 : LastID++;
436 1 : Algorithm *a4 = new PredictAlgorithm;
437 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a4) );
438 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a4->name(), LastID) );
439 1 : LastID++;
440 1 : Algorithm *a5 = new ResidualAlgorithm;
441 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a5) );
442 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a5->name(), LastID) );
443 1 : LastID++;
444 1 : Algorithm *a6 = new CubeMajorCycleAlgorithm;
445 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a6) );
446 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a6->name(), LastID) );
447 1 : LastID++;
448 1 : Algorithm *a7 = new CubeMakeImageAlgorithm;
449 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a7) );
450 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a7->name(), LastID) );
451 1 : LastID++;
452 1 : Algorithm *a8 = new CubeMinorCycleAlgorithm;
453 1 : knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a8) );
454 1 : algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a8->name(), LastID) );
455 1 : LastID++;
456 2 : return;
457 : }
458 :
459 1 : void Applicator::setupProcStatus()
460 : {
461 : // Set up the process status list
462 : //
463 1 : nProcs = comm->numThreads();
464 1 : if (nProcs <= 1) {
465 1 : serial = true;
466 : } else {
467 0 : serial = false;
468 : }
469 : // Resize the process list, and mark as unassigned (except for controller)
470 1 : usedAllThreads = false;
471 1 : procStatus.resize(max(nProcs,1));
472 1 : procStatus = FREE;
473 : // In the parallel case, the controller is never assigned
474 1 : if (!isSerial())
475 0 : procStatus(comm->controllerRank()) = ASSIGNED;
476 1 : }
477 :
478 1091 : Int Applicator::findFreeProc(Bool &lastOne)
479 : {
480 : // Search the process status list for the next free process
481 : //
482 1091 : Int freeProc = -1;
483 1091 : Int nfree = 0;
484 :
485 2182 : for (uInt i=0; i<procStatus.nelements(); i++) {
486 1091 : if (procStatus(i) == FREE) {
487 1091 : nfree++;
488 1091 : if (freeProc < 0) freeProc = i;
489 : }
490 : }
491 1091 : lastOne = (nfree==1);
492 : //cerr <<"FreeProc procstat "<< procStatus << " nfree " << nfree << endl;
493 1091 : return freeProc;
494 : }
495 :
496 : // The applicator is ominpresent.
497 : // Moved here for shared libraries.
498 : Applicator applicator;
499 :
500 :
501 : } //# NAMESPACE CASA - END
502 :
|