Line data Source code
1 : /* -*- mode: c++ -*- */
2 : //# CubePartitionMixin.h: Parallel cube imaging data partitioning
3 : //# Copyright (C) 2016
4 : //# Associated Universities, Inc. Washington DC, USA.
5 : //#
6 : //# This library is free software; you can redistribute it and/or modify it
7 : //# under the terms of the GNU Library General Public License as published by
8 : //# the Free Software Foundation; either version 2 of the License, or (at your
9 : //# option) any later version.
10 : //#
11 : //# This library is distributed in the hope that it will be useful, but WITHOUT
12 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
14 : //# License for more details.
15 : //#
16 : //# You should have received a copy of the GNU Library General Public License
17 : //# along with this library; if not, write to the Free Software Foundation,
18 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
19 : //#
20 : //# Correspondence concerning AIPS++ should be addressed as follows:
21 : //# Internet email: aips2-request@nrao.edu.
22 : //# Postal address: AIPS++ Project Office
23 : //# National Radio Astronomy Observatory
24 : //# 520 Edgemont Road
25 : //# Charlottesville, VA 22903-2475 USA
26 : //#
27 : #ifndef CUBE_PARTITION_MIXIN_H_
28 : #define CUBE_PARTITION_MIXIN_H_
29 :
30 : #include <synthesis/ImagerObjects/SynthesisUtilMethods.h>
31 : #include <synthesis/ImagerObjects/ParallelImagerParams.h>
32 : #include <synthesis/ImagerObjects/ParamFieldIterator.h>
33 : #include <synthesis/ImagerObjects/MPIGlue.h>
34 : #include <algorithm>
35 : #include <vector>
36 : #include <string>
37 : #include <unistd.h>
38 :
39 : namespace casa {
40 :
41 : /**
42 : * Parameter and input data partitioning for parallel cube imaging (in
43 : * ParallelImagerMixin).
44 : */
45 : template <class T>
46 : class CubePartitionMixin
47 : : public T {
48 :
49 : public:
50 0 : void concat_images(const string &type) {
51 0 : casacore::LogIO log(casacore::LogOrigin("CubePartitionMixin", "concat_images", WHERE));
52 0 : if (worker_rank >= 0) {
53 0 : const std::string image_types[] =
54 : {"image", "psf", "model", "residual", "mask", "pb", "weight"};
55 0 : string cwd(getcwd(nullptr, 0));
56 : // wait until all ranks have completed file modifications
57 0 : MPI_Barrier(worker_comm);
58 : // round-robin allocation of image concatenation tasks to worker
59 : // ranks
60 0 : for (casacore::uInt i = (casacore::uInt)worker_rank;
61 0 : i < image_parameters.nfields();
62 0 : i += (casacore::uInt)num_workers) {
63 0 : std::string imagename =
64 0 : image_parameters.subRecord(i).asString("imagename");
65 0 : std::string image_prefix = cwd + "/" + imagename;
66 0 : std::vector<std::string> images;
67 0 : for (auto ext : image_types) {
68 0 : images.clear();
69 0 : std::string ext_suffix = "." + ext;
70 0 : std::string concat_imagename =
71 : image_prefix + ext_suffix;
72 0 : for (casacore::uInt j = 0; j < (casacore::uInt)num_workers; ++j) {
73 0 : std::string component_imagename =
74 : image_prefix + ".n" + std::to_string(j) + ext_suffix;
75 0 : if (access(component_imagename.c_str(), F_OK) == 0)
76 0 : images.push_back(component_imagename);
77 : }
78 0 : if (!images.empty()) {
79 0 : std::string cmd = "imageconcat inimages='";
80 0 : for (auto im : images) cmd += im + " ";
81 0 : cmd += "' outimage='" + concat_imagename + "' type=";
82 0 : cmd += type;
83 0 : int rc = std::system(cmd.c_str());
84 0 : if (rc == -1 || WEXITSTATUS(rc) != 0)
85 : log << casacore::LogIO::WARN
86 : << ("concatenation of " + concat_imagename
87 : + " failed")
88 0 : << casacore::LogIO::POST;
89 : }
90 : }
91 : }
92 : }
93 0 : };
94 :
95 : protected:
96 :
97 : MPI_Comm worker_comm;
98 :
99 : int num_workers;
100 :
101 : int worker_rank;
102 :
103 : casacore::Record image_parameters;
104 :
105 : ParallelImagerParams
106 0 : get_params(MPI_Comm wcomm, ParallelImagerParams &initial) {
107 :
108 : // Determine rank among parallel imaging worker processes.
109 0 : worker_comm = wcomm;
110 0 : if (worker_comm != MPI_COMM_NULL) {
111 0 : MPI_Comm_size(worker_comm, &num_workers);
112 0 : MPI_Comm_rank(worker_comm, &worker_rank);
113 : } else {
114 0 : num_workers = 0;
115 0 : worker_rank = -1;
116 : }
117 :
118 0 : std::string cwd(getcwd(nullptr, 0));
119 0 : std::vector<std::string> all_worker_suffixes;
120 0 : for (int r = 0; r < num_workers; ++r) {
121 0 : all_worker_suffixes.push_back(".n" + std::to_string(r));
122 : }
123 0 : std::string my_worker_suffix =
124 0 : ((worker_rank >= 0) ? all_worker_suffixes[worker_rank] : "");
125 :
126 0 : image_parameters = casacore::Record(initial.image);
127 :
128 0 : SynthesisUtilMethods util;
129 0 : ParallelImagerParams result;
130 :
131 0 : casacore::Record data_image_partition;
132 0 : if (worker_rank >= 0) {
133 0 : std::string worker_rank_str = std::to_string(worker_rank);
134 : // need a SynthesisImager instance to do cube partitioning
135 0 : std::unique_ptr<SynthesisImager> si(new SynthesisImager());
136 0 : for (casacore::uInt i = 0; i < initial.selection.nfields(); ++i) {
137 0 : SynthesisParamsSelect selection_pars;
138 0 : selection_pars.fromRecord(initial.selection.subRecord(i));
139 0 : si->selectData(selection_pars);
140 : }
141 0 : for (casacore::uInt i = 0; i < initial.image.nfields(); ++i) {
142 0 : casacore::String i_name = initial.image.name(i);
143 0 : if (initial.grid.isDefined(i_name)) {
144 0 : SynthesisParamsImage image_pars;
145 0 : image_pars.fromRecord(initial.image.subRecord(i_name));
146 0 : SynthesisParamsGrid grid_pars;
147 0 : grid_pars.fromRecord(initial.grid.subRecord(i_name));
148 0 : si->defineImage(image_pars, grid_pars);
149 0 : casacore::Record csys = si->getcsys();
150 0 : if (csys.nfields() > 0) {
151 0 : int nchan = ((image_pars.nchan == -1)
152 0 : ? si->updateNchan()
153 : : image_pars.nchan);;
154 0 : casacore::Vector<casacore::Int> numchans;
155 0 : casacore::Vector<casacore::CoordinateSystem> csystems;
156 : // save only that part of the record returned from
157 : // util.cubeDataImagePartition that is handled by the
158 : // current rank
159 0 : data_image_partition.defineRecord(
160 : i_name,
161 : util.cubeDataImagePartition(
162 0 : initial.selection,
163 0 : *casacore::CoordinateSystem::restore(csys, "coordsys"),
164 : num_workers, nchan, csystems, numchans).
165 0 : rwSubRecord(worker_rank_str));
166 : }
167 : }
168 : }
169 : }
170 :
171 : // selection_params
172 0 : if (worker_rank >= 0) {
173 0 : casacore::Record sel;
174 0 : for (casacore::uInt i = 0; i < data_image_partition.nfields(); ++i) {
175 0 : casacore::Record &di = data_image_partition.rwSubRecord(i);
176 0 : for (casacore::uInt f = 0; f < di.nfields(); ++f) {
177 0 : casacore::String name = di.name(f);
178 0 : if (name.find("ms") == 0) {
179 0 : casacore::Record &ms = di.rwSubRecord(f);
180 0 : if (ms.isDefined("spw") && ms.asString("spw") == "-1")
181 0 : ms.define("spw", "");
182 0 : sel.defineRecord(name, ms);
183 : }
184 : }
185 : }
186 0 : result.selection = sel;
187 : } else {
188 0 : result.selection = casacore::Record();
189 : }
190 :
191 : // image_params
192 0 : if (worker_rank >= 0) {
193 0 : result.image = initial.image;
194 0 : for (casacore::uInt i = 0; i < data_image_partition.nfields(); ++i) {
195 0 : const casacore::Record &di = data_image_partition.subRecord(i);
196 0 : casacore::String i_name = data_image_partition.name(i);
197 0 : casacore::Record &i_image = result.image.rwSubRecord(i_name);
198 0 : i_image.define("csys", di.asString("coordsys"));
199 0 : i_image.define("nchan", di.asString("nchan"));
200 0 : i_image.define(
201 : "imagename",
202 : i_image.asString("imagename") + casacore::String(my_worker_suffix));
203 : }
204 : } else {
205 0 : result.image = empty_fields(initial.image, "imagename");
206 : }
207 :
208 : // FIXME: are grid parameters partitioned by node?
209 : //
210 : // grid params
211 0 : if (worker_rank >= 0) {
212 0 : auto modify_cfcache = [&](const char *field_val) {
213 0 : return *field_val + my_worker_suffix;
214 : };
215 0 : result.grid =
216 0 : convert_fields(initial.grid, "cfcache", modify_cfcache);
217 : } else {
218 0 : result.grid = empty_fields(initial.grid, "cfcache");
219 : }
220 :
221 : // normalization_params
222 0 : result.normalization =
223 0 : ((worker_rank >= 0) ? initial.normalization : casacore::Record());
224 :
225 : // deconvolution params
226 0 : result.deconvolution =
227 0 : ((worker_rank >= 0) ? initial.deconvolution : casacore::Record());
228 :
229 : // weight params
230 0 : result.weight =
231 0 : ((worker_rank >= 0) ? initial.weight : casacore::Record());
232 :
233 : // iteration params
234 0 : result.iteration = initial.iteration;
235 :
236 0 : return result;
237 : }
238 :
239 : private:
240 :
241 : // Convenience method to transform certain record fields
242 0 : casacore::Record convert_fields(casacore::Record &rec, const char *field,
243 : std::function<std::string(const char *)> fn) {
244 0 : auto modify_field_val = [&](casacore::Record &msRec) {
245 0 : msRec.define(field, fn(msRec.asString(field).c_str()));
246 : };
247 0 : casacore::Record result(rec);
248 0 : for_each(ParamFieldIterator::begin(&result),
249 : ParamFieldIterator::end(&result),
250 : modify_field_val);
251 0 : return result;
252 : }
253 :
254 : // Convenience method to clear certain record fields
255 0 : casacore::Record empty_fields(casacore::Record &rec, const char *field) {
256 0 : auto modify_field_val = [&](casacore::Record &msRec) {
257 0 : msRec.defineRecord(field, casacore::Record());
258 : };
259 0 : casacore::Record result(rec);
260 0 : for_each(ParamFieldIterator::begin(&result),
261 : ParamFieldIterator::end(&result),
262 : modify_field_val);
263 0 : return result;
264 : }
265 : };
266 :
267 : } // namespace casa
268 :
269 : #endif // CUBE_PARTITION_MIXIN_H_
|