Line data Source code
1 : #include <synthesis/ImagerObjects/grpcInteractiveClean.h>
2 : #include <synthesis/ImagerObjects/SIMinorCycleController.h>
3 : #include <casatools/Config/State.h>
4 : #include <casacore/casa/Logging/LogIO.h>
5 : #include <casacore/images/Images/PagedImage.h>
6 : #include <stdcasa/StdCasa/CasacSupport.h>
7 : #include <string.h>
8 : #include <stdlib.h>
9 : #include <sys/types.h>
10 : #include <sys/wait.h>
11 : #include <iostream>
12 :
13 : #include <sys/types.h>
14 : #include <sys/stat.h>
15 : #include <unistd.h>
16 : #include <array>
17 : #include <regex>
18 : #include <string>
19 :
20 : #include <algorithm>
21 : #include <cctype>
22 : #include <locale>
23 :
24 : #include <grpc++/grpc++.h>
25 : #include "shutdown.grpc.pb.h"
26 : #include "img.grpc.pb.h"
27 : #include "ping.grpc.pb.h"
28 :
29 : #include <stdcasa/StdCasa/CasacSupport.h>
30 :
31 : #ifdef __APPLE__
32 : extern "C" char **environ;
33 : #include <unistd.h>
34 : #endif
35 :
36 : using namespace casacore;
37 :
38 : // https://stackoverflow.com/questions/216823/whats-the-best-way-to-trim-stdstring
39 : // C++ is so ridiculous... trim from start (in place)
40 0 : static inline void ltrim(std::string &s) {
41 0 : s.erase(s.begin(), std::find_if(s.begin(), s.end(), [](int ch) {
42 0 : return !std::isspace(ch);
43 : }));
44 0 : }
45 :
46 : // trim from end (in place)
47 0 : static inline void rtrim(std::string &s) {
48 0 : s.erase(std::find_if(s.rbegin(), s.rend(), [](int ch) {
49 0 : return !std::isspace(ch);
50 0 : }).base(), s.end());
51 0 : }
52 :
53 : // trim from both ends (in place)
54 0 : static inline void trim(std::string &s) {
55 0 : ltrim(s);
56 0 : rtrim(s);
57 0 : }
58 :
59 : // Executes the given program, with the given arguments and the given environment.
60 : // The stdout from the program is collected and returned in output, up to outputlen characters.
61 : // @param envp To get around the MPI issue from CAS-13252, this should probably come from getenv_sansmpi().
62 0 : static void execve_getstdout(char *pathname, char *argv[], char *envp[], char *output, ssize_t outputlen)
63 : {
64 : // We use execve here instead of popen to get around issues related to using MPI.
65 : // MPI crashes when starting a process that calls MPI_Init in a process spawned using popen or exec.
66 : // We can trick MPI into behaving itself by removing all the MPI environmental variables for
67 : // the child precess (thus getenv_sansmpi and execve).
68 :
69 : int filedes[2];
70 0 : if (pipe(filedes) == -1) {
71 0 : std::cerr << "pipe error" << std::endl;
72 0 : exit(1);
73 : }
74 :
75 0 : pid_t pid = fork();
76 0 : if (pid == -1) {
77 0 : std::cerr << "fork error" << std::endl;
78 0 : exit(1);
79 0 : } else if (pid == 0) { // child
80 : // close stdout and connect it to the input of the pipe
81 0 : while ((dup2(filedes[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {}
82 0 : close(filedes[1]);
83 0 : close(filedes[0]);
84 : // exec on the child process
85 0 : execve(pathname, argv, envp);
86 0 : exit(1);
87 : } else { // parent
88 : // don't care about the input end of the pipe
89 0 : close(filedes[1]);
90 :
91 0 : const ssize_t tmplen = 128;
92 : char tmp[tmplen];
93 0 : ssize_t total = 0;
94 : while (1) {
95 0 : ssize_t count = read(filedes[0], tmp, tmplen);
96 0 : if (count == -1) {
97 0 : if (errno == EINTR) {
98 0 : continue;
99 : } else {
100 0 : std::cerr << "read error" << std::endl;
101 0 : exit(1);
102 : }
103 0 : } else if (count == 0) {
104 0 : break;
105 : } else {
106 0 : ssize_t remaining = outputlen - total;
107 0 : ssize_t cpysize = (count < remaining) ? count : remaining;
108 0 : memcpy(output+total, tmp, cpysize);
109 0 : total += cpysize;
110 0 : output[total] = '\0';
111 : }
112 0 : }
113 :
114 0 : close(filedes[0]);
115 : }
116 0 : }
117 :
118 : // Get all environment parameters (as from the "environ" posix variable),
119 : // but don't include any environment parameters that match "*MPI*".
120 : // @return A malloc'ed set of environment parameters. Should call free after use.
121 0 : static char **getenv_sansmpi()
122 : {
123 0 : int nvars = 0, nvars_sansmpi = 0;
124 0 : for (nvars = 0; environ[nvars] != NULL; nvars++) {
125 : // printf("%s\n", environ[nvars]);
126 0 : std::string envvar = environ[nvars];
127 0 : if (envvar.find("MPI") == std::string::npos) {
128 0 : nvars_sansmpi++;
129 : }
130 0 : }
131 :
132 0 : char **ret = (char**)malloc(sizeof(char*) * (nvars_sansmpi+1));
133 0 : int retidx = 0;
134 0 : for (int i = 0; environ[i] != NULL; i++) {
135 0 : std::string envvar = environ[i];
136 0 : if (envvar.find("MPI") == std::string::npos) {
137 0 : ret[retidx] = environ[i];
138 0 : retidx++;
139 : }
140 0 : }
141 0 : ret[nvars_sansmpi] = NULL;
142 :
143 0 : return ret;
144 : }
145 :
146 : namespace casa { //# NAMESPACE CASA - BEGIN
147 :
148 0 : grpcInteractiveCleanManager &grpcInteractiveClean::getManager( ) {
149 0 : static grpcInteractiveCleanManager mgr;
150 0 : return mgr;
151 : }
152 :
153 0 : void grpcInteractiveCleanManager::pushDetails() {
154 0 : }
155 :
156 0 : grpcInteractiveCleanState::grpcInteractiveCleanState( ) : SummaryMinor(casacore::IPosition(2,
157 : SIMinorCycleController::nSummaryFields, // temporary CAS-13683 workaround
158 : // SIMinorCycleController::useSmallSummaryminor() ? 6 : SIMinorCycleController::nSummaryFields, // temporary CAS-13683 workaround
159 : 0)),
160 0 : SummaryMajor(casacore::IPosition(1,0)) {
161 0 : LogIO os( LogOrigin("grpcInteractiveCleanState",__FUNCTION__,WHERE) );
162 0 : reset( );
163 0 : }
164 :
165 0 : void grpcInteractiveCleanState::reset( ) {
166 0 : Niter = 0;
167 0 : MajorDone = 0;
168 0 : CycleNiter = 0;
169 0 : InteractiveNiter = 0;
170 0 : Threshold = 0;
171 0 : CycleThreshold = 0;
172 0 : InteractiveThreshold = 0.0;
173 0 : IsCycleThresholdAuto = true;
174 0 : IsCycleThresholdMutable = true;
175 0 : IsThresholdAuto = false;
176 0 : CycleFactor = 1.0;
177 0 : LoopGain = 0.1;
178 0 : StopFlag = false;
179 0 : PauseFlag = false;
180 0 : InteractiveMode = false;
181 0 : UpdatedModelFlag = false;
182 0 : InteractiveIterDone = 0;
183 0 : IterDone = 0;
184 0 : StopCode = 0;
185 0 : Nsigma = 0.0;
186 0 : MaxPsfSidelobe = 0.0;
187 0 : MinPsfFraction = 0.05;
188 0 : MaxPsfFraction = 0.8;
189 0 : PeakResidual = 0.0;
190 0 : MinorCyclePeakResidual = 0.0;
191 0 : PrevPeakResidual = -1.0;
192 0 : NsigmaThreshold = 0.0;
193 0 : PrevMajorCycleCount = 0;
194 0 : PeakResidualNoMask = 0.0;
195 0 : PrevPeakResidualNoMask = -1.0;
196 0 : MinPeakResidualNoMask = 1e+9;
197 0 : MinPeakResidual = 1e+9;
198 0 : MaskSum = -1.0;
199 0 : MadRMS = 0.0;
200 : //int nSummaryFields = SIMinorCycleController::useSmallSummaryminor() ? 6 : SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
201 0 : int nSummaryFields = SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
202 : //int nSummaryFields = !FullSummary ? 6 : SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
203 0 : SummaryMinor.reformOrResize(casacore::IPosition(2, nSummaryFields ,0));
204 0 : SummaryMajor.reformOrResize(casacore::IPosition(1,0));
205 0 : SummaryMinor = 0;
206 0 : SummaryMajor = 0;
207 0 : }
208 :
209 0 : void grpcInteractiveCleanManager::setControls( int niter, int ncycle, float threshold ) {
210 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager", __FUNCTION__, WHERE) );
211 0 : static const auto debug = getenv("GRPC_DEBUG");
212 0 : if ( debug ) std::cerr << "setting clean controls:";
213 0 : access( (void*) 0,
214 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
215 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
216 :
217 0 : state.Niter = niter;
218 0 : if ( debug ) std::cerr << " niter=" << state.Niter;
219 0 : state.CycleNiter = ncycle;
220 0 : if ( debug ) std::cerr << " cycleniter=" << state.CycleNiter;
221 0 : state.Threshold = threshold;
222 0 : if ( debug ) std::cerr << " threshold=" << state.Threshold;
223 0 : return dummy;
224 :
225 : } ) );
226 :
227 0 : if ( debug ) {
228 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
229 0 : std::this_thread::get_id() << ")" << std::endl;
230 0 : fflush(stderr);
231 : }
232 :
233 0 : }
234 0 : void grpcInteractiveCleanManager::setControlsFromRecord(const casac::record &iterpars) {
235 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager", __FUNCTION__, WHERE) );
236 0 : static const auto debug = getenv("GRPC_DEBUG");
237 :
238 0 : if ( debug ) std::cerr << "initializing clean controls:";
239 :
240 0 : access( (void*) 0,
241 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
242 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
243 :
244 0 : auto oldNiter = state.Niter;
245 0 : auto oldCycleNiter = state.CycleNiter;
246 0 : auto oldThreshold = state.Threshold;
247 0 : auto oldCycleThreshold = state.CycleThreshold;
248 :
249 0 : state.reset( );
250 :
251 : /* Note it is important that niter get set first as we catch
252 : negative values in the cycleniter, and set it equal to niter */
253 0 : auto niter = iterpars.find("niter");
254 0 : if ( niter != iterpars.end( ) ) {
255 0 : state.Niter = niter->second.toInt( );
256 0 : if ( debug ) std::cerr << " niter=" << state.Niter;
257 : }
258 0 : auto newNiter = state.Niter;
259 :
260 0 : auto cycleniter = iterpars.find("cycleniter");
261 0 : if ( cycleniter != iterpars.end( ) ) {
262 0 : int val = cycleniter->second.toInt( );
263 0 : if ( val <= 0 )
264 0 : state.CycleNiter = state.Niter;
265 : else
266 0 : state.CycleNiter = val;
267 0 : if ( debug ) std::cerr << " cycleniter=" << state.CycleNiter;
268 : }
269 0 : auto newCycleNiter = state.CycleNiter;
270 :
271 0 : auto interactiveniter = iterpars.find("interactiveniter");
272 0 : if ( interactiveniter != iterpars.end( ) ) {
273 0 : state.InteractiveNiter = interactiveniter->second.toInt( );
274 0 : if ( debug ) std::cerr << " interactiveniter=" << state.InteractiveNiter;
275 : }
276 :
277 0 : auto threshold = iterpars.find("threshold");
278 0 : if ( threshold != iterpars.end( ) ) {
279 0 : auto quant = casaQuantity(threshold->second);
280 0 : if ( quant.getUnit( ) == "" ) quant.setUnit("Jy");
281 0 : auto val = quant.getValue(Unit("Jy"));
282 0 : if ( val == -1.0 ) {
283 0 : state.Threshold = 0.0;
284 0 : state.IsThresholdAuto = true;
285 : } else {
286 0 : state.Threshold = (float) val;
287 0 : state.IsThresholdAuto = false;
288 : }
289 0 : if ( debug ) {
290 0 : std::cerr << " threshold=" << state.Threshold;
291 : std::cerr << " isthresholdauto=" <<
292 0 : (state.IsThresholdAuto ? "true" : "false");
293 : }
294 0 : }
295 0 : auto newThreshold = state.Threshold;
296 :
297 0 : auto cyclethreshold = iterpars.find("cyclethreshold");
298 0 : if ( cyclethreshold != iterpars.end( ) ) {
299 0 : state.CycleThreshold = casaQuantity(cyclethreshold->second).getValue(Unit("Jy"));
300 0 : state.IsCycleThresholdAuto = false;
301 0 : if ( debug ) {
302 0 : std::cerr << " cyclethreshold=" << state.CycleThreshold;
303 : std::cerr << " iscyclethresholdauto=" <<
304 0 : (state.IsCycleThresholdAuto ? "true" : "false");
305 0 : fflush(stderr);
306 : }
307 : }
308 0 : auto newCycleThreshold = state.CycleThreshold;
309 :
310 0 : auto cyclethresholdismutable = iterpars.find("cyclethresholdismutable");
311 0 : if ( cyclethresholdismutable != iterpars.end( ) ) {
312 0 : state.IsCycleThresholdMutable = cyclethresholdismutable->second.toBool( );
313 0 : state.IsCycleThresholdAuto = state.IsCycleThresholdAuto && state.IsCycleThresholdMutable;
314 0 : if ( debug ) {
315 0 : std::cerr << " iscyclethresholdmutable=" << (state.IsCycleThresholdMutable ? "true" : "false");
316 0 : std::cerr << " iscyclethresholdauto=" << (state.IsCycleThresholdAuto ? "true" : "false");
317 0 : fflush(stderr);
318 : }
319 : }
320 :
321 0 : auto interactivethreshold = iterpars.find("interactivethreshold");
322 0 : if ( interactivethreshold != iterpars.end( ) ) {
323 0 : state.InteractiveThreshold = casaQuantity(interactivethreshold->second).getValue(Unit("Jy"));
324 0 : if ( debug ) std::cerr << " interactivethreshold=" << state.InteractiveThreshold;
325 : }
326 :
327 0 : auto loopgain = iterpars.find("loopgain");
328 0 : if ( loopgain != iterpars.end( ) ) {
329 0 : state.LoopGain = (float) loopgain->second.toDouble( );
330 0 : if ( debug ) std::cerr << " loopgain=" << state.LoopGain;
331 : }
332 0 : auto cyclefactor = iterpars.find("cyclefactor");
333 0 : if ( cyclefactor != iterpars.end( ) ) {
334 0 : state.CycleFactor = (float) cyclefactor->second.toDouble( );
335 0 : if ( debug ) std::cerr << " cyclefactor=" << state.CycleFactor;
336 : }
337 :
338 0 : auto interactivemode = iterpars.find("interactive");
339 0 : if ( interactivemode != iterpars.end( ) ) {
340 0 : state.InteractiveMode = interactivemode->second.toBool( );
341 0 : if ( debug ) std::cerr << " interactive=" <<
342 0 : (state.InteractiveMode ? "true" : "false");
343 : }
344 :
345 0 : auto minpsffraction = iterpars.find("minpsffraction");
346 0 : if ( minpsffraction != iterpars.end( ) ) {
347 0 : state.MinPsfFraction = (float) minpsffraction->second.toDouble( );
348 0 : if ( debug ) std::cerr << " minpsffraction=" << state.MinPsfFraction;
349 : }
350 :
351 0 : auto maxpsffraction = iterpars.find("maxpsffraction");
352 0 : if ( maxpsffraction != iterpars.end( ) ) {
353 0 : state.MaxPsfFraction = (float) maxpsffraction->second.toDouble( );
354 0 : if ( debug ) std::cerr << " maxpsffraction=" << state.MaxPsfFraction;
355 : }
356 :
357 0 : auto nsigma = iterpars.find("nsigma");
358 0 : if ( nsigma != iterpars.end( ) ) {
359 0 : state.Nsigma = (float) nsigma->second.toDouble( );
360 0 : if ( debug ) std::cerr << " nsigma=" << state.Nsigma;
361 : }
362 0 : auto fullsummary = iterpars.find("fullsummary");
363 0 : if ( fullsummary != iterpars.end() ) {
364 0 : state.FullSummary = fullsummary->second.toBool();
365 0 : if ( debug ) std::cerr << " fullsummry=" << state.FullSummary;
366 : }
367 0 : if ( debug ) std::cerr << std::endl;
368 :
369 0 : if ( debug ) {
370 0 : std::cerr << "-------------------------------------------" << this << " / " << &state << std::endl;
371 0 : std::cerr << " exported python state: " << std::endl;
372 0 : std::cerr << "-------------------------------------------" << std::endl;
373 0 : std::cerr << " Niter " << oldNiter <<
374 0 : " ---> " << newNiter << std::endl;
375 0 : std::cerr << " CycleNiter " << oldCycleNiter <<
376 0 : " ---> " << newCycleNiter << std::endl;
377 0 : std::cerr << " Threshold " << oldThreshold <<
378 0 : " ---> " << newThreshold << std::endl;
379 0 : std::cerr << " CycleThreshold " << oldCycleThreshold <<
380 0 : " ---> " << newCycleThreshold << std::endl;
381 0 : std::cerr << " IsCycleThresholdAuto " << state.IsCycleThresholdAuto << std::endl;
382 0 : std::cerr << "-------------------------------------------" << std::endl;
383 :
384 :
385 : }
386 0 : return dummy;
387 :
388 : } ) );
389 :
390 0 : if ( debug ) {
391 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
392 0 : std::this_thread::get_id() << ")" << std::endl;
393 0 : fflush(stderr);
394 : }
395 0 : }
396 : /*
397 : Float SIIterBot_state::readThreshold( Record recordIn, String id ) {
398 : LogIO os( LogOrigin("SIIterBot_state",__FUNCTION__,WHERE) );
399 : std::lock_guard<std::recursive_mutex> guard(recordMutex);
400 : // Threshold can be a variant, either Float or String(with units).
401 : Float fthresh=0.0;
402 : // If a number, treat it as a number in units of Jy.
403 : if( recordIn.dataType(id) == TpFloat ||
404 : recordIn.dataType(id) == TpDouble ||
405 : recordIn.dataType(id) == TpInt )
406 : { fthresh = recordIn.asFloat( RecordFieldId(id)); }
407 : // If a string, try to convert to a Quantity
408 : else if( recordIn.dataType(id) == TpString )
409 : {
410 : Quantity thresh;
411 : // If it cannot be converted to a Quantity.... complain, and use zero.
412 : if( ! casacore::Quantity::read( thresh, recordIn.asString( RecordFieldId(id) ) ) )
413 : {os << LogIO::WARN << "Cannot parse threshold value. Setting to zero." << LogIO::POST;
414 : fthresh=0.0;}
415 : // If converted to Quantity, get value in Jy.
416 : // ( Note : This does not check for wrong units, e.g. if the user says '100m' ! )
417 : else { fthresh = thresh.getValue(Unit("Jy")); }
418 : }
419 : // If neither valid datatype, print a warning and use zero.
420 : else {os << LogIO::WARN << id << " is neither a number nor a string Quantity. Setting to zero." << LogIO::POST;
421 : fthresh=0.0; }
422 :
423 : return fthresh;
424 : }
425 : */
426 0 : void grpcInteractiveCleanManager::setIterationDetails(const casac::record &iterpars) {
427 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
428 0 : static const auto debug = getenv("GRPC_DEBUG");
429 :
430 : // Setup interactive masking : list of image names.
431 0 : if ( clean_images.size( ) == 0 ) {
432 : try {
433 : ////////////////////////////////////////////////////////////////////////////////////////////////////////
434 : ///// START : code to get a list of image names for interactive masking
435 :
436 0 : auto allimages = iterpars.find("allimages");
437 0 : if ( allimages != iterpars.end( ) ) {
438 0 : auto rec = allimages->second.getRecord( );
439 0 : for ( auto it = rec.begin( ); it != rec.end( ); ++it ) {
440 0 : auto oneimg = it->second.getRecord( );
441 0 : auto img_name = oneimg.find("imagename");
442 0 : auto img_multiterm = oneimg.find("multiterm");
443 0 : if ( img_name != oneimg.end( ) && img_multiterm != oneimg.end( ) ) {
444 0 : clean_images.push_back( std::make_tuple( img_name->second.getString(),
445 0 : img_multiterm->second.toBool( ), false, 0) );
446 : }
447 0 : }
448 0 : } else {
449 0 : throw( AipsError("Need image names and nterms in iteration parameter list") );
450 : }
451 0 : if ( clean_images.size( ) <= 0 ) {
452 0 : throw( AipsError("Need image names for iteration") );
453 : }
454 :
455 0 : if ( debug ) {
456 0 : std::cerr << "clean images specified: ";
457 0 : for ( auto it = clean_images.begin( ); it != clean_images.end( ); ++it ) {
458 0 : if ( it != clean_images.begin( ) ) std::cerr << ", ";
459 0 : std::cerr << std::get<0>(*it) << " [" << (std::get<1>(*it) ? "true" : "false") << "]";
460 : }
461 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
462 0 : std::this_thread::get_id() << ")" << std::endl;
463 0 : fflush(stderr);
464 : }
465 :
466 : ///// END : code to get a list of image names for interactive masking
467 : ////////////////////////////////////////////////////////////////////////////////////////////////////////
468 :
469 0 : setControlsFromRecord( iterpars );
470 0 : Int nSummaryFields = !state.FullSummary ? 7 : SIMinorCycleController::nSummaryFields;
471 0 : state.SummaryMinor.reformOrResize(casacore::IPosition(2, nSummaryFields ,0));
472 :
473 0 : } catch( AipsError &x ) {
474 0 : throw( AipsError("Error in updating iteration parameters : " + x.getMesg()) );
475 0 : }
476 : }
477 0 : }
478 :
479 0 : void grpcInteractiveCleanManager::updateCycleThreshold( grpcInteractiveCleanState &state ) {
480 0 : static const auto debug = getenv("GRPC_DEBUG");
481 :
482 0 : Float psffraction = state.MaxPsfSidelobe * state.CycleFactor;
483 :
484 0 : psffraction = max(psffraction, state.MinPsfFraction);
485 0 : psffraction = min(psffraction, state.MaxPsfFraction);
486 :
487 0 : if ( debug ) {
488 0 : std::cerr << "------------------------------------------- " << this << " / " << &state << std::endl;
489 0 : std::cerr << " algorithmic update of cycle threshold: " << std::endl;
490 0 : std::cerr << " CycleThreshold " << state.CycleThreshold <<
491 0 : " ---> " << (state.PeakResidual * psffraction) << std::endl;
492 0 : std::cerr << " IsCycleThresholdAuto " << state.IsCycleThresholdAuto << std::endl;
493 0 : std::cerr << "-------------------------------------------" << std::endl;
494 : }
495 :
496 0 : state.CycleThreshold = state.PeakResidual * psffraction;
497 0 : pushDetails();
498 0 : }
499 :
500 0 : void grpcInteractiveCleanManager::addSummaryMajor( ) {
501 0 : access( (void*) 0,
502 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
503 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
504 0 : IPosition shp = state.SummaryMajor.shape();
505 0 : if( shp.nelements() != 1 )
506 0 : throw(AipsError("Internal error in shape of major-cycle summary record"));
507 :
508 0 : state.SummaryMajor.resize( IPosition( 1, shp[0]+1 ) , true );
509 0 : state.SummaryMajor( IPosition(1, shp[0] ) ) = state.IterDone;
510 0 : return dummy; } ) );
511 0 : }
512 :
513 0 : casacore::Record grpcInteractiveCleanManager::getDetailsRecord( bool includeSummary ) {
514 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
515 :
516 0 : Record returnRecord;
517 :
518 : Record result = access( returnRecord,
519 0 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
520 0 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
521 : //*** Control Variables **************************************************
522 0 : rec.define( RecordFieldId("niter"), state.Niter );
523 0 : rec.define( RecordFieldId("cycleniter"), state.CycleNiter );
524 0 : rec.define( RecordFieldId("interactiveniter"), state.InteractiveNiter );
525 :
526 0 : rec.define( RecordFieldId("threshold"), state.Threshold );
527 0 : rec.define( RecordFieldId("nsigma"), state.Nsigma );
528 :
529 0 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
530 0 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; // Reset this, for the next round
531 :
532 0 : rec.define( RecordFieldId("cyclethreshold"), state.CycleThreshold );
533 0 : rec.define( RecordFieldId("interactivethreshold"), state.InteractiveThreshold );
534 :
535 0 : rec.define( RecordFieldId("loopgain"), state.LoopGain );
536 0 : rec.define( RecordFieldId("cyclefactor"), state.CycleFactor );
537 :
538 : //*** Status Reporting Variables *****************************************
539 0 : rec.define( RecordFieldId("iterdone"), state.IterDone );
540 0 : rec.define( RecordFieldId("cycleiterdone"), state.MaxCycleIterDone );
541 0 : rec.define( RecordFieldId("interactiveiterdone"),
542 0 : state.InteractiveIterDone + state.MaxCycleIterDone);
543 :
544 0 : rec.define( RecordFieldId("nmajordone"), state.MajorDone );
545 0 : rec.define( RecordFieldId("maxpsfsidelobe"), state.MaxPsfSidelobe );
546 0 : rec.define( RecordFieldId("maxpsffraction"), state.MaxPsfFraction );
547 0 : rec.define( RecordFieldId("minpsffraction"), state.MinPsfFraction );
548 0 : rec.define( RecordFieldId("interactivemode"), state.InteractiveMode );
549 :
550 0 : rec.define( RecordFieldId("stopcode"), state.StopCode );
551 :
552 : //*** report clean's state ***********************************************
553 0 : rec.define( RecordFieldId("cleanstate"),
554 0 : state.StopFlag ? "stopped" : state.PauseFlag ? "paused" : "running" );
555 :
556 0 : if ( includeSummary ) {
557 0 : rec.define( RecordFieldId("summaryminor"), state.SummaryMinor );
558 0 : rec.define( RecordFieldId("summarymajor"), state.SummaryMajor );
559 : }
560 :
561 0 : return rec; }) );
562 :
563 0 : return result;
564 :
565 :
566 : /* return access( returnRecord,
567 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
568 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
569 : //*** Control Variables **************************************************
570 : rec.define( RecordFieldId("niter"), state.Niter );
571 : rec.define( RecordFieldId("cycleniter"), state.CycleNiter );
572 : rec.define( RecordFieldId("interactiveniter"), state.InteractiveNiter );
573 :
574 : rec.define( RecordFieldId("threshold"), state.Threshold );
575 : rec.define( RecordFieldId("nsigma"), state.Nsigma );
576 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
577 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; // Reset this, for the next round
578 :
579 : rec.define( RecordFieldId("cyclethreshold"), state.CycleThreshold );
580 : rec.define( RecordFieldId("interactivethreshold"), state.InteractiveThreshold );
581 :
582 : rec.define( RecordFieldId("loopgain"), state.LoopGain );
583 : rec.define( RecordFieldId("cyclefactor"), state.CycleFactor );
584 :
585 : //*** Status Reporting Variables *****************************************
586 : rec.define( RecordFieldId("iterdone"), state.IterDone );
587 : rec.define( RecordFieldId("cycleiterdone"), state.MaxCycleIterDone );
588 : rec.define( RecordFieldId("interactiveiterdone"),
589 : state.InteractiveIterDone + state.MaxCycleIterDone);
590 :
591 : rec.define( RecordFieldId("nmajordone"), state.MajorDone );
592 : rec.define( RecordFieldId("maxpsfsidelobe"), state.MaxPsfSidelobe );
593 : rec.define( RecordFieldId("maxpsffraction"), state.MaxPsfFraction );
594 : rec.define( RecordFieldId("minpsffraction"), state.MinPsfFraction );
595 : rec.define( RecordFieldId("interactivemode"), state.InteractiveMode );
596 :
597 : rec.define( RecordFieldId("stopcode"), state.StopCode );
598 :
599 : //*** report clean's state ***********************************************
600 : rec.define( RecordFieldId("cleanstate"),
601 : state.StopFlag ? "stopped" : state.PauseFlag ? "paused" : "running" );
602 :
603 : if ( includeSummary ) {
604 : rec.define( RecordFieldId("summaryminor"), state.SummaryMinor );
605 : rec.define( RecordFieldId("summarymajor"), state.SummaryMajor );
606 : }
607 :
608 : return rec; }) );
609 : */
610 0 : }
611 :
612 :
613 0 : Record grpcInteractiveCleanManager::getMinorCycleControls( ){
614 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
615 :
616 : /* This returns a record suitable for initializing the minor cycle controls. */
617 0 : Record returnRecord;
618 :
619 : return access( returnRecord,
620 0 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
621 0 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
622 :
623 : /* If autocalc, compute cyclethresh from peak res, cyclefactor and psf sidelobe
624 : Otherwise, the user has explicitly set it (interactively) for this minor cycle */
625 0 : if( state.IsCycleThresholdAuto == true ) { updateCycleThreshold(state); }
626 0 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; /* Reset this, for the next round */
627 :
628 : /* The minor cycle will stop based on the cycle parameters. */
629 0 : int maxCycleIterations = state.CycleNiter;
630 0 : float cycleThreshold = state.CycleThreshold;
631 0 : maxCycleIterations = min(maxCycleIterations, state.Niter - state.IterDone);
632 0 : cycleThreshold = max(cycleThreshold, state.Threshold);
633 0 : bool thresholdReached = (cycleThreshold==state.Threshold)? True : False;
634 :
635 0 : rec.define( RecordFieldId("cycleniter"), maxCycleIterations);
636 0 : rec.define( RecordFieldId("cyclethreshold"), cycleThreshold);
637 0 : rec.define( RecordFieldId("loopgain"), state.LoopGain);
638 0 : rec.define( RecordFieldId("thresholdreached"), thresholdReached);
639 0 : rec.define( RecordFieldId("nsigma"), state.Nsigma);
640 :
641 0 : return rec; }) );
642 0 : }
643 :
644 0 : int grpcInteractiveCleanManager::cleanComplete( bool lastcyclecheck, bool reachedMajorLimit ){
645 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
646 :
647 0 : int stopCode=0;
648 :
649 0 : return access( stopCode,
650 0 : std::function< int ( int, grpcInteractiveCleanState & )>(
651 0 : [&]( int stop_code, grpcInteractiveCleanState &state ) -> int {
652 :
653 : float usePeakRes;
654 :
655 0 : if( lastcyclecheck==True ) { usePeakRes = state.MinorCyclePeakResidual; }
656 0 : else { usePeakRes = state.PeakResidual; }
657 :
658 : // for debugging, remove it later
659 0 : os<<LogIO::DEBUG1<<"cleanComplete-- CycleThreshold without Threshold limit="<<state.CycleThreshold<<LogIO::POST;
660 :
661 0 : if( state.PeakResidual > 0 && state.PrevPeakResidual>0 &&
662 0 : fabs( state.PeakResidual - state.PrevPeakResidual)/fabs(state.PrevPeakResidual) > 2.0 ) {
663 0 : os << "[WARN] Peak residual (within the mask) increased from " << state.PrevPeakResidual << " to " << state.PeakResidual << LogIO::POST;
664 : }
665 : // for debugging, remove it later
666 0 : os <<LogIO::DEBUG1<<"Threshold="<<state.Threshold<<" itsNsigmaThreshold===="<<state.NsigmaThreshold<<LogIO::POST;
667 0 : os <<LogIO::DEBUG1<<"usePeakRes="<<usePeakRes<<" itsPeakResidual="<<state.PeakResidual<<" itsPrevPeakRes="<<state.PrevPeakResidual<<LogIO::POST;
668 0 : os <<LogIO::DEBUG1<<"itsIterDone="<<state.IterDone<<" itsNiter="<<state.Niter<<LogIO::POST;
669 0 : os <<LogIO::DEBUG1<<"FullSummary="<<state.FullSummary<<LogIO::POST;
670 :
671 : /// This may interfere with some other criterion... check.
672 0 : float tol = 0.01; // threshold test torelance (CAS-11278)
673 0 : if ( state.MajorDone==0 && state.IterDone==0 && state.MaskSum==0.0) {
674 0 : stopCode=7; // if zero mask is detected it should exit right away
675 0 : } else if ( state.IterDone >= state.Niter ||
676 0 : usePeakRes <= state.Threshold ||
677 0 : state.PeakResidual <= state.NsigmaThreshold ||
678 0 : fabs(usePeakRes-state.Threshold)/state.Threshold < tol ||
679 0 : fabs(state.PeakResidual - state.NsigmaThreshold)/state.NsigmaThreshold < tol ||
680 0 : state.StopFlag ) {
681 : // os << "Reached global stopping criteria : ";
682 :
683 0 : if ( state.IterDone >= state.Niter ) { stopCode=1; }
684 : //os << "Numer of iterations. "; // (" << state.IterDone << ") >= limit (" << state.Niter << ")" ;
685 0 : if( usePeakRes <= state.Threshold || (usePeakRes-state.Threshold)/state.Threshold < tol) {stopCode=2; }
686 0 : else if ( usePeakRes <= state.NsigmaThreshold || (state.PeakResidual - state.NsigmaThreshold)/state.NsigmaThreshold < tol ) {
687 0 : if (state.NsigmaThreshold!=0.0) { stopCode=8; } // for nsigma=0.0 this mode is turned off
688 : }
689 :
690 : //os << "Peak residual (" << state.PeakResidual << ") <= threshold(" << state.Threshold << ")";
691 0 : if( state.StopFlag ) {stopCode=3;}
692 : //os << "Forced stop. ";
693 : // os << LogIO::POST;
694 :
695 : //return true;
696 :
697 : } else { // not converged yet... but....if nothing has changed in this round... also stop
698 :
699 0 : if (state.MaskSum==0.0) {
700 : //cout << "(7) Mask is all zero.Stopping" << endl;
701 0 : stopCode = 7;
702 : }
703 : // Nothing has changed across the last set of minor cycle iterations and major cycle.
704 0 : else if( state.IterDone>0 && (state.MajorDone>state.PrevMajorCycleCount) &&
705 0 : fabs(state.PrevPeakResidual - state.PeakResidual)<1e-10)
706 0 : {stopCode = 4;}
707 :
708 : // another non-convergent condition: diverging (relative increase is more than 3 times across one major cycle)
709 0 : else if ( state.IterDone > 0 &&
710 0 : fabs(state.PeakResidualNoMask-state.PrevPeakResidualNoMask)/fabs(state.PrevPeakResidualNoMask) > 3.0) {
711 : //cout << "(5) Peak res (no mask) : " << state.PeakResidualNoMask
712 : // << " Dev from prev peak res " << state.PrevPeakResidualNoMask << endl;
713 0 : stopCode = 5;}
714 :
715 : // divergence check, 3 times increase from the minimum peak residual so far (across all previous major cycles).
716 0 : else if ( state.IterDone > 0 &&
717 0 : (fabs(state.PeakResidualNoMask)-state.MinPeakResidualNoMask)/state.MinPeakResidualNoMask > 3.0 )
718 : {
719 : //cout << "(6) Peak res (no mask): " << state.PeakResidualNoMask
720 : // << " Dev from min peak res " << state.MinPeakResidualNoMask << endl;
721 0 : stopCode = 6;
722 : }
723 :
724 : }
725 :
726 0 : if (stopCode == 0 && reachedMajorLimit) {
727 0 : stopCode = 9;
728 : }
729 :
730 : /*
731 : if( lastcyclecheck==False)
732 : {
733 : cout << "*****" << endl;
734 : cout << "Peak residual : " << state.PeakResidual << " No Mask : " << state.PeakResidualNoMask << endl;
735 : cout << "Prev Peak residual : " << state.PrevPeakResidual << " No Mask : " << state.PrevPeakResidualNoMask << endl;
736 : cout << "Min Peak residual : " << state.MinPeakResidual << " No Mask : " << state.MinPeakResidualNoMask << endl;
737 : }
738 : */
739 :
740 : // os << "Peak residual : " << state.PeakResidual << " and " << state.IterDone << " iterations."<< LogIO::POST;
741 : //cout << "cleancomp : stopcode : " << stopCode << endl;
742 :
743 : //cout << "peak res : " << state.PeakResidual << " state.minPR : " << state.MinPeakResidual << endl;
744 :
745 0 : if( lastcyclecheck==False)
746 : {
747 0 : if( fabs(state.PeakResidual) < state.MinPeakResidual )
748 0 : {state.MinPeakResidual = fabs(state.PeakResidual);}
749 :
750 0 : state.PrevPeakResidual = state.PeakResidual;
751 :
752 :
753 0 : if( fabs(state.PeakResidualNoMask) < state.MinPeakResidualNoMask )
754 0 : {state.MinPeakResidualNoMask = fabs(state.PeakResidualNoMask);}
755 :
756 0 : state.PrevPeakResidualNoMask = state.PeakResidualNoMask;
757 :
758 0 : state.PrevMajorCycleCount = state.MajorDone;
759 :
760 : }
761 :
762 0 : state.StopCode=stopCode;
763 0 : return stopCode; } ) );
764 0 : }
765 :
766 0 : void grpcInteractiveCleanManager::resetMinorCycleInitInfo( grpcInteractiveCleanState &state ) {
767 : /* Get ready to do the minor cycle */
768 0 : state.PeakResidual = 0;
769 0 : state.PeakResidualNoMask = 0;
770 0 : state.MaxPsfSidelobe = 0;
771 0 : state.MaxCycleIterDone = 0;
772 0 : state.MaskSum = -1.0;
773 0 : }
774 :
775 0 : void grpcInteractiveCleanManager::resetMinorCycleInitInfo( ) {
776 0 : access( (void*) 0,
777 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
778 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
779 0 : resetMinorCycleInitInfo(state);
780 0 : return dummy; } ) );
781 0 : }
782 :
783 0 : void grpcInteractiveCleanManager::incrementMajorCycleCount( ) {
784 :
785 0 : access( (void*) 0,
786 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
787 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
788 0 : state.PrevMajorCycleCount = state.MajorDone;
789 0 : state.MajorDone++;
790 :
791 : /* Interactive iteractions update */
792 0 : state.InteractiveIterDone += state.MaxCycleIterDone;
793 :
794 0 : resetMinorCycleInitInfo(state);
795 0 : return dummy; } ) );
796 0 : }
797 :
798 0 : void grpcInteractiveCleanManager::mergeCycleInitializationRecord( Record &initRecord, casacore::Int immod ){
799 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
800 :
801 0 : access( (void*) 0,
802 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
803 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
804 :
805 0 : mergeMinorCycleSummary( initRecord.asArrayDouble( RecordFieldId("summaryminor")), state, immod );
806 0 : state.PeakResidual = max(state.PeakResidual, initRecord.asFloat(RecordFieldId("peakresidual")));
807 0 : state.MaxPsfSidelobe = max(state.MaxPsfSidelobe, initRecord.asFloat(RecordFieldId("maxpsfsidelobe")));
808 :
809 0 : state.PeakResidualNoMask = max( state.PeakResidualNoMask, initRecord.asFloat(RecordFieldId("peakresidualnomask")));
810 0 : state.MadRMS = max(state.MadRMS, initRecord.asFloat(RecordFieldId("madrms")));
811 0 : state.NsigmaThreshold = initRecord.asFloat(RecordFieldId("nsigmathreshold"));
812 :
813 : /*
814 : It has been reset to -1.0.
815 : If no masks have changed, it should remain at -1.0
816 : If any mask has changed, the sum will come in, and should be added to this.
817 : */
818 0 : float thismasksum = initRecord.asFloat(RecordFieldId("masksum"));
819 0 : if( thismasksum != -1.0 ) {
820 0 : if ( state.MaskSum == -1.0 ) state.MaskSum = thismasksum;
821 0 : else state.MaskSum += thismasksum;
822 : }
823 :
824 0 : if ( state.PrevPeakResidual == -1.0 ) state.PrevPeakResidual = state.PeakResidual;
825 0 : if ( state.PrevPeakResidualNoMask == -1.0 ) state.PrevPeakResidualNoMask = state.PeakResidualNoMask;
826 0 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
827 :
828 0 : return dummy; } ) );
829 0 : }
830 :
831 :
832 0 : void grpcInteractiveCleanManager::mergeMinorCycleSummary( const Array<Double> &summary, grpcInteractiveCleanState &state, Int immod ){
833 0 : IPosition cShp = state.SummaryMinor.shape();
834 0 : IPosition nShp = summary.shape();
835 :
836 : //bool uss = SIMinorCycleController::useSmallSummaryminor(); // temporary CAS-13683 workaround
837 : //int nSummaryFields = uss ? 6 : SIMinorCycleController::nSummaryFields;
838 0 : int nSummaryFields = !state.FullSummary ? 7 : SIMinorCycleController::nSummaryFields;
839 :
840 0 : if( cShp.nelements() != 2 || cShp[0] != nSummaryFields ||
841 0 : nShp.nelements() != 2 || nShp[0] != nSummaryFields )
842 0 : throw(AipsError("Internal error in shape of global minor-cycle summary record"));
843 :
844 0 : state.SummaryMinor.resize( IPosition( 2, nSummaryFields, cShp[1]+nShp[1] ) ,true );
845 :
846 0 : for (unsigned int row = 0; row < nShp[1]; row++) {
847 : // iterations done
848 0 : state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
849 : //if (state.FullSummary){
850 : // state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,0,row));
851 : //}
852 : //else{
853 : // state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
854 : //}
855 : //state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
856 : // peak residual
857 0 : state.SummaryMinor( IPosition(2,1,cShp[1]+row) ) = summary(IPosition(2,1,row));
858 : // model flux
859 0 : state.SummaryMinor( IPosition(2,2,cShp[1]+row) ) = summary(IPosition(2,2,row));
860 : // cycle threshold
861 0 : state.SummaryMinor( IPosition(2,3,cShp[1]+row) ) = summary(IPosition(2,3,row));
862 : //if (uss) { // temporary CAS-13683 workaround
863 0 : if (!state.FullSummary) { // temporary CAS-13683 workaround
864 : // swap out mapper id with multifield id
865 0 : state.SummaryMinor( IPosition(2,4,cShp[1]+row) ) = immod;
866 : // chunk id (channel/stokes)
867 0 : state.SummaryMinor( IPosition(2,5,cShp[1]+row) ) = summary(IPosition(2,5,row));
868 : // polarization id
869 0 : state.SummaryMinor( IPosition(2,6,cShp[1]+row) ) = summary(IPosition(2,6,row));
870 : } else {
871 : // mapper id
872 0 : state.SummaryMinor( IPosition(2,4,cShp[1]+row) ) = summary(IPosition(2,4,row));
873 : // channel id
874 0 : state.SummaryMinor( IPosition(2,5,cShp[1]+row) ) = summary(IPosition(2,5,row));
875 : // polarization id
876 0 : state.SummaryMinor( IPosition(2,6,cShp[1]+row) ) = summary(IPosition(2,6,row));
877 : // cycle start iterations done
878 0 : state.SummaryMinor( IPosition(2,7,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,7,row));
879 : // starting iterations done
880 0 : state.SummaryMinor( IPosition(2,8,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,8,row));
881 : // starting peak residual
882 0 : state.SummaryMinor( IPosition(2,9,cShp[1]+row) ) = summary(IPosition(2,9,row));
883 : // starting model flux
884 0 : state.SummaryMinor( IPosition(2,10,cShp[1]+row) ) = summary(IPosition(2,10,row));
885 : // starting peak residual, not limited to the user's mask
886 0 : state.SummaryMinor( IPosition(2,11,cShp[1]+row) ) = summary(IPosition(2,11,row));
887 : // peak residual, not limited to the user's mask
888 0 : state.SummaryMinor( IPosition(2,12,cShp[1]+row) ) = summary(IPosition(2,12,row));
889 : // number of pixels in the mask
890 0 : state.SummaryMinor( IPosition(2,13,cShp[1]+row) ) = summary(IPosition(2,13,row));
891 : // mpi server
892 0 : state.SummaryMinor( IPosition(2,14,cShp[1]+row) ) = summary(IPosition(2,14,row));
893 : // outlier field id
894 0 : state.SummaryMinor( IPosition(2,15,cShp[1]+row) ) = immod;
895 : // stopcode
896 0 : state.SummaryMinor( IPosition(2,16,cShp[1]+row) ) = summary(IPosition(2,16,row));
897 : }
898 :
899 : }
900 0 : }
901 :
902 0 : void grpcInteractiveCleanManager::mergeCycleExecutionRecord( Record& execRecord, Int immod ){
903 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
904 :
905 0 : access( (void*) 0,
906 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
907 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
908 0 : mergeMinorCycleSummary( execRecord.asArrayDouble( RecordFieldId("summaryminor")), state, immod );
909 :
910 0 : state.IterDone += execRecord.asInt(RecordFieldId("iterdone"));
911 :
912 0 : state.MaxCycleIterDone = max( state.MaxCycleIterDone, execRecord.asInt(RecordFieldId("maxcycleiterdone")) );
913 :
914 0 : state.MinorCyclePeakResidual = max( state.PeakResidual, execRecord.asFloat(RecordFieldId("peakresidual")) );
915 :
916 0 : state.UpdatedModelFlag |=execRecord.asBool( RecordFieldId("updatedmodelflag") );
917 :
918 0 : os << "Completed " << state.IterDone << " iterations." << LogIO::POST;
919 : //with peak residual "<< state.PeakResidual << LogIO::POST;
920 0 : return dummy; } ) );
921 0 : }
922 :
923 0 : void grpcInteractiveCleanManager::changeStopFlag( bool stopEnabled ) {
924 0 : access( (void*) 0,
925 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
926 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
927 0 : state.StopFlag = stopEnabled;
928 0 : return dummy;
929 : } ) );
930 0 : }
931 :
932 : //====================================================================================================
933 :
934 0 : static bool isdir( const char *path ) {
935 : struct stat statbuf;
936 0 : int err = stat(path, &statbuf);
937 0 : if ( err == -1 ) return false;
938 0 : if ( S_ISDIR(statbuf.st_mode) ) return true;
939 0 : return false;
940 : }
941 :
942 0 : static std::string trim_trailing_slash( const char *str ) {
943 0 : char *temp = strdup(str);
944 0 : for ( int off = strlen(str) - 1; off >= 0; --off ) {
945 0 : if ( temp[off] == '/' ) temp[off] = '\0';
946 0 : else break;
947 : }
948 0 : std::string result = temp;
949 0 : free(temp);
950 0 : return result;
951 : }
952 :
953 0 : grpcInteractiveCleanGui::grpcInteractiveCleanGui( ) : viewer_pid(0), viewer_started(false) { }
954 0 : grpcInteractiveCleanGui::~grpcInteractiveCleanGui( ) {
955 0 : static const auto debug = getenv("GRPC_DEBUG");
956 :
957 0 : if ( ! viewer_started ) {
958 0 : if ( debug ) {
959 0 : std::cerr << "viewer shutdown required (" << viewer_uri << ")" <<
960 0 : " (process " << getpid( ) << ", thread " <<
961 0 : std::this_thread::get_id() << ")" << std::endl;
962 0 : fflush(stderr);
963 : }
964 : } else {
965 0 : if ( debug ) {
966 0 : std::cerr << "sending shutdown message to viewer (" << viewer_uri << ")" <<
967 0 : " (process " << getpid( ) << ", thread " <<
968 0 : std::this_thread::get_id() << ")" << std::endl;
969 0 : fflush(stderr);
970 : }
971 :
972 0 : bool stopped = stop_viewer( );
973 :
974 0 : if ( debug ) {
975 0 : if ( stopped ) {
976 0 : std::cerr << "viewer shutdown successful (" << viewer_uri << ")" <<
977 0 : " (process " << getpid( ) << ", thread " <<
978 0 : std::this_thread::get_id() << ")" << std::endl;
979 : } else {
980 0 : std::cerr << "viewer shutdown failed (" << viewer_uri << ")" <<
981 0 : " (process " << getpid( ) << ", thread " <<
982 0 : std::this_thread::get_id() << ")" << std::endl;
983 : }
984 0 : fflush(stderr);
985 : }
986 : }
987 0 : }
988 :
989 :
990 0 : bool grpcInteractiveCleanGui::alive( ) {
991 0 : static const auto debug = getenv("GRPC_DEBUG");
992 0 : if ( debug ) {
993 0 : std::cerr << "pinging viewer (" << viewer_uri << ")" <<
994 0 : " (process " << getpid( ) << ", thread " <<
995 0 : std::this_thread::get_id() << ")" << std::endl;
996 0 : fflush(stderr);
997 : }
998 0 : grpc::ClientContext context;
999 0 : ::google::protobuf::Empty resp;
1000 0 : ::google::protobuf::Empty msg;
1001 0 : auto ping = casatools::rpc::Ping::NewStub( grpc::CreateChannel( viewer_uri, grpc::InsecureChannelCredentials( ) ) );
1002 0 : ::grpc::Status status = ping->now( &context, msg, &resp );
1003 0 : bool ping_result = status.ok( );
1004 0 : if ( debug ) {
1005 : std::cerr << "ping result: " << (ping_result ? "OK" : "FAIL")<<
1006 0 : " (process " << getpid( ) << ", thread " <<
1007 0 : std::this_thread::get_id() << ")" << std::endl;
1008 0 : fflush(stderr);
1009 : }
1010 0 : if ( ping_result == false ) {
1011 : int proc_status;
1012 0 : waitpid( viewer_pid, &proc_status, WUNTRACED | WCONTINUED | WNOHANG );
1013 0 : viewer_pid = 0;
1014 0 : viewer_proxy.release( );
1015 0 : viewer_started = false;
1016 0 : if ( debug ) {
1017 : std::cerr << "ping failed resetting state" <<
1018 0 : " (process " << getpid( ) << ", thread " <<
1019 0 : std::this_thread::get_id() << ")" << std::endl;
1020 0 : fflush(stderr);
1021 : }
1022 : }
1023 0 : return ping_result;
1024 0 : }
1025 :
1026 0 : bool grpcInteractiveCleanGui::launch( ) {
1027 0 : static const auto debug = getenv("GRPC_DEBUG");
1028 0 : if ( viewer_started == false ) {
1029 : // start the viewer process if it is not already running...
1030 0 : if ( debug ) {
1031 : std::cerr << "spawning viewer process" <<
1032 0 : " (process " << getpid( ) << ", thread " <<
1033 0 : std::this_thread::get_id() << ")" << std::endl;
1034 0 : fflush(stderr);
1035 : }
1036 0 : return spawn_viewer( );
1037 : } else {
1038 0 : if ( alive( ) ) {
1039 0 : if ( debug ) {
1040 : std::cerr << "viewer process available" <<
1041 0 : " (process " << getpid( ) << ", thread " <<
1042 0 : std::this_thread::get_id() << ")" << std::endl;
1043 0 : fflush(stderr);
1044 : }
1045 0 : return true;
1046 : } else {
1047 0 : if ( debug ) {
1048 : std::cerr << "re-spawning viewer process" <<
1049 0 : " (process " << getpid( ) << ", thread " <<
1050 0 : std::this_thread::get_id() << ")" << std::endl;
1051 0 : fflush(stderr);
1052 : }
1053 0 : return launch( );
1054 : }
1055 : }
1056 : return false;
1057 : }
1058 :
1059 0 : void grpcInteractiveCleanGui::close_panel( int id ) {
1060 0 : static const auto debug = getenv("GRPC_DEBUG");
1061 0 : if ( debug ) {
1062 0 : std::cerr << "close_panel(" << id << ")" <<
1063 0 : " (process " << getpid( ) << ", thread " <<
1064 0 : std::this_thread::get_id() << ")" << std::endl;
1065 0 : fflush(stderr);
1066 : }
1067 0 : if ( id != -1 && alive( ) ) {
1068 0 : if ( debug ) {
1069 0 : std::cerr << "close_panel(" << id << ") -- closing panel" <<
1070 0 : " (process " << getpid( ) << ", thread " <<
1071 0 : std::this_thread::get_id() << ")" << std::endl;
1072 0 : fflush(stderr);
1073 : }
1074 : {
1075 : // unload panel's images
1076 0 : rpc::img::Id panel;
1077 0 : grpc::ClientContext context;
1078 0 : ::google::protobuf::Empty resp;
1079 0 : panel.set_id(id);
1080 0 : viewer_proxy->unload( &context, panel, &resp );
1081 0 : }
1082 : {
1083 : // close panel
1084 0 : rpc::img::Id panel;
1085 0 : grpc::ClientContext context;
1086 0 : ::google::protobuf::Empty resp;
1087 0 : panel.set_id(id);
1088 0 : viewer_proxy->close( &context, panel, &resp );
1089 0 : }
1090 : }
1091 0 : }
1092 :
1093 0 : int grpcInteractiveCleanGui::open_panel( std::list<std::tuple<std::string,bool,bool,int>> images ) {
1094 0 : static const auto debug = getenv("GRPC_DEBUG");
1095 0 : if ( viewer_started == false ) {
1096 0 : if ( launch( ) == false ) return -1;
1097 : }
1098 0 : if ( debug ) {
1099 : std::cerr << "opening viewer panel" <<
1100 0 : " (process " << getpid( ) << ", thread " <<
1101 0 : std::this_thread::get_id() << ")" << std::endl;
1102 0 : fflush(stderr);
1103 : }
1104 0 : grpc::ClientContext context;
1105 0 : ::rpc::img::NewPanel np;
1106 0 : rpc::img::Id resp;
1107 0 : np.set_type("clean2");
1108 0 : np.set_hidden(false);
1109 0 : viewer_proxy->panel( &context, np, &resp );
1110 0 : int result = resp.id( );
1111 :
1112 0 : if ( debug ) {
1113 0 : std::cerr << "opened viewer panel " << result <<
1114 0 : " (process " << getpid( ) << ", thread " <<
1115 0 : std::this_thread::get_id() << ")" << std::endl;
1116 0 : fflush(stderr);
1117 : }
1118 :
1119 : // state for interactive masking in the new viewer panel
1120 0 : clean_state.insert( std::pair<int,CleanState>(result, CleanState( )) );
1121 :
1122 0 : if ( debug ) {
1123 0 : std::cerr << "created panel " << result <<
1124 0 : " (process " << getpid( ) << ", thread " <<
1125 0 : std::this_thread::get_id() << ")" << std::endl;
1126 0 : fflush(stderr);
1127 : }
1128 0 : return result;
1129 0 : }
1130 :
1131 0 : void grpcInteractiveCleanGui::unload( int id ) {
1132 0 : grpc::ClientContext context;
1133 0 : ::rpc::img::Id data;
1134 0 : ::google::protobuf::Empty resp;
1135 0 : data.set_id(id);
1136 0 : viewer_proxy->unload( &context, data, &resp );
1137 0 : }
1138 :
1139 0 : bool grpcInteractiveCleanGui::clone( const std::string &imageName, const std::string &newImageName ) {
1140 0 : LogIO os(LogOrigin("grpcInteractiveCleanGui", __FUNCTION__, WHERE));
1141 :
1142 : try {
1143 0 : PagedImage<Float> oldImage( imageName );
1144 0 : PagedImage<Float> newImage( TiledShape( oldImage.shape(), oldImage.niceCursorShape()),
1145 0 : oldImage.coordinates(), newImageName );
1146 0 : newImage.set(0.0);
1147 0 : newImage.table().flush(true, true);
1148 0 : } catch (AipsError x) {
1149 0 : os << LogIO::SEVERE << "Exception: " << x.getMesg() << LogIO::POST;
1150 0 : return false;
1151 0 : }
1152 0 : return true;
1153 0 : }
1154 :
1155 0 : float grpcInteractiveCleanGui::maskSum(const std::string &maskname) {
1156 :
1157 0 : PagedImage<Float> mask( maskname );
1158 :
1159 0 : LatticeExprNode msum( sum( mask ) );
1160 0 : float maskSum = msum.getFloat( );
1161 :
1162 0 : mask.unlock();
1163 0 : mask.tempClose();
1164 :
1165 0 : return maskSum;
1166 0 : }
1167 :
1168 0 : int grpcInteractiveCleanGui::interactivemask( int panel, const std::string &image, const std::string &mask,
1169 : int &niter, int &cycleniter, std::string &thresh,
1170 : std::string &cyclethresh, const bool forceReload ) {
1171 :
1172 0 : static const auto debug = getenv("GRPC_DEBUG");
1173 0 : LogIO os( LogOrigin("grpcInteractiveCleanGui",__FUNCTION__,WHERE) );
1174 :
1175 0 : if ( debug ) {
1176 0 : std::cerr << "starting interactivemask( " <<
1177 0 : panel << ", " << image << ", " << mask << ", " <<
1178 0 : niter << ", " << cycleniter << ", " << thresh << ", " <<
1179 : cyclethresh << ", " << (forceReload ? "true" : "false") << ")" <<
1180 0 : " (process " << getpid( ) << ", thread " <<
1181 0 : std::this_thread::get_id() << ")" << std::endl;
1182 0 : fflush(stderr);
1183 : }
1184 :
1185 0 : if ( viewer_started == false ) {
1186 : // viewer should be started before calling interactivemask(...)
1187 0 : os << LogIO::WARN << "Viewer GUI Not Available" << LogIO::POST;
1188 0 : return 0;
1189 : }
1190 :
1191 0 : auto state = clean_state.find(panel);
1192 0 : if ( state == clean_state.end( ) ) {
1193 0 : os << LogIO::WARN << "Invalid clean panel id used for interactive masking" << LogIO::POST;
1194 0 : return 0;
1195 : }
1196 :
1197 0 : if( Table::isReadable(mask) ) {
1198 0 : if ( ! Table::isWritable(mask) ) {
1199 0 : os << LogIO::WARN << "Mask image is not modifiable " << LogIO::POST;
1200 0 : return 0;
1201 : }
1202 : // we should regrid here if image and mask do not match
1203 : } else {
1204 0 : clone(image, mask);
1205 : }
1206 :
1207 0 : double startmask = maskSum(mask);
1208 :
1209 0 : if ( state->second.image_id == 0 || state->second.mask_id == 0 || forceReload ) {
1210 :
1211 : //Make sure image left after a "no more" is pressed is cleared
1212 0 : if ( forceReload && state->second.image_id !=0 )
1213 0 : state->second.prev_image_id = state->second.image_id;
1214 0 : if ( forceReload && state->second.mask_id !=0 )
1215 0 : state->second.prev_mask_id = state->second.mask_id;
1216 :
1217 0 : if ( state->second.prev_image_id ){
1218 0 : if ( debug ) {
1219 0 : std::cerr << "preparing to unload prev_image_id " <<
1220 0 : state->second.prev_image_id << " (panel " << panel << ")" <<
1221 0 : " (process " << getpid( ) << ", thread " <<
1222 0 : std::this_thread::get_id() << ")" << std::endl;
1223 0 : fflush(stderr);
1224 : }
1225 0 : unload( state->second.prev_image_id );
1226 : }
1227 0 : if ( state->second.prev_mask_id ) {
1228 0 : if ( debug ) {
1229 0 : std::cerr << "preparing to unload prev_mask_id " <<
1230 0 : state->second.prev_mask_id << " (panel " << panel << ")" <<
1231 0 : " (process " << getpid( ) << ", thread " <<
1232 0 : std::this_thread::get_id() << ")" << std::endl;
1233 0 : fflush(stderr);
1234 : }
1235 0 : unload( state->second.prev_mask_id );
1236 : }
1237 :
1238 0 : state->second.prev_image_id = 0;
1239 0 : state->second.prev_mask_id = 0;
1240 :
1241 : {
1242 0 : grpc::ClientContext context;
1243 0 : ::rpc::img::NewData nd;
1244 0 : rpc::img::Id resp;
1245 0 : nd.mutable_panel( )->set_id(panel);
1246 0 : nd.set_path(image);
1247 0 : nd.set_type("raster");
1248 0 : nd.set_scale(0);
1249 0 : viewer_proxy->load( &context, nd, &resp );
1250 0 : state->second.image_id = resp.id( );
1251 0 : }
1252 : {
1253 0 : grpc::ClientContext context;
1254 0 : ::rpc::img::NewData nd;
1255 0 : rpc::img::Id resp;
1256 0 : nd.mutable_panel( )->set_id(panel);
1257 0 : nd.set_path(mask);
1258 0 : nd.set_type("contour");
1259 0 : nd.set_scale(0);
1260 0 : viewer_proxy->load( &context, nd, &resp );
1261 0 : state->second.mask_id = resp.id( );
1262 0 : }
1263 :
1264 : } else {
1265 0 : grpc::ClientContext context;
1266 0 : ::rpc::img::Id id;
1267 0 : ::google::protobuf::Empty resp;
1268 0 : id.set_id(state->second.image_id);
1269 0 : viewer_proxy->reload( &context, id, &resp );
1270 0 : id.set_id(state->second.mask_id);
1271 0 : viewer_proxy->reload( &context, id, &resp );
1272 0 : }
1273 :
1274 0 : grpc::ClientContext context;
1275 0 : ::rpc::img::InteractiveMaskOptions options;
1276 0 : options.mutable_panel( )->set_id(state->first);
1277 0 : options.set_niter(niter);
1278 0 : options.set_cycleniter(cycleniter);
1279 0 : options.set_threshold(thresh);
1280 0 : options.set_cyclethreshold(cyclethresh);
1281 0 : ::rpc::img::InteractiveMaskResult imresult;
1282 0 : ::grpc::Status s = viewer_proxy->interactivemask( &context, options, &imresult );
1283 :
1284 0 : if ( ! s.ok( ) ) {
1285 0 : std::cerr << "interactive mask failed: " << s.error_details( ) << std::endl;
1286 0 : fflush(stderr);
1287 : }
1288 :
1289 0 : niter = imresult.state( ).niter( );
1290 0 : cycleniter = imresult.state( ).cycleniter( );
1291 0 : thresh = imresult.state( ).threshold( );
1292 0 : cyclethresh = imresult.state( ).cyclethreshold( );
1293 0 : int result = 1;
1294 0 : std::string action = imresult.action( );
1295 :
1296 0 : if ( debug ) {
1297 0 : std::cerr << "-------------------------------------------" << std::endl;
1298 0 : std::cerr << " gui state from interactive masking" << std::endl;
1299 0 : std::cerr << "-------------------------------------------" << std::endl;
1300 0 : std::cerr << " action: " << action << std::endl;
1301 0 : std::cerr << " niter: " << niter << std::endl;
1302 0 : std::cerr << " cycle niter: " << cycleniter << std::endl;
1303 0 : std::cerr << " threshold: " << thresh << std::endl;
1304 0 : std::cerr << " cycle threshold: " << cyclethresh << std::endl;
1305 0 : std::cerr << "-------------------------------------------" << std::endl;
1306 : }
1307 :
1308 0 : if ( action == "stop" ) result = 3;
1309 0 : else if ( action == "no more" ) result = 2;
1310 0 : else if ( action == "continue" ) result = 1;
1311 : else {
1312 0 : os << "ill-formed action result (" << action << ")" << LogIO::WARN << LogIO::POST;
1313 0 : return 0;
1314 : }
1315 :
1316 0 : state->second.prev_image_id = state->second.image_id;
1317 0 : state->second.prev_mask_id = state->second.mask_id;
1318 :
1319 0 : state->second.image_id = 0;
1320 0 : state->second.mask_id = 0;
1321 :
1322 0 : if ( debug ) {
1323 0 : std::cerr << "set prev_image_id to " << state->second.prev_image_id << " (panel " << panel << ")" <<
1324 0 : " (process " << getpid( ) << ", thread " <<
1325 0 : std::this_thread::get_id() << ")" << std::endl;
1326 0 : std::cerr << "set prev_mask_id to " << state->second.prev_mask_id << " (panel " << panel << ")" <<
1327 0 : " (process " << getpid( ) << ", thread " <<
1328 0 : std::this_thread::get_id() << ")" << std::endl;
1329 0 : fflush(stderr);
1330 : }
1331 :
1332 0 : double endmask = maskSum(mask);
1333 :
1334 0 : if( startmask != endmask ) {
1335 0 : result = -1 * result;
1336 0 : LogIO os( LogOrigin("grpcInteractiveCleanGui",__FUNCTION__,WHERE) );
1337 0 : os << "[" << mask << "] Mask modified from " << startmask << " pixels to " << endmask << " pixels " << LogIO::POST;
1338 0 : }
1339 :
1340 0 : return result;
1341 0 : }
1342 :
1343 0 : bool grpcInteractiveCleanGui::stop_viewer( ) {
1344 : // viewer is not running...
1345 0 : if ( ! viewer_started ) return false;
1346 0 : static const auto debug = getenv("GRPC_DEBUG");
1347 0 : if ( debug ) {
1348 0 : std::cerr << "sending shutdown message to viewer (" << viewer_uri << ")" <<
1349 0 : " (process " << getpid( ) << ", thread " <<
1350 0 : std::this_thread::get_id() << ")" << std::endl;
1351 0 : fflush(stderr);
1352 : }
1353 :
1354 : // send shutdown message to viewer...
1355 0 : grpc::ClientContext context;
1356 0 : ::google::protobuf::Empty req;
1357 0 : ::google::protobuf::Empty resp;
1358 0 : auto shutdown = casatools::rpc::Shutdown::NewStub( grpc::CreateChannel( viewer_uri,
1359 0 : grpc::InsecureChannelCredentials( ) ) );
1360 0 : shutdown->now( &context, req, &resp );
1361 :
1362 : // wait on viewer (appimage) to exit...
1363 : int status;
1364 0 : pid_t w = waitpid( viewer_pid, &status, WUNTRACED | WCONTINUED );
1365 0 : if ( w == -1 ){
1366 0 : if ( debug ) {
1367 : std::cerr << "viewer process waitpid failed " <<
1368 0 : " (process " << getpid( ) << ", thread " <<
1369 0 : std::this_thread::get_id() << ")" << std::endl;
1370 0 : fflush(stderr);
1371 : }
1372 : // waitpid failed
1373 0 : return false;
1374 0 : } else if ( w == 0 ) {
1375 0 : if ( debug ) {
1376 : std::cerr << "viewer process not found " <<
1377 0 : " (process " << getpid( ) << ", thread " <<
1378 0 : std::this_thread::get_id() << ")" << std::endl;
1379 0 : fflush(stderr);
1380 : }
1381 0 : return false;
1382 : } else {
1383 0 : if ( debug ) {
1384 : std::cerr << "viewer process exited, status fetched " <<
1385 0 : " (process " << getpid( ) << ", thread " <<
1386 0 : std::this_thread::get_id() << ")" << std::endl;
1387 0 : fflush(stderr);
1388 : }
1389 0 : return true;
1390 : }
1391 :
1392 : viewer_pid = 0;
1393 : viewer_proxy.release( );
1394 : viewer_started = false;
1395 : return true;
1396 0 : }
1397 :
1398 0 : bool grpcInteractiveCleanGui::spawn_viewer( ) {
1399 0 : static const auto debug = getenv("GRPC_DEBUG");
1400 :
1401 0 : std::string viewer_path = get_viewer_path( );
1402 0 : if ( viewer_path.size( ) == 0 ) return false;
1403 :
1404 : // To minimize package size for distribution via pypi.org, the
1405 : // data repo has been moved out of the viewer appImage/app and
1406 : // into a separate package. The path to this needs to be specified
1407 : // when starting the viewer now...
1408 0 : std::string distro_data_path_arg = get_distro_data_path( );
1409 :
1410 : // sanity check on viewer path...
1411 : struct stat statbuf;
1412 0 : if ( stat( viewer_path.c_str( ), &statbuf ) < 0 ) {
1413 : // file (or dir) does not exist... e.g.
1414 : // >>>>>>registry available at 0.0.0.0:40939
1415 : // stopping registry<<<<<<
1416 0 : return false;
1417 : }
1418 :
1419 0 : std::string fifo = get_fifo( );
1420 0 : if ( fifo.size( ) == 0 ) return false;
1421 :
1422 : // here we start the viewer in a very basic manner... we do not bother
1423 : // with all of the theatrics needed to daemonize the launched process
1424 : // (see https://stackoverflow.com/questions/17954432/creating-a-daemon-in-linux)
1425 : // it could be that this should be done in the future, but for now we
1426 : // will adopt the simple...
1427 :
1428 0 : const int maxargc = 5;
1429 : char *arguments[maxargc];
1430 0 : for (int i = 0; i < maxargc; i++) { arguments[i] = (char*)""; };
1431 :
1432 0 : arguments[0] = strdup(viewer_path.c_str( ));
1433 0 : arguments[1] = (char*) malloc(sizeof(char) * (fifo.size( ) + 12));
1434 0 : sprintf( arguments[1], "--server=%s", fifo.c_str( ) );
1435 0 : arguments[2] = strdup("--oldregions");
1436 0 : int argc =3;
1437 0 : if ( distro_data_path_arg.size( ) > 0 ) {
1438 0 : distro_data_path_arg = std::string("--datapath=") + distro_data_path_arg;
1439 0 : arguments[argc] = strdup(distro_data_path_arg.c_str( ));
1440 0 : argc++;
1441 : }
1442 0 : std::string log_path = casatools::get_state( ).logPath( );
1443 0 : if ( log_path.size( ) > 0 ) {
1444 0 : arguments[argc] = (char*) malloc(sizeof(char) * (log_path.size( ) + 17));
1445 0 : sprintf( arguments[argc], "--casalogfile=%s", log_path.c_str( ) );
1446 0 : argc++;
1447 : }
1448 :
1449 0 : if ( debug ) {
1450 0 : std::cerr << "forking viewer process: ";
1451 0 : for (int i=0; i < argc; ++i) std::cout << arguments[i] << " ";
1452 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
1453 0 : std::this_thread::get_id() << ")" << std::endl;
1454 0 : fflush(stderr);
1455 : }
1456 0 : pid_t pid = fork( );
1457 :
1458 0 : if ( pid == 0 ) {
1459 0 : if ( debug ) {
1460 0 : std::cerr << "execing viewer process: ";
1461 0 : for (int i=0; i < argc; ++i) std::cout << arguments[i] << " ";
1462 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
1463 0 : std::this_thread::get_id() << ")" << std::endl;
1464 0 : fflush(stderr);
1465 : }
1466 0 : char **envp = getenv_sansmpi(); // bugfix: run the viewer without MPI CAS-13252
1467 0 : execle( arguments[0], arguments[0], arguments[1], arguments[2], arguments[3], arguments[4], NULL, envp );
1468 0 : perror( "grpcInteractiveCleanGui::launch(...) child process exec failed" );
1469 0 : exit(1);
1470 : }
1471 :
1472 0 : for ( int i=0; i < argc; ++i ) free(arguments[i]);
1473 :
1474 0 : if ( pid == -1 ) {
1475 0 : perror( "grpcInteractiveCleanGui::launch(...) child process fork failed" );
1476 0 : return false;
1477 : }
1478 :
1479 : // perform a health check, after a delay...
1480 : int status;
1481 0 : sleep(2);
1482 0 : pid_t w = waitpid( pid, &status, WUNTRACED | WCONTINUED | WNOHANG );
1483 0 : if ( w == -1 ){
1484 0 : if ( debug ) {
1485 : std::cerr << "viewer process failed " <<
1486 0 : " (process " << getpid( ) << ", thread " <<
1487 0 : std::this_thread::get_id() << ")" << std::endl;
1488 0 : fflush(stderr);
1489 : }
1490 : // waitpid failed
1491 0 : return false;
1492 0 : } else if ( w != 0 ) {
1493 0 : if ( debug ) {
1494 : std::cerr << "viewer process died " <<
1495 0 : " (process " << getpid( ) << ", thread " <<
1496 0 : std::this_thread::get_id() << ")" << std::endl;
1497 0 : fflush(stderr);
1498 : }
1499 : // process exited
1500 0 : if ( WIFEXITED(status) ) {
1501 0 : printf("exited, status=%d\n", WEXITSTATUS(status));
1502 0 : } else if (WIFSIGNALED(status)) {
1503 0 : printf("killed by signal %d\n", WTERMSIG(status));
1504 0 : } else if (WIFSTOPPED(status)) {
1505 0 : printf("stopped by signal %d\n", WSTOPSIG(status));
1506 : }
1507 0 : return false;
1508 : }
1509 :
1510 0 : if ( debug ) {
1511 : std::cerr << "fetching viewer uri from " << fifo <<
1512 0 : " (process " << getpid( ) << ", thread " <<
1513 0 : std::this_thread::get_id() << ")" << std::endl;
1514 0 : fflush(stderr);
1515 : }
1516 : char buffer[512];
1517 0 : std::string uri_buffer;
1518 0 : FILE *fp = fopen(fifo.c_str( ), "r");
1519 0 : while ( fgets( buffer, sizeof(buffer), fp ) ) { uri_buffer = uri_buffer + buffer; }
1520 0 : fclose(fp);
1521 0 : trim(uri_buffer);
1522 :
1523 : // validate viewer uri...
1524 0 : if ( ! std::regex_match( uri_buffer, std::regex("^([0-9]+\\.){3}[0-9]+:[0-9]+$") ) ) {
1525 : //rework of regex required for IPv6...
1526 0 : if ( debug ) {
1527 : std::cerr << "bad viewer uri " << uri_buffer <<
1528 0 : " (process " << getpid( ) << ", thread " <<
1529 0 : std::this_thread::get_id() << ")" << std::endl;
1530 0 : fflush(stderr);
1531 : }
1532 0 : return false;
1533 : }
1534 :
1535 0 : if ( debug ) {
1536 : std::cerr << "received viewer uri: " << uri_buffer <<
1537 0 : " (process " << getpid( ) << ", thread " <<
1538 0 : std::this_thread::get_id() << ")" << std::endl;
1539 0 : fflush(stderr);
1540 : }
1541 :
1542 0 : viewer_uri = uri_buffer;
1543 0 : viewer_pid = pid;
1544 0 : viewer_proxy = rpc::img::view::NewStub( grpc::CreateChannel( viewer_uri,
1545 0 : grpc::InsecureChannelCredentials( ) ) );
1546 0 : viewer_started = true;
1547 :
1548 0 : return true;
1549 0 : }
1550 :
1551 0 : std::string grpcInteractiveCleanGui::get_python_path( ) {
1552 0 : std::string ret = casatools::get_state( ).pythonPath( );
1553 0 : return ret;
1554 : }
1555 :
1556 0 : std::string grpcInteractiveCleanGui::get_distro_data_path( ) {
1557 : static bool initialized = false;
1558 0 : static std::string result;
1559 0 : if ( initialized == false ) {
1560 0 : initialized = true;
1561 0 : result = casatools::get_state( ).distroDataPath( );
1562 : struct stat statbuf;
1563 0 : if ( stat( result.c_str( ), &statbuf ) < 0 ) {
1564 : // file (or dir) does not exist...
1565 0 : result = "";
1566 : }
1567 : }
1568 0 : return result;
1569 : }
1570 :
1571 0 : std::string grpcInteractiveCleanGui::get_viewer_path( ) {
1572 : // Get the path to the casaviewer Qt application, to be called in spawn_viewer()
1573 0 : std::string python_path = get_python_path( );
1574 0 : if ( python_path.size( ) == 0 ) return std::string( );
1575 :
1576 : //*** python3 -m casaviewer --app-path
1577 : char buffer[1024];
1578 0 : std::string result;
1579 0 : char **envp = getenv_sansmpi(); // bugfix: run the viewer without MPI CAS-13252
1580 0 : char *python_args[] = { (char*)python_path.c_str(), (char*)"-m", (char*)"casaviewer", (char*)"--app-path", NULL };
1581 0 : execve_getstdout((char*)python_path.c_str(), python_args, envp, buffer, 1024);
1582 0 : result = buffer;
1583 0 : free(envp);
1584 :
1585 0 : trim(result);
1586 0 : if ( result.size( ) == 0 ) return std::string( );
1587 0 : return result;
1588 0 : }
1589 :
1590 0 : std::string grpcInteractiveCleanGui::get_fifo( ) {
1591 0 : static const char *env_tmpdir = getenv("TMPDIR");
1592 0 : static std::string fifo_template = trim_trailing_slash(env_tmpdir && isdir(env_tmpdir) ? env_tmpdir : P_tmpdir) + "/vwr-XXXXXXXXXX";
1593 0 : static int fifo_template_size = fifo_template.size( );
1594 0 : char fifo_path[fifo_template_size+1];
1595 0 : strncpy( fifo_path, fifo_template.c_str( ), fifo_template_size );
1596 0 : fifo_path[fifo_template_size] = '\0';
1597 0 : int fd = mkstemp(fifo_path);
1598 0 : if ( fd == -1 ) throw std::runtime_error("mkstemp failed...");
1599 0 : close( fd );
1600 0 : unlink(fifo_path);
1601 0 : mkfifo( fifo_path, 0666 );
1602 0 : return fifo_path;
1603 0 : }
1604 :
1605 0 : casacore::Record grpcInteractiveCleanManager::pauseForUserInteraction( ) {
1606 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
1607 0 : static const auto debug = getenv("GRPC_DEBUG");
1608 :
1609 0 : if ( clean_images.size( ) == 0 ) {
1610 : // cannot open clean panel in viewer if not images are available...
1611 0 : if ( debug ) {
1612 : std::cerr << "no clean images available" <<
1613 0 : " (process " << getpid( ) << ", thread " <<
1614 0 : std::this_thread::get_id() << ")" << std::endl;
1615 0 : fflush(stderr);
1616 : }
1617 0 : return Record( );
1618 : }
1619 :
1620 0 : if ( clean_panel_id == -1 || ! gui.alive( ) ) {
1621 : // open panel if it is not already open...
1622 0 : clean_panel_id = gui.open_panel( clean_images );
1623 : }
1624 :
1625 0 : int niter=0,cycleniter=0,iterdone;
1626 0 : float threshold=0.0, cyclethreshold=0.0;
1627 0 : access( (void*) 0,
1628 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1629 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1630 0 : niter = state.Niter;
1631 0 : cycleniter = state.CycleNiter;
1632 0 : threshold = state.Threshold;
1633 0 : cyclethreshold = state.CycleThreshold;
1634 0 : iterdone = state.IterDone;
1635 0 : return dummy;
1636 : } ) );
1637 :
1638 0 : std::string strthresh = std::to_string(threshold)+"Jy";
1639 0 : std::string strcycthresh = std::to_string(cyclethreshold)+"Jy";
1640 :
1641 0 : int iterleft = niter - iterdone;
1642 0 : if( iterleft<0 ) iterleft=0;
1643 :
1644 0 : casacore::Vector<int> itsActionCodes(clean_images.size( ));
1645 0 : itsActionCodes = 1.0;
1646 :
1647 0 : unsigned ind = 0;
1648 0 : for ( auto it = clean_images.begin( ); it != clean_images.end( ); ++it, ++ind ) {
1649 0 : if ( std::get<2>(*it) ) {
1650 0 : itsActionCodes[ind] = std::get<3>(*it);
1651 0 : continue;
1652 : }
1653 0 : if ( fabs(itsActionCodes[ind]) == 1.0 ) {
1654 0 : std::string imageName = std::get<0>(*it) + ".residual" + ( std::get<1>(*it) ? ".tt0" : "" );
1655 0 : std::string maskName = std::get<0>(*it) + ".mask";
1656 0 : std::string last_strcycthresh = strcycthresh;
1657 0 : itsActionCodes[ind] = gui.interactivemask( clean_panel_id, imageName, maskName, iterleft,
1658 : cycleniter, strthresh, strcycthresh );
1659 :
1660 0 : if ( strcycthresh != last_strcycthresh ) {
1661 0 : access( (void*) 0,
1662 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1663 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1664 : // if this is not set to false, the users cyclethreshold
1665 : // change are recomputed...
1666 0 : state.IsCycleThresholdAuto = false;
1667 0 : return dummy;
1668 : } ) );
1669 : }
1670 :
1671 0 : if( itsActionCodes[ind] < 0 ) os << "[" << std::get<0>(*it) <<"] Mask changed interactively." << LogIO::POST;
1672 0 : if( fabs(itsActionCodes[ind])==3 || fabs(itsActionCodes[ind])==2 ) {
1673 : // fabs(itsActionCodes[ind])==3 --> stop
1674 : // fabs(itsActionCodes[ind])==2 --> no more
1675 0 : std::get<2>(*it) = true;
1676 0 : std::get<3>(*it) = fabs(itsActionCodes[ind]);
1677 : }
1678 0 : }
1679 : }
1680 :
1681 :
1682 0 : Quantity qa;
1683 0 : casacore::Quantity::read(qa,strthresh);
1684 0 : threshold = qa.getValue(Unit("Jy"));
1685 :
1686 :
1687 0 : float oldcyclethreshold = cyclethreshold;
1688 0 : Quantity qb;
1689 0 : casacore::Quantity::read(qb,strcycthresh);
1690 0 : cyclethreshold = qb.getValue(Unit("Jy"));
1691 :
1692 0 : access( (void*) 0,
1693 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1694 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1695 0 : if ( debug ) {
1696 0 : std::cerr << "-------------------------------------------" << std::endl;
1697 0 : std::cerr << " exporting gui state: " << std::endl;
1698 0 : std::cerr << "-------------------------------------------" << std::endl;
1699 0 : std::cerr << " Niter " << state.Niter <<
1700 0 : " ---> " << iterdone+iterleft << std::endl;
1701 0 : std::cerr << " CycleNiter " << state.CycleNiter <<
1702 0 : " ---> " << cycleniter << std::endl;
1703 0 : std::cerr << " Threshold " << state.Threshold <<
1704 0 : " ---> " << threshold << std::endl;
1705 0 : std::cerr << " CycleThreshold " << oldcyclethreshold <<
1706 0 : ( fabs( cyclethreshold - oldcyclethreshold ) > 1e-06 &&
1707 0 : cyclethreshold != 0 && oldcyclethreshold != 0 ?
1708 0 : " ---> " : " -x-> ") << cyclethreshold << std::endl;
1709 0 : std::cerr << "-------------------------------------------" << std::endl;
1710 : }
1711 :
1712 0 : state.Niter = iterdone+iterleft;
1713 0 : state.CycleNiter = cycleniter;
1714 0 : state.Threshold = threshold;
1715 0 : if ( cyclethreshold != 0 && oldcyclethreshold != 0 &&
1716 0 : fabs( cyclethreshold - oldcyclethreshold ) > 1e-06 )
1717 0 : state.CycleThreshold = cyclethreshold;
1718 :
1719 0 : return dummy;
1720 : } ) );
1721 :
1722 0 : Bool alldone=true;
1723 0 : for ( ind = 0; ind < clean_images.size( ); ++ind ) {
1724 0 : alldone = alldone & ( fabs(itsActionCodes[ind])==3 );
1725 : }
1726 0 : if( alldone==true ) changeStopFlag( true );
1727 :
1728 0 : Record returnRec;
1729 0 : for( ind = 0; ind < clean_images.size( ); ind++ ){
1730 0 : returnRec.define( RecordFieldId( String::toString(ind)), itsActionCodes[ind] );
1731 : }
1732 :
1733 0 : return returnRec;
1734 0 : }
1735 :
1736 0 : void grpcInteractiveCleanManager::closePanel( ) {
1737 0 : gui.close_panel(clean_panel_id);
1738 0 : clean_panel_id = -1;
1739 0 : clean_images.clear( );
1740 0 : access( (void*) 0,
1741 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1742 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1743 0 : state.reset( );
1744 0 : return dummy; } ) );
1745 0 : }
1746 :
1747 : } //# NAMESPACE CASA - END
|