Line data Source code
1 : /*
2 : * VisibilityProcessing.h
3 : *
4 : * Created on: Feb 8, 2011
5 : * Author: jjacobs
6 : */
7 :
8 : #ifndef VISIBILITYPROCESSING_H_
9 : #define VISIBILITYPROCESSING_H_
10 :
11 : #include <casacore/casa/aips.h>
12 : #include <casacore/casa/BasicSL/String.h>
13 : #include <casacore/casa/Exceptions/Error.h>
14 : #include <stdcasa/UtilJ.h>
15 : #include "VisBuffer.h"
16 : #include "VisibilityIterator.h"
17 :
18 : #include <memory>
19 : #include <tuple>
20 : #include <map>
21 : #include <set>
22 : #include <vector>
23 :
24 :
25 : /*
26 :
27 : Visibility Processing Framework Class Summary
28 : =============================================
29 :
30 : SubchunkIndex - Index of a subchunk. Consists of the chunk number,
31 : the subchunk number and the iteration number. All three
32 : are zero-based. The interation number is nonzero if a chunk
33 : is reprocessed. A subchunk is used to identify a VisBuffer
34 : relative to the VisibilityIterator managed by the VpEngine.
35 : VbPtr - Smart pointer of a VisBuffer.
36 : VisibilityProcessor - A visibility processing node in data flow graph
37 : VisibilityProcessorStub - A do-nothing node used for unit testing
38 : VpContainer - A VP which contains a graph of VPs. It handles moving
39 : data between its input and output ports to the appropriate
40 : input and output ports of the data flow graph it contains.
41 : SplitterVp - Has a single input port and outputs identical copies of it
42 : through its output ports.
43 : WriterVp - Takes an input and writes it out to the VisibilityIterator provided
44 : when it was constructed. Optionally passes the input data to its
45 : output port.
46 : VpEngine - Object that executes a data flow graph of VisibilityProcessors on data
47 : accessed via a VisibilityIterator.
48 :
49 : VpPort - A data port into or out of (or both) a VisibiltyProcessor
50 : VpPorts - A collection of VpPort objects
51 : VpData - A collection of visibility data; it works like an associative array
52 : pairing a VpPort with a VisBuffer.
53 :
54 : */
55 :
56 : namespace casa {
57 :
58 : namespace asyncio {
59 : class PrefetchColumns;
60 : };
61 :
62 : namespace vpf {
63 :
64 : class VisibilityProcessor;
65 : class VpContainer;
66 : class VpEngine;
67 :
68 : class SubchunkIndex {
69 :
70 : friend class SubchunkIndex_Test;
71 :
72 : public:
73 :
74 : enum {Invalid = -1};
75 :
76 : SubchunkIndex (casacore::Int chunkNumber = Invalid, casacore::Int subChunkNumber = Invalid, casacore::Int iteration = Invalid);
77 :
78 : // Comparison Operators
79 : //
80 : // Comparison is in lexicographic order by chunk, subchunk and iteration.
81 :
82 : casacore::Bool operator< (const SubchunkIndex & other) const;
83 0 : casacore::Bool operator== (const SubchunkIndex & other) const { return ! (* this < other || other < * this);}
84 0 : casacore::Bool operator!= (const SubchunkIndex & other) const { return ! (* this == other);}
85 :
86 : casacore::Int getChunkNumber () const;
87 : casacore::Int getIteration () const;
88 : casacore::Int getSubchunkNumber () const;
89 :
90 : casacore::String toString () const;
91 :
92 : private:
93 :
94 : casacore::Int chunkNumber_p; // -1 for invalid
95 : casacore::Int iteration_p; // -1 for invalid
96 : casacore::Int subChunkNumber_p;
97 : };
98 :
99 : class VbPtr : public std::shared_ptr<casa::VisBuffer> {
100 :
101 : public:
102 :
103 0 : VbPtr () : std::shared_ptr<casa::VisBuffer> () {}
104 0 : explicit VbPtr (casa::VisBuffer * vb) : std::shared_ptr<casa::VisBuffer> (vb) {}
105 :
106 : // Assignment operator setting VbPtr to a normal pointer. Ownership is passed to the
107 : // VbPtr so caller must ensure that delete is not called on the VisBuffer.
108 :
109 : VbPtr & operator= (casa::VisBuffer * vb)
110 : {
111 : std::shared_ptr<casa::VisBuffer>::operator= (VbPtr (vb));
112 : return * this;
113 : }
114 : };
115 :
116 : class VpPort {
117 :
118 : friend class VpContainer;
119 : friend class VpPort_Test;
120 :
121 : public:
122 :
123 : // Normally ports are either input or output ports. However, the ports
124 : // of a VpContainer do double duty serving as an input to the container and
125 : // an outputted to the input of a contained VP, or vice versa.
126 :
127 : typedef enum {Unknown, Input = 1, Output = 2, InOut = Input | Output} Type;
128 :
129 : VpPort ();
130 : VpPort (VisibilityProcessor * vp, const casacore::String & name, Type type);
131 0 : ~VpPort () {}
132 :
133 : casacore::Bool operator< (const VpPort & other) const;
134 : casacore::Bool operator== (const VpPort & other) const;
135 :
136 : casacore::Bool empty () const;
137 : casacore::String getFullName () const; // returns Vp0.Vp1...VpN.portName
138 : casacore::String getName () const; // returns portName
139 : Type getType () const; // Returns the port's type as something from the Type enum
140 : casacore::Bool isConnectedInput () const; // true if port has been connected up as an input
141 : casacore::Bool isConnectedOutput () const; // true if port has been connected up as an output
142 :
143 : // Used to check the type of the port as defined in the Type enum. InOut ports
144 : // return true for both casacore::Input and Output types.
145 :
146 : bool isType (Type t) const;
147 :
148 : //casacore::String toString() const;
149 :
150 : protected:
151 :
152 : const VisibilityProcessor * getVp () const;
153 : VisibilityProcessor * getVp ();
154 : void setConnectedInput ();
155 : void setConnectedOutput ();
156 :
157 : private:
158 :
159 : casacore::Bool connectedInput_p;
160 : casacore::Bool connectedOutput_p;
161 : casacore::String name_p;
162 : VisibilityProcessor * visibilityProcessor_p; // [use]
163 : Type type_p;
164 :
165 : };
166 :
167 : class VpPorts : public std::vector<VpPort> {
168 :
169 : friend class VisibilityProcessor;
170 : friend class VpPorts_Test;
171 :
172 : public:
173 :
174 : casacore::Bool contains (const casacore::String & name) const;
175 : casacore::Bool contains (const VpPort & port) const;
176 : VpPort get (const casacore::String & name) const;
177 : casacore::String toString () const;
178 :
179 : protected:
180 :
181 : VpPort & getRef (const casacore::String & name);
182 :
183 : template <typename Itr>
184 : static
185 : Itr
186 0 : find(const casacore::String & name, Itr begin, Itr end)
187 : {
188 0 : Itr i;
189 :
190 0 : for (i = begin; i != end; i++){
191 0 : if (i->getName() == name){
192 0 : break;
193 : }
194 : }
195 :
196 0 : return i;
197 : }
198 :
199 : };
200 :
201 : namespace asyncio {
202 : class PrefetchColumns;
203 : }
204 :
205 :
206 : class VpData: public std::map<VpPort, VbPtr> {
207 :
208 : friend class VpData_Test;
209 :
210 : public:
211 :
212 : VpData ();
213 : VpData (const VpPort & port, VbPtr);
214 :
215 : void add (const VpPort & port, VbPtr); // Adds a (port,VbPtr) to the collection
216 :
217 : // Returns the (port,VbPtr) pairs for the requested set of ports. An execption
218 : // is thrown if a requested port is not present unless missingIsOk is set to true.
219 :
220 : VpData getSelection (const VpPorts &, bool missingIsOk = false) const;
221 : casacore::String getNames () const; // Returns a comma-separated list of the port names.
222 : };
223 :
224 :
225 : class VisibilityProcessor {
226 :
227 : friend class VpContainer;
228 : friend class WriterVp;
229 :
230 : public:
231 :
232 : VisibilityProcessor( const VisibilityProcessor& ) = delete;
233 : VisibilityProcessor& operator=( const VisibilityProcessor& ) = delete;
234 :
235 : typedef enum {
236 : Normal,
237 : RepeatChunk
238 : } ChunkCode;
239 :
240 : typedef enum {
241 : Subchunk, // casacore::Normal processing of a subchunk
242 : EndOfChunk, // Called after all subchunks of a chunk have been processed
243 : EndOfData // Called after all chunks have been processed
244 : } ProcessingType;
245 :
246 : typedef std::tuple <ChunkCode, VpData> ProcessingResult;
247 :
248 : VisibilityProcessor ();
249 : VisibilityProcessor (const casacore::String & name,
250 : const std::vector<casacore::String> & inputNames,
251 : const std::vector<casacore::String> & outputNames = std::vector<casacore::String>(),
252 : casacore::Bool makeIoPorts = false);
253 0 : virtual ~VisibilityProcessor () {}
254 :
255 : // chunkStart is called to inform the VP that a new chunk is starting.
256 :
257 : void chunkStart (const SubchunkIndex &);
258 :
259 : // Called to cause the VP to process the provided inputs. It will be called
260 : // in three different contexts as indicated by the ProcessingType.
261 :
262 : ProcessingResult doProcessing (ProcessingType processingType,
263 : VpData & inputData,
264 : VpEngine * vpEngine,
265 : const SubchunkIndex & subChunkIndex);
266 :
267 : // Returns a pointer to the containing VP or NULL if this VP is top-level.
268 :
269 0 : const VpContainer * getContainer () const { return NULL;}
270 :
271 : // The full name of a VP is a dotted list of the names of all the containing
272 : // VPs ending with the name of this VP (e.g., vp0.vp1...vpN.thisVp).
273 :
274 : casacore::String getFullName () const;
275 :
276 : // Returns the input port having the specified name. Exception if port is undefined.
277 :
278 : VpPort getInput (const casacore::String & name) const;
279 :
280 : // Returns a collection of the input ports for this VP; optionally only the
281 : // connected ports are returned.
282 :
283 : VpPorts getInputs (casacore::Bool connectedOnly = false) const;
284 :
285 : // Returns the name of this VP
286 :
287 : casacore::String getName () const;
288 :
289 : // Returns the number of Subchunks processed (mainly for testing)
290 :
291 : casacore::Int getNSubchunksProcessed () const;
292 :
293 : // Returns the number of unique Subchunks (i.e., iteration ignored) processed.
294 : // (mainly for testing)
295 :
296 : casacore::Int getNSubchunksUniqueProcessed () const;
297 :
298 : // Returns the output port having the specified name. Exception if port is undefined.
299 :
300 : VpPort getOutput (const casacore::String & name) const;
301 :
302 : // Returns a collection of the output ports for this VP; optionally only the
303 : // connected ports are returned.
304 :
305 : VpPorts getOutputs (casacore::Bool connectedOnly = false) const;
306 :
307 : // Returns the collection of columns that need to be prefetched if this node
308 : // is used with async I/O.
309 :
310 : virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
311 :
312 : // Called by the framework when the processing is about to begin (i.e., prior
313 : // to the first VisBuffer being fed into the graph.
314 :
315 : void processingStart ();
316 :
317 : // Called to ask the VP to check its validity (i.e., are all needed inputs connected,
318 : // etc.).
319 :
320 : void validate ();
321 :
322 : protected:
323 :
324 : // The public API contains many methods that are not virtual. However, where subclass-specific
325 : // behavior is potentially useful, a corresponding xxxImpl method is provided. This allows the
326 : // framework to perform certain required housekeeping options while allowing the subclass
327 : // to perform custom operations.
328 :
329 : // Called on the object when a new chunk is about to be started.
330 :
331 0 : virtual void chunkStartImpl (const SubchunkIndex &) {}
332 :
333 :
334 : // Defines the set of possible input ports for this VP
335 :
336 : VpPorts definePorts (const std::vector<casacore::String> & portNames, VpPort::Type type, const casacore::String & typeName);
337 :
338 : // Requests processing of the provided (possibly empty) input data. This is called on each
339 : // subchunk (then inputData will be nonempty) and at the end of a chunk and the end of the
340 : // entire data set. These last two call types allow the VP to output any data that it might have
341 : // been accumulating across multiple subchunks, etc.
342 :
343 : virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
344 : VpData & inputData,
345 : const SubchunkIndex & subChunkIndex) = 0;
346 :
347 : // Returns a collection of the ports that are not connected using the provided connection
348 : // method; some ports may also be excluded from this list by name.
349 :
350 : VpPorts portsUnconnected (const VpPorts & ports, casacore::Bool (VpPort::* isConnected) () const,
351 : const std::vector<casacore::String> & except = std::vector<casacore::String> ()) const;
352 :
353 : // Called when data processing is about to beging; this allows the VP to perform any
354 : // initialization that it desires now that it is completely connected into the graph.
355 :
356 0 : virtual void processingStartImpl () {}
357 :
358 : // Methods to ease the validation process.
359 :
360 : void throwIfAnyInputsUnconnected (const std::vector<casacore::String> & exceptThese = std::vector<casacore::String> ()) const;
361 : void throwIfAnyInputsUnconnectedExcept (const casacore::String & exceptThisOne) const;
362 : void throwIfAnyOutputsUnconnected (const std::vector<casacore::String> & exceptThese = std::vector<casacore::String> ()) const;
363 : void throwIfAnyOutputsUnconnectedExcept (const casacore::String & exceptThisOne) const;
364 : void throwIfAnyPortsUnconnected () const;
365 :
366 : // Called to allow the node to validate its initial state. An casacore::AipsError should be thrown if
367 : // this node decides that it is invalid.
368 :
369 : virtual void validateImpl () = 0;
370 :
371 : private:
372 :
373 : VpPort & getInputRef (const casacore::String & name);
374 : VpPort & getOutputRef (const casacore::String & name);
375 : void setContainer (const VpContainer *);
376 :
377 : ROVisibilityIterator * getVi (); // returns the VI used for this data set
378 : VpEngine * getVpEngine(); // returns the engine executing this VP
379 :
380 : const VpContainer * container_p; // [use]
381 : casacore::String name_p; // name of this VP
382 : casacore::Int nSubchunks_p; // number of subchunks processed
383 : casacore::Int nSubchunksUnique_p; // number of unique subchunks processed
384 : VpEngine * vpEngine_p; // pointer to VpEngine processing this VP (can be null)
385 : VpPorts vpInputs_p; // collection of input ports
386 : VpPorts vpOutputs_p; // collection of output ports
387 : };
388 :
389 : std::ostream & operator<< (std::ostream & os, const VisibilityProcessor::ProcessingType & processingType);
390 : casacore::String toString (VisibilityProcessor::ProcessingType p);
391 :
392 : class VisibilityProcessorStub : public VisibilityProcessor {
393 :
394 : // Used to allow definition of a VP variable for use in testing.
395 : // Should never be actually operated on.
396 :
397 : public:
398 :
399 : VisibilityProcessorStub (const casacore::String & name)
400 : : VisibilityProcessor (name, utilj::Strings(), utilj::Strings())
401 : {}
402 :
403 : ProcessingResult doProcessingImpl (ProcessingType /*processingType*/,
404 : VpData & /*inputData*/,
405 : const SubchunkIndex & /*subChunkIndex*/);
406 :
407 : void validateImpl ();
408 :
409 :
410 : };
411 :
412 : //class SimpleVp: public VisibilityProcessor {
413 : //
414 : //public:
415 : //
416 : // SimpleVp (const casacore::String & name, const casacore::String & input = "In", const casacore::String & output = "");
417 : // virtual ~SimpleVp ();
418 : //
419 : //protected:
420 : //
421 : // class SimpleResult : public std::tuple<ChunkCode, VisBuffer *> {};
422 : //
423 : // virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
424 : // VpData & inputData,
425 : // const SubchunkIndex & subChunkIndex);
426 : // virtual void validateImpl ();
427 : //
428 : //private:
429 : //
430 : //};
431 :
432 : class SplitterVp : public VisibilityProcessor {
433 :
434 : public:
435 :
436 : SplitterVp (const casacore::String & name,
437 : const casacore::String & inputName,
438 : const std::vector<casacore::String> & outputNames);
439 :
440 0 : ~SplitterVp () {}
441 :
442 : protected:
443 :
444 : ProcessingResult doProcessingImpl (ProcessingType processingType ,
445 : VpData & inputData,
446 : const SubchunkIndex & subChunkIndex);
447 :
448 : void validateImpl ();
449 : };
450 :
451 : class WriterVp: public VisibilityProcessor {
452 :
453 : public:
454 :
455 : // Creates a WriterVp node. If the vi argument is NULL then the
456 : // flow graph's VI is used. The advanceVi argument is used to
457 : // direct the node to advance the VI after each write (i.e., perform
458 : // a vi++ operation); advancing the flow graph's VI will cause a
459 : // run time exception.
460 :
461 : WriterVp (const casacore::String & name,
462 : VisibilityIterator * vi = NULL,
463 : casacore::Bool advanceVi = false,
464 : const casacore::String & input = "In",
465 : const casacore::String & output = "Out");
466 :
467 : // This paradoxical method allows the user to create a single data flow graph
468 : // and then programmatically decide at run time whether data should be actually
469 : // output on this particular run.
470 :
471 : casacore::Bool setDisableOutput (casacore::Bool disableIt);
472 :
473 : protected:
474 :
475 : ProcessingResult doProcessingImpl (ProcessingType processingType,
476 : VpData & inputData,
477 : const SubchunkIndex & subChunkIndex);
478 :
479 : void validateImpl ();
480 :
481 : private:
482 :
483 : casacore::Bool advanceVi_p; // true is VI is to be advanced after each write.
484 : // N.B., advancing the flow graphs VI is prohibited
485 : casacore::Bool disableOutput_p; // true if output is disabled.
486 : VisibilityIterator * vi_p; // VI to use for output.
487 : };
488 :
489 : class VpContainer : public VisibilityProcessor {
490 :
491 : friend class VisibilityProcessing;
492 :
493 : public:
494 :
495 : // Creates a VpContainer object providing the specified inputs and outputs.
496 : // These inputs and outputs will potentially be connected to the inputs and
497 : // outputs of the VPs that are contained in the container.
498 :
499 : VpContainer (const casacore::String & name,
500 : const std::vector<casacore::String> & inputs = std::vector<casacore::String> (1, "In"),
501 : const std::vector<casacore::String> & outputs = std::vector<casacore::String> ());
502 :
503 0 : virtual ~VpContainer () {}
504 :
505 : // Adds a VP to the container. Exception if VP is already in the container.
506 :
507 : virtual void add (VisibilityProcessor * processor);
508 :
509 : // Connects the specified output to the specified input. The VP pointer may be
510 : // omitted if the port belongs to the container.
511 :
512 : virtual void connect (VisibilityProcessor * sourceVp, const casacore::String & sourcePortName,
513 : VisibilityProcessor * sinkVp, const casacore::String & sinkPortName);
514 : virtual void connect (const casacore::String & sourcePortName,
515 : VisibilityProcessor * sinkVp, const casacore::String & sinkPortName);
516 : virtual void connect (VisibilityProcessor * sourceVp, const casacore::String & sourcePortName,
517 : const casacore::String & sinkPortName);
518 :
519 : virtual void chunkStart (const SubchunkIndex & sci);
520 :
521 : // Fills the container with the specified set of VPs. The container must be
522 : // empty prior to this call.
523 :
524 : virtual void fillWithSequence (VisibilityProcessor * first, ...); // Last one NULL
525 :
526 : // Returns the columns that are required to be prefetched if async I/O is used.
527 :
528 : virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
529 :
530 : protected:
531 :
532 : typedef std::vector<VisibilityProcessor *> VPs; // VPs are used (not owned)
533 : typedef VPs::const_iterator const_iterator;
534 : typedef VPs::iterator iterator;
535 :
536 : iterator begin();
537 : const_iterator begin() const;
538 :
539 : casacore::Bool contains (const VisibilityProcessor *) const;
540 : virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
541 : VpData & inputData,
542 : const SubchunkIndex & subChunkIndex);
543 : casacore::Bool empty () const;
544 : iterator end();
545 : const_iterator end() const;
546 : virtual void processingStartImpl ();
547 : size_t size() const;
548 : virtual void validateImpl ();
549 :
550 : private:
551 :
552 : typedef std::map<VpPort, VpPort> Network;
553 : typedef std::set<VpPort> NetworkReverse;
554 : typedef std::tuple<VisibilityProcessor *, VpData> ReadyVpAndData;
555 :
556 : class VpSet : public std::set<VisibilityProcessor *> {
557 : public:
558 :
559 : template <typename In>
560 0 : VpSet (In begin, In end) : std::set<VisibilityProcessor *> (begin, end) {}
561 : casacore::String getNames () const;
562 : };
563 :
564 : Network network_p; // connections between the ports of the connected nodes
565 : NetworkReverse networkReverse_p; // connections of contets except indexed in
566 : // backwards order.
567 : VPs vps_p; // the VPs contained by this container.
568 :
569 : ReadyVpAndData findReadyVp (VpSet & vpsWaiting, VpData & inputs, bool flushing) const;
570 : ReadyVpAndData findReadyVpFlushing (VpSet & vpsWaiting, VpData & inputs) const;
571 : ReadyVpAndData findReadyVpNormal (VpSet & vpsWaiting, VpData & inputs) const;
572 : bool follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const;
573 : bool followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const;
574 : void orderContents ();
575 : void remapPorts (VpData & data, const VisibilityProcessor *);
576 : std::pair<VpPort,VpPort> validateConnectionPorts (VisibilityProcessor * sourceVp,
577 : const casacore::String & sourcePortName,
578 : VisibilityProcessor * sinkVp,
579 : const casacore::String & sinkPortName);
580 : };
581 :
582 : class VpEngine {
583 :
584 : friend class VisibilityProcessor;
585 :
586 : public:
587 :
588 : VpEngine () : vi_p (NULL) {}
589 :
590 : // Process the data set swept by the VisibilityIterator using the
591 : // VisibilityProcessor provided with the optionally specified port
592 : // as the input.
593 :
594 : void process (VisibilityProcessor & processor,
595 : ROVisibilityIterator & vi,
596 : const casacore::String & inputPortName);
597 :
598 : void process (VisibilityProcessor & processor,
599 : ROVisibilityIterator & vi,
600 : const VpPort & inputPort = VpPort ());
601 :
602 : static casacore::Int getLogLevel ();
603 : static void log (const casacore::String & format, ...);
604 : static casacore::String getAipsRcBase ();
605 :
606 : private:
607 :
608 : ROVisibilityIterator * vi_p; // [use]
609 :
610 : static casacore::Int logLevel_p;
611 : static casacore::LogIO * logIo_p;
612 : static casacore::Bool loggingInitialized_p;
613 : static casacore::LogSink * logSink_p;
614 :
615 : static casacore::Bool initializeLogging ();
616 :
617 : ROVisibilityIterator * getVi ();
618 :
619 : };
620 :
621 : } // end namespace vpu
622 :
623 : } // end namespace casa
624 :
625 :
626 : /*
627 :
628 : VisibilityProcessor vp1;
629 : VisibilityProcessor vp2;
630 : VpuContainer vpc1;
631 :
632 : vpc1.add (vp1);
633 : vpc1.add (vp2);
634 :
635 : vpc1.connect (vp1.getOutput (Out), vp2.getInput (In));
636 : vpc1.connect (vpc1.getInput (In), vp1.getInput (In));
637 : vpc1.connect (vp2.getOutput (Out), vpc1.getOutput (Out));
638 :
639 : VpuContainer vpc2;
640 : VpuContainer vpc0;
641 :
642 : vpc0.add (vpc1, vpc2);
643 : vpc0.connect (vpc1.getOutput (Out), vpc2.getOutput (In));
644 : vpc0.connect (vpc0.getOutput (In), vpc1.getInput (In));
645 : vpc0.connect (vpc1.getOutput (Out), vpc0.getOutput (Out));
646 :
647 : vpc0.validate ();
648 :
649 : */
650 :
651 :
652 :
653 : #endif /* VISIBILITYPROCESSING_H_ */
|