Line data Source code
1 : //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/)
2 : //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All
3 : //# rights reserved.
4 : //# Copyright (C) European Southern Observatory, 2011, All rights reserved.
5 : //#
6 : //# This library is free software; you can redistribute it and/or
7 : //# modify it under the terms of the GNU Lesser General Public
8 : //# License as published by the Free software Foundation; either
9 : //# version 2.1 of the License, or (at your option) any later version.
10 : //#
11 : //# This library is distributed in the hope that it will be useful,
12 : //# but WITHOUT ANY WARRANTY, without even the implied warranty of
13 : //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : //# Lesser General Public License for more details.
15 : //#
16 : //# You should have received a copy of the GNU Lesser General Public
17 : //# License along with this library; if not, write to the Free Software
18 : //# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19 : //# MA 02111-1307 USA
20 :
21 : #include <mstransform/TVI/StatWtFloatingWindowDataAggregator.h>
22 :
23 : #include <casacore/scimath/StatsFramework/ClassicalStatistics.h>
24 :
25 : #include <msvis/MSVis/ViImplementation2.h>
26 : #include <mstransform/TVI/StatWtTypes.h>
27 :
28 : #ifdef _OPENMP
29 : #include <omp.h>
30 : #endif
31 :
32 : using namespace casacore;
33 : using namespace std;
34 :
35 : namespace casa {
36 :
37 : namespace vi {
38 :
39 34 : StatWtFloatingWindowDataAggregator::StatWtFloatingWindowDataAggregator(
40 : ViImplementation2 *const vii,
41 : const std::map<casacore::Int, std::vector<StatWtTypes::ChanBin>>& chanBins,
42 : std::shared_ptr<map<uInt, pair<uInt, uInt>>>& samples,
43 : StatWtTypes::Column column, Bool noModel,
44 : const map<uInt, Cube<Bool>>& chanSelFlags, Bool combineCorr,
45 : shared_ptr<
46 : ClassicalStatistics<
47 : Double, Array<Float>::const_iterator,
48 : Array<Bool>::const_iterator
49 : >
50 : >& wtStats,
51 : shared_ptr<const pair<Double, Double>> wtrange,
52 : shared_ptr<const Double> binWidthInSeconds,
53 : shared_ptr<const Int> nTimeStampsInBin, const Bool timeBlockProcessing,
54 : shared_ptr<
55 : StatisticsAlgorithm<
56 : Double, Array<Float>::const_iterator,
57 : Array<Bool>::const_iterator, Array<Double>::const_iterator
58 : >
59 : >& statAlg, Int minSamp
60 34 : ) : StatWtDataAggregator(
61 : vii, chanBins, samples, column, noModel, chanSelFlags, /* mustComputeWtSp,*/
62 : wtStats, wtrange, combineCorr, statAlg, minSamp
63 : ), _binWidthInSeconds(binWidthInSeconds),
64 : _nTimeStampsInBin(nTimeStampsInBin),
65 34 : _timeBlockProcessing(timeBlockProcessing) {
66 34 : ThrowIf(
67 : ! (_binWidthInSeconds || _nTimeStampsInBin ),
68 : "Logic error: neither binWidthInSeconds "
69 : "nor nTimeStampsInBin has been specified"
70 : );
71 34 : }
72 :
73 68 : StatWtFloatingWindowDataAggregator::~StatWtFloatingWindowDataAggregator() {}
74 :
75 251 : void StatWtFloatingWindowDataAggregator::aggregate() {
76 251 : auto* vb = _vii->getVisBuffer();
77 : // map of rowID (the index of the vector) to rowIDs that should be used to
78 : // compute the weight for the key rowID
79 251 : std::vector<std::set<uInt>> rowMap;
80 251 : auto firstTime = True;
81 : // each subchunk is guaranteed to represent exactly one time stamp
82 251 : std::vector<Double> subChunkToTimeStamp;
83 : // Baseline-subchunk number pair to row index in that chunk
84 251 : std::map<std::pair<StatWtTypes::Baseline, uInt>, uInt> baselineSubChunkToIndex;
85 : // we build up the chunk data in the chunkData and chunkFlags cubes
86 251 : Cube<Complex> chunkData;
87 251 : Cube<Bool> chunkFlags;
88 251 : std::vector<Double> exposures;
89 251 : rownr_t subchunkStartRowNum = 0;
90 251 : auto initChanSelTemplate = True;
91 251 : Cube<Bool> chanSelFlagTemplate, chanSelFlags;
92 : // we cannot know the spw until inside the subchunk loop
93 251 : Int spw = -1;
94 251 : _rowIDInMSToRowIndexInChunk.clear();
95 502 : Slicer sl(IPosition(3, 0), IPosition(3, 1));
96 251 : auto slStart = sl.start();
97 251 : auto slEnd = sl.end();
98 251 : std::vector<std::pair<uInt, uInt>> idToChunksNeededByIDMap,
99 251 : chunkNeededToIDsThatNeedChunkIDMap;
100 251 : _limits(idToChunksNeededByIDMap, chunkNeededToIDsThatNeedChunkIDMap);
101 251 : uInt subChunkID = 0;
102 2450 : for (_vii->origin(); _vii->more(); _vii->next(), ++subChunkID) {
103 2233 : if (_checkFirstSubChunk(spw, firstTime, vb)) {
104 34 : return;
105 : }
106 2199 : if (! _mustComputeWtSp) {
107 54 : _mustComputeWtSp.reset(
108 : new Bool(
109 27 : vb->existsColumn(VisBufferComponent2::WeightSpectrum)
110 27 : )
111 : );
112 : }
113 2199 : _rowIDInMSToRowIndexInChunk[*vb->rowIds().begin()] = subchunkStartRowNum;
114 2199 : const auto& ant1 = vb->antenna1();
115 2199 : const auto& ant2 = vb->antenna2();
116 : // [nCorrs, nFreqs, nRows)
117 2199 : const auto nrows = vb->nRows();
118 : // there is no guarantee a previous subchunk will be included,
119 : // eg if the timewidth is small enough
120 : // This is the first subchunk ID that should be used for averaging
121 : // grouping data for weight computation of the current subchunk ID.
122 : const auto firstChunkNeededByCurrentID
123 2199 : = idToChunksNeededByIDMap[subChunkID].first;
124 : const auto lastChunkNeededByCurrentID
125 2199 : = idToChunksNeededByIDMap[subChunkID].second;
126 : const auto firstChunkThatNeedsCurrentID
127 2199 : = chunkNeededToIDsThatNeedChunkIDMap[subChunkID].first;
128 2199 : auto subchunkTime = vb->time()[0];
129 2199 : auto rowInChunk = subchunkStartRowNum;
130 2199 : pair<StatWtTypes::Baseline, uInt> mypair;
131 2199 : mypair.second = subChunkID;
132 107060 : for (rownr_t row=0; row<nrows; ++row, ++rowInChunk) {
133 : // loop over rows in sub chunk, grouping baseline specific data
134 : // together
135 104861 : const auto baseline = _baseline(ant1[row], ant2[row]);
136 104861 : mypair.first = baseline;
137 104861 : baselineSubChunkToIndex[mypair] = rowInChunk;
138 209722 : std::set<uInt> neededRowNums;
139 104861 : neededRowNums.insert(rowInChunk);
140 104861 : if (subChunkID > 0) {
141 93024 : auto s = min(
142 : firstChunkNeededByCurrentID, firstChunkThatNeedsCurrentID
143 : );
144 93024 : auto tpair = mypair;
145 132404 : for (; s < subChunkID; ++s) {
146 39380 : const auto myend = baselineSubChunkToIndex.end();
147 39380 : tpair.second = s;
148 39380 : const auto iter = baselineSubChunkToIndex.find(tpair);
149 39380 : auto found = iter != myend;
150 39380 : if (found) {
151 39380 : const auto existingRowNum = iter->second;
152 39380 : if (
153 : s >= firstChunkNeededByCurrentID
154 33715 : && s <= lastChunkNeededByCurrentID
155 : ) {
156 : // The subchunk data is needed for computation
157 : // of the current subchunkID's weights
158 33715 : neededRowNums.insert(existingRowNum);
159 : }
160 39380 : if (
161 39380 : idToChunksNeededByIDMap[s].first <= subChunkID
162 39380 : && idToChunksNeededByIDMap[s].second >= subChunkID
163 : ) {
164 32175 : rowMap[existingRowNum].insert(rowInChunk);
165 : }
166 : }
167 : }
168 : }
169 104861 : rowMap.push_back(neededRowNums);
170 : }
171 4398 : const auto dataCube = _dataCube(vb);
172 : const auto resultantFlags = _getResultantFlags(
173 : chanSelFlagTemplate, chanSelFlags, initChanSelTemplate,
174 2199 : spw, vb->flagCube()
175 4398 : );
176 4398 : const auto myExposures = vb->exposure().tovector();
177 : exposures.insert(
178 2199 : exposures.end(), myExposures.begin(), myExposures.end()
179 4398 : );
180 4398 : const auto cubeShape = dataCube.shape();
181 4398 : IPosition sliceStart(3, 0);
182 2199 : auto sliceEnd = cubeShape - 1;
183 : // build up chunkData and chunkFlags one subchunk at a time
184 2199 : if (chunkData.empty()) {
185 217 : chunkData = dataCube;
186 217 : chunkFlags = resultantFlags;
187 : }
188 : else {
189 1982 : auto newShape = chunkData.shape();
190 1982 : newShape[2] += nrows;
191 1982 : chunkData.resize(newShape, True);
192 1982 : chunkFlags.resize(newShape, True);
193 1982 : slStart[2] = subchunkStartRowNum;
194 1982 : sl.setStart(slStart);
195 1982 : slEnd = newShape - 1;
196 1982 : sl.setEnd(slEnd);
197 1982 : chunkData(sl) = dataCube;
198 1982 : chunkFlags(sl) = resultantFlags;
199 : }
200 2199 : subChunkToTimeStamp.push_back(subchunkTime);
201 2199 : subchunkStartRowNum += nrows;
202 : }
203 217 : _computeWeights(
204 434 : chunkData, chunkFlags, Vector<Double>(exposures), rowMap, spw
205 : );
206 : }
207 :
208 328 : void StatWtFloatingWindowDataAggregator::weightSingleChanBin(
209 : Matrix<Float>& wtmat, Int nrows
210 : ) const {
211 656 : Vector<rownr_t> rowIDs;
212 328 : _vii->getRowIds(rowIDs);
213 328 : const auto start = _rowIDInMSToRowIndexInChunk.find(*rowIDs.begin());
214 328 : ThrowIf(
215 : start == _rowIDInMSToRowIndexInChunk.end(),
216 : "Logic Error: Cannot find requested subchunk in stored chunk"
217 : );
218 : // this is the row index in the chunk
219 328 : auto chunkRowIndex = start->second;
220 328 : auto ncorr = wtmat.nrow();
221 2284 : for (Int i=0; i<nrows; ++i, ++chunkRowIndex) {
222 1956 : if (_combineCorr) {
223 0 : wtmat.column(i) = _weights(0, 0, chunkRowIndex);
224 : }
225 : else {
226 9780 : for (uInt corr=0; corr<ncorr; ++corr) {
227 7824 : wtmat(corr, i) = _weights(
228 : corr, 0, chunkRowIndex
229 7824 : );
230 : }
231 : }
232 : }
233 328 : }
234 :
235 217 : void StatWtFloatingWindowDataAggregator::_computeWeights(
236 : const Cube<Complex>& data, const Cube<Bool>& flags,
237 : const Vector<Double>& exposures, const std::vector<std::set<uInt>>& rowMap,
238 : uInt spw
239 : ) const {
240 217 : auto chunkShape = data.shape();
241 217 : const auto nActCorr = chunkShape[0];
242 217 : const auto ncorr = _combineCorr ? 1 : nActCorr;
243 217 : const auto& chanBins = _chanBins.find(spw)->second;
244 217 : _weights.resize(IPosition(3, ncorr, chanBins.size(), chunkShape[2]), False);
245 217 : const auto nRows = rowMap.size();
246 : #ifdef _OPENMP
247 217 : #pragma omp parallel for
248 : // cout << "DEBUG PARALLEL LOOPING IS OFF" << endl;
249 : #endif
250 :
251 : for (size_t iRow=0; iRow<nRows; ++iRow) {
252 : IPosition chunkSliceStart(3, 0);
253 : auto chunkSliceLength = chunkShape;
254 : chunkSliceLength[2] = 1;
255 : Slicer chunkSlice(
256 : chunkSliceStart, chunkSliceLength, Slicer::endIsLength
257 : );
258 : auto chunkSliceEnd = chunkSlice.end();
259 : auto appendingSlice = chunkSlice;
260 : auto appendingSliceStart = appendingSlice.start();
261 : auto appendingSliceEnd = appendingSlice.end();
262 : auto intraChunkSlice = appendingSlice;
263 : auto intraChunkSliceStart = intraChunkSlice.start();
264 : auto intraChunkSliceEnd = intraChunkSlice.end();
265 : intraChunkSliceEnd[0] = nActCorr - 1;
266 : const auto& rowsToInclude = rowMap[iRow];
267 : auto dataShape = chunkShape;
268 : dataShape[2] = rowsToInclude.size();
269 : Cube<Complex> dataArray(dataShape);
270 : Cube<Bool> flagArray(dataShape);
271 : auto siter = rowsToInclude.begin();
272 : auto send = rowsToInclude.end();
273 : Vector<Double> exposureVector(rowsToInclude.size(), 0);
274 : uInt n = 0;
275 : // create an array with only the rows that should
276 : // be used in the computation of weights for the
277 : // current row
278 : for (; siter!=send; ++siter, ++n) {
279 : exposureVector[n] = exposures[*siter];
280 : appendingSliceStart[2] = n;
281 : appendingSlice.setStart(appendingSliceStart);
282 : appendingSliceEnd[2] = n;
283 : appendingSlice.setEnd(appendingSliceEnd);
284 : chunkSliceStart[2] = *siter;
285 : chunkSlice.setStart(chunkSliceStart);
286 : chunkSliceEnd[2] = *siter;
287 : chunkSlice.setEnd(chunkSliceEnd);
288 : dataArray(appendingSlice) = data(chunkSlice);
289 : flagArray(appendingSlice) = flags(chunkSlice);
290 : }
291 : // slice up for correlations and channel binning
292 : intraChunkSliceEnd[2] = dataShape[2] - 1;
293 : for (uInt corr=0; corr<ncorr; ++corr) {
294 : if (! _combineCorr) {
295 : intraChunkSliceStart[0] = corr;
296 : intraChunkSliceEnd[0] = corr;
297 : }
298 : auto citer = chanBins.begin();
299 : auto cend = chanBins.end();
300 : auto iChanBin = 0;
301 : for (; citer!=cend; ++citer, ++iChanBin) {
302 : intraChunkSliceStart[1] = citer->start;
303 : intraChunkSliceEnd[1] = citer->end;
304 : intraChunkSlice.setStart(intraChunkSliceStart);
305 : intraChunkSlice.setEnd(intraChunkSliceEnd);
306 : _weights(corr, iChanBin, iRow)
307 : = _varianceComputer->computeWeight(
308 : dataArray(intraChunkSlice), flagArray(intraChunkSlice),
309 : exposureVector, spw, exposures[iRow]
310 : );
311 : }
312 : }
313 : }
314 217 : }
315 :
316 2199 : void StatWtFloatingWindowDataAggregator::weightSpectrumFlags(
317 : Cube<Float>& wtsp, Cube<Bool>& flagCube, Bool& checkFlags,
318 : const Vector<Int>& ant1, const Vector<Int>&,
319 : const Vector<Int>& spws, const Vector<Double>&,
320 : const Vector<rownr_t>& rowIDs
321 : ) const {
322 : // fish out the rows relevant to this subchunk
323 2199 : const auto start = _rowIDInMSToRowIndexInChunk.find(*rowIDs.begin());
324 2199 : ThrowIf(
325 : start == _rowIDInMSToRowIndexInChunk.end(),
326 : "Logic Error: Cannot find requested subchunk in stored chunk"
327 : );
328 : // this is the row index in the chunk
329 2199 : auto chunkRowIndex = start->second;
330 2199 : auto chunkRowEnd = chunkRowIndex + ant1.size();
331 4398 : Slicer slice(IPosition(3, 0), flagCube.shape(), Slicer::endIsLength);
332 4398 : auto sliceStart = slice.start();
333 4398 : auto sliceEnd = slice.end();
334 2199 : auto nCorrBins = _combineCorr ? 1 : flagCube.shape()[0];
335 2199 : auto spw = *spws.begin();
336 2199 : const auto& chanBins = _chanBins.find(spw)->second;
337 2199 : auto subChunkRowIndex = 0;
338 107060 : for (; chunkRowIndex < chunkRowEnd; ++chunkRowIndex, ++subChunkRowIndex) {
339 104861 : sliceStart[2] = subChunkRowIndex;
340 104861 : sliceEnd[2] = subChunkRowIndex;
341 104861 : auto iChanBin = 0;
342 371422 : for (const auto& chanBin: chanBins) {
343 266561 : sliceStart[1] = chanBin.start;
344 266561 : sliceEnd[1] = chanBin.end;
345 266561 : auto corr = 0;
346 694090 : for (; corr < nCorrBins; ++corr) {
347 427529 : if (! _combineCorr) {
348 318024 : sliceStart[0] = corr;
349 318024 : sliceEnd[0] = corr;
350 : }
351 427529 : slice.setStart(sliceStart);
352 427529 : slice.setEnd(sliceEnd);
353 855058 : _updateWtSpFlags(
354 : wtsp, flagCube, checkFlags, slice,
355 427529 : _weights(corr, iChanBin, chunkRowIndex)
356 : );
357 : }
358 266561 : ++iChanBin;
359 : }
360 : }
361 2199 : }
362 :
363 251 : void StatWtFloatingWindowDataAggregator::_limits(
364 : std::vector<std::pair<uInt, uInt>>& idToChunksNeededByIDMap,
365 : std::vector<std::pair<uInt, uInt>>& chunkNeededToIDsThatNeedChunkIDMap
366 : ) const {
367 251 : pair<uInt, uInt> p, q;
368 251 : const uInt nTimes = _vii->nTimes();
369 251 : if (_nTimeStampsInBin) {
370 : // fixed number of time stamps specified
371 243 : if (_timeBlockProcessing) {
372 : // integer division
373 227 : uInt nBlocks = nTimes/(*_nTimeStampsInBin);
374 227 : if (nTimes % *_nTimeStampsInBin > 0) {
375 8 : ++nBlocks;
376 : }
377 227 : uInt subChunkCount = 0;
378 2504 : for (uInt blockCount = 0; blockCount < nBlocks; ++blockCount) {
379 2277 : if ((subChunkCount + *_nTimeStampsInBin <= nTimes)) {
380 2269 : p.first = subChunkCount;
381 2269 : p.second = subChunkCount + *_nTimeStampsInBin - 1;
382 : }
383 : else {
384 : // chunk upper edge
385 8 : p.first = nTimes < (uInt)*_nTimeStampsInBin
386 8 : ? 0 : nTimes - *_nTimeStampsInBin;
387 8 : p.second = nTimes - 1;
388 : }
389 2277 : q = p;
390 4607 : for (uInt i=subChunkCount; i<=p.second; ++i, ++subChunkCount) {
391 2330 : idToChunksNeededByIDMap.push_back(p);
392 2330 : chunkNeededToIDsThatNeedChunkIDMap.push_back(q);
393 : }
394 : }
395 : }
396 : else {
397 : // sliding time window, fixed number of time stamps (timebin
398 : // specified as int
399 16 : const auto isEven = *_nTimeStampsInBin % 2 == 0;
400 : // integer division
401 16 : const uInt halfTimeBin = *_nTimeStampsInBin/2;
402 : const auto nBefore = isEven
403 16 : ? (halfTimeBin - 1) : (*_nTimeStampsInBin - 1)/2;
404 16 : const auto nAfter = isEven ? halfTimeBin : nBefore;
405 : // integer division
406 : // p.first is the first sub chunk needed by the current index.
407 : // p.second is the first sub chunk that needs the current index
408 144 : for (uInt i=0; i<nTimes; ++i) {
409 128 : Int begin = i - nBefore;
410 128 : if (begin < 0) {
411 32 : begin = 0;
412 : }
413 128 : p.second = begin + *_nTimeStampsInBin - 1;
414 128 : if (p.second >= nTimes) {
415 48 : p.second = nTimes - 1;
416 48 : begin = p.second - *_nTimeStampsInBin + 1;
417 48 : if (begin < 0) {
418 31 : begin = 0;
419 : }
420 : }
421 128 : ThrowIf(begin < 0, "Logic Error: begin < 0");
422 128 : p.first = begin;
423 128 : if ((Int)nTimes <= *_nTimeStampsInBin) {
424 36 : q = p;
425 : }
426 : else {
427 92 : if ((Int)i < *_nTimeStampsInBin) {
428 33 : q.first = 0;
429 : }
430 59 : else if (i > nTimes - *_nTimeStampsInBin) {
431 27 : q.second = nTimes - 1;
432 : }
433 32 : else if (isEven) {
434 13 : begin = i - nAfter;
435 13 : q.first = begin;
436 13 : q.second = i + nBefore;
437 : }
438 : else {
439 : // isOdd
440 19 : q = p;
441 : }
442 : }
443 128 : idToChunksNeededByIDMap.push_back(p);
444 128 : chunkNeededToIDsThatNeedChunkIDMap.push_back(q);
445 : }
446 : }
447 : }
448 : else {
449 8 : if (_timeBlockProcessing) {
450 : // shouldn't get in here
451 0 : ThrowCc("Logic error: shouldn't have gotten into this code block");
452 : }
453 : else {
454 8 : ThrowIf(
455 : ! _binWidthInSeconds,
456 : "Logic error: _binWidthInSeconds not defined"
457 : );
458 8 : const auto halfBinWidth = *_binWidthInSeconds/2;
459 8 : auto vb = _vii->getVisBuffer();
460 16 : vector<Double> times;
461 72 : for (_vii->origin(); _vii->more(); _vii->next()) {
462 64 : times.push_back(vb->time()[0]);
463 : }
464 72 : for (uInt i=0; i<nTimes; ++i) {
465 64 : auto mytime = times[i];
466 : auto loit = std::lower_bound(
467 0 : times.begin(), times.end(), mytime - halfBinWidth
468 64 : );
469 64 : ThrowIf(
470 : loit == times.end(),
471 : "Logic Error for std::lower_bound()"
472 : );
473 64 : p.first = std::distance(times.begin(), loit);
474 : auto upit = std::upper_bound(
475 0 : times.begin(), times.end(), mytime + halfBinWidth
476 64 : );
477 64 : p.second = upit == times.end()
478 34 : ? nTimes - 1 : std::distance(times.begin(), upit) - 1;
479 64 : q = p;
480 64 : idToChunksNeededByIDMap.push_back(p);
481 64 : chunkNeededToIDsThatNeedChunkIDMap.push_back(q);
482 : }
483 : }
484 : }
485 251 : }
486 :
487 : }
488 :
489 : }
|