Line data Source code
1 : #include <synthesis/ImagerObjects/grpcInteractiveClean.h>
2 : #include <synthesis/ImagerObjects/SIMinorCycleController.h>
3 : #include <casatools/Config/State.h>
4 : #include <casacore/casa/Logging/LogIO.h>
5 : #include <casacore/images/Images/PagedImage.h>
6 : #include <stdcasa/StdCasa/CasacSupport.h>
7 : #include <string.h>
8 : #include <stdlib.h>
9 : #include <sys/types.h>
10 : #include <sys/wait.h>
11 : #include <iostream>
12 :
13 : #include <sys/types.h>
14 : #include <sys/stat.h>
15 : #include <unistd.h>
16 : #include <array>
17 : #include <regex>
18 : #include <string>
19 :
20 : #include <algorithm>
21 : #include <cctype>
22 : #include <locale>
23 :
24 : #include <grpc++/grpc++.h>
25 : #include "shutdown.grpc.pb.h"
26 : #include "img.grpc.pb.h"
27 : #include "ping.grpc.pb.h"
28 :
29 : #include <stdcasa/StdCasa/CasacSupport.h>
30 :
31 : #ifdef __APPLE__
32 : extern "C" char **environ;
33 : #include <unistd.h>
34 : #endif
35 :
36 : using namespace casacore;
37 :
38 : // https://stackoverflow.com/questions/216823/whats-the-best-way-to-trim-stdstring
39 : // C++ is so ridiculous... trim from start (in place)
40 0 : static inline void ltrim(std::string &s) {
41 0 : s.erase(s.begin(), std::find_if(s.begin(), s.end(), [](int ch) {
42 0 : return !std::isspace(ch);
43 : }));
44 0 : }
45 :
46 : // trim from end (in place)
47 0 : static inline void rtrim(std::string &s) {
48 0 : s.erase(std::find_if(s.rbegin(), s.rend(), [](int ch) {
49 0 : return !std::isspace(ch);
50 0 : }).base(), s.end());
51 0 : }
52 :
53 : // trim from both ends (in place)
54 0 : static inline void trim(std::string &s) {
55 0 : ltrim(s);
56 0 : rtrim(s);
57 0 : }
58 :
59 : // Executes the given program, with the given arguments and the given environment.
60 : // The stdout from the program is collected and returned in output, up to outputlen characters.
61 : // @param envp To get around the MPI issue from CAS-13252, this should probably come from getenv_sansmpi().
62 0 : static void execve_getstdout(char *pathname, char *argv[], char *envp[], char *output, ssize_t outputlen)
63 : {
64 : // We use execve here instead of popen to get around issues related to using MPI.
65 : // MPI crashes when starting a process that calls MPI_Init in a process spawned using popen or exec.
66 : // We can trick MPI into behaving itself by removing all the MPI environmental variables for
67 : // the child precess (thus getenv_sansmpi and execve).
68 :
69 : int filedes[2];
70 0 : if (pipe(filedes) == -1) {
71 0 : std::cerr << "pipe error" << std::endl;
72 0 : exit(1);
73 : }
74 :
75 0 : pid_t pid = fork();
76 0 : if (pid == -1) {
77 0 : std::cerr << "fork error" << std::endl;
78 0 : exit(1);
79 0 : } else if (pid == 0) { // child
80 : // close stdout and connect it to the input of the pipe
81 0 : while ((dup2(filedes[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {}
82 0 : close(filedes[1]);
83 0 : close(filedes[0]);
84 : // exec on the child process
85 0 : execve(pathname, argv, envp);
86 0 : exit(1);
87 : } else { // parent
88 : // don't care about the input end of the pipe
89 0 : close(filedes[1]);
90 :
91 0 : const ssize_t tmplen = 128;
92 : char tmp[tmplen];
93 0 : ssize_t total = 0;
94 : while (1) {
95 0 : ssize_t count = read(filedes[0], tmp, tmplen);
96 0 : if (count == -1) {
97 0 : if (errno == EINTR) {
98 0 : continue;
99 : } else {
100 0 : std::cerr << "read error" << std::endl;
101 0 : exit(1);
102 : }
103 0 : } else if (count == 0) {
104 0 : break;
105 : } else {
106 0 : ssize_t remaining = outputlen - total;
107 0 : ssize_t cpysize = (count < remaining) ? count : remaining;
108 0 : memcpy(output+total, tmp, cpysize);
109 0 : total += cpysize;
110 0 : output[total] = '\0';
111 : }
112 0 : }
113 :
114 0 : close(filedes[0]);
115 : }
116 0 : }
117 :
118 : // Get all environment parameters (as from the "environ" posix variable),
119 : // but don't include any environment parameters that match "*MPI*".
120 : // @return A malloc'ed set of environment parameters. Should call free after use.
121 0 : static char **getenv_sansmpi()
122 : {
123 0 : int nvars = 0, nvars_sansmpi = 0;
124 0 : for (nvars = 0; environ[nvars] != NULL; nvars++) {
125 : // printf("%s\n", environ[nvars]);
126 0 : std::string envvar = environ[nvars];
127 0 : if (envvar.find("MPI") == std::string::npos) {
128 0 : nvars_sansmpi++;
129 : }
130 0 : }
131 :
132 0 : char **ret = (char**)malloc(sizeof(char*) * (nvars_sansmpi+1));
133 0 : int retidx = 0;
134 0 : for (int i = 0; environ[i] != NULL; i++) {
135 0 : std::string envvar = environ[i];
136 0 : if (envvar.find("MPI") == std::string::npos) {
137 0 : ret[retidx] = environ[i];
138 0 : retidx++;
139 : }
140 0 : }
141 0 : ret[nvars_sansmpi] = NULL;
142 :
143 0 : return ret;
144 : }
145 :
146 : namespace casa { //# NAMESPACE CASA - BEGIN
147 :
148 638 : grpcInteractiveCleanManager &grpcInteractiveClean::getManager( ) {
149 638 : static grpcInteractiveCleanManager mgr;
150 638 : return mgr;
151 : }
152 :
153 3410 : void grpcInteractiveCleanManager::pushDetails() {
154 3410 : }
155 :
156 1 : grpcInteractiveCleanState::grpcInteractiveCleanState( ) : SummaryMinor(casacore::IPosition(2,
157 : SIMinorCycleController::nSummaryFields, // temporary CAS-13683 workaround
158 : // SIMinorCycleController::useSmallSummaryminor() ? 6 : SIMinorCycleController::nSummaryFields, // temporary CAS-13683 workaround
159 : 0)),
160 1 : SummaryMajor(casacore::IPosition(1,0)) {
161 2 : LogIO os( LogOrigin("grpcInteractiveCleanState",__FUNCTION__,WHERE) );
162 1 : reset( );
163 1 : }
164 :
165 1277 : void grpcInteractiveCleanState::reset( ) {
166 1277 : Niter = 0;
167 1277 : MajorDone = 0;
168 1277 : CycleNiter = 0;
169 1277 : InteractiveNiter = 0;
170 1277 : Threshold = 0;
171 1277 : CycleThreshold = 0;
172 1277 : InteractiveThreshold = 0.0;
173 1277 : IsCycleThresholdAuto = true;
174 1277 : IsCycleThresholdMutable = true;
175 1277 : IsThresholdAuto = false;
176 1277 : CycleFactor = 1.0;
177 1277 : LoopGain = 0.1;
178 1277 : StopFlag = false;
179 1277 : PauseFlag = false;
180 1277 : InteractiveMode = false;
181 1277 : UpdatedModelFlag = false;
182 1277 : InteractiveIterDone = 0;
183 1277 : IterDone = 0;
184 1277 : StopCode = 0;
185 1277 : Nsigma = 0.0;
186 1277 : MaxPsfSidelobe = 0.0;
187 1277 : MinPsfFraction = 0.05;
188 1277 : MaxPsfFraction = 0.8;
189 1277 : PeakResidual = 0.0;
190 1277 : MinorCyclePeakResidual = 0.0;
191 1277 : PrevPeakResidual = -1.0;
192 1277 : NsigmaThreshold = 0.0;
193 1277 : PrevMajorCycleCount = 0;
194 1277 : PeakResidualNoMask = 0.0;
195 1277 : PrevPeakResidualNoMask = -1.0;
196 1277 : MinPeakResidualNoMask = 1e+9;
197 1277 : MinPeakResidual = 1e+9;
198 1277 : MaskSum = -1.0;
199 1277 : MadRMS = 0.0;
200 : //int nSummaryFields = SIMinorCycleController::useSmallSummaryminor() ? 6 : SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
201 1277 : int nSummaryFields = SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
202 : //int nSummaryFields = !FullSummary ? 6 : SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
203 1277 : SummaryMinor.reformOrResize(casacore::IPosition(2, nSummaryFields ,0));
204 1277 : SummaryMajor.reformOrResize(casacore::IPosition(1,0));
205 1277 : SummaryMinor = 0;
206 1277 : SummaryMajor = 0;
207 1277 : }
208 :
209 0 : void grpcInteractiveCleanManager::setControls( int niter, int ncycle, float threshold ) {
210 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager", __FUNCTION__, WHERE) );
211 0 : static const auto debug = getenv("GRPC_DEBUG");
212 0 : if ( debug ) std::cerr << "setting clean controls:";
213 0 : access( (void*) 0,
214 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
215 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
216 :
217 0 : state.Niter = niter;
218 0 : if ( debug ) std::cerr << " niter=" << state.Niter;
219 0 : state.CycleNiter = ncycle;
220 0 : if ( debug ) std::cerr << " cycleniter=" << state.CycleNiter;
221 0 : state.Threshold = threshold;
222 0 : if ( debug ) std::cerr << " threshold=" << state.Threshold;
223 0 : return dummy;
224 :
225 : } ) );
226 :
227 0 : if ( debug ) {
228 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
229 0 : std::this_thread::get_id() << ")" << std::endl;
230 0 : fflush(stderr);
231 : }
232 :
233 0 : }
234 638 : void grpcInteractiveCleanManager::setControlsFromRecord(const casac::record &iterpars) {
235 1276 : LogIO os( LogOrigin("grpcInteractiveCleanManager", __FUNCTION__, WHERE) );
236 638 : static const auto debug = getenv("GRPC_DEBUG");
237 :
238 638 : if ( debug ) std::cerr << "initializing clean controls:";
239 :
240 638 : access( (void*) 0,
241 1276 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
242 638 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
243 :
244 638 : auto oldNiter = state.Niter;
245 638 : auto oldCycleNiter = state.CycleNiter;
246 638 : auto oldThreshold = state.Threshold;
247 638 : auto oldCycleThreshold = state.CycleThreshold;
248 :
249 638 : state.reset( );
250 :
251 : /* Note it is important that niter get set first as we catch
252 : negative values in the cycleniter, and set it equal to niter */
253 638 : auto niter = iterpars.find("niter");
254 638 : if ( niter != iterpars.end( ) ) {
255 638 : state.Niter = niter->second.toInt( );
256 638 : if ( debug ) std::cerr << " niter=" << state.Niter;
257 : }
258 638 : auto newNiter = state.Niter;
259 :
260 638 : auto cycleniter = iterpars.find("cycleniter");
261 638 : if ( cycleniter != iterpars.end( ) ) {
262 638 : int val = cycleniter->second.toInt( );
263 638 : if ( val <= 0 )
264 4 : state.CycleNiter = state.Niter;
265 : else
266 634 : state.CycleNiter = val;
267 638 : if ( debug ) std::cerr << " cycleniter=" << state.CycleNiter;
268 : }
269 638 : auto newCycleNiter = state.CycleNiter;
270 :
271 638 : auto interactiveniter = iterpars.find("interactiveniter");
272 638 : if ( interactiveniter != iterpars.end( ) ) {
273 0 : state.InteractiveNiter = interactiveniter->second.toInt( );
274 0 : if ( debug ) std::cerr << " interactiveniter=" << state.InteractiveNiter;
275 : }
276 :
277 638 : auto threshold = iterpars.find("threshold");
278 638 : if ( threshold != iterpars.end( ) ) {
279 638 : auto quant = casaQuantity(threshold->second);
280 638 : if ( quant.getUnit( ) == "" ) quant.setUnit("Jy");
281 638 : auto val = quant.getValue(Unit("Jy"));
282 638 : if ( val == -1.0 ) {
283 0 : state.Threshold = 0.0;
284 0 : state.IsThresholdAuto = true;
285 : } else {
286 638 : state.Threshold = (float) val;
287 638 : state.IsThresholdAuto = false;
288 : }
289 638 : if ( debug ) {
290 0 : std::cerr << " threshold=" << state.Threshold;
291 : std::cerr << " isthresholdauto=" <<
292 0 : (state.IsThresholdAuto ? "true" : "false");
293 : }
294 638 : }
295 638 : auto newThreshold = state.Threshold;
296 :
297 638 : auto cyclethreshold = iterpars.find("cyclethreshold");
298 638 : if ( cyclethreshold != iterpars.end( ) ) {
299 176 : state.CycleThreshold = casaQuantity(cyclethreshold->second).getValue(Unit("Jy"));
300 176 : state.IsCycleThresholdAuto = false;
301 176 : if ( debug ) {
302 0 : std::cerr << " cyclethreshold=" << state.CycleThreshold;
303 : std::cerr << " iscyclethresholdauto=" <<
304 0 : (state.IsCycleThresholdAuto ? "true" : "false");
305 0 : fflush(stderr);
306 : }
307 : }
308 638 : auto newCycleThreshold = state.CycleThreshold;
309 :
310 638 : auto cyclethresholdismutable = iterpars.find("cyclethresholdismutable");
311 638 : if ( cyclethresholdismutable != iterpars.end( ) ) {
312 176 : state.IsCycleThresholdMutable = cyclethresholdismutable->second.toBool( );
313 176 : state.IsCycleThresholdAuto = state.IsCycleThresholdAuto && state.IsCycleThresholdMutable;
314 176 : if ( debug ) {
315 0 : std::cerr << " iscyclethresholdmutable=" << (state.IsCycleThresholdMutable ? "true" : "false");
316 0 : std::cerr << " iscyclethresholdauto=" << (state.IsCycleThresholdAuto ? "true" : "false");
317 0 : fflush(stderr);
318 : }
319 : }
320 :
321 638 : auto interactivethreshold = iterpars.find("interactivethreshold");
322 638 : if ( interactivethreshold != iterpars.end( ) ) {
323 0 : state.InteractiveThreshold = casaQuantity(interactivethreshold->second).getValue(Unit("Jy"));
324 0 : if ( debug ) std::cerr << " interactivethreshold=" << state.InteractiveThreshold;
325 : }
326 :
327 638 : auto loopgain = iterpars.find("loopgain");
328 638 : if ( loopgain != iterpars.end( ) ) {
329 638 : state.LoopGain = (float) loopgain->second.toDouble( );
330 638 : if ( debug ) std::cerr << " loopgain=" << state.LoopGain;
331 : }
332 638 : auto cyclefactor = iterpars.find("cyclefactor");
333 638 : if ( cyclefactor != iterpars.end( ) ) {
334 638 : state.CycleFactor = (float) cyclefactor->second.toDouble( );
335 638 : if ( debug ) std::cerr << " cyclefactor=" << state.CycleFactor;
336 : }
337 :
338 638 : auto interactivemode = iterpars.find("interactive");
339 638 : if ( interactivemode != iterpars.end( ) ) {
340 638 : state.InteractiveMode = interactivemode->second.toBool( );
341 638 : if ( debug ) std::cerr << " interactive=" <<
342 0 : (state.InteractiveMode ? "true" : "false");
343 : }
344 :
345 638 : auto minpsffraction = iterpars.find("minpsffraction");
346 638 : if ( minpsffraction != iterpars.end( ) ) {
347 638 : state.MinPsfFraction = (float) minpsffraction->second.toDouble( );
348 638 : if ( debug ) std::cerr << " minpsffraction=" << state.MinPsfFraction;
349 : }
350 :
351 638 : auto maxpsffraction = iterpars.find("maxpsffraction");
352 638 : if ( maxpsffraction != iterpars.end( ) ) {
353 638 : state.MaxPsfFraction = (float) maxpsffraction->second.toDouble( );
354 638 : if ( debug ) std::cerr << " maxpsffraction=" << state.MaxPsfFraction;
355 : }
356 :
357 638 : auto nsigma = iterpars.find("nsigma");
358 638 : if ( nsigma != iterpars.end( ) ) {
359 638 : state.Nsigma = (float) nsigma->second.toDouble( );
360 638 : if ( debug ) std::cerr << " nsigma=" << state.Nsigma;
361 : }
362 638 : auto fullsummary = iterpars.find("fullsummary");
363 638 : if ( fullsummary != iterpars.end() ) {
364 638 : state.FullSummary = fullsummary->second.toBool();
365 638 : if ( debug ) std::cerr << " fullsummry=" << state.FullSummary;
366 : }
367 638 : if ( debug ) std::cerr << std::endl;
368 :
369 638 : if ( debug ) {
370 0 : std::cerr << "-------------------------------------------" << this << " / " << &state << std::endl;
371 0 : std::cerr << " exported python state: " << std::endl;
372 0 : std::cerr << "-------------------------------------------" << std::endl;
373 0 : std::cerr << " Niter " << oldNiter <<
374 0 : " ---> " << newNiter << std::endl;
375 0 : std::cerr << " CycleNiter " << oldCycleNiter <<
376 0 : " ---> " << newCycleNiter << std::endl;
377 0 : std::cerr << " Threshold " << oldThreshold <<
378 0 : " ---> " << newThreshold << std::endl;
379 0 : std::cerr << " CycleThreshold " << oldCycleThreshold <<
380 0 : " ---> " << newCycleThreshold << std::endl;
381 0 : std::cerr << " IsCycleThresholdAuto " << state.IsCycleThresholdAuto << std::endl;
382 0 : std::cerr << "-------------------------------------------" << std::endl;
383 :
384 :
385 : }
386 638 : return dummy;
387 :
388 : } ) );
389 :
390 638 : if ( debug ) {
391 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
392 0 : std::this_thread::get_id() << ")" << std::endl;
393 0 : fflush(stderr);
394 : }
395 638 : }
396 : /*
397 : Float SIIterBot_state::readThreshold( Record recordIn, String id ) {
398 : LogIO os( LogOrigin("SIIterBot_state",__FUNCTION__,WHERE) );
399 : std::lock_guard<std::recursive_mutex> guard(recordMutex);
400 : // Threshold can be a variant, either Float or String(with units).
401 : Float fthresh=0.0;
402 : // If a number, treat it as a number in units of Jy.
403 : if( recordIn.dataType(id) == TpFloat ||
404 : recordIn.dataType(id) == TpDouble ||
405 : recordIn.dataType(id) == TpInt )
406 : { fthresh = recordIn.asFloat( RecordFieldId(id)); }
407 : // If a string, try to convert to a Quantity
408 : else if( recordIn.dataType(id) == TpString )
409 : {
410 : Quantity thresh;
411 : // If it cannot be converted to a Quantity.... complain, and use zero.
412 : if( ! casacore::Quantity::read( thresh, recordIn.asString( RecordFieldId(id) ) ) )
413 : {os << LogIO::WARN << "Cannot parse threshold value. Setting to zero." << LogIO::POST;
414 : fthresh=0.0;}
415 : // If converted to Quantity, get value in Jy.
416 : // ( Note : This does not check for wrong units, e.g. if the user says '100m' ! )
417 : else { fthresh = thresh.getValue(Unit("Jy")); }
418 : }
419 : // If neither valid datatype, print a warning and use zero.
420 : else {os << LogIO::WARN << id << " is neither a number nor a string Quantity. Setting to zero." << LogIO::POST;
421 : fthresh=0.0; }
422 :
423 : return fthresh;
424 : }
425 : */
426 638 : void grpcInteractiveCleanManager::setIterationDetails(const casac::record &iterpars) {
427 1276 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
428 638 : static const auto debug = getenv("GRPC_DEBUG");
429 :
430 : // Setup interactive masking : list of image names.
431 638 : if ( clean_images.size( ) == 0 ) {
432 : try {
433 : ////////////////////////////////////////////////////////////////////////////////////////////////////////
434 : ///// START : code to get a list of image names for interactive masking
435 :
436 638 : auto allimages = iterpars.find("allimages");
437 638 : if ( allimages != iterpars.end( ) ) {
438 638 : auto rec = allimages->second.getRecord( );
439 1287 : for ( auto it = rec.begin( ); it != rec.end( ); ++it ) {
440 649 : auto oneimg = it->second.getRecord( );
441 649 : auto img_name = oneimg.find("imagename");
442 649 : auto img_multiterm = oneimg.find("multiterm");
443 649 : if ( img_name != oneimg.end( ) && img_multiterm != oneimg.end( ) ) {
444 649 : clean_images.push_back( std::make_tuple( img_name->second.getString(),
445 1298 : img_multiterm->second.toBool( ), false, 0) );
446 : }
447 649 : }
448 638 : } else {
449 0 : throw( AipsError("Need image names and nterms in iteration parameter list") );
450 : }
451 638 : if ( clean_images.size( ) <= 0 ) {
452 0 : throw( AipsError("Need image names for iteration") );
453 : }
454 :
455 638 : if ( debug ) {
456 0 : std::cerr << "clean images specified: ";
457 0 : for ( auto it = clean_images.begin( ); it != clean_images.end( ); ++it ) {
458 0 : if ( it != clean_images.begin( ) ) std::cerr << ", ";
459 0 : std::cerr << std::get<0>(*it) << " [" << (std::get<1>(*it) ? "true" : "false") << "]";
460 : }
461 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
462 0 : std::this_thread::get_id() << ")" << std::endl;
463 0 : fflush(stderr);
464 : }
465 :
466 : ///// END : code to get a list of image names for interactive masking
467 : ////////////////////////////////////////////////////////////////////////////////////////////////////////
468 :
469 638 : setControlsFromRecord( iterpars );
470 638 : Int nSummaryFields = !state.FullSummary ? 6 : SIMinorCycleController::nSummaryFields;
471 638 : state.SummaryMinor.reformOrResize(casacore::IPosition(2, nSummaryFields ,0));
472 :
473 0 : } catch( AipsError &x ) {
474 0 : throw( AipsError("Error in updating iteration parameters : " + x.getMesg()) );
475 0 : }
476 : }
477 638 : }
478 :
479 3410 : void grpcInteractiveCleanManager::updateCycleThreshold( grpcInteractiveCleanState &state ) {
480 3410 : static const auto debug = getenv("GRPC_DEBUG");
481 :
482 3410 : Float psffraction = state.MaxPsfSidelobe * state.CycleFactor;
483 :
484 3410 : psffraction = max(psffraction, state.MinPsfFraction);
485 3410 : psffraction = min(psffraction, state.MaxPsfFraction);
486 :
487 3410 : if ( debug ) {
488 0 : std::cerr << "------------------------------------------- " << this << " / " << &state << std::endl;
489 0 : std::cerr << " algorithmic update of cycle threshold: " << std::endl;
490 0 : std::cerr << " CycleThreshold " << state.CycleThreshold <<
491 0 : " ---> " << (state.PeakResidual * psffraction) << std::endl;
492 0 : std::cerr << " IsCycleThresholdAuto " << state.IsCycleThresholdAuto << std::endl;
493 0 : std::cerr << "-------------------------------------------" << std::endl;
494 : }
495 :
496 3410 : state.CycleThreshold = state.PeakResidual * psffraction;
497 3410 : pushDetails();
498 3410 : }
499 :
500 1175 : void grpcInteractiveCleanManager::addSummaryMajor( ) {
501 1175 : access( (void*) 0,
502 2350 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
503 1175 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
504 1175 : IPosition shp = state.SummaryMajor.shape();
505 1175 : if( shp.nelements() != 1 )
506 0 : throw(AipsError("Internal error in shape of major-cycle summary record"));
507 :
508 1175 : state.SummaryMajor.resize( IPosition( 1, shp[0]+1 ) , true );
509 1175 : state.SummaryMajor( IPosition(1, shp[0] ) ) = state.IterDone;
510 2350 : return dummy; } ) );
511 1175 : }
512 :
513 1266 : casacore::Record grpcInteractiveCleanManager::getDetailsRecord( bool includeSummary ) {
514 2532 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
515 :
516 1266 : Record returnRecord;
517 :
518 : Record result = access( returnRecord,
519 1266 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
520 1266 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
521 : //*** Control Variables **************************************************
522 1266 : rec.define( RecordFieldId("niter"), state.Niter );
523 1266 : rec.define( RecordFieldId("cycleniter"), state.CycleNiter );
524 1266 : rec.define( RecordFieldId("interactiveniter"), state.InteractiveNiter );
525 :
526 1266 : rec.define( RecordFieldId("threshold"), state.Threshold );
527 1266 : rec.define( RecordFieldId("nsigma"), state.Nsigma );
528 :
529 1266 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
530 1266 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; // Reset this, for the next round
531 :
532 1266 : rec.define( RecordFieldId("cyclethreshold"), state.CycleThreshold );
533 1266 : rec.define( RecordFieldId("interactivethreshold"), state.InteractiveThreshold );
534 :
535 1266 : rec.define( RecordFieldId("loopgain"), state.LoopGain );
536 1266 : rec.define( RecordFieldId("cyclefactor"), state.CycleFactor );
537 :
538 : //*** Status Reporting Variables *****************************************
539 1266 : rec.define( RecordFieldId("iterdone"), state.IterDone );
540 1266 : rec.define( RecordFieldId("cycleiterdone"), state.MaxCycleIterDone );
541 1266 : rec.define( RecordFieldId("interactiveiterdone"),
542 1266 : state.InteractiveIterDone + state.MaxCycleIterDone);
543 :
544 1266 : rec.define( RecordFieldId("nmajordone"), state.MajorDone );
545 1266 : rec.define( RecordFieldId("maxpsfsidelobe"), state.MaxPsfSidelobe );
546 1266 : rec.define( RecordFieldId("maxpsffraction"), state.MaxPsfFraction );
547 1266 : rec.define( RecordFieldId("minpsffraction"), state.MinPsfFraction );
548 1266 : rec.define( RecordFieldId("interactivemode"), state.InteractiveMode );
549 :
550 1266 : rec.define( RecordFieldId("stopcode"), state.StopCode );
551 :
552 : //*** report clean's state ***********************************************
553 1266 : rec.define( RecordFieldId("cleanstate"),
554 1266 : state.StopFlag ? "stopped" : state.PauseFlag ? "paused" : "running" );
555 :
556 1266 : if ( includeSummary ) {
557 1266 : rec.define( RecordFieldId("summaryminor"), state.SummaryMinor );
558 1266 : rec.define( RecordFieldId("summarymajor"), state.SummaryMajor );
559 : }
560 :
561 3798 : return rec; }) );
562 :
563 2532 : return result;
564 :
565 :
566 : /* return access( returnRecord,
567 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
568 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
569 : //*** Control Variables **************************************************
570 : rec.define( RecordFieldId("niter"), state.Niter );
571 : rec.define( RecordFieldId("cycleniter"), state.CycleNiter );
572 : rec.define( RecordFieldId("interactiveniter"), state.InteractiveNiter );
573 :
574 : rec.define( RecordFieldId("threshold"), state.Threshold );
575 : rec.define( RecordFieldId("nsigma"), state.Nsigma );
576 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
577 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; // Reset this, for the next round
578 :
579 : rec.define( RecordFieldId("cyclethreshold"), state.CycleThreshold );
580 : rec.define( RecordFieldId("interactivethreshold"), state.InteractiveThreshold );
581 :
582 : rec.define( RecordFieldId("loopgain"), state.LoopGain );
583 : rec.define( RecordFieldId("cyclefactor"), state.CycleFactor );
584 :
585 : //*** Status Reporting Variables *****************************************
586 : rec.define( RecordFieldId("iterdone"), state.IterDone );
587 : rec.define( RecordFieldId("cycleiterdone"), state.MaxCycleIterDone );
588 : rec.define( RecordFieldId("interactiveiterdone"),
589 : state.InteractiveIterDone + state.MaxCycleIterDone);
590 :
591 : rec.define( RecordFieldId("nmajordone"), state.MajorDone );
592 : rec.define( RecordFieldId("maxpsfsidelobe"), state.MaxPsfSidelobe );
593 : rec.define( RecordFieldId("maxpsffraction"), state.MaxPsfFraction );
594 : rec.define( RecordFieldId("minpsffraction"), state.MinPsfFraction );
595 : rec.define( RecordFieldId("interactivemode"), state.InteractiveMode );
596 :
597 : rec.define( RecordFieldId("stopcode"), state.StopCode );
598 :
599 : //*** report clean's state ***********************************************
600 : rec.define( RecordFieldId("cleanstate"),
601 : state.StopFlag ? "stopped" : state.PauseFlag ? "paused" : "running" );
602 :
603 : if ( includeSummary ) {
604 : rec.define( RecordFieldId("summaryminor"), state.SummaryMinor );
605 : rec.define( RecordFieldId("summarymajor"), state.SummaryMajor );
606 : }
607 :
608 : return rec; }) );
609 : */
610 1266 : }
611 :
612 :
613 923 : Record grpcInteractiveCleanManager::getMinorCycleControls( ){
614 1846 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
615 :
616 : /* This returns a record suitable for initializing the minor cycle controls. */
617 923 : Record returnRecord;
618 :
619 : return access( returnRecord,
620 923 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
621 923 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
622 :
623 : /* If autocalc, compute cyclethresh from peak res, cyclefactor and psf sidelobe
624 : Otherwise, the user has explicitly set it (interactively) for this minor cycle */
625 923 : if( state.IsCycleThresholdAuto == true ) { updateCycleThreshold(state); }
626 923 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; /* Reset this, for the next round */
627 :
628 : /* The minor cycle will stop based on the cycle parameters. */
629 923 : int maxCycleIterations = state.CycleNiter;
630 923 : float cycleThreshold = state.CycleThreshold;
631 923 : maxCycleIterations = min(maxCycleIterations, state.Niter - state.IterDone);
632 923 : cycleThreshold = max(cycleThreshold, state.Threshold);
633 923 : bool thresholdReached = (cycleThreshold==state.Threshold)? True : False;
634 :
635 923 : rec.define( RecordFieldId("cycleniter"), maxCycleIterations);
636 923 : rec.define( RecordFieldId("cyclethreshold"), cycleThreshold);
637 923 : rec.define( RecordFieldId("loopgain"), state.LoopGain);
638 923 : rec.define( RecordFieldId("thresholdreached"), thresholdReached);
639 923 : rec.define( RecordFieldId("nsigma"), state.Nsigma);
640 :
641 2769 : return rec; }) );
642 923 : }
643 :
644 3341 : int grpcInteractiveCleanManager::cleanComplete( bool lastcyclecheck, bool reachedMajorLimit ){
645 6682 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
646 :
647 3341 : int stopCode=0;
648 :
649 3341 : return access( stopCode,
650 6682 : std::function< int ( int, grpcInteractiveCleanState & )>(
651 3341 : [&]( int stop_code, grpcInteractiveCleanState &state ) -> int {
652 :
653 : float usePeakRes;
654 :
655 3341 : if( lastcyclecheck==True ) { usePeakRes = state.MinorCyclePeakResidual; }
656 2166 : else { usePeakRes = state.PeakResidual; }
657 :
658 : // for debugging, remove it later
659 3341 : os<<LogIO::DEBUG1<<"cleanComplete-- CycleThreshold without Threshold limit="<<state.CycleThreshold<<LogIO::POST;
660 :
661 5497 : if( state.PeakResidual > 0 && state.PrevPeakResidual>0 &&
662 2156 : fabs( state.PeakResidual - state.PrevPeakResidual)/fabs(state.PrevPeakResidual) > 2.0 ) {
663 8 : os << "[WARN] Peak residual (within the mask) increased from " << state.PrevPeakResidual << " to " << state.PeakResidual << LogIO::POST;
664 : }
665 : // for debugging, remove it later
666 3341 : os <<LogIO::DEBUG1<<"Threshold="<<state.Threshold<<" itsNsigmaThreshold===="<<state.NsigmaThreshold<<LogIO::POST;
667 3341 : os <<LogIO::DEBUG1<<"usePeakRes="<<usePeakRes<<" itsPeakResidual="<<state.PeakResidual<<" itsPrevPeakRes="<<state.PrevPeakResidual<<LogIO::POST;
668 3341 : os <<LogIO::DEBUG1<<"itsIterDone="<<state.IterDone<<" itsNiter="<<state.Niter<<LogIO::POST;
669 3341 : os <<LogIO::DEBUG1<<"FullSummary="<<state.FullSummary<<LogIO::POST;
670 :
671 : /// This may interfere with some other criterion... check.
672 3341 : float tol = 0.01; // threshold test torelance (CAS-11278)
673 3341 : if ( state.MajorDone==0 && state.IterDone==0 && state.MaskSum==0.0) {
674 3 : stopCode=7; // if zero mask is detected it should exit right away
675 9005 : } else if ( state.IterDone >= state.Niter ||
676 2329 : usePeakRes <= state.Threshold ||
677 1858 : state.PeakResidual <= state.NsigmaThreshold ||
678 1551 : fabs(usePeakRes-state.Threshold)/state.Threshold < tol ||
679 7217 : fabs(state.PeakResidual - state.NsigmaThreshold)/state.NsigmaThreshold < tol ||
680 1550 : state.StopFlag ) {
681 : // os << "Reached global stopping criteria : ";
682 :
683 1788 : if ( state.IterDone >= state.Niter ) { stopCode=1; }
684 : //os << "Numer of iterations. "; // (" << state.IterDone << ") >= limit (" << state.Niter << ")" ;
685 1788 : if( usePeakRes <= state.Threshold || (usePeakRes-state.Threshold)/state.Threshold < tol) {stopCode=2; }
686 1311 : else if ( usePeakRes <= state.NsigmaThreshold || (state.PeakResidual - state.NsigmaThreshold)/state.NsigmaThreshold < tol ) {
687 47 : if (state.NsigmaThreshold!=0.0) { stopCode=8; } // for nsigma=0.0 this mode is turned off
688 : }
689 :
690 : //os << "Peak residual (" << state.PeakResidual << ") <= threshold(" << state.Threshold << ")";
691 1788 : if( state.StopFlag ) {stopCode=3;}
692 : //os << "Forced stop. ";
693 : // os << LogIO::POST;
694 :
695 : //return true;
696 :
697 : } else { // not converged yet... but....if nothing has changed in this round... also stop
698 :
699 1550 : if (state.MaskSum==0.0) {
700 : //cout << "(7) Mask is all zero.Stopping" << endl;
701 3 : stopCode = 7;
702 : }
703 : // Nothing has changed across the last set of minor cycle iterations and major cycle.
704 1851 : else if( state.IterDone>0 && (state.MajorDone>state.PrevMajorCycleCount) &&
705 304 : fabs(state.PrevPeakResidual - state.PeakResidual)<1e-10)
706 0 : {stopCode = 4;}
707 :
708 : // another non-convergent condition: diverging (relative increase is more than 3 times across one major cycle)
709 1869 : else if ( state.IterDone > 0 &&
710 322 : fabs(state.PeakResidualNoMask-state.PrevPeakResidualNoMask)/fabs(state.PrevPeakResidualNoMask) > 3.0) {
711 : //cout << "(5) Peak res (no mask) : " << state.PeakResidualNoMask
712 : // << " Dev from prev peak res " << state.PrevPeakResidualNoMask << endl;
713 0 : stopCode = 5;}
714 :
715 : // divergence check, 3 times increase from the minimum peak residual so far (across all previous major cycles).
716 1869 : else if ( state.IterDone > 0 &&
717 322 : (fabs(state.PeakResidualNoMask)-state.MinPeakResidualNoMask)/state.MinPeakResidualNoMask > 3.0 )
718 : {
719 : //cout << "(6) Peak res (no mask): " << state.PeakResidualNoMask
720 : // << " Dev from min peak res " << state.MinPeakResidualNoMask << endl;
721 1 : stopCode = 6;
722 : }
723 :
724 : }
725 :
726 3341 : if (stopCode == 0 && reachedMajorLimit) {
727 4 : stopCode = 9;
728 : }
729 :
730 : /*
731 : if( lastcyclecheck==False)
732 : {
733 : cout << "*****" << endl;
734 : cout << "Peak residual : " << state.PeakResidual << " No Mask : " << state.PeakResidualNoMask << endl;
735 : cout << "Prev Peak residual : " << state.PrevPeakResidual << " No Mask : " << state.PrevPeakResidualNoMask << endl;
736 : cout << "Min Peak residual : " << state.MinPeakResidual << " No Mask : " << state.MinPeakResidualNoMask << endl;
737 : }
738 : */
739 :
740 : // os << "Peak residual : " << state.PeakResidual << " and " << state.IterDone << " iterations."<< LogIO::POST;
741 : //cout << "cleancomp : stopcode : " << stopCode << endl;
742 :
743 : //cout << "peak res : " << state.PeakResidual << " state.minPR : " << state.MinPeakResidual << endl;
744 :
745 3341 : if( lastcyclecheck==False)
746 : {
747 2166 : if( fabs(state.PeakResidual) < state.MinPeakResidual )
748 1553 : {state.MinPeakResidual = fabs(state.PeakResidual);}
749 :
750 2166 : state.PrevPeakResidual = state.PeakResidual;
751 :
752 :
753 2166 : if( fabs(state.PeakResidualNoMask) < state.MinPeakResidualNoMask )
754 1530 : {state.MinPeakResidualNoMask = fabs(state.PeakResidualNoMask);}
755 :
756 2166 : state.PrevPeakResidualNoMask = state.PeakResidualNoMask;
757 :
758 2166 : state.PrevMajorCycleCount = state.MajorDone;
759 :
760 : }
761 :
762 3341 : state.StopCode=stopCode;
763 10023 : return stopCode; } ) );
764 3341 : }
765 :
766 4270 : void grpcInteractiveCleanManager::resetMinorCycleInitInfo( grpcInteractiveCleanState &state ) {
767 : /* Get ready to do the minor cycle */
768 4270 : state.PeakResidual = 0;
769 4270 : state.PeakResidualNoMask = 0;
770 4270 : state.MaxPsfSidelobe = 0;
771 4270 : state.MaxCycleIterDone = 0;
772 4270 : state.MaskSum = -1.0;
773 4270 : }
774 :
775 3095 : void grpcInteractiveCleanManager::resetMinorCycleInitInfo( ) {
776 3095 : access( (void*) 0,
777 6190 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
778 3095 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
779 3095 : resetMinorCycleInitInfo(state);
780 3095 : return dummy; } ) );
781 3095 : }
782 :
783 1175 : void grpcInteractiveCleanManager::incrementMajorCycleCount( ) {
784 :
785 1175 : access( (void*) 0,
786 2350 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
787 1175 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
788 1175 : state.PrevMajorCycleCount = state.MajorDone;
789 1175 : state.MajorDone++;
790 :
791 : /* Interactive iteractions update */
792 1175 : state.InteractiveIterDone += state.MaxCycleIterDone;
793 :
794 1175 : resetMinorCycleInitInfo(state);
795 1175 : return dummy; } ) );
796 1175 : }
797 :
798 2199 : void grpcInteractiveCleanManager::mergeCycleInitializationRecord( Record &initRecord, casacore::Int immod ){
799 4398 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
800 :
801 2199 : access( (void*) 0,
802 4398 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
803 2199 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
804 :
805 2199 : mergeMinorCycleSummary( initRecord.asArrayDouble( RecordFieldId("summaryminor")), state, immod );
806 2199 : state.PeakResidual = max(state.PeakResidual, initRecord.asFloat(RecordFieldId("peakresidual")));
807 2199 : state.MaxPsfSidelobe = max(state.MaxPsfSidelobe, initRecord.asFloat(RecordFieldId("maxpsfsidelobe")));
808 :
809 2199 : state.PeakResidualNoMask = max( state.PeakResidualNoMask, initRecord.asFloat(RecordFieldId("peakresidualnomask")));
810 2199 : state.MadRMS = max(state.MadRMS, initRecord.asFloat(RecordFieldId("madrms")));
811 2199 : state.NsigmaThreshold = initRecord.asFloat(RecordFieldId("nsigmathreshold"));
812 :
813 : /*
814 : It has been reset to -1.0.
815 : If no masks have changed, it should remain at -1.0
816 : If any mask has changed, the sum will come in, and should be added to this.
817 : */
818 2199 : float thismasksum = initRecord.asFloat(RecordFieldId("masksum"));
819 2199 : if( thismasksum != -1.0 ) {
820 653 : if ( state.MaskSum == -1.0 ) state.MaskSum = thismasksum;
821 11 : else state.MaskSum += thismasksum;
822 : }
823 :
824 2199 : if ( state.PrevPeakResidual == -1.0 ) state.PrevPeakResidual = state.PeakResidual;
825 2199 : if ( state.PrevPeakResidualNoMask == -1.0 ) state.PrevPeakResidualNoMask = state.PeakResidualNoMask;
826 2199 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
827 :
828 2199 : return dummy; } ) );
829 2199 : }
830 :
831 :
832 3125 : void grpcInteractiveCleanManager::mergeMinorCycleSummary( const Array<Double> &summary, grpcInteractiveCleanState &state, Int immod ){
833 3125 : IPosition cShp = state.SummaryMinor.shape();
834 3125 : IPosition nShp = summary.shape();
835 :
836 : //bool uss = SIMinorCycleController::useSmallSummaryminor(); // temporary CAS-13683 workaround
837 : //int nSummaryFields = uss ? 6 : SIMinorCycleController::nSummaryFields;
838 3125 : int nSummaryFields = !state.FullSummary ? 6 : SIMinorCycleController::nSummaryFields;
839 :
840 6250 : if( cShp.nelements() != 2 || cShp[0] != nSummaryFields ||
841 6250 : nShp.nelements() != 2 || nShp[0] != nSummaryFields )
842 0 : throw(AipsError("Internal error in shape of global minor-cycle summary record"));
843 :
844 3125 : state.SummaryMinor.resize( IPosition( 2, nSummaryFields, cShp[1]+nShp[1] ) ,true );
845 :
846 6276 : for (unsigned int row = 0; row < nShp[1]; row++) {
847 : // iterations done
848 3151 : state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
849 : //if (state.FullSummary){
850 : // state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,0,row));
851 : //}
852 : //else{
853 : // state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
854 : //}
855 : //state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
856 : // peak residual
857 3151 : state.SummaryMinor( IPosition(2,1,cShp[1]+row) ) = summary(IPosition(2,1,row));
858 : // model flux
859 3151 : state.SummaryMinor( IPosition(2,2,cShp[1]+row) ) = summary(IPosition(2,2,row));
860 : // cycle threshold
861 3151 : state.SummaryMinor( IPosition(2,3,cShp[1]+row) ) = summary(IPosition(2,3,row));
862 : //if (uss) { // temporary CAS-13683 workaround
863 3151 : if (!state.FullSummary) { // temporary CAS-13683 workaround
864 : // swap out mapper id with multifield id
865 3105 : state.SummaryMinor( IPosition(2,4,cShp[1]+row) ) = immod;
866 : // chunk id (channel/stokes)
867 3105 : state.SummaryMinor( IPosition(2,5,cShp[1]+row) ) = summary(IPosition(2,5,row));
868 : } else {
869 : // mapper id
870 46 : state.SummaryMinor( IPosition(2,4,cShp[1]+row) ) = summary(IPosition(2,4,row));
871 : // channel id
872 46 : state.SummaryMinor( IPosition(2,5,cShp[1]+row) ) = summary(IPosition(2,5,row));
873 : // polarity id
874 46 : state.SummaryMinor( IPosition(2,6,cShp[1]+row) ) = summary(IPosition(2,6,row));
875 : // cycle start iterations done
876 46 : state.SummaryMinor( IPosition(2,7,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,7,row));
877 : // starting iterations done
878 46 : state.SummaryMinor( IPosition(2,8,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,8,row));
879 : // starting peak residual
880 46 : state.SummaryMinor( IPosition(2,9,cShp[1]+row) ) = summary(IPosition(2,9,row));
881 : // starting model flux
882 46 : state.SummaryMinor( IPosition(2,10,cShp[1]+row) ) = summary(IPosition(2,10,row));
883 : // starting peak residual, not limited to the user's mask
884 46 : state.SummaryMinor( IPosition(2,11,cShp[1]+row) ) = summary(IPosition(2,11,row));
885 : // peak residual, not limited to the user's mask
886 46 : state.SummaryMinor( IPosition(2,12,cShp[1]+row) ) = summary(IPosition(2,12,row));
887 : // number of pixels in the mask
888 46 : state.SummaryMinor( IPosition(2,13,cShp[1]+row) ) = summary(IPosition(2,13,row));
889 : // mpi server
890 46 : state.SummaryMinor( IPosition(2,14,cShp[1]+row) ) = summary(IPosition(2,14,row));
891 : // outlier field id
892 46 : state.SummaryMinor( IPosition(2,15,cShp[1]+row) ) = immod;
893 : // stopcode
894 46 : state.SummaryMinor( IPosition(2,16,cShp[1]+row) ) = summary(IPosition(2,16,row));
895 : }
896 :
897 : }
898 3125 : }
899 :
900 926 : void grpcInteractiveCleanManager::mergeCycleExecutionRecord( Record& execRecord, Int immod ){
901 1852 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
902 :
903 926 : access( (void*) 0,
904 1852 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
905 926 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
906 926 : mergeMinorCycleSummary( execRecord.asArrayDouble( RecordFieldId("summaryminor")), state, immod );
907 :
908 926 : state.IterDone += execRecord.asInt(RecordFieldId("iterdone"));
909 :
910 926 : state.MaxCycleIterDone = max( state.MaxCycleIterDone, execRecord.asInt(RecordFieldId("maxcycleiterdone")) );
911 :
912 926 : state.MinorCyclePeakResidual = max( state.PeakResidual, execRecord.asFloat(RecordFieldId("peakresidual")) );
913 :
914 926 : state.UpdatedModelFlag |=execRecord.asBool( RecordFieldId("updatedmodelflag") );
915 :
916 926 : os << "Completed " << state.IterDone << " iterations." << LogIO::POST;
917 : //with peak residual "<< state.PeakResidual << LogIO::POST;
918 926 : return dummy; } ) );
919 926 : }
920 :
921 0 : void grpcInteractiveCleanManager::changeStopFlag( bool stopEnabled ) {
922 0 : access( (void*) 0,
923 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
924 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
925 0 : state.StopFlag = stopEnabled;
926 0 : return dummy;
927 : } ) );
928 0 : }
929 :
930 : //====================================================================================================
931 :
932 0 : static bool isdir( const char *path ) {
933 : struct stat statbuf;
934 0 : int err = stat(path, &statbuf);
935 0 : if ( err == -1 ) return false;
936 0 : if ( S_ISDIR(statbuf.st_mode) ) return true;
937 0 : return false;
938 : }
939 :
940 0 : static std::string trim_trailing_slash( const char *str ) {
941 0 : char *temp = strdup(str);
942 0 : for ( int off = strlen(str) - 1; off >= 0; --off ) {
943 0 : if ( temp[off] == '/' ) temp[off] = '\0';
944 0 : else break;
945 : }
946 0 : std::string result = temp;
947 0 : free(temp);
948 0 : return result;
949 : }
950 :
951 1 : grpcInteractiveCleanGui::grpcInteractiveCleanGui( ) : viewer_pid(0), viewer_started(false) { }
952 1 : grpcInteractiveCleanGui::~grpcInteractiveCleanGui( ) {
953 1 : static const auto debug = getenv("GRPC_DEBUG");
954 :
955 1 : if ( ! viewer_started ) {
956 1 : if ( debug ) {
957 0 : std::cerr << "viewer shutdown required (" << viewer_uri << ")" <<
958 0 : " (process " << getpid( ) << ", thread " <<
959 0 : std::this_thread::get_id() << ")" << std::endl;
960 0 : fflush(stderr);
961 : }
962 : } else {
963 0 : if ( debug ) {
964 0 : std::cerr << "sending shutdown message to viewer (" << viewer_uri << ")" <<
965 0 : " (process " << getpid( ) << ", thread " <<
966 0 : std::this_thread::get_id() << ")" << std::endl;
967 0 : fflush(stderr);
968 : }
969 :
970 0 : bool stopped = stop_viewer( );
971 :
972 0 : if ( debug ) {
973 0 : if ( stopped ) {
974 0 : std::cerr << "viewer shutdown successful (" << viewer_uri << ")" <<
975 0 : " (process " << getpid( ) << ", thread " <<
976 0 : std::this_thread::get_id() << ")" << std::endl;
977 : } else {
978 0 : std::cerr << "viewer shutdown failed (" << viewer_uri << ")" <<
979 0 : " (process " << getpid( ) << ", thread " <<
980 0 : std::this_thread::get_id() << ")" << std::endl;
981 : }
982 0 : fflush(stderr);
983 : }
984 : }
985 1 : }
986 :
987 :
988 0 : bool grpcInteractiveCleanGui::alive( ) {
989 0 : static const auto debug = getenv("GRPC_DEBUG");
990 0 : if ( debug ) {
991 0 : std::cerr << "pinging viewer (" << viewer_uri << ")" <<
992 0 : " (process " << getpid( ) << ", thread " <<
993 0 : std::this_thread::get_id() << ")" << std::endl;
994 0 : fflush(stderr);
995 : }
996 0 : grpc::ClientContext context;
997 0 : ::google::protobuf::Empty resp;
998 0 : ::google::protobuf::Empty msg;
999 0 : auto ping = casatools::rpc::Ping::NewStub( grpc::CreateChannel( viewer_uri, grpc::InsecureChannelCredentials( ) ) );
1000 0 : ::grpc::Status status = ping->now( &context, msg, &resp );
1001 0 : bool ping_result = status.ok( );
1002 0 : if ( debug ) {
1003 : std::cerr << "ping result: " << (ping_result ? "OK" : "FAIL")<<
1004 0 : " (process " << getpid( ) << ", thread " <<
1005 0 : std::this_thread::get_id() << ")" << std::endl;
1006 0 : fflush(stderr);
1007 : }
1008 0 : if ( ping_result == false ) {
1009 : int proc_status;
1010 0 : waitpid( viewer_pid, &proc_status, WUNTRACED | WCONTINUED | WNOHANG );
1011 0 : viewer_pid = 0;
1012 0 : viewer_proxy.release( );
1013 0 : viewer_started = false;
1014 0 : if ( debug ) {
1015 : std::cerr << "ping failed resetting state" <<
1016 0 : " (process " << getpid( ) << ", thread " <<
1017 0 : std::this_thread::get_id() << ")" << std::endl;
1018 0 : fflush(stderr);
1019 : }
1020 : }
1021 0 : return ping_result;
1022 0 : }
1023 :
1024 0 : bool grpcInteractiveCleanGui::launch( ) {
1025 0 : static const auto debug = getenv("GRPC_DEBUG");
1026 0 : if ( viewer_started == false ) {
1027 : // start the viewer process if it is not already running...
1028 0 : if ( debug ) {
1029 : std::cerr << "spawning viewer process" <<
1030 0 : " (process " << getpid( ) << ", thread " <<
1031 0 : std::this_thread::get_id() << ")" << std::endl;
1032 0 : fflush(stderr);
1033 : }
1034 0 : return spawn_viewer( );
1035 : } else {
1036 0 : if ( alive( ) ) {
1037 0 : if ( debug ) {
1038 : std::cerr << "viewer process available" <<
1039 0 : " (process " << getpid( ) << ", thread " <<
1040 0 : std::this_thread::get_id() << ")" << std::endl;
1041 0 : fflush(stderr);
1042 : }
1043 0 : return true;
1044 : } else {
1045 0 : if ( debug ) {
1046 : std::cerr << "re-spawning viewer process" <<
1047 0 : " (process " << getpid( ) << ", thread " <<
1048 0 : std::this_thread::get_id() << ")" << std::endl;
1049 0 : fflush(stderr);
1050 : }
1051 0 : return launch( );
1052 : }
1053 : }
1054 : return false;
1055 : }
1056 :
1057 638 : void grpcInteractiveCleanGui::close_panel( int id ) {
1058 638 : static const auto debug = getenv("GRPC_DEBUG");
1059 638 : if ( debug ) {
1060 0 : std::cerr << "close_panel(" << id << ")" <<
1061 0 : " (process " << getpid( ) << ", thread " <<
1062 0 : std::this_thread::get_id() << ")" << std::endl;
1063 0 : fflush(stderr);
1064 : }
1065 638 : if ( id != -1 && alive( ) ) {
1066 0 : if ( debug ) {
1067 0 : std::cerr << "close_panel(" << id << ") -- closing panel" <<
1068 0 : " (process " << getpid( ) << ", thread " <<
1069 0 : std::this_thread::get_id() << ")" << std::endl;
1070 0 : fflush(stderr);
1071 : }
1072 : {
1073 : // unload panel's images
1074 0 : rpc::img::Id panel;
1075 0 : grpc::ClientContext context;
1076 0 : ::google::protobuf::Empty resp;
1077 0 : panel.set_id(id);
1078 0 : viewer_proxy->unload( &context, panel, &resp );
1079 0 : }
1080 : {
1081 : // close panel
1082 0 : rpc::img::Id panel;
1083 0 : grpc::ClientContext context;
1084 0 : ::google::protobuf::Empty resp;
1085 0 : panel.set_id(id);
1086 0 : viewer_proxy->close( &context, panel, &resp );
1087 0 : }
1088 : }
1089 638 : }
1090 :
1091 0 : int grpcInteractiveCleanGui::open_panel( std::list<std::tuple<std::string,bool,bool,int>> images ) {
1092 0 : static const auto debug = getenv("GRPC_DEBUG");
1093 0 : if ( viewer_started == false ) {
1094 0 : if ( launch( ) == false ) return -1;
1095 : }
1096 0 : if ( debug ) {
1097 : std::cerr << "opening viewer panel" <<
1098 0 : " (process " << getpid( ) << ", thread " <<
1099 0 : std::this_thread::get_id() << ")" << std::endl;
1100 0 : fflush(stderr);
1101 : }
1102 0 : grpc::ClientContext context;
1103 0 : ::rpc::img::NewPanel np;
1104 0 : rpc::img::Id resp;
1105 0 : np.set_type("clean2");
1106 0 : np.set_hidden(false);
1107 0 : viewer_proxy->panel( &context, np, &resp );
1108 0 : int result = resp.id( );
1109 :
1110 0 : if ( debug ) {
1111 0 : std::cerr << "opened viewer panel " << result <<
1112 0 : " (process " << getpid( ) << ", thread " <<
1113 0 : std::this_thread::get_id() << ")" << std::endl;
1114 0 : fflush(stderr);
1115 : }
1116 :
1117 : // state for interactive masking in the new viewer panel
1118 0 : clean_state.insert( std::pair<int,CleanState>(result, CleanState( )) );
1119 :
1120 0 : if ( debug ) {
1121 0 : std::cerr << "created panel " << result <<
1122 0 : " (process " << getpid( ) << ", thread " <<
1123 0 : std::this_thread::get_id() << ")" << std::endl;
1124 0 : fflush(stderr);
1125 : }
1126 0 : return result;
1127 0 : }
1128 :
1129 0 : void grpcInteractiveCleanGui::unload( int id ) {
1130 0 : grpc::ClientContext context;
1131 0 : ::rpc::img::Id data;
1132 0 : ::google::protobuf::Empty resp;
1133 0 : data.set_id(id);
1134 0 : viewer_proxy->unload( &context, data, &resp );
1135 0 : }
1136 :
1137 0 : bool grpcInteractiveCleanGui::clone( const std::string &imageName, const std::string &newImageName ) {
1138 0 : LogIO os(LogOrigin("grpcInteractiveCleanGui", __FUNCTION__, WHERE));
1139 :
1140 : try {
1141 0 : PagedImage<Float> oldImage( imageName );
1142 0 : PagedImage<Float> newImage( TiledShape( oldImage.shape(), oldImage.niceCursorShape()),
1143 0 : oldImage.coordinates(), newImageName );
1144 0 : newImage.set(0.0);
1145 0 : newImage.table().flush(true, true);
1146 0 : } catch (AipsError x) {
1147 0 : os << LogIO::SEVERE << "Exception: " << x.getMesg() << LogIO::POST;
1148 0 : return false;
1149 0 : }
1150 0 : return true;
1151 0 : }
1152 :
1153 0 : float grpcInteractiveCleanGui::maskSum(const std::string &maskname) {
1154 :
1155 0 : PagedImage<Float> mask( maskname );
1156 :
1157 0 : LatticeExprNode msum( sum( mask ) );
1158 0 : float maskSum = msum.getFloat( );
1159 :
1160 0 : mask.unlock();
1161 0 : mask.tempClose();
1162 :
1163 0 : return maskSum;
1164 0 : }
1165 :
1166 0 : int grpcInteractiveCleanGui::interactivemask( int panel, const std::string &image, const std::string &mask,
1167 : int &niter, int &cycleniter, std::string &thresh,
1168 : std::string &cyclethresh, const bool forceReload ) {
1169 :
1170 0 : static const auto debug = getenv("GRPC_DEBUG");
1171 0 : LogIO os( LogOrigin("grpcInteractiveCleanGui",__FUNCTION__,WHERE) );
1172 :
1173 0 : if ( debug ) {
1174 0 : std::cerr << "starting interactivemask( " <<
1175 0 : panel << ", " << image << ", " << mask << ", " <<
1176 0 : niter << ", " << cycleniter << ", " << thresh << ", " <<
1177 : cyclethresh << ", " << (forceReload ? "true" : "false") << ")" <<
1178 0 : " (process " << getpid( ) << ", thread " <<
1179 0 : std::this_thread::get_id() << ")" << std::endl;
1180 0 : fflush(stderr);
1181 : }
1182 :
1183 0 : if ( viewer_started == false ) {
1184 : // viewer should be started before calling interactivemask(...)
1185 0 : os << LogIO::WARN << "Viewer GUI Not Available" << LogIO::POST;
1186 0 : return 0;
1187 : }
1188 :
1189 0 : auto state = clean_state.find(panel);
1190 0 : if ( state == clean_state.end( ) ) {
1191 0 : os << LogIO::WARN << "Invalid clean panel id used for interactive masking" << LogIO::POST;
1192 0 : return 0;
1193 : }
1194 :
1195 0 : if( Table::isReadable(mask) ) {
1196 0 : if ( ! Table::isWritable(mask) ) {
1197 0 : os << LogIO::WARN << "Mask image is not modifiable " << LogIO::POST;
1198 0 : return 0;
1199 : }
1200 : // we should regrid here if image and mask do not match
1201 : } else {
1202 0 : clone(image, mask);
1203 : }
1204 :
1205 0 : double startmask = maskSum(mask);
1206 :
1207 0 : if ( state->second.image_id == 0 || state->second.mask_id == 0 || forceReload ) {
1208 :
1209 : //Make sure image left after a "no more" is pressed is cleared
1210 0 : if ( forceReload && state->second.image_id !=0 )
1211 0 : state->second.prev_image_id = state->second.image_id;
1212 0 : if ( forceReload && state->second.mask_id !=0 )
1213 0 : state->second.prev_mask_id = state->second.mask_id;
1214 :
1215 0 : if ( state->second.prev_image_id ){
1216 0 : if ( debug ) {
1217 0 : std::cerr << "preparing to unload prev_image_id " <<
1218 0 : state->second.prev_image_id << " (panel " << panel << ")" <<
1219 0 : " (process " << getpid( ) << ", thread " <<
1220 0 : std::this_thread::get_id() << ")" << std::endl;
1221 0 : fflush(stderr);
1222 : }
1223 0 : unload( state->second.prev_image_id );
1224 : }
1225 0 : if ( state->second.prev_mask_id ) {
1226 0 : if ( debug ) {
1227 0 : std::cerr << "preparing to unload prev_mask_id " <<
1228 0 : state->second.prev_mask_id << " (panel " << panel << ")" <<
1229 0 : " (process " << getpid( ) << ", thread " <<
1230 0 : std::this_thread::get_id() << ")" << std::endl;
1231 0 : fflush(stderr);
1232 : }
1233 0 : unload( state->second.prev_mask_id );
1234 : }
1235 :
1236 0 : state->second.prev_image_id = 0;
1237 0 : state->second.prev_mask_id = 0;
1238 :
1239 : {
1240 0 : grpc::ClientContext context;
1241 0 : ::rpc::img::NewData nd;
1242 0 : rpc::img::Id resp;
1243 0 : nd.mutable_panel( )->set_id(panel);
1244 0 : nd.set_path(image);
1245 0 : nd.set_type("raster");
1246 0 : nd.set_scale(0);
1247 0 : viewer_proxy->load( &context, nd, &resp );
1248 0 : state->second.image_id = resp.id( );
1249 0 : }
1250 : {
1251 0 : grpc::ClientContext context;
1252 0 : ::rpc::img::NewData nd;
1253 0 : rpc::img::Id resp;
1254 0 : nd.mutable_panel( )->set_id(panel);
1255 0 : nd.set_path(mask);
1256 0 : nd.set_type("contour");
1257 0 : nd.set_scale(0);
1258 0 : viewer_proxy->load( &context, nd, &resp );
1259 0 : state->second.mask_id = resp.id( );
1260 0 : }
1261 :
1262 : } else {
1263 0 : grpc::ClientContext context;
1264 0 : ::rpc::img::Id id;
1265 0 : ::google::protobuf::Empty resp;
1266 0 : id.set_id(state->second.image_id);
1267 0 : viewer_proxy->reload( &context, id, &resp );
1268 0 : id.set_id(state->second.mask_id);
1269 0 : viewer_proxy->reload( &context, id, &resp );
1270 0 : }
1271 :
1272 0 : grpc::ClientContext context;
1273 0 : ::rpc::img::InteractiveMaskOptions options;
1274 0 : options.mutable_panel( )->set_id(state->first);
1275 0 : options.set_niter(niter);
1276 0 : options.set_cycleniter(cycleniter);
1277 0 : options.set_threshold(thresh);
1278 0 : options.set_cyclethreshold(cyclethresh);
1279 0 : ::rpc::img::InteractiveMaskResult imresult;
1280 0 : ::grpc::Status s = viewer_proxy->interactivemask( &context, options, &imresult );
1281 :
1282 0 : if ( ! s.ok( ) ) {
1283 0 : std::cerr << "interactive mask failed: " << s.error_details( ) << std::endl;
1284 0 : fflush(stderr);
1285 : }
1286 :
1287 0 : niter = imresult.state( ).niter( );
1288 0 : cycleniter = imresult.state( ).cycleniter( );
1289 0 : thresh = imresult.state( ).threshold( );
1290 0 : cyclethresh = imresult.state( ).cyclethreshold( );
1291 0 : int result = 1;
1292 0 : std::string action = imresult.action( );
1293 :
1294 0 : if ( debug ) {
1295 0 : std::cerr << "-------------------------------------------" << std::endl;
1296 0 : std::cerr << " gui state from interactive masking" << std::endl;
1297 0 : std::cerr << "-------------------------------------------" << std::endl;
1298 0 : std::cerr << " action: " << action << std::endl;
1299 0 : std::cerr << " niter: " << niter << std::endl;
1300 0 : std::cerr << " cycle niter: " << cycleniter << std::endl;
1301 0 : std::cerr << " threshold: " << thresh << std::endl;
1302 0 : std::cerr << " cycle threshold: " << cyclethresh << std::endl;
1303 0 : std::cerr << "-------------------------------------------" << std::endl;
1304 : }
1305 :
1306 0 : if ( action == "stop" ) result = 3;
1307 0 : else if ( action == "no more" ) result = 2;
1308 0 : else if ( action == "continue" ) result = 1;
1309 : else {
1310 0 : os << "ill-formed action result (" << action << ")" << LogIO::WARN << LogIO::POST;
1311 0 : return 0;
1312 : }
1313 :
1314 0 : state->second.prev_image_id = state->second.image_id;
1315 0 : state->second.prev_mask_id = state->second.mask_id;
1316 :
1317 0 : state->second.image_id = 0;
1318 0 : state->second.mask_id = 0;
1319 :
1320 0 : if ( debug ) {
1321 0 : std::cerr << "set prev_image_id to " << state->second.prev_image_id << " (panel " << panel << ")" <<
1322 0 : " (process " << getpid( ) << ", thread " <<
1323 0 : std::this_thread::get_id() << ")" << std::endl;
1324 0 : std::cerr << "set prev_mask_id to " << state->second.prev_mask_id << " (panel " << panel << ")" <<
1325 0 : " (process " << getpid( ) << ", thread " <<
1326 0 : std::this_thread::get_id() << ")" << std::endl;
1327 0 : fflush(stderr);
1328 : }
1329 :
1330 0 : double endmask = maskSum(mask);
1331 :
1332 0 : if( startmask != endmask ) {
1333 0 : result = -1 * result;
1334 0 : LogIO os( LogOrigin("grpcInteractiveCleanGui",__FUNCTION__,WHERE) );
1335 0 : os << "[" << mask << "] Mask modified from " << startmask << " pixels to " << endmask << " pixels " << LogIO::POST;
1336 0 : }
1337 :
1338 0 : return result;
1339 0 : }
1340 :
1341 0 : bool grpcInteractiveCleanGui::stop_viewer( ) {
1342 : // viewer is not running...
1343 0 : if ( ! viewer_started ) return false;
1344 0 : static const auto debug = getenv("GRPC_DEBUG");
1345 0 : if ( debug ) {
1346 0 : std::cerr << "sending shutdown message to viewer (" << viewer_uri << ")" <<
1347 0 : " (process " << getpid( ) << ", thread " <<
1348 0 : std::this_thread::get_id() << ")" << std::endl;
1349 0 : fflush(stderr);
1350 : }
1351 :
1352 : // send shutdown message to viewer...
1353 0 : grpc::ClientContext context;
1354 0 : ::google::protobuf::Empty req;
1355 0 : ::google::protobuf::Empty resp;
1356 0 : auto shutdown = casatools::rpc::Shutdown::NewStub( grpc::CreateChannel( viewer_uri,
1357 0 : grpc::InsecureChannelCredentials( ) ) );
1358 0 : shutdown->now( &context, req, &resp );
1359 :
1360 : // wait on viewer (appimage) to exit...
1361 : int status;
1362 0 : pid_t w = waitpid( viewer_pid, &status, WUNTRACED | WCONTINUED );
1363 0 : if ( w == -1 ){
1364 0 : if ( debug ) {
1365 : std::cerr << "viewer process waitpid failed " <<
1366 0 : " (process " << getpid( ) << ", thread " <<
1367 0 : std::this_thread::get_id() << ")" << std::endl;
1368 0 : fflush(stderr);
1369 : }
1370 : // waitpid failed
1371 0 : return false;
1372 0 : } else if ( w == 0 ) {
1373 0 : if ( debug ) {
1374 : std::cerr << "viewer process not found " <<
1375 0 : " (process " << getpid( ) << ", thread " <<
1376 0 : std::this_thread::get_id() << ")" << std::endl;
1377 0 : fflush(stderr);
1378 : }
1379 0 : return false;
1380 : } else {
1381 0 : if ( debug ) {
1382 : std::cerr << "viewer process exited, status fetched " <<
1383 0 : " (process " << getpid( ) << ", thread " <<
1384 0 : std::this_thread::get_id() << ")" << std::endl;
1385 0 : fflush(stderr);
1386 : }
1387 0 : return true;
1388 : }
1389 :
1390 : viewer_pid = 0;
1391 : viewer_proxy.release( );
1392 : viewer_started = false;
1393 : return true;
1394 0 : }
1395 :
1396 0 : bool grpcInteractiveCleanGui::spawn_viewer( ) {
1397 0 : static const auto debug = getenv("GRPC_DEBUG");
1398 :
1399 0 : std::string viewer_path = get_viewer_path( );
1400 0 : if ( viewer_path.size( ) == 0 ) return false;
1401 :
1402 : // To minimize package size for distribution via pypi.org, the
1403 : // data repo has been moved out of the viewer appImage/app and
1404 : // into a separate package. The path to this needs to be specified
1405 : // when starting the viewer now...
1406 0 : std::string distro_data_path_arg = get_distro_data_path( );
1407 :
1408 : // sanity check on viewer path...
1409 : struct stat statbuf;
1410 0 : if ( stat( viewer_path.c_str( ), &statbuf ) < 0 ) {
1411 : // file (or dir) does not exist... e.g.
1412 : // >>>>>>registry available at 0.0.0.0:40939
1413 : // stopping registry<<<<<<
1414 0 : return false;
1415 : }
1416 :
1417 0 : std::string fifo = get_fifo( );
1418 0 : if ( fifo.size( ) == 0 ) return false;
1419 :
1420 : // here we start the viewer in a very basic manner... we do not bother
1421 : // with all of the theatrics needed to daemonize the launched process
1422 : // (see https://stackoverflow.com/questions/17954432/creating-a-daemon-in-linux)
1423 : // it could be that this should be done in the future, but for now we
1424 : // will adopt the simple...
1425 :
1426 0 : const int maxargc = 5;
1427 : char *arguments[maxargc];
1428 0 : for (int i = 0; i < maxargc; i++) { arguments[i] = (char*)""; };
1429 :
1430 0 : arguments[0] = strdup(viewer_path.c_str( ));
1431 0 : arguments[1] = (char*) malloc(sizeof(char) * (fifo.size( ) + 12));
1432 0 : sprintf( arguments[1], "--server=%s", fifo.c_str( ) );
1433 0 : arguments[2] = strdup("--oldregions");
1434 0 : int argc =3;
1435 0 : if ( distro_data_path_arg.size( ) > 0 ) {
1436 0 : distro_data_path_arg = std::string("--datapath=") + distro_data_path_arg;
1437 0 : arguments[argc] = strdup(distro_data_path_arg.c_str( ));
1438 0 : argc++;
1439 : }
1440 0 : std::string log_path = casatools::get_state( ).logPath( );
1441 0 : if ( log_path.size( ) > 0 ) {
1442 0 : arguments[argc] = (char*) malloc(sizeof(char) * (log_path.size( ) + 17));
1443 0 : sprintf( arguments[argc], "--casalogfile=%s", log_path.c_str( ) );
1444 0 : argc++;
1445 : }
1446 :
1447 0 : if ( debug ) {
1448 0 : std::cerr << "forking viewer process: ";
1449 0 : for (int i=0; i < argc; ++i) std::cout << arguments[i] << " ";
1450 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
1451 0 : std::this_thread::get_id() << ")" << std::endl;
1452 0 : fflush(stderr);
1453 : }
1454 0 : pid_t pid = fork( );
1455 :
1456 0 : if ( pid == 0 ) {
1457 0 : if ( debug ) {
1458 0 : std::cerr << "execing viewer process: ";
1459 0 : for (int i=0; i < argc; ++i) std::cout << arguments[i] << " ";
1460 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
1461 0 : std::this_thread::get_id() << ")" << std::endl;
1462 0 : fflush(stderr);
1463 : }
1464 0 : char **envp = getenv_sansmpi(); // bugfix: run the viewer without MPI CAS-13252
1465 0 : execle( arguments[0], arguments[0], arguments[1], arguments[2], arguments[3], arguments[4], NULL, envp );
1466 0 : perror( "grpcInteractiveCleanGui::launch(...) child process exec failed" );
1467 0 : exit(1);
1468 : }
1469 :
1470 0 : for ( int i=0; i < argc; ++i ) free(arguments[i]);
1471 :
1472 0 : if ( pid == -1 ) {
1473 0 : perror( "grpcInteractiveCleanGui::launch(...) child process fork failed" );
1474 0 : return false;
1475 : }
1476 :
1477 : // perform a health check, after a delay...
1478 : int status;
1479 0 : sleep(2);
1480 0 : pid_t w = waitpid( pid, &status, WUNTRACED | WCONTINUED | WNOHANG );
1481 0 : if ( w == -1 ){
1482 0 : if ( debug ) {
1483 : std::cerr << "viewer process failed " <<
1484 0 : " (process " << getpid( ) << ", thread " <<
1485 0 : std::this_thread::get_id() << ")" << std::endl;
1486 0 : fflush(stderr);
1487 : }
1488 : // waitpid failed
1489 0 : return false;
1490 0 : } else if ( w != 0 ) {
1491 0 : if ( debug ) {
1492 : std::cerr << "viewer process died " <<
1493 0 : " (process " << getpid( ) << ", thread " <<
1494 0 : std::this_thread::get_id() << ")" << std::endl;
1495 0 : fflush(stderr);
1496 : }
1497 : // process exited
1498 0 : if ( WIFEXITED(status) ) {
1499 0 : printf("exited, status=%d\n", WEXITSTATUS(status));
1500 0 : } else if (WIFSIGNALED(status)) {
1501 0 : printf("killed by signal %d\n", WTERMSIG(status));
1502 0 : } else if (WIFSTOPPED(status)) {
1503 0 : printf("stopped by signal %d\n", WSTOPSIG(status));
1504 : }
1505 0 : return false;
1506 : }
1507 :
1508 0 : if ( debug ) {
1509 : std::cerr << "fetching viewer uri from " << fifo <<
1510 0 : " (process " << getpid( ) << ", thread " <<
1511 0 : std::this_thread::get_id() << ")" << std::endl;
1512 0 : fflush(stderr);
1513 : }
1514 : char buffer[512];
1515 0 : std::string uri_buffer;
1516 0 : FILE *fp = fopen(fifo.c_str( ), "r");
1517 0 : while ( fgets( buffer, sizeof(buffer), fp ) ) { uri_buffer = uri_buffer + buffer; }
1518 0 : fclose(fp);
1519 0 : trim(uri_buffer);
1520 :
1521 : // validate viewer uri...
1522 0 : if ( ! std::regex_match( uri_buffer, std::regex("^([0-9]+\\.){3}[0-9]+:[0-9]+$") ) ) {
1523 : //rework of regex required for IPv6...
1524 0 : if ( debug ) {
1525 : std::cerr << "bad viewer uri " << uri_buffer <<
1526 0 : " (process " << getpid( ) << ", thread " <<
1527 0 : std::this_thread::get_id() << ")" << std::endl;
1528 0 : fflush(stderr);
1529 : }
1530 0 : return false;
1531 : }
1532 :
1533 0 : if ( debug ) {
1534 : std::cerr << "received viewer uri: " << uri_buffer <<
1535 0 : " (process " << getpid( ) << ", thread " <<
1536 0 : std::this_thread::get_id() << ")" << std::endl;
1537 0 : fflush(stderr);
1538 : }
1539 :
1540 0 : viewer_uri = uri_buffer;
1541 0 : viewer_pid = pid;
1542 0 : viewer_proxy = rpc::img::view::NewStub( grpc::CreateChannel( viewer_uri,
1543 0 : grpc::InsecureChannelCredentials( ) ) );
1544 0 : viewer_started = true;
1545 :
1546 0 : return true;
1547 0 : }
1548 :
1549 0 : std::string grpcInteractiveCleanGui::get_python_path( ) {
1550 0 : std::string ret = casatools::get_state( ).pythonPath( );
1551 0 : return ret;
1552 : }
1553 :
1554 0 : std::string grpcInteractiveCleanGui::get_distro_data_path( ) {
1555 : static bool initialized = false;
1556 0 : static std::string result;
1557 0 : if ( initialized == false ) {
1558 0 : initialized = true;
1559 0 : result = casatools::get_state( ).distroDataPath( );
1560 : struct stat statbuf;
1561 0 : if ( stat( result.c_str( ), &statbuf ) < 0 ) {
1562 : // file (or dir) does not exist...
1563 0 : result = "";
1564 : }
1565 : }
1566 0 : return result;
1567 : }
1568 :
1569 0 : std::string grpcInteractiveCleanGui::get_viewer_path( ) {
1570 : // Get the path to the casaviewer Qt application, to be called in spawn_viewer()
1571 0 : std::string python_path = get_python_path( );
1572 0 : if ( python_path.size( ) == 0 ) return std::string( );
1573 :
1574 : //*** python3 -m casaviewer --app-path
1575 : char buffer[1024];
1576 0 : std::string result;
1577 0 : char **envp = getenv_sansmpi(); // bugfix: run the viewer without MPI CAS-13252
1578 0 : char *python_args[] = { (char*)python_path.c_str(), (char*)"-m", (char*)"casaviewer", (char*)"--app-path", NULL };
1579 0 : execve_getstdout((char*)python_path.c_str(), python_args, envp, buffer, 1024);
1580 0 : result = buffer;
1581 0 : free(envp);
1582 :
1583 0 : trim(result);
1584 0 : if ( result.size( ) == 0 ) return std::string( );
1585 0 : return result;
1586 0 : }
1587 :
1588 0 : std::string grpcInteractiveCleanGui::get_fifo( ) {
1589 0 : static const char *env_tmpdir = getenv("TMPDIR");
1590 0 : static std::string fifo_template = trim_trailing_slash(env_tmpdir && isdir(env_tmpdir) ? env_tmpdir : P_tmpdir) + "/vwr-XXXXXXXXXX";
1591 0 : static int fifo_template_size = fifo_template.size( );
1592 0 : char fifo_path[fifo_template_size+1];
1593 0 : strncpy( fifo_path, fifo_template.c_str( ), fifo_template_size );
1594 0 : fifo_path[fifo_template_size] = '\0';
1595 0 : int fd = mkstemp(fifo_path);
1596 0 : if ( fd == -1 ) throw std::runtime_error("mkstemp failed...");
1597 0 : close( fd );
1598 0 : unlink(fifo_path);
1599 0 : mkfifo( fifo_path, 0666 );
1600 0 : return fifo_path;
1601 0 : }
1602 :
1603 0 : casacore::Record grpcInteractiveCleanManager::pauseForUserInteraction( ) {
1604 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
1605 0 : static const auto debug = getenv("GRPC_DEBUG");
1606 :
1607 0 : if ( clean_images.size( ) == 0 ) {
1608 : // cannot open clean panel in viewer if not images are available...
1609 0 : if ( debug ) {
1610 : std::cerr << "no clean images available" <<
1611 0 : " (process " << getpid( ) << ", thread " <<
1612 0 : std::this_thread::get_id() << ")" << std::endl;
1613 0 : fflush(stderr);
1614 : }
1615 0 : return Record( );
1616 : }
1617 :
1618 0 : if ( clean_panel_id == -1 || ! gui.alive( ) ) {
1619 : // open panel if it is not already open...
1620 0 : clean_panel_id = gui.open_panel( clean_images );
1621 : }
1622 :
1623 0 : int niter=0,cycleniter=0,iterdone;
1624 0 : float threshold=0.0, cyclethreshold=0.0;
1625 0 : access( (void*) 0,
1626 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1627 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1628 0 : niter = state.Niter;
1629 0 : cycleniter = state.CycleNiter;
1630 0 : threshold = state.Threshold;
1631 0 : cyclethreshold = state.CycleThreshold;
1632 0 : iterdone = state.IterDone;
1633 0 : return dummy;
1634 : } ) );
1635 :
1636 0 : std::string strthresh = std::to_string(threshold)+"Jy";
1637 0 : std::string strcycthresh = std::to_string(cyclethreshold)+"Jy";
1638 :
1639 0 : int iterleft = niter - iterdone;
1640 0 : if( iterleft<0 ) iterleft=0;
1641 :
1642 0 : casacore::Vector<int> itsActionCodes(clean_images.size( ));
1643 0 : itsActionCodes = 1.0;
1644 :
1645 0 : unsigned ind = 0;
1646 0 : for ( auto it = clean_images.begin( ); it != clean_images.end( ); ++it, ++ind ) {
1647 0 : if ( std::get<2>(*it) ) {
1648 0 : itsActionCodes[ind] = std::get<3>(*it);
1649 0 : continue;
1650 : }
1651 0 : if ( fabs(itsActionCodes[ind]) == 1.0 ) {
1652 0 : std::string imageName = std::get<0>(*it) + ".residual" + ( std::get<1>(*it) ? ".tt0" : "" );
1653 0 : std::string maskName = std::get<0>(*it) + ".mask";
1654 0 : std::string last_strcycthresh = strcycthresh;
1655 0 : itsActionCodes[ind] = gui.interactivemask( clean_panel_id, imageName, maskName, iterleft,
1656 : cycleniter, strthresh, strcycthresh );
1657 :
1658 0 : if ( strcycthresh != last_strcycthresh ) {
1659 0 : access( (void*) 0,
1660 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1661 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1662 : // if this is not set to false, the users cyclethreshold
1663 : // change are recomputed...
1664 0 : state.IsCycleThresholdAuto = false;
1665 0 : return dummy;
1666 : } ) );
1667 : }
1668 :
1669 0 : if( itsActionCodes[ind] < 0 ) os << "[" << std::get<0>(*it) <<"] Mask changed interactively." << LogIO::POST;
1670 0 : if( fabs(itsActionCodes[ind])==3 || fabs(itsActionCodes[ind])==2 ) {
1671 : // fabs(itsActionCodes[ind])==3 --> stop
1672 : // fabs(itsActionCodes[ind])==2 --> no more
1673 0 : std::get<2>(*it) = true;
1674 0 : std::get<3>(*it) = fabs(itsActionCodes[ind]);
1675 : }
1676 0 : }
1677 : }
1678 :
1679 :
1680 0 : Quantity qa;
1681 0 : casacore::Quantity::read(qa,strthresh);
1682 0 : threshold = qa.getValue(Unit("Jy"));
1683 :
1684 :
1685 0 : float oldcyclethreshold = cyclethreshold;
1686 0 : Quantity qb;
1687 0 : casacore::Quantity::read(qb,strcycthresh);
1688 0 : cyclethreshold = qb.getValue(Unit("Jy"));
1689 :
1690 0 : access( (void*) 0,
1691 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1692 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1693 0 : if ( debug ) {
1694 0 : std::cerr << "-------------------------------------------" << std::endl;
1695 0 : std::cerr << " exporting gui state: " << std::endl;
1696 0 : std::cerr << "-------------------------------------------" << std::endl;
1697 0 : std::cerr << " Niter " << state.Niter <<
1698 0 : " ---> " << iterdone+iterleft << std::endl;
1699 0 : std::cerr << " CycleNiter " << state.CycleNiter <<
1700 0 : " ---> " << cycleniter << std::endl;
1701 0 : std::cerr << " Threshold " << state.Threshold <<
1702 0 : " ---> " << threshold << std::endl;
1703 0 : std::cerr << " CycleThreshold " << oldcyclethreshold <<
1704 0 : ( fabs( cyclethreshold - oldcyclethreshold ) > 1e-06 &&
1705 0 : cyclethreshold != 0 && oldcyclethreshold != 0 ?
1706 0 : " ---> " : " -x-> ") << cyclethreshold << std::endl;
1707 0 : std::cerr << "-------------------------------------------" << std::endl;
1708 : }
1709 :
1710 0 : state.Niter = iterdone+iterleft;
1711 0 : state.CycleNiter = cycleniter;
1712 0 : state.Threshold = threshold;
1713 0 : if ( cyclethreshold != 0 && oldcyclethreshold != 0 &&
1714 0 : fabs( cyclethreshold - oldcyclethreshold ) > 1e-06 )
1715 0 : state.CycleThreshold = cyclethreshold;
1716 :
1717 0 : return dummy;
1718 : } ) );
1719 :
1720 0 : Bool alldone=true;
1721 0 : for ( ind = 0; ind < clean_images.size( ); ++ind ) {
1722 0 : alldone = alldone & ( fabs(itsActionCodes[ind])==3 );
1723 : }
1724 0 : if( alldone==true ) changeStopFlag( true );
1725 :
1726 0 : Record returnRec;
1727 0 : for( ind = 0; ind < clean_images.size( ); ind++ ){
1728 0 : returnRec.define( RecordFieldId( String::toString(ind)), itsActionCodes[ind] );
1729 : }
1730 :
1731 0 : return returnRec;
1732 0 : }
1733 :
1734 638 : void grpcInteractiveCleanManager::closePanel( ) {
1735 638 : gui.close_panel(clean_panel_id);
1736 638 : clean_panel_id = -1;
1737 638 : clean_images.clear( );
1738 638 : access( (void*) 0,
1739 1276 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1740 638 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1741 638 : state.reset( );
1742 638 : return dummy; } ) );
1743 638 : }
1744 :
1745 : } //# NAMESPACE CASA - END
|