casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
VisibilityIteratorImplAsync.h
Go to the documentation of this file.
00001 /*
00002  * VisibilityIteratorAsync.h
00003  *
00004  *  Created on: Nov 1, 2010
00005  *      Author: jjacobs
00006  */
00007 
00008 #if ! defined (VisibilityIteratorAsync_H_)
00009 #define VisibilityIteratorAsync_H_
00010 
00011 #include <set>
00012 using std::set;
00013 
00014 #include <synthesis/MSVis/VisibilityIteratorImpl.h>
00015 #include <synthesis/MSVis/AsynchronousInterface.h>
00016 #include "UtilJ.h"
00017 
00018 #define NotImplementedROVIA throw utilj::AipsErrorTrace (String ("Method not legal in ROVIA: ") + __PRETTY_FUNCTION__, __FILE__, __LINE__)
00019 #define NotPrefetched throw utilj::AipsErrorTrace (String ("Column not prefetched for async I/O: ") + __PRETTY_FUNCTION__, __FILE__, __LINE__)
00020 
00021 namespace casa {
00022 
00023 
00024 class VisBufferAsync;
00025 class VisBufferAsyncWrapper;
00026 
00027 namespace asyncio {
00028 
00029 class AsynchronousInterface;
00030 class VlaDatum;
00031 class VlaData;
00032 class VLAT;
00033 
00034 } // end namespace asyncio
00035 
00036 class ViReadImplAsync : public VisibilityIteratorReadImpl {
00037     // This needs to be changed back to ROVisibilityIterator at some point
00038     // after feasibility testing
00039 
00040     friend class Rovia_Test;
00041     friend class ROVisIterator;
00042     friend class VisIterator;
00043     friend class ViWriteImplAsync;
00044 
00045 public:
00046 
00047     typedef casa::asyncio::PrefetchColumns PrefetchColumns;
00048 
00049 //    static VisibilityIteratorReadImpl *
00050 //    create (const ROVisibilityIterator &,
00051 //            const PrefetchColumns & prefetchColumns,
00052 //            Int nReadAheadBuffers = -1);
00053 
00054     ViReadImplAsync (const Block<MeasurementSet> & mss,
00055                      const PrefetchColumns & prefetchColumns,
00056                      const Block<Int> & sortColumns,
00057                      const Bool addDefaultSortCols,
00058                      Double timeInterval,
00059                      Bool writable);
00060 
00061     ViReadImplAsync (const PrefetchColumns & prefetchColumns,
00062                      const VisibilityIteratorReadImpl & other,
00063                      Bool writable);
00064 
00065 
00066     ~ViReadImplAsync ();
00067 
00068     VisibilityIteratorReadImpl * clone () const;
00069 
00070     void attachVisBuffer (VisBuffer & vb);
00071     void detachVisBuffer (VisBuffer & vb);
00072     void getChannelSelection(Block< Vector<Int> >& ,
00073                              Block< Vector<Int> >& ,
00074                              Block< Vector<Int> >& ,
00075                              Block< Vector<Int> >& ,
00076                              Block< Vector<Int> >& );
00077     PrefetchColumns getPrefetchColumns () const;
00078     VisBuffer * getVisBuffer ();
00079 
00080 
00081 //    Int getDataDescriptionId () const;
00082 //    const MeasurementSet & getMeasurementSet();
00083 //    const Int getMeasurementSetId ();
00084 //    Int getNAntennas () const;
00085     MEpoch getEpoch () const;
00086 //    Vector<Float> getReceptor0Angle ();
00087 
00089 
00090     bool more () const;
00091     bool moreChunks () const;
00092     ViReadImplAsync & nextChunk ();
00093     void origin ();
00094     void originChunks ();
00095     virtual void advance ();
00096 
00097     void setPrefetchColumns (const PrefetchColumns & prefetchColumns);
00098 
00099     VisibilityIteratorReadImpl& selectChannel(Int nGroup=1,
00100                                         Int start=0,
00101                                         Int width=0,
00102                                         Int increment=1,
00103                                         Int spectralWindow=-1);
00104 
00105     VisibilityIteratorReadImpl& selectChannel(const Block< Vector<Int> >& blockNGroup,
00106                                         const Block< Vector<Int> >& blockStart,
00107                                         const Block< Vector<Int> >& blockWidth,
00108                                         const Block< Vector<Int> >& blockIncr,
00109                                         const Block< Vector<Int> >& blockSpw);
00110 
00111 
00112     VisibilityIteratorReadImpl&
00113     selectVelocity(Int ,
00114                    const MVRadialVelocity& ,
00115                    const MVRadialVelocity& ,
00116                    MRadialVelocity::Types ,
00117                    MDoppler::Types , Bool );
00118     void setInterval(Double timeInterval);
00119     void setRowBlocking(Int nRow);
00120 
00121     // These functions generate a list of the IDs (from PrefetchColumnIDs enum)
00122     // of the columns to prefetch.  For the variable arg calls, terminate with a
00123     // -1.
00124 
00125     static int getDefaultNBuffers ();
00126 
00127     // The functions below make no sense (at first glance) for asynchronous operation and are implemented
00128     // to throw an AipsError if called.  ROVIA is designed to have all the data accessed through the
00129     // associated VisBufferAsync.  Any method which tries to access data through the ROVIA makes no
00130     // sense.  Also anything that tries to change the characteristics of underlying ROVI is not currently
00131     // permitted.  During integration some methods may be found to be more important to the use of ROVIA
00132     // and a way may be found to implement them in ROVIA.
00133 
00134     Bool allBeamOffsetsZero() const; // { NotPrefetched; }
00135     void allSelectedSpectralWindows(Vector<Int>& , Vector<Int>& ) { NotPrefetched; }
00136     Vector<Int>& antenna1(Vector<Int>& ) const { NotPrefetched; }
00137     Vector<Int>& antenna2(Vector<Int>& ) const { NotPrefetched; }
00138     const Vector<String>& antennaMounts() const; // { NotPrefetched; }
00139     Vector<MDirection> azel(Double ) const { NotImplementedROVIA; }
00140     MDirection azel0(Double ) const { NotImplementedROVIA; }
00141     Vector<Int>& channel(Vector<Int>& ) const { NotPrefetched; }
00142     Int channelGroupSize() const { NotImplementedROVIA; }
00143     Int channelIndex() const { NotImplementedROVIA; }
00144     Vector<SquareMatrix<Complex,2> >& CJones(Vector<SquareMatrix<Complex,2> >& ) const { NotPrefetched; }
00145     Vector<Int>& corrType(Vector<Int>& ) const { NotPrefetched; }
00146     Int dataDescriptionId() const { NotPrefetched; }
00147     Bool existsWeightSpectrum() const { NotImplementedROVIA; }
00148     Vector<Double>& exposure(Vector<Double>& /*expo*/) const { NotPrefetched; }
00149     Vector<Int>& feed1(Vector<Int>& ) const { NotPrefetched; }
00150     Vector<Int>& feed2(Vector<Int>& ) const { NotPrefetched; }
00151     //Vector<Float> feed_pa(Double ) const { NotImplementedROVIA; }
00152     Int fieldId() const { NotPrefetched; }
00153     Array<Bool>& flagCategory(Array<Bool>& /*flagCategories*/) const { NotPrefetched; }
00154     Cube<Float>& floatData(Cube<Float>& /*fcube*/) const { NotPrefetched; }
00155     void getFloatDataColumn (Cube<Float>& /*data*/) const { NotImplementedROVIA; }
00156     void getFloatDataColumn(const Slicer& /*slicer*/, Cube<Float>& /*data*/) const { NotImplementedROVIA; }
00157     void getInterpolatedFloatDataFlagWeight() const { NotImplementedROVIA; }
00158     void getInterpolatedVisFlagWeight(DataColumn /*whichOne*/) const { NotImplementedROVIA; }
00159     Int arrayId() const { NotPrefetched; }
00160     String fieldName() const { NotImplementedROVIA; }
00161     String sourceName() const { NotImplementedROVIA; }
00162     Cube<Bool>& flag(Cube<Bool>& ) const { NotPrefetched; }
00163     Matrix<Bool>& flag(Matrix<Bool>& ) const { NotPrefetched; }
00164     Vector<Bool>& flagRow(Vector<Bool>& ) const { NotPrefetched; }
00165     Vector<Double>& frequency(Vector<Double>& ) const { NotPrefetched; }
00166     const Cube<RigidVector<Double, 2> >& getBeamOffsets() const;// { NotImplementedROVIA; }
00167     Int getDataDescriptionId () const { NotPrefetched; }
00168     MEpoch getMEpoch () const { NotImplementedROVIA; }
00169     const MeasurementSet & getMeasurementSet() const { NotImplementedROVIA; }
00170     Int getMeasurementSetId() const { NotImplementedROVIA; }
00171     Int getNAntennas () const { NotImplementedROVIA; }
00172     Vector<Float> getReceptor0Angle () { NotImplementedROVIA; }
00173     Vector<uInt> getRowIds () const { NotImplementedROVIA; }
00174     Double hourang(Double ) const { NotImplementedROVIA; }
00175     Vector<Double>& lsrFrequency(Vector<Double>& ) const { NotImplementedROVIA; }
00176     void lsrFrequency(const Int& , Vector<Double>& , Bool& ) { NotImplementedROVIA; }
00177     const MeasurementSet& ms() const  { NotImplementedROVIA; }
00178     const ROMSColumns& msColumns() const;
00179     Int msId() const;
00180     Int nCorr() const { NotPrefetched; }
00181     Int nRow() const { NotPrefetched; }
00182     Int nRowChunk() const;
00183     Int nSubInterval() const { NotImplementedROVIA; }
00184     Bool newArrayId() const { NotImplementedROVIA; }
00185     Bool newFieldId() const { NotImplementedROVIA; }
00186     Bool newMS() const   { return msIter_p.more();}
00187     Bool newSpectralWindow() const { NotImplementedROVIA; }
00188     Int numberCoh() { NotPrefetched; }
00189     Int numberDDId() { NotPrefetched; }
00190     Int numberPol() { NotPrefetched; }
00191     Int numberSpw();
00192     Vector<Int>& observationId(Vector<Int>& /*obsIDs*/) const { NotPrefetched; }
00193     Vector<Float> parang(Double ) const { NotImplementedROVIA; }
00194     const Float& parang0(Double ) const { NotImplementedROVIA; }
00195     const MDirection& phaseCenter() const { NotPrefetched; }
00196     Int polFrame() const  { NotPrefetched; }
00197     Vector<Int>& processorId(Vector<Int>& /*procIDs*/) const { NotPrefetched; }
00198     Int polarizationId() const { NotPrefetched; }
00199     const Cube<Double>& receptorAngles() const; // { NotImplementedROVIA; }
00200     Vector<uInt>& rowIds(Vector<uInt>& ) const { NotImplementedROVIA; }
00201     Vector<Int>& scan(Vector<Int>& ) const { NotPrefetched; }
00202     Vector<Float>& sigma(Vector<Float>& ) const { NotPrefetched; }
00203     Matrix<Float>& sigmaMat(Matrix<Float>& ) const { NotPrefetched; }
00204     void slurp() const { NotImplementedROVIA; }
00205     Int spectralWindow() const { NotPrefetched; }
00206     Vector<Int>& stateId(Vector<Int>& /*stateIds*/) const { NotPrefetched; }
00207     Vector<Double>& time(Vector<Double>& ) const { NotPrefetched; }
00208     Vector<Double>& timeCentroid(Vector<Double>& /*t*/) const { NotPrefetched; }
00209     Vector<Double>& timeInterval(Vector<Double>& ) const { NotPrefetched; }
00210     Vector<RigidVector<Double,3> >& uvw(Vector<RigidVector<Double,3> >& ) const { NotPrefetched; }
00211     Matrix<Double>& uvwMat(Matrix<Double>& ) const { NotPrefetched; }
00212     VisibilityIteratorReadImpl& velInterpolation(const String& ) { NotImplementedROVIA; }
00213     Cube<Complex>& visibility(Cube<Complex>& , DataColumn ) const { NotPrefetched; }
00214     Matrix<CStokesVector>& visibility(Matrix<CStokesVector>& , DataColumn ) const { NotPrefetched; }
00215     IPosition visibilityShape() const { NotImplementedROVIA; }
00216     Vector<Float>& weight(Vector<Float>& ) const { NotPrefetched; }
00217     Matrix<Float>& weightMat(Matrix<Float>& ) const { NotPrefetched; }
00218     Cube<Float>& weightSpectrum(Cube<Float>& ) const { NotPrefetched; }
00219 
00220     static String prefetchColumnName (Int id); // for debug only
00221 
00222 protected:
00223 
00224     // Use the factory method "create" instead of calling the constructor
00225     // directly.  This allows disabling the feature.
00226 
00227 //    ViReadImplAsync (const MeasurementSet & ms,
00228 //                     const PrefetchColumns & prefetchColumns,
00229 //                     const Block<Int> & sortColumns,
00230 //                     Double timeInterval=0,
00231 //                     Int nReadAheadBuffers = 2);
00232 //    ViReadImplAsync (const MeasurementSet & ms,
00233 //                     const PrefetchColumns & prefetchColumns,
00234 //                     const Block<Int> & sortColumns,
00235 //                     const Bool addDefaultSortCols,
00236 //                     Double timeInterval=0,
00237 //                     Int nReadAheadBuffers = 2);
00238 //
00239 //    // Same as previous constructor, but with multiple MSs to iterate over.
00240 //
00241 //    ViReadImplAsync (const Block<MeasurementSet> & mss,
00242 //                     const PrefetchColumns & prefetchColumns,
00243 //                     const Block<Int> & sortColumns,
00244 //                     Double timeInterval=0,
00245 //                     Int nReadAheadBuffers = 2);
00246 
00247 
00248     ViReadImplAsync (const ROVisibilityIterator & rovi,
00249                      const PrefetchColumns & prefetchColumns,
00250                      Int nReadAheadBuffers = -1);
00251 
00252     PrefetchColumns augmentPrefetchColumns (const PrefetchColumns & prefetchColumnsBase);
00253     void construct(const Block<MeasurementSet> & mss,
00254                    const PrefetchColumns & prefetchColumns,
00255                    const Block<Int> & sortColumns,
00256                    const Bool addDefaultSortCols,
00257                    Double timeInterval,
00258                    Bool writable);
00259 
00260 
00261     void fillVisBuffer();
00262     const MeasurementSet & getMs() const;
00263     void readComplete ();
00264     void saveMss (const Block<MeasurementSet> & mss);
00265     void saveMss (const MeasurementSet & ms);
00266     //void startVlat ();
00267 
00268 private:
00269 
00270     asyncio::AsynchronousInterface * interface_p; // [own]
00271     Int                              msId_p;
00272     PrefetchColumns                  prefetchColumns_p;
00273     Stack<VisBufferAsyncWrapper *>   vbaWrapperStack_p;
00274     VisBufferAsync *                 visBufferAsync_p;
00275     asyncio::VlaData *               vlaData_p; // [use]
00276     asyncio::VLAT *                  vlat_p; // [use]
00277 
00278     void dumpPrefetchColumns () const;
00279     void updateMsd ();
00280 
00281     ViReadImplAsync (const ViReadImplAsync & MSI);
00282     ViReadImplAsync & operator=(const ViReadImplAsync &MSI);
00283 
00284 }; // end class ViReadImplAsync
00285 
00286 namespace asyncio {
00287     class WriteData;
00288 } // end namespace asyncio
00289 
00290 class ViWriteImplAsync : public VisibilityIteratorWriteImpl {
00291 
00292 public:
00293 
00294     typedef casa::asyncio::PrefetchColumns PrefetchColumns;
00295 
00296     ViWriteImplAsync (VisibilityIterator * vi);
00297     ViWriteImplAsync (const PrefetchColumns & prefetchColumns,
00298                       const VisibilityIteratorWriteImpl & other,
00299                       VisibilityIterator * vi);
00300 
00301     ~ViWriteImplAsync ();
00302 
00303     VisibilityIteratorWriteImpl * clone () const;
00304 
00305     void putModel(const RecordInterface& rec, Bool iscomponentlist=True, Bool incremental=False);
00306 
00307 
00308     // Set/modify the flags in the data.
00309     // This will flag all channels in the original data that contributed to
00310     // the output channel in the case of channel averaging.
00311     // All polarizations have the same flag value.
00312     void setFlag(const Matrix<Bool>& flag);
00313     // Set/modify the flags in the data.
00314     // This sets the flags as found in the MS, Cube(npol,nchan,nrow),
00315     // where nrow is the number of rows in the current iteration (given by
00316     // nRow()).
00317     void setFlag(const Cube<Bool>& flag);
00318 
00319     void setFlagCategory (const Array<Bool> & flagCategory);
00320 
00321     // Set/modify the flag row column; dimension Vector(nrow)
00322     void setFlagRow(const Vector<Bool>& rowflags);
00323     // Set/modify the visibilities.
00324     // This is possibly only for a 'reference' MS which has a new DATA column.
00325     // The first axis of the matrix should equal the selected number of channels
00326     // in the original MS.
00327     // If the MS does not contain all polarizations, only the parallel
00328     // hand polarizations are used.
00329     void setVis(const Matrix<CStokesVector>& vis, DataColumn whichOne);
00330     // Set/modify the visibilities
00331     // This sets the data as found in the MS, Cube(npol,nchan,nrow).
00332     void setVis(const Cube<Complex>& vis, DataColumn whichOne);
00333     // Set the visibility and flags, and interpolate from velocities if needed
00334     void setVisAndFlag(const Cube<Complex>& vis, const Cube<Bool>& flag,
00335                        DataColumn whichOne);
00336     // Set/modify the weights
00337     void setWeight(const Vector<Float>& wt);
00338     // Set/modify the weightMat
00339     void setWeightMat(const Matrix<Float>& wtmat);
00340     // Set/modify the weightSpectrum
00341     void setWeightSpectrum(const Cube<Float>& wtsp);
00342     // Set/modify the Sigma
00343     void setSigma(const Vector<Float>& sig);
00344     // Set/modify the ncorr x nrow SigmaMat.
00345     void setSigmaMat(const Matrix<Float>& sigmat);
00346 
00347 protected:
00348 
00349     ViReadImplAsync * getReadImpl();
00350     void queueWriteData (const asyncio::WriteData & data);
00351 
00352 private:
00353 
00354 
00355 }; // end class ViWriteImplAsync
00356 
00357 
00358 typedef ViReadImplAsync ROVIA;
00359 
00360 } // end namespace casa
00361 
00362 #endif // ViReadImplAsync