casa
$Rev:20696$
|
00001 //# VLAT.h: Visibility lookahead concurrency definitions (classes VlaDatum, VlaData, VLAT) 00002 //# Copyright (C) 2011 00003 //# Associated Universities, Inc. Washington DC, USA. 00004 //# 00005 //# This library is free software; you can redistribute it and/or modify it 00006 //# under the terms of the GNU Library General Public License as published by 00007 //# the Free Software Foundation; either version 2 of the License, or (at your 00008 //# option) any later version. 00009 //# 00010 //# This library is distributed in the hope that it will be useful, but WITHOUT 00011 //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 //# License for more details. 00014 //# 00015 //# You should have received a copy of the GNU Library General Public License 00016 //# along with this library; if not, write to the Free Software Foundation, 00017 //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA. 00018 //# 00019 //# Correspondence concerning CASA should be addressed as follows: 00020 //# Internet email: CASA-request@nrao.edu. 00021 //# Postal address: CASA Project Office 00022 //# National Radio Astronomy Observatory 00023 //# 520 Edgemont Road 00024 //# Charlottesville, VA 22903-2475 USA 00025 //# 00026 //# 00027 //# $Id$ 00028 00029 #ifndef VLAT_H_ 00030 #define VLAT_H_ 00031 00032 #include "AsynchronousTools.h" 00033 #include "UtilJ.h" 00034 00035 #include <boost/tuple/tuple.hpp> 00036 #include <synthesis/MSVis/AsynchronousInterface.h> 00037 #include <synthesis/MSVis/VisBuffer.h> 00038 #include <synthesis/MSVis/VisBufferAsync.h> 00039 #include <synthesis/MSVis/VisibilityIterator.h> 00040 #include <synthesis/MSVis/VisibilityIteratorImplAsync.h> 00041 00042 #include <map> 00043 00044 //using namespace casa::async; 00045 using casa::asyncio::RoviaModifiers; 00046 using casa::utilj::ThreadTimes; 00047 using casa::utilj::DeltaThreadTimes; 00048 00049 namespace casa { 00050 00051 template<typename T> class Block; 00052 class MeasurementSet; 00053 class VisBuffer; 00054 00055 namespace asyncio { 00056 00057 class AsynchronousInterface; 00058 class InterfaceController; 00059 00060 // VlatFunctor is an abstract class for functor objects used to encapsulate the various 00061 // filling methods (e.g., fillVis, fillAnt1, etc.). This allows the various functions 00062 // to be put into a list of fill methods that are used by the VLAT everytime the VLAT's 00063 // visibliity iterator is advanced. There are two subclasses VlatFunctor0 and VlatFunctor1 00064 // which support nullar and unary fill methods. The fillers for visibility-related data 00065 // (e.g., fillVis and fillVisCube) take a parameter to indicate which sort of visibility 00066 // (e.g., actual, model, corrected) is to be filled. 00067 00068 class VlatFunctor { 00069 00070 public: 00071 00072 00073 VlatFunctor (const String & name, Int precedence = 0) 00074 : id_p (VisBufferComponents::N_VisBufferComponents), name_p (name), precedence_p (precedence) 00075 {} 00076 VlatFunctor (Int precedence = 0) 00077 : id_p (VisBufferComponents::N_VisBufferComponents), name_p ("NotSpecified"), precedence_p (precedence) 00078 {} 00079 virtual ~VlatFunctor () {} 00080 00081 virtual void operator() (VisBuffer *); // Throws an error if not overridden 00082 virtual VlatFunctor * clone () { return new VlatFunctor (* this);} 00083 00084 VisBufferComponents::EnumType getId () const { return id_p;} 00085 void setId (VisBufferComponents::EnumType id) { id_p = id;} 00086 void setPrecedence (Int precedence) { precedence_p = precedence; } 00087 00088 static Bool byDecreasingPrecedence (const VlatFunctor * a, const VlatFunctor * b) 00089 { // First by increasing precedence and then by decreasing id (make deterministic) 00090 Bool result = (a->precedence_p > b->precedence_p) || 00091 (a->precedence_p == b->precedence_p && a->id_p < b->id_p); 00092 return result; 00093 } 00094 private: 00095 00096 VisBufferComponents::EnumType id_p; 00097 String name_p; 00098 Int precedence_p; 00099 00100 }; 00101 00102 template <typename Ret, typename VbType> 00103 class VlatFunctor0 : public VlatFunctor { 00104 00105 public: 00106 00107 typedef Ret (VbType::* Nullary) (); 00108 00109 VlatFunctor0 (Nullary nullary, Int precedence = 0) : VlatFunctor (precedence), f_p (nullary) {} 00110 virtual ~VlatFunctor0 () {} 00111 00112 void operator() (VisBuffer * c) { (dynamic_cast<VbType *> (c)->*f_p)(); } 00113 virtual VlatFunctor * clone () { return new VlatFunctor0 (* this); } 00114 00115 private: 00116 00117 Nullary f_p; 00118 }; 00119 00120 template <typename Ret, typename VbType> 00121 VlatFunctor0<Ret, VbType> * vlatFunctor0 (Ret (VbType::* f) ()) 00122 { return new VlatFunctor0<Ret, VbType> (f);} 00123 00124 template <typename Ret, typename Arg> 00125 class VlatFunctor1 : public VlatFunctor { 00126 00127 public: 00128 00129 typedef Ret (VisBuffer::* Unary) (Arg); 00130 00131 VlatFunctor1 (Unary unary, Arg arg, Int precedence = 0) : VlatFunctor (precedence) { f_p = unary; arg_p = arg;} 00132 virtual ~VlatFunctor1 () {} 00133 00134 void operator() (VisBuffer * c) { (c->*f_p)(arg_p); } 00135 virtual VlatFunctor * clone () { return new VlatFunctor1 (* this); } 00136 00137 private: 00138 00139 Unary f_p; 00140 Arg arg_p; 00141 }; 00142 00143 template <typename Ret, typename Arg> 00144 VlatFunctor1<Ret, Arg> * vlatFunctor1 (Ret (VisBuffer::* f) (Arg), Arg i) 00145 { return new VlatFunctor1<Ret, Arg> (f, i);} 00146 00147 // <summary> 00148 // VLAT is the Visibility LookAhead Thread. This thread advances a visibility iterator 00149 // and fills the data indicated by the visibility iterator into the VlaData buffer ring. 00150 // </summary> 00151 00152 // <use visibility=local> 00153 00154 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos=""> 00155 // </reviewed> 00156 00157 // <prerequisite> 00158 // <li> VisBuffer 00159 // <li> VisBufferAsync 00160 // <li> ROVisibilityIteratorAsync 00161 // <li> VlaData 00162 // <li> VLAT 00163 // </prerequisite> 00164 // 00165 // <etymology> 00166 // VLAT is the Visibility LookAhead Thread. It is not related to the more common NRAO 00167 // acronym VLA. 00168 // </etymology> 00169 // 00170 // <synopsis> 00171 // 00172 // The VLAT is a thread object that buffers up data from successive visibility iterator positions 00173 // in a MeasurementSet. It is part of the backend to a ROVisibilityIteratorAsync (ROVIA) 00174 // object used by the main thread to replace the normal, synchronous ROVisibilityIterator. 00175 // When the user creates a ROVIA object the information normally used to create a ROVisibilityIterator 00176 // is passed to the VLAT which uses it to create a ROVisibilityIterator local to itself. The VLAT then 00177 // uses this ROVisibilityIterator to fill buffers in the VlaData-managed buffer ring (this interaction 00178 // is described in VlaData). Filling consists of attaching VLAT's ROVisibilityIterator to the 00179 // VisBufferAsync object associated with a buffer and then calling the fill operations for the data 00180 // items (e.g., visCube, Ant1, etc.) which the user has requested be prefetched. The fill operations 00181 // will likely result in synchronous I/O operations being performed by the column access methods 00182 // related to the Table system (memory-resident tables, columns, etc., may be able to satisfy a fill 00183 // operation without performing I/O). 00184 // 00185 // The thread may be terminated by calling the terminate method. This will cause the VLAT to terminate 00186 // when it notices the termination request. The termination may not be immediate since the VLAT may 00187 // be engaged in a syncrhonous I/O operation and is uanble to detect the termination request until 00188 // that I/O has completed. 00189 // 00190 // Normally the VLAT sweeps the VI to the end of the measurement set and then awaits further instructions. 00191 // The main thread may stop the sweep early by calling VlaData::terminateSweep which will eventually be 00192 // detected by the VLAT and result in a coordinated reset of the sweep. When the sweep reset is applied 00193 // the VLAT will also detect visibility iterator modification requests (e.g., setRowBlocking, selectChannel, 00194 // setInterval, etc.) that were queued up in VlaData; for the set of available VI modification requests 00195 // supported see ROVisibilityIteratorAsync. 00196 // 00197 // </synopsis> 00198 // 00199 // <example> 00200 // </example> 00201 // 00202 // <motivation> 00203 // </motivation> 00204 // 00205 // <thrown> 00206 // <li> AipsError for unrecoverable errors. These will not be caught (in C++ anyway) and cause 00207 // application termination. 00208 // </thrown> 00209 // 00210 // <todo asof="yyyy/mm/dd"> 00211 // </todo> 00212 00213 class VLAT : public casa::async::Thread { 00214 00215 public: 00216 00217 VLAT (AsynchronousInterface *); 00218 ~VLAT (); 00219 00220 void clearFillTerminationRequest (); 00221 void initialize (const ROVisibilityIterator & rovi); 00222 void initialize (const Block<MeasurementSet> & mss, 00223 const Block<Int> & sortColumns, 00224 Bool addDefaultSortCols, 00225 Double timeInterval, 00226 Bool writable); 00227 Bool isTerminated () const; 00228 void setModifiers (RoviaModifiers & modifiers); 00229 void setPrefetchColumns (const PrefetchColumns & prefetchColumns); 00230 void requestSweepTermination (); 00231 void terminate (); 00232 00233 protected: 00234 00235 class FillerDictionary : public std::map<VisBufferComponents::EnumType, VlatFunctor *> { 00236 00237 public: 00238 00239 void add (VisBufferComponents::EnumType id, VlatFunctor * f) 00240 { 00241 f->setId (id); 00242 assert (find(id) == end()); // shouldn't already have one for this ID 00243 (* this)[id] = f; 00244 } 00245 00246 //void setPrecedences (const FillerDependencies & dependencies); 00247 }; 00248 typedef vector<VlatFunctor *> Fillers; 00249 00250 void applyModifiers (ROVisibilityIterator * rovi, VisibilityIterator * vi); 00251 void alignWriteIterator (SubChunkPair subchunk); 00252 void checkFiller (VisBufferComponents::EnumType id); 00253 void createFillerDictionary (); 00254 void fillDatum (VlaDatum * datum); 00255 void fillDatumMiscellanyAfter (VlaDatum * datum); 00256 void fillDatumMiscellanyBefore (VlaDatum * datum); 00257 void fillLsrInfo (VlaDatum * datum); 00258 void flushWrittenData (); 00259 void handleWrite (); 00260 void * run (); 00261 Bool sweepTerminationRequested () const; 00262 void sweepVi (); 00263 void throwIfSweepTerminated (); 00264 Bool waitForViReset (); 00265 void waitUntilFillCanStart (); 00266 00267 private: 00268 00269 class SweepTerminated : public std::exception {}; 00270 00271 // class NullaryPredicate { 00272 // public: 00273 // 00274 // virtual ~NullaryPredicate () {} 00275 // virtual Bool operator () () const = 0; 00276 // }; 00277 // 00278 // class FillCanStartOrSweepTermination : public NullaryPredicate { 00279 // 00280 // public: 00281 // 00282 // FillCanStartOrSweepTermination (VlaData * vlaData, AsynchronousInterface * interface) 00283 // : interface_p (interface), 00284 // vlaData_p (vlaData) 00285 // {} 00286 // 00287 // Bool operator() () const 00288 // { 00289 // return vlaData_p->fillCanStart () || interface_p->isSweepTerminationRequested (); 00290 // } 00291 // 00292 // private: 00293 // 00294 // AsynchronousInterface * interface_p; 00295 // VlaData * vlaData_p; 00296 // }; 00297 // 00298 // class ViResetOrLookaheadTermination : public NullaryPredicate { 00299 // 00300 // public: 00301 // 00302 // ViResetOrLookaheadTermination (AsynchronousInterface * interface) 00303 // : interface_p (interface) 00304 // {} 00305 // 00306 // Bool operator() () const 00307 // { 00308 // Bool viWasReset_p = interface_p->viResetRequested; 00309 // 00310 // return viWasReset_p || interface_p->isLookaheadTerminationRequested (); 00311 // } 00312 // 00313 // private: 00314 // 00315 // AsynchronousInterface * interface_p; 00316 // }; 00317 00318 const InterfaceController * controller_p; // [use] 00319 FillerDictionary fillerDictionary_p; 00320 Fillers fillers_p; 00321 AsynchronousInterface * interface_p; // [use] 00322 Block<MeasurementSet> measurementSets_p; 00323 SubChunkPair readSubchunk_p; 00324 RoviaModifiers roviaModifiers_p; 00325 volatile Bool sweepTerminationRequested_p; 00326 Bool threadTerminated_p; 00327 ROVisibilityIterator * visibilityIterator_p; // [own] 00328 VlaData * vlaData_p; // [use] 00329 VisibilityIterator * writeIterator_p; // [own] 00330 SubChunkPair writeSubchunk_p; 00331 00332 }; 00333 00334 class VlatAndData { 00335 00336 friend class ViReadImplAsync; 00337 00338 public: 00339 00340 protected: 00341 00342 VlatAndData (); 00343 ~VlatAndData (){} 00344 00345 private: 00346 00347 VlaData * vlaData_p; 00348 VLAT * vlat_p; 00349 }; 00350 00351 } // end namespace asyncio 00352 00353 } // end namespace casa 00354 00355 00356 00357 #endif /* VLAT_H_ */