Line data Source code
1 : //# MPITransport.cc: class which define an MPI parallel transport layer
2 : //# Copyright (C) 1998,1999,2000
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify it
6 : //# under the terms of the GNU Library General Public License as published by
7 : //# the Free Software Foundation; either version 2 of the License, or (at your
8 : //# option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful, but WITHOUT
11 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13 : //# License for more details.
14 : //#
15 : //# You should have received a copy of the GNU Library General Public License
16 : //# along with this library; if not, write to the Free Software Foundation,
17 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
18 : //#
19 : //# Correspondence concerning AIPS++ should be addressed as follows:
20 : //# Internet email: aips2-request@nrao.edu.
21 : //# Postal address: AIPS++ Project Office
22 : //# National Radio Astronomy Observatory
23 : //# 520 Edgemont Road
24 : //# Charlottesville, VA 22903-2475 USA
25 : //#
26 : //# $Id$
27 :
28 : #ifdef HAVE_MPI
29 :
30 : #include <memory>
31 :
32 : #include <casacore/casa/Containers/Record.h>
33 : #include <casacore/casa/IO/AipsIO.h>
34 : #include <casacore/casa/IO/MemoryIO.h>
35 :
36 : #include <synthesis/Parallel/MPITransport.h>
37 : #include <synthesis/Parallel/MPIError.h>
38 : #include <synthesis/Parallel/Algorithm.h>
39 :
40 : #include <mpi.h>
41 :
42 : using std::shared_ptr;
43 : using namespace casacore;
44 : namespace casa { //# NAMESPACE CASA - BEGIN
45 :
46 0 : MPITransport::MPITransport() : PTransport()
47 : {
48 : // Default constructor
49 : //
50 : // Set default tag and source/destination
51 0 : setAnyTag();
52 0 : connectAnySource();
53 0 : };
54 :
55 0 : MPITransport::MPITransport(Int argc, Char *argv[]) : PTransport()
56 : {
57 0 : if (debug_p) {
58 0 : cerr << "constructing MPITransport" << std::endl;
59 : }
60 : // Construct from argc, argv
61 : //
62 0 : int flag=0;
63 0 : MPI_Initialized(&flag);
64 : //cerr << "FLAG " << flag << endl;
65 0 : if((flag && isController()) || MPI_Init(&argc, &argv) == MPI_SUCCESS){
66 : //if(MPI_Init(&argc, &argv) == MPI_SUCCESS){
67 0 : MPI_Comm_rank(MPI_COMM_WORLD, &myCpu);
68 0 : MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
69 : // Set default tag and source/destination
70 0 : setAnyTag();
71 0 : connectAnySource();
72 : } else {
73 0 : throw MPIError("MPI Init failed!");
74 : }
75 0 : }
76 :
77 0 : MPITransport::~MPITransport(){
78 0 : if(!isFinalized())
79 0 : MPI_Finalize();
80 0 : }
81 :
82 0 : Bool MPITransport::isFinalized()
83 : {
84 : int flag;
85 0 : MPI_Finalized(&flag);
86 0 : return Bool(flag);
87 : }
88 :
89 0 : Int MPITransport::anyTag()
90 : {
91 : // Return the value which indicates an unset tag
92 : //
93 0 : return MPI_ANY_TAG;
94 : };
95 :
96 0 : Int MPITransport::anySource()
97 : {
98 : // Return the value which indicates an unset source
99 : //
100 0 : return MPI_ANY_SOURCE;
101 : };
102 :
103 : // produce a contiguous vector of per-dimension sizes from a shape/Iposition
104 0 : const std::vector<uInt> makeContiguousSizes(uInt ndim, const IPosition &ipos) {
105 0 : std::vector<uInt> sizes(ndim);
106 0 : for (uInt idx=0; idx<ndim; ++idx) {
107 0 : sizes[idx] = ipos[idx];
108 : }
109 0 : return sizes;
110 : }
111 :
112 0 : Int MPITransport::put(const Array<Float> &af){
113 0 : uInt ndim(af.ndim());
114 0 : setDestAndTag(sendTo, myOp);
115 0 : IPosition ipos = af.shape();
116 : // Send the number of dimensions
117 0 : MPI_Send((void *)&ndim, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
118 : // Send the shape vector
119 : // Don't feel tempted to send directly (void *)ipos.storage(). That doesn't give you
120 : // a contiguous sequence of size integers. You can get for example integers:
121 : // dim1, 0, dim2, 0, uninit, ...
122 : // etc.
123 : // Dangerous: MPI_Send((void *)ipos.storage(), ndim, MPI_INT, sendTo, myOp,
124 0 : const auto &sizes = makeContiguousSizes(ndim, ipos);
125 0 : MPI_Send((void *)sizes.data(), ndim, MPI_INT, sendTo, myOp,
126 : MPI_COMM_WORLD);
127 : // Send the data
128 : Bool deleteit;
129 0 : MPI_Send((void *)af.getStorage(deleteit), af.nelements(), MPI_FLOAT,
130 : sendTo, myOp, MPI_COMM_WORLD);
131 0 : return(0);
132 : }
133 :
134 0 : Int MPITransport::put(const Array<Double> &af){
135 0 : uInt ndim(af.ndim());
136 0 : const IPosition ipos = af.shape();
137 0 : setDestAndTag(sendTo, myOp);
138 0 : MPI_Send((void *)&ndim, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
139 : // Send the shape vector
140 0 : const auto &sizes = makeContiguousSizes(ndim, ipos);
141 0 : MPI_Send((const void *)sizes.data(), ndim, MPI_INT, sendTo, myOp,
142 : MPI_COMM_WORLD);
143 : // Send the data
144 : Bool deleteit;
145 0 : MPI_Send((void *)af.getStorage(deleteit), af.nelements(), MPI_DOUBLE,
146 : sendTo, myOp, MPI_COMM_WORLD);
147 0 : return(0);
148 : }
149 :
150 0 : Int MPITransport::put(const Array<Int> &af){
151 0 : uInt ndim(af.ndim());
152 0 : IPosition ipos = af.shape();
153 0 : setDestAndTag(sendTo, myOp);
154 0 : MPI_Send((void *)&ndim, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
155 : // Send the shape vector
156 0 : const auto &sizes = makeContiguousSizes(ndim, ipos);
157 0 : MPI_Send((void *)sizes.data(), ndim, MPI_INT, sendTo, myOp,
158 : MPI_COMM_WORLD);
159 : // Send the data
160 : Bool deleteit;
161 0 : MPI_Send((void *)af.getStorage(deleteit), af.nelements(), MPI_INT,
162 : sendTo, myOp, MPI_COMM_WORLD);
163 0 : return(0);
164 : }
165 :
166 0 : Int MPITransport::put(const Array<Complex> &af){
167 0 : uInt ndim(af.ndim());
168 0 : setDestAndTag(sendTo, myOp);
169 0 : IPosition ipos = af.shape();
170 : // Send the number of dimensions
171 0 : MPI_Send((void *)&ndim, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
172 : // Send the shape vector
173 0 : const auto &sizes = makeContiguousSizes(ndim, ipos);
174 0 : MPI_Send((const void *)sizes.data(), ndim, MPI_INT, sendTo, myOp,
175 : MPI_COMM_WORLD);
176 : // Send the data
177 : Bool deleteit;
178 0 : MPI_Send((void *)af.getStorage(deleteit), 2*af.nelements(), MPI_FLOAT,
179 : sendTo, myOp, MPI_COMM_WORLD);
180 0 : return(0);
181 : }
182 :
183 0 : Int MPITransport::put(const Array<DComplex> &af){
184 0 : uInt ndim(af.ndim());
185 0 : setDestAndTag(sendTo, myOp);
186 0 : IPosition ipos = af.shape();
187 : // Send the number of dimensions
188 0 : MPI_Send((void *)&ndim, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
189 : // Send the shape vector
190 0 : const auto &sizes = makeContiguousSizes(ndim, ipos);
191 0 : MPI_Send((void *)sizes.data(), ndim, MPI_INT, sendTo, myOp,
192 : MPI_COMM_WORLD);
193 : // Send the data
194 : Bool deleteit;
195 0 : MPI_Send((void *)af.getStorage(deleteit), 2*af.nelements(), MPI_DOUBLE,
196 : sendTo, myOp, MPI_COMM_WORLD);
197 0 : return(0);
198 : }
199 :
200 0 : Int MPITransport::put(const Float &f){
201 0 : setDestAndTag(sendTo, myOp);
202 0 : MPI_Send((void *)&f, 1, MPI_FLOAT, sendTo, myOp, MPI_COMM_WORLD);
203 0 : return(0);
204 : }
205 0 : Int MPITransport::put(const Complex &f){
206 0 : setDestAndTag(sendTo, myOp);
207 0 : MPI_Send((void *)&f, 2, MPI_FLOAT, sendTo, myOp, MPI_COMM_WORLD);
208 0 : return(0);
209 : }
210 0 : Int MPITransport::put(const DComplex &f){
211 0 : setDestAndTag(sendTo, myOp);
212 0 : MPI_Send((void *)&f, 2, MPI_DOUBLE, sendTo, myOp, MPI_COMM_WORLD);
213 0 : return(0);
214 : }
215 :
216 0 : Int MPITransport::put(const Double &d){
217 0 : setDestAndTag(sendTo, myOp);
218 0 : MPI_Send((void *)&d, 1, MPI_DOUBLE, sendTo, myOp, MPI_COMM_WORLD);
219 0 : return(0);
220 : }
221 0 : Int MPITransport::put(const Int &i){
222 0 : setDestAndTag(sendTo, myOp);
223 : // warning: sstat set but not used!
224 0 : Int sstat = MPI_Send((void *)&i, 1, MPI_INT, sendTo, myOp, MPI_COMM_WORLD);
225 : (void) sstat; // warning: unused sstat
226 0 : return(0);
227 : }
228 0 : Int MPITransport::put(const Bool &b){
229 0 : setDestAndTag(sendTo, myOp);
230 0 : Int i(b);
231 : // warning: sstat set but not used!
232 0 : Int sstat = MPI_Send((void *)&i, 1, MPI_INT, sendTo, myOp, MPI_COMM_WORLD);
233 : (void) sstat; // warning: unused sstat
234 0 : return(0);
235 : }
236 :
237 0 : Int MPITransport::put(const Record &r){
238 0 : setDestAndTag(sendTo, myOp);
239 0 : auto buffer = std::make_shared<MemoryIO>();
240 0 : AipsIO rBuf(buffer);
241 0 : rBuf.putstart("MPIRecord",1);
242 0 : rBuf << r;
243 0 : rBuf.putend();
244 0 : uInt bytes2send=rBuf.getpos();
245 : //cerr << "Bytes 2 send " << bytes2send << endl;
246 : // warning: sstat set but not used!
247 0 : Int sstat = MPI_Send((void *)&bytes2send, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
248 0 : sstat = MPI_Send((void *)buffer->getBuffer(), bytes2send, MPI_UNSIGNED_CHAR, sendTo, myOp,
249 : MPI_COMM_WORLD);
250 : (void) sstat; // warning: unused sstat
251 0 : return(0);
252 : }
253 :
254 0 : Int MPITransport::put(const String &s){
255 0 : uInt ndim(s.length());
256 0 : setDestAndTag(sendTo, myOp);
257 : // Send the length of the string
258 0 : MPI_Send((void *)&ndim, 1, MPI_UNSIGNED, sendTo, myOp, MPI_COMM_WORLD);
259 : // Send the characters
260 0 : MPI_Send((void *)s.chars(), ndim, MPI_CHAR, sendTo, myOp,
261 : MPI_COMM_WORLD);
262 0 : return(0);
263 : }
264 :
265 0 : Int MPITransport::get(Array<Float> &af){
266 : // Get the number of dimensions
267 0 : setSourceAndTag(getFrom, myOp);
268 : MPI_Status status;
269 : uInt ndim;
270 0 : MPI_Recv(&ndim, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
271 : // Get the shape vector
272 0 : aTag = myOp = status.MPI_TAG;
273 0 : aWorker = getFrom = status.MPI_SOURCE;
274 0 : std::vector<uInt> ashape(ndim);
275 0 : MPI_Recv(ashape.data(), ndim, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
276 : // Get the data
277 0 : Int nelements(1);
278 : {
279 0 : for(uInt i=0;i<ndim;i++){
280 0 : nelements *= ashape[i];
281 : }
282 : }
283 0 : Float *data = new Float[nelements];
284 0 : MPI_Recv(data, nelements, MPI_FLOAT, getFrom, myOp, MPI_COMM_WORLD,
285 : &status);
286 0 : IPosition ipos(ndim, ndim);
287 0 : for(uInt i=0;i<ndim;i++)
288 0 : ipos(i) = ashape[i];
289 0 : af.takeStorage(ipos, data, TAKE_OVER);
290 :
291 0 : return(status.MPI_SOURCE);
292 : }
293 :
294 0 : Int MPITransport::get(Array<Double> &af){
295 : // Get the number of dimensions
296 : MPI_Status status;
297 0 : setSourceAndTag(getFrom, myOp);
298 : uInt ndim;
299 0 : MPI_Recv(&ndim, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
300 : // Get the shape vector
301 0 : aTag = myOp = status.MPI_TAG;
302 0 : aWorker = getFrom = status.MPI_SOURCE;
303 0 : std::vector<uInt> ashape(ndim);
304 0 : MPI_Recv(ashape.data(), ndim, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
305 : // Get the data
306 0 : Int nelements(1);
307 : {
308 0 : for(uInt i=0;i<ndim;i++){
309 0 : nelements *= ashape[i];
310 : }
311 : }
312 0 : Double *data = new Double[nelements];
313 0 : MPI_Recv(data, nelements, MPI_DOUBLE, getFrom, myOp, MPI_COMM_WORLD,
314 : &status);
315 0 : IPosition ipos(ndim, ndim);
316 0 : for(uInt i=0;i<ndim;i++)
317 0 : ipos(i) = ashape[i];
318 0 : af.takeStorage(ipos, data, TAKE_OVER);
319 :
320 0 : return(status.MPI_SOURCE);
321 : }
322 :
323 0 : Int MPITransport::get(Array<Complex> &af){
324 : // Get the number of dimensions
325 : MPI_Status status;
326 0 : setSourceAndTag(getFrom, myOp);
327 : uInt ndim;
328 0 : MPI_Recv(&ndim, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
329 : // Get the shape vector
330 0 : aTag = myOp = status.MPI_TAG;
331 0 : aWorker = getFrom = status.MPI_SOURCE;
332 0 : std::vector<uInt> ashape(ndim);
333 0 : MPI_Recv(ashape.data(), ndim, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
334 : // Get the data
335 0 : Int nelements(1);
336 : {
337 0 : for(uInt i=0;i<ndim;i++){
338 0 : nelements *= ashape[i];
339 : }
340 : }
341 0 : Complex *data = new Complex[nelements];
342 0 : MPI_Recv(data, 2*nelements, MPI_FLOAT, getFrom, myOp, MPI_COMM_WORLD,
343 : &status);
344 0 : IPosition ipos(ndim, ndim);
345 0 : for(uInt i=0;i<ndim;i++)
346 0 : ipos(i) = ashape[i];
347 0 : af.takeStorage(ipos, data, TAKE_OVER);
348 :
349 0 : return(status.MPI_SOURCE);
350 : }
351 :
352 0 : Int MPITransport::get(Array<DComplex> &af){
353 : // Get the number of dimensions
354 : MPI_Status status;
355 : uInt ndim;
356 0 : setSourceAndTag(getFrom, myOp);
357 0 : MPI_Recv(&ndim, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
358 : // Get the shape vector
359 0 : aTag = myOp = status.MPI_TAG;
360 0 : aWorker = getFrom = status.MPI_SOURCE;
361 0 : std::vector<uInt> ashape(ndim);
362 0 : MPI_Recv(ashape.data(), ndim, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
363 : // Get the data
364 0 : Int nelements(1);
365 : {
366 0 : for(uInt i=0;i<ndim;i++){
367 0 : nelements *= ashape[i];
368 : }
369 : }
370 0 : DComplex *data = new DComplex[nelements];
371 0 : MPI_Recv(data, 2*nelements, MPI_DOUBLE, getFrom, myOp, MPI_COMM_WORLD,
372 : &status);
373 0 : IPosition ipos(ndim, ndim);
374 0 : for(uInt i=0;i<ndim;i++)
375 0 : ipos(i) = ashape[i];
376 0 : af.takeStorage(ipos, data, TAKE_OVER);
377 :
378 0 : return(status.MPI_SOURCE);
379 : }
380 :
381 0 : Int MPITransport::get(Array<Int> &af){
382 : // Get the number of dimensions
383 : MPI_Status status;
384 : uInt ndim;
385 0 : setSourceAndTag(getFrom, myOp);
386 0 : MPI_Recv(&ndim, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
387 : // Get the shape vector
388 0 : aTag = myOp = status.MPI_TAG;
389 0 : aWorker = getFrom = status.MPI_SOURCE;
390 0 : std::vector<uInt> ashape(ndim);
391 0 : MPI_Recv(ashape.data(), ndim, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
392 : // Get the data
393 0 : Int nelements(1);
394 : {
395 0 : for(uInt i=0;i<ndim;i++){
396 0 : nelements *= ashape[i];
397 : }
398 : }
399 0 : Int *data = new Int[nelements];
400 0 : MPI_Recv(data, nelements, MPI_INT, getFrom, myOp, MPI_COMM_WORLD,
401 : &status);
402 0 : IPosition ipos(ndim, ndim);
403 0 : for(uInt i=0;i<ndim;i++)
404 0 : ipos(i) = ashape[i];
405 0 : af.takeStorage(ipos, data, TAKE_OVER);
406 :
407 0 : return(status.MPI_SOURCE);
408 : }
409 :
410 0 : Int MPITransport::get(Float &f){
411 : MPI_Status status;
412 0 : setSourceAndTag(getFrom, myOp);
413 0 : MPI_Recv(&f, 1, MPI_FLOAT, getFrom, myOp, MPI_COMM_WORLD, &status);
414 0 : return(status.MPI_SOURCE);
415 : }
416 :
417 0 : Int MPITransport::get(Double &d){
418 : MPI_Status status;
419 0 : setSourceAndTag(getFrom, myOp);
420 0 : MPI_Recv(&d, 1, MPI_DOUBLE, getFrom, myOp, MPI_COMM_WORLD, &status);
421 0 : return(status.MPI_SOURCE);
422 : }
423 :
424 0 : Int MPITransport::get(Complex &f){
425 : MPI_Status status;
426 0 : setSourceAndTag(getFrom, myOp);
427 0 : MPI_Recv(&f, 2, MPI_FLOAT, getFrom, myOp, MPI_COMM_WORLD, &status);
428 0 : return(status.MPI_SOURCE);
429 : }
430 :
431 0 : Int MPITransport::get(DComplex &d){
432 : MPI_Status status;
433 0 : setSourceAndTag(getFrom, myOp);
434 0 : MPI_Recv(&d, 2, MPI_DOUBLE, getFrom, myOp, MPI_COMM_WORLD, &status);
435 0 : return(status.MPI_SOURCE);
436 : }
437 :
438 0 : Int MPITransport::get(Int &i){
439 0 : Int r_status(1);
440 : MPI_Status status;
441 0 : setSourceAndTag(getFrom, myOp);
442 0 : r_status = MPI_Recv(&i, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
443 : (void) r_status; // warning: unused r_status
444 0 : return(status.MPI_SOURCE);
445 : }
446 :
447 0 : Int MPITransport::get(Bool &b){
448 0 : Int r_status(1);
449 : MPI_Status status;
450 0 : setSourceAndTag(getFrom, myOp);
451 : Int i;
452 0 : r_status = MPI_Recv(&i, 1, MPI_INT, getFrom, myOp, MPI_COMM_WORLD, &status);
453 : (void) r_status; // warning: unused r_status
454 0 : if(i == 0)
455 0 : b = false;
456 : else
457 0 : b = true;
458 0 : return(status.MPI_SOURCE);
459 : }
460 :
461 0 : Int MPITransport::get(Record &r){
462 : MPI_Status status;
463 : (void) status; // warning: unused status
464 0 : setSourceAndTag(getFrom, myOp);
465 : // Get the size of the record in bytes
466 : uInt bytesSent;
467 0 : MPI_Recv(&bytesSent, 1, MPI_UNSIGNED, getFrom, myOp, MPI_COMM_WORLD, &status);
468 : // Now fill the buffer full of bytes from the record
469 0 : std::vector<uChar> buffer(bytesSent);
470 0 : MPI_Recv(buffer.data(), bytesSent, MPI_UNSIGNED_CHAR, getFrom, myOp, MPI_COMM_WORLD, &status);
471 0 : auto nBuf = std::make_shared<MemoryIO>(buffer.data(), bytesSent);
472 0 : AipsIO rBuf(nBuf);
473 0 : uInt version = rBuf.getstart("MPIRecord");
474 : (void)version; // warning: unused version
475 0 : rBuf >> r;
476 0 : rBuf.getend();
477 0 : return(0);
478 : }
479 :
480 0 : Int MPITransport::get(String &s){
481 : MPI_Status status;
482 : (void) status; // warning: unused status
483 0 : setSourceAndTag(getFrom, myOp);
484 : // Get the length of the string
485 : uInt i;
486 0 : MPI_Recv(&i, 1, MPI_UNSIGNED, getFrom, myOp, MPI_COMM_WORLD, &status);
487 : // Send the characters
488 0 : Char *theChars = new Char[i+1];
489 0 : MPI_Recv(theChars, i, MPI_CHAR, getFrom, myOp, MPI_COMM_WORLD, &status);
490 :
491 0 : *(theChars+i) = 0;
492 0 : s = theChars;
493 0 : delete [] theChars;
494 0 : return(status.MPI_SOURCE);
495 : }
496 :
497 0 : void MPITransport::setSourceAndTag(Int &source, Int &tag)
498 : {
499 : // Set source and tag for MPI_Recv commands
500 : //
501 : // Message tag
502 0 : tag = aTag;
503 :
504 : // Source
505 0 : if (isController()) {
506 0 : source = aWorker;
507 : } else {
508 0 : source = controllerRank();
509 : };
510 0 : return;
511 : };
512 :
513 0 : void MPITransport::setDestAndTag(Int &dest, Int &tag)
514 : {
515 : // Set destination and tag for MPI_Send commands
516 : //
517 : // Message tag
518 0 : if (aTag == anyTag()) {
519 0 : throw(AipsError("No tag set for MPI send command"));
520 : } else {
521 0 : tag = aTag;
522 : };
523 :
524 : // Destination
525 0 : if (isController()) {
526 0 : if (aWorker != anySource()) {
527 0 : dest = aWorker;
528 : } else {
529 0 : throw(AipsError("Invalid destination for MPI send command"));
530 : };
531 : } else {
532 0 : dest = controllerRank();
533 : };
534 0 : return;
535 : };
536 :
537 :
538 : } //# NAMESPACE CASA - END
539 :
540 : #endif
541 :
|