casa  5.7.0-16
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CubePartitionMixin.h
Go to the documentation of this file.
1 /* -*- mode: c++ -*- */
2 //# CubePartitionMixin.h: Parallel cube imaging data partitioning
3 //# Copyright (C) 2016
4 //# Associated Universities, Inc. Washington DC, USA.
5 //#
6 //# This library is free software; you can redistribute it and/or modify it
7 //# under the terms of the GNU Library General Public License as published by
8 //# the Free Software Foundation; either version 2 of the License, or (at your
9 //# option) any later version.
10 //#
11 //# This library is distributed in the hope that it will be useful, but WITHOUT
12 //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
14 //# License for more details.
15 //#
16 //# You should have received a copy of the GNU Library General Public License
17 //# along with this library; if not, write to the Free Software Foundation,
18 //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
19 //#
20 //# Correspondence concerning AIPS++ should be addressed as follows:
21 //# Internet email: aips2-request@nrao.edu.
22 //# Postal address: AIPS++ Project Office
23 //# National Radio Astronomy Observatory
24 //# 520 Edgemont Road
25 //# Charlottesville, VA 22903-2475 USA
26 //#
27 #ifndef CUBE_PARTITION_MIXIN_H_
28 #define CUBE_PARTITION_MIXIN_H_
29 
34 #include <algorithm>
35 #include <vector>
36 #include <string>
37 #include <unistd.h>
38 
39 namespace casa {
40 
45 template <class T>
47  : public T {
48 
49 public:
50  void concat_images(const string &type) {
51  casacore::LogIO log(casacore::LogOrigin("CubePartitionMixin", "concat_images", WHERE));
52  if (worker_rank >= 0) {
53  const std::string image_types[] =
54  {"image", "psf", "model", "residual", "mask", "pb", "weight"};
55  string cwd(getcwd(nullptr, 0));
56  // wait until all ranks have completed file modifications
58  // round-robin allocation of image concatenation tasks to worker
59  // ranks
63  std::string imagename =
64  image_parameters.subRecord(i).asString("imagename");
65  std::string image_prefix = cwd + "/" + imagename;
66  std::vector<std::string> images;
67  for (auto ext : image_types) {
68  images.clear();
69  std::string ext_suffix = "." + ext;
70  std::string concat_imagename =
71  image_prefix + ext_suffix;
72  for (casacore::uInt j = 0; j < (casacore::uInt)num_workers; ++j) {
73  std::string component_imagename =
74  image_prefix + ".n" + std::to_string(j) + ext_suffix;
75  if (access(component_imagename.c_str(), F_OK) == 0)
76  images.push_back(component_imagename);
77  }
78  if (!images.empty()) {
79  std::string cmd = "imageconcat inimages='";
80  for (auto im : images) cmd += im + " ";
81  cmd += "' outimage='" + concat_imagename + "' type=";
82  cmd += type;
83  int rc = std::system(cmd.c_str());
84  if (rc == -1 || WEXITSTATUS(rc) != 0)
86  << ("concatenation of " + concat_imagename
87  + " failed")
89  }
90  }
91  }
92  }
93  };
94 
95 protected:
96 
98 
100 
102 
104 
107 
108  // Determine rank among parallel imaging worker processes.
109  worker_comm = wcomm;
110  if (worker_comm != MPI_COMM_NULL) {
113  } else {
114  num_workers = 0;
115  worker_rank = -1;
116  }
117 
118  std::string cwd(getcwd(nullptr, 0));
119  std::vector<std::string> all_worker_suffixes;
120  for (int r = 0; r < num_workers; ++r) {
121  all_worker_suffixes.push_back(".n" + std::to_string(r));
122  }
123  std::string my_worker_suffix =
124  ((worker_rank >= 0) ? all_worker_suffixes[worker_rank] : "");
125 
127 
129  ParallelImagerParams result;
130 
131  casacore::Record data_image_partition;
132  if (worker_rank >= 0) {
133  std::string worker_rank_str = std::to_string(worker_rank);
134  // need a SynthesisImager instance to do cube partitioning
135  std::unique_ptr<SynthesisImager> si(new SynthesisImager());
136  for (casacore::uInt i = 0; i < initial.selection.nfields(); ++i) {
137  SynthesisParamsSelect selection_pars;
138  selection_pars.fromRecord(initial.selection.subRecord(i));
139  si->selectData(selection_pars);
140  }
141  for (casacore::uInt i = 0; i < initial.image.nfields(); ++i) {
142  casacore::String i_name = initial.image.name(i);
143  if (initial.grid.isDefined(i_name)) {
144  SynthesisParamsImage image_pars;
145  image_pars.fromRecord(initial.image.subRecord(i_name));
146  SynthesisParamsGrid grid_pars;
147  grid_pars.fromRecord(initial.grid.subRecord(i_name));
148  si->defineImage(image_pars, grid_pars);
149  casacore::Record csys = si->getcsys();
150  if (csys.nfields() > 0) {
151  int nchan = ((image_pars.nchan == -1)
152  ? si->updateNchan()
153  : image_pars.nchan);;
156  // save only that part of the record returned from
157  // util.cubeDataImagePartition that is handled by the
158  // current rank
159  data_image_partition.defineRecord(
160  i_name,
162  initial.selection,
163  *casacore::CoordinateSystem::restore(csys, "coordsys"),
164  num_workers, nchan, csystems, numchans).
165  rwSubRecord(worker_rank_str));
166  }
167  }
168  }
169  }
170 
171  // selection_params
172  if (worker_rank >= 0) {
173  casacore::Record sel;
174  for (casacore::uInt i = 0; i < data_image_partition.nfields(); ++i) {
175  casacore::Record &di = data_image_partition.rwSubRecord(i);
176  for (casacore::uInt f = 0; f < di.nfields(); ++f) {
177  casacore::String name = di.name(f);
178  if (name.find("ms") == 0) {
179  casacore::Record &ms = di.rwSubRecord(f);
180  if (ms.isDefined("spw") && ms.asString("spw") == "-1")
181  ms.define("spw", "");
182  sel.defineRecord(name, ms);
183  }
184  }
185  }
186  result.selection = sel;
187  } else {
188  result.selection = casacore::Record();
189  }
190 
191  // image_params
192  if (worker_rank >= 0) {
193  result.image = initial.image;
194  for (casacore::uInt i = 0; i < data_image_partition.nfields(); ++i) {
195  const casacore::Record &di = data_image_partition.subRecord(i);
196  casacore::String i_name = data_image_partition.name(i);
197  casacore::Record &i_image = result.image.rwSubRecord(i_name);
198  i_image.define("csys", di.asString("coordsys"));
199  i_image.define("nchan", di.asString("nchan"));
200  i_image.define(
201  "imagename",
202  i_image.asString("imagename") + casacore::String(my_worker_suffix));
203  }
204  } else {
205  result.image = empty_fields(initial.image, "imagename");
206  }
207 
208  // FIXME: are grid parameters partitioned by node?
209  //
210  // grid params
211  if (worker_rank >= 0) {
212  auto modify_cfcache = [&](const char *field_val) {
213  return *field_val + my_worker_suffix;
214  };
215  result.grid =
216  convert_fields(initial.grid, "cfcache", modify_cfcache);
217  } else {
218  result.grid = empty_fields(initial.grid, "cfcache");
219  }
220 
221  // normalization_params
222  result.normalization =
223  ((worker_rank >= 0) ? initial.normalization : casacore::Record());
224 
225  // deconvolution params
226  result.deconvolution =
227  ((worker_rank >= 0) ? initial.deconvolution : casacore::Record());
228 
229  // weight params
230  result.weight =
231  ((worker_rank >= 0) ? initial.weight : casacore::Record());
232 
233  // iteration params
234  result.iteration = initial.iteration;
235 
236  return result;
237  }
238 
239 private:
240 
241  // Convenience method to transform certain record fields
243  std::function<std::string(const char *)> fn) {
244  auto modify_field_val = [&](casacore::Record &msRec) {
245  msRec.define(field, fn(msRec.asString(field).c_str()));
246  };
247  casacore::Record result(rec);
248  for_each(ParamFieldIterator::begin(&result),
249  ParamFieldIterator::end(&result),
250  modify_field_val);
251  return result;
252  }
253 
254  // Convenience method to clear certain record fields
255  casacore::Record empty_fields(casacore::Record &rec, const char *field) {
256  auto modify_field_val = [&](casacore::Record &msRec) {
257  msRec.defineRecord(field, casacore::Record());
258  };
259  casacore::Record result(rec);
260  for_each(ParamFieldIterator::begin(&result),
261  ParamFieldIterator::end(&result),
262  modify_field_val);
263  return result;
264  }
265 };
266 
267 } // namespace casa
268 
269 #endif // CUBE_PARTITION_MIXIN_H_
const Record & subRecord(const RecordFieldId &) const
Get the subrecord from the given field.
#define MPI_COMM_NULL
Definition: MPIGlue.h:67
Record & rwSubRecord(const RecordFieldId &)
void fromRecord(const casacore::Record &inrec)
void concat_images(const string &type)
LatticeExprNode log(const LatticeExprNode &expr)
void defineRecord(const RecordFieldId &, const Record &value, RecordType type=Variable)
Define a value for the given field containing a subrecord.
int MPI_Comm
Definition: MPIGlue.h:59
static CoordinateSystem * restore(const RecordInterface &container, const String &fieldName)
Restore the CoordinateSystem from a record.
casacore::Int nchan
Spectral coordinates (TT : Add other params here)
virtual Type type()
Return the type enum.
Class that contains functions needed for imager.
ostream-like interface to creating log messages.
Definition: LogIO.h:167
ABSTRACT CLASSES Abstract class for colors Any implementation of color should be able to provide a hexadecimal form of the if a human readable name(i.e."black").In many places throughout the plotter
Change the message priority to WARN.
Definition: LogIO.h:181
#define MPI_Comm_size(c, sp)
Definition: MPIGlue.h:71
Collection of parameters for parallel imaging, categorized roughly by imaging component.
virtual uInt nfields() const
How many fields does this structure have? A convenient synonym for description().nfields().
static ParamFieldIterator end(casacore::Record *rec, const string &prefix="")
static ParamFieldIterator begin(casacore::Record *rec, const string &prefix="")
#define MPI_Comm_rank(c, rp)
Definition: MPIGlue.h:75
void clear()
Clear the string Warning: clear() executed as erase() due to missing clear() in gcc ...
Definition: String.h:372
casacore::Record image_parameters
A hierarchical collection of named fields of various types.
Definition: Record.h:180
void fromRecord(const casacore::Record &inrec)
Bool isDefined(const String &fieldName) const
Test if a field name exists.
void fromRecord(const casacore::Record &inrec)
#define WHERE
Definition: LogOrigin.h:211
casacore::Record empty_fields(casacore::Record &rec, const char *field)
Convenience method to clear certain record fields.
std::string to_string(int i)
Definition: StringUtil.h:36
Post the accumulated message.
Definition: LogIO.h:173
LogOrigin: The source code location of the originator of a LogMessage.
Definition: LogOrigin.h:94
size_type find(const string &str, size_type pos=0) const
Search functions.
Definition: String.h:644
const String & asString(const RecordFieldId &) const
String: the storage and methods of handling collections of characters.
Definition: String.h:223
static casacore::Record cubeDataImagePartition(const casacore::Record &selpars, const casacore::CoordinateSystem &incsys, const casacore::Int npart, const casacore::Int nchannel, casacore::Vector< casacore::CoordinateSystem > &outCsys, casacore::Vector< casacore::Int > &outnChan)
casacore::CoordinateSystem and number of channels of original cube is passed in Output record is the ...
ParallelImagerParams get_params(MPI_Comm wcomm, ParallelImagerParams &initial)
casacore::Record convert_fields(casacore::Record &rec, const char *field, std::function< std::string(const char *)> fn)
Convenience method to transform certain record fields.
void define(const RecordFieldId &, Bool value)
Define a value for the given field.
Parameter and input data partitioning for parallel cube imaging (in ParallelImagerMixin).
String name(const RecordFieldId &) const
Get the name of this field.
unsigned int uInt
Definition: aipstype.h:51
#define MPI_Barrier(c)
Definition: MPIGlue.h:114