Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_DistributorPlan.cpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
11
12#include "Teuchos_StandardParameterEntryValidators.hpp"
13#include "Tpetra_Util.hpp"
15#include <numeric>
16
17namespace Tpetra {
18namespace Details {
19
20std::string
22{
23 if (sendType == DISTRIBUTOR_ISEND) {
24 return "Isend";
25 }
26 else if (sendType == DISTRIBUTOR_SEND) {
27 return "Send";
28 }
29 else if (sendType == DISTRIBUTOR_ALLTOALL) {
30 return "Alltoall";
31 }
32#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
33 else if (sendType == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
34 return "MpiAdvanceAlltoall";
35 }
36 else if (sendType == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
37 return "MpiAdvanceNbralltoallv";
38 }
39#endif
40 else {
41 TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
42 "EDistributorSendType enum value " << sendType << ".");
43 }
44}
45
46std::string
48{
49 switch (how) {
50 case Details::DISTRIBUTOR_NOT_INITIALIZED:
51 return "Not initialized yet";
52 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
53 return "By createFromSends";
54 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
55 return "By createFromRecvs";
56 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
57 return "By createFromSendsAndRecvs";
58 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
59 return "By createReverseDistributor";
60 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
61 return "By copy constructor";
62 default:
63 return "INVALID";
64 }
65}
66
67DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
68 : comm_(comm),
69#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
70 mpixComm_(Teuchos::null),
71#endif
72 howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
73 reversePlan_(Teuchos::null),
74 sendType_(DISTRIBUTOR_SEND),
75 sendMessageToSelf_(false),
76 numSendsToOtherProcs_(0),
77 maxSendLength_(0),
78 numReceives_(0),
79 totalReceiveLength_(0)
80{ }
81
82DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
83 : comm_(otherPlan.comm_),
84#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
85 mpixComm_(otherPlan.mpixComm_),
86#endif
87 howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
88 reversePlan_(otherPlan.reversePlan_),
89 sendType_(otherPlan.sendType_),
90 sendMessageToSelf_(otherPlan.sendMessageToSelf_),
91 numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
92 procIdsToSendTo_(otherPlan.procIdsToSendTo_),
93 startsTo_(otherPlan.startsTo_),
94 lengthsTo_(otherPlan.lengthsTo_),
95 maxSendLength_(otherPlan.maxSendLength_),
96 indicesTo_(otherPlan.indicesTo_),
97 numReceives_(otherPlan.numReceives_),
98 totalReceiveLength_(otherPlan.totalReceiveLength_),
99 lengthsFrom_(otherPlan.lengthsFrom_),
100 procsFrom_(otherPlan.procsFrom_),
101 startsFrom_(otherPlan.startsFrom_),
102 indicesFrom_(otherPlan.indicesFrom_)
103{ }
104
105size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
106 using Teuchos::outArg;
107 using Teuchos::REDUCE_MAX;
108 using Teuchos::reduceAll;
109 using std::endl;
110 const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
111
112 const size_t numExports = exportProcIDs.size();
113 const int myProcID = comm_->getRank();
114 const int numProcs = comm_->getSize();
115 const bool debug = Details::Behavior::debug("Distributor");
116
117 // exportProcIDs tells us the communication pattern for this
118 // distributor. It dictates the way that the export data will be
119 // interpreted in doPosts(). We want to perform at most one
120 // send per process in doPosts; this is for two reasons:
121 // * minimize latency / overhead in the comm routines (nice)
122 // * match the number of receives and sends between processes
123 // (necessary)
124 //
125 // Teuchos::Comm requires that the data for a send are contiguous
126 // in a send buffer. Therefore, if the data in the send buffer
127 // for doPosts() are not contiguous, they will need to be copied
128 // into a contiguous buffer. The user has specified this
129 // noncontiguous pattern and we can't do anything about it.
130 // However, if they do not provide an efficient pattern, we will
131 // warn them if one of the following compile-time options has been
132 // set:
133 // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
134 //
135 // If the data are contiguous, then we can post the sends in situ
136 // (i.e., without needing to copy them into a send buffer).
137 //
138 // Determine contiguity. There are a number of ways to do this:
139 // * If the export IDs are sorted, then all exports to a
140 // particular proc must be contiguous. This is what Epetra does.
141 // * If the export ID of the current export already has been
142 // listed, then the previous listing should correspond to the
143 // same export. This tests contiguity, but not sortedness.
144 //
145 // Both of these tests require O(n), where n is the number of
146 // exports. However, the latter will positively identify a greater
147 // portion of contiguous patterns. We use the latter method.
148 //
149 // Check to see if values are grouped by procs without gaps
150 // If so, indices_to -> 0.
151
152 if (debug) {
153 // Test whether any process in the communicator got an invalid
154 // process ID. If badID != -1 on this process, then it equals
155 // this process' rank. The max of all badID over all processes
156 // is the max rank which has an invalid process ID.
157 int badID = -1;
158 for (size_t i = 0; i < numExports; ++i) {
159 const int exportID = exportProcIDs[i];
160 if (exportID >= numProcs || exportID < 0) {
161 badID = myProcID;
162 break;
163 }
164 }
165 int gbl_badID;
166 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
167 TEUCHOS_TEST_FOR_EXCEPTION
168 (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
169 << gbl_badID << ", perhaps among other processes, got a bad "
170 "send process ID.");
171 }
172
173 // Set up data structures for quick traversal of arrays.
174 // This contains the number of sends for each process ID.
175 //
176 // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
177 // that create an array of length the number of processes in the
178 // communicator (plus one). Given how this code uses this array,
179 // it should be straightforward to replace it with a hash table or
180 // some other more space-efficient data structure. In practice,
181 // most of the entries of starts should be zero for a sufficiently
182 // large process count, unless the communication pattern is dense.
183 // Note that it's important to be able to iterate through keys (i
184 // for which starts[i] is nonzero) in increasing order.
185 Teuchos::Array<size_t> starts (numProcs + 1, 0);
186
187 // numActive is the number of sends that are not Null
188 size_t numActive = 0;
189 int needSendBuff = 0; // Boolean
190
191 for (size_t i = 0; i < numExports; ++i) {
192 const int exportID = exportProcIDs[i];
193 if (exportID >= 0) {
194 // exportID is a valid process ID. Increment the number of
195 // messages this process will send to that process.
196 ++starts[exportID];
197
198 // If we're sending more than one message to process exportID,
199 // then it is possible that the data are not contiguous.
200 // Check by seeing if the previous process ID in the list
201 // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
202 // because if starts[exportID] > 1, then i must be > 1 (since
203 // the starts array was filled with zeros initially).
204
205 // null entries break continuity.
206 // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
207 if (needSendBuff == 0 && starts[exportID] > 1 &&
208 exportID != exportProcIDs[i-1]) {
209 needSendBuff = 1;
210 }
211 ++numActive;
212 }
213 }
214
215#if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
216 {
217 int global_needSendBuff;
218 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
219 outArg (global_needSendBuff));
221 global_needSendBuff != 0,
222 "::createFromSends: Grouping export IDs together by process rank often "
223 "improves performance.");
224 }
225#endif
226
227 // Determine from the caller's data whether or not the current
228 // process should send (a) message(s) to itself.
229 if (starts[myProcID] != 0) {
230 sendMessageToSelf_ = true;
231 }
232 else {
233 sendMessageToSelf_ = false;
234 }
235
236 if (! needSendBuff) {
237 // grouped by proc, no send buffer or indicesTo_ needed
238 numSendsToOtherProcs_ = 0;
239 // Count total number of sends, i.e., total number of procs to
240 // which we are sending. This includes myself, if applicable.
241 for (int i = 0; i < numProcs; ++i) {
242 if (starts[i]) {
243 ++numSendsToOtherProcs_;
244 }
245 }
246
247 // Not only do we not need these, but we must clear them, as
248 // empty status of indicesTo is a flag used later.
249 indicesTo_.resize(0);
250 // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
251 // includes self sends. Set their values to zeros.
252 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
253 startsTo_.assign(numSendsToOtherProcs_,0);
254 lengthsTo_.assign(numSendsToOtherProcs_,0);
255
256 // set startsTo to the offset for each send (i.e., each proc ID)
257 // set procsTo to the proc ID for each send
258 // in interpreting this code, remember that we are assuming contiguity
259 // that is why index skips through the ranks
260 {
261 size_t procIndex = 0;
262 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
263 while (exportProcIDs[procIndex] < 0) {
264 ++procIndex; // skip all negative proc IDs
265 }
266 startsTo_[i] = procIndex;
267 int procID = exportProcIDs[procIndex];
268 procIdsToSendTo_[i] = procID;
269 procIndex += starts[procID];
270 }
271 }
272 // sort the startsTo and proc IDs together, in ascending order, according
273 // to proc IDs
274 if (numSendsToOtherProcs_ > 0) {
275 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
276 }
277 // compute the maximum send length
278 maxSendLength_ = 0;
279 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
280 int procID = procIdsToSendTo_[i];
281 lengthsTo_[i] = starts[procID];
282 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
283 maxSendLength_ = lengthsTo_[i];
284 }
285 }
286 }
287 else {
288 // not grouped by proc, need send buffer and indicesTo_
289
290 // starts[i] is the number of sends to proc i
291 // numActive equals number of sends total, \sum_i starts[i]
292
293 // this loop starts at starts[1], so explicitly check starts[0]
294 if (starts[0] == 0 ) {
295 numSendsToOtherProcs_ = 0;
296 }
297 else {
298 numSendsToOtherProcs_ = 1;
299 }
300 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
301 im1=starts.begin();
302 i != starts.end(); ++i)
303 {
304 if (*i != 0) ++numSendsToOtherProcs_;
305 *i += *im1;
306 im1 = i;
307 }
308 // starts[i] now contains the number of exports to procs 0 through i
309
310 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
311 i=starts.rbegin()+1;
312 i != starts.rend(); ++i)
313 {
314 *ip1 = *i;
315 ip1 = i;
316 }
317 starts[0] = 0;
318 // starts[i] now contains the number of exports to procs 0 through
319 // i-1, i.e., all procs before proc i
320
321 indicesTo_.resize(numActive);
322
323 for (size_t i = 0; i < numExports; ++i) {
324 if (exportProcIDs[i] >= 0) {
325 // record the offset to the sendBuffer for this export
326 indicesTo_[starts[exportProcIDs[i]]] = i;
327 // now increment the offset for this proc
328 ++starts[exportProcIDs[i]];
329 }
330 }
331 // our send buffer will contain the export data for each of the procs
332 // we communicate with, in order by proc id
333 // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
334 // indicesTo now maps each export to the location in our send buffer
335 // associated with the export
336 // data for export i located at sendBuffer[indicesTo[i]]
337 //
338 // starts[i] once again contains the number of exports to
339 // procs 0 through i
340 for (int proc = numProcs-1; proc != 0; --proc) {
341 starts[proc] = starts[proc-1];
342 }
343 starts.front() = 0;
344 starts[numProcs] = numActive;
345 //
346 // starts[proc] once again contains the number of exports to
347 // procs 0 through proc-1
348 // i.e., the start of my data in the sendBuffer
349
350 // this contains invalid data at procs we don't care about, that is okay
351 procIdsToSendTo_.resize(numSendsToOtherProcs_);
352 startsTo_.resize(numSendsToOtherProcs_);
353 lengthsTo_.resize(numSendsToOtherProcs_);
354
355 // for each group of sends/exports, record the destination proc,
356 // the length, and the offset for this send into the
357 // send buffer (startsTo_)
358 maxSendLength_ = 0;
359 size_t snd = 0;
360 for (int proc = 0; proc < numProcs; ++proc ) {
361 if (starts[proc+1] != starts[proc]) {
362 lengthsTo_[snd] = starts[proc+1] - starts[proc];
363 startsTo_[snd] = starts[proc];
364 // record max length for all off-proc sends
365 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
366 maxSendLength_ = lengthsTo_[snd];
367 }
368 procIdsToSendTo_[snd] = proc;
369 ++snd;
370 }
371 }
372 }
373
374 if (sendMessageToSelf_) {
375 --numSendsToOtherProcs_;
376 }
377
378 // Invert map to see what msgs are received and what length
379 computeReceives();
380
381#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
382 initializeMpiAdvance();
383#endif
384
385 // createFromRecvs() calls createFromSends(), but will set
386 // howInitialized_ again after calling createFromSends().
387 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
388
389 return totalReceiveLength_;
390}
391
392void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
393{
394 createFromSends(remoteProcIDs);
395 *this = *getReversePlan();
396 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
397}
398
399void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
400 const Teuchos::ArrayView<const int>& remoteProcIDs)
401{
402 // note the exportProcIDs and remoteProcIDs _must_ be a list that has
403 // an entry for each GID. If the export/remoteProcIDs is taken from
404 // the getProcs{From|To} lists that are extracted from a previous distributor,
405 // it will generate a wrong answer, because those lists have a unique entry
406 // for each processor id. A version of this with lengthsTo and lengthsFrom
407 // should be made.
408
409 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
410
411 int myProcID = comm_->getRank ();
412 int numProcs = comm_->getSize();
413
414 const size_t numExportIDs = exportProcIDs.size();
415 Teuchos::Array<size_t> starts (numProcs + 1, 0);
416
417 size_t numActive = 0;
418 int needSendBuff = 0; // Boolean
419
420 for(size_t i = 0; i < numExportIDs; i++ )
421 {
422 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
423 needSendBuff = 1;
424 if( exportProcIDs[i] >= 0 )
425 {
426 ++starts[ exportProcIDs[i] ];
427 ++numActive;
428 }
429 }
430
431 sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
432
433 numSendsToOtherProcs_ = 0;
434
435 if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
436 {
437 if (starts[0] == 0 ) {
438 numSendsToOtherProcs_ = 0;
439 }
440 else {
441 numSendsToOtherProcs_ = 1;
442 }
443 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
444 im1=starts.begin();
445 i != starts.end(); ++i)
446 {
447 if (*i != 0) ++numSendsToOtherProcs_;
448 *i += *im1;
449 im1 = i;
450 }
451 // starts[i] now contains the number of exports to procs 0 through i
452
453 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
454 i=starts.rbegin()+1;
455 i != starts.rend(); ++i)
456 {
457 *ip1 = *i;
458 ip1 = i;
459 }
460 starts[0] = 0;
461 // starts[i] now contains the number of exports to procs 0 through
462 // i-1, i.e., all procs before proc i
463
464 indicesTo_.resize(numActive);
465
466 for (size_t i = 0; i < numExportIDs; ++i) {
467 if (exportProcIDs[i] >= 0) {
468 // record the offset to the sendBuffer for this export
469 indicesTo_[starts[exportProcIDs[i]]] = i;
470 // now increment the offset for this proc
471 ++starts[exportProcIDs[i]];
472 }
473 }
474 for (int proc = numProcs-1; proc != 0; --proc) {
475 starts[proc] = starts[proc-1];
476 }
477 starts.front() = 0;
478 starts[numProcs] = numActive;
479 procIdsToSendTo_.resize(numSendsToOtherProcs_);
480 startsTo_.resize(numSendsToOtherProcs_);
481 lengthsTo_.resize(numSendsToOtherProcs_);
482 maxSendLength_ = 0;
483 size_t snd = 0;
484 for (int proc = 0; proc < numProcs; ++proc ) {
485 if (starts[proc+1] != starts[proc]) {
486 lengthsTo_[snd] = starts[proc+1] - starts[proc];
487 startsTo_[snd] = starts[proc];
488 // record max length for all off-proc sends
489 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
490 maxSendLength_ = lengthsTo_[snd];
491 }
492 procIdsToSendTo_[snd] = proc;
493 ++snd;
494 }
495 }
496 }
497 else {
498 // grouped by proc, no send buffer or indicesTo_ needed
499 numSendsToOtherProcs_ = 0;
500 // Count total number of sends, i.e., total number of procs to
501 // which we are sending. This includes myself, if applicable.
502 for (int i = 0; i < numProcs; ++i) {
503 if (starts[i]) {
504 ++numSendsToOtherProcs_;
505 }
506 }
507
508 // Not only do we not need these, but we must clear them, as
509 // empty status of indicesTo is a flag used later.
510 indicesTo_.resize(0);
511 // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
512 // includes self sends. Set their values to zeros.
513 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
514 startsTo_.assign(numSendsToOtherProcs_,0);
515 lengthsTo_.assign(numSendsToOtherProcs_,0);
516
517 // set startsTo to the offset for each send (i.e., each proc ID)
518 // set procsTo to the proc ID for each send
519 // in interpreting this code, remember that we are assuming contiguity
520 // that is why index skips through the ranks
521 {
522 size_t procIndex = 0;
523 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
524 while (exportProcIDs[procIndex] < 0) {
525 ++procIndex; // skip all negative proc IDs
526 }
527 startsTo_[i] = procIndex;
528 int procID = exportProcIDs[procIndex];
529 procIdsToSendTo_[i] = procID;
530 procIndex += starts[procID];
531 }
532 }
533 // sort the startsTo and proc IDs together, in ascending order, according
534 // to proc IDs
535 if (numSendsToOtherProcs_ > 0) {
536 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
537 }
538 // compute the maximum send length
539 maxSendLength_ = 0;
540 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
541 int procID = procIdsToSendTo_[i];
542 lengthsTo_[i] = starts[procID];
543 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
544 maxSendLength_ = lengthsTo_[i];
545 }
546 }
547 }
548
549
550 numSendsToOtherProcs_ -= sendMessageToSelf_;
551 std::vector<int> recv_list;
552 recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
553
554 int last_pid=-2;
555 for(int i=0; i<remoteProcIDs.size(); i++) {
556 if(remoteProcIDs[i]>last_pid) {
557 recv_list.push_back(remoteProcIDs[i]);
558 last_pid = remoteProcIDs[i];
559 }
560 else if (remoteProcIDs[i]<last_pid)
561 throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
562 }
563 numReceives_ = recv_list.size();
564 if(numReceives_) {
565 procsFrom_.assign(numReceives_,0);
566 lengthsFrom_.assign(numReceives_,0);
567 indicesFrom_.assign(numReceives_,0);
568 startsFrom_.assign(numReceives_,0);
569 }
570 for(size_t i=0,j=0; i<numReceives_; ++i) {
571 int jlast=j;
572 procsFrom_[i] = recv_list[i];
573 startsFrom_[i] = j;
574 for( ; j<(size_t)remoteProcIDs.size() &&
575 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
576 lengthsFrom_[i] = j-jlast;
577 }
578 totalReceiveLength_ = remoteProcIDs.size();
579 indicesFrom_.clear ();
580 numReceives_-=sendMessageToSelf_;
581
582#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
583 initializeMpiAdvance();
584#endif
585}
586
587Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
588 if (reversePlan_.is_null()) createReversePlan();
589 return reversePlan_;
590}
591
592void DistributorPlan::createReversePlan() const
593{
594 reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
595 reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
596 reversePlan_->sendType_ = sendType_;
597
598 // The total length of all the sends of this DistributorPlan. We
599 // calculate it because it's the total length of all the receives
600 // of the reverse DistributorPlan.
601 size_t totalSendLength =
602 std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
603
604 // The maximum length of any of the receives of this DistributorPlan.
605 // We calculate it because it's the maximum length of any of the
606 // sends of the reverse DistributorPlan.
607 size_t maxReceiveLength = 0;
608 const int myProcID = comm_->getRank();
609 for (size_t i=0; i < numReceives_; ++i) {
610 if (procsFrom_[i] != myProcID) {
611 // Don't count receives for messages sent by myself to myself.
612 if (lengthsFrom_[i] > maxReceiveLength) {
613 maxReceiveLength = lengthsFrom_[i];
614 }
615 }
616 }
617
618 reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
619 reversePlan_->numSendsToOtherProcs_ = numReceives_;
620 reversePlan_->procIdsToSendTo_ = procsFrom_;
621 reversePlan_->startsTo_ = startsFrom_;
622 reversePlan_->lengthsTo_ = lengthsFrom_;
623 reversePlan_->maxSendLength_ = maxReceiveLength;
624 reversePlan_->indicesTo_ = indicesFrom_;
625 reversePlan_->numReceives_ = numSendsToOtherProcs_;
626 reversePlan_->totalReceiveLength_ = totalSendLength;
627 reversePlan_->lengthsFrom_ = lengthsTo_;
628 reversePlan_->procsFrom_ = procIdsToSendTo_;
629 reversePlan_->startsFrom_ = startsTo_;
630 reversePlan_->indicesFrom_ = indicesTo_;
631
632#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
633 // is there a smarter way to do this
634 reversePlan_->initializeMpiAdvance();
635#endif
636}
637
638void DistributorPlan::computeReceives()
639{
640 using Teuchos::Array;
641 using Teuchos::ArrayRCP;
642 using Teuchos::as;
643 using Teuchos::CommStatus;
644 using Teuchos::CommRequest;
645 using Teuchos::ireceive;
646 using Teuchos::RCP;
647 using Teuchos::rcp;
648 using Teuchos::REDUCE_SUM;
649 using Teuchos::receive;
650 using Teuchos::reduce;
651 using Teuchos::scatter;
652 using Teuchos::send;
653 using Teuchos::waitAll;
654
655 const int myRank = comm_->getRank();
656 const int numProcs = comm_->getSize();
657
658 const int mpiTag = DEFAULT_MPI_TAG;
659
660 // toProcsFromMe[i] == the number of messages sent by this process
661 // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
662 // concern the contiguous sends. Therefore, each process will be
663 // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
664 // either be 0 or 1.
665 {
666 Array<int> toProcsFromMe (numProcs, 0);
667#ifdef HAVE_TPETRA_DEBUG
668 bool counting_error = false;
669#endif // HAVE_TPETRA_DEBUG
670 for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
671#ifdef HAVE_TPETRA_DEBUG
672 if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
673 counting_error = true;
674 }
675#endif // HAVE_TPETRA_DEBUG
676 toProcsFromMe[procIdsToSendTo_[i]] = 1;
677 }
678#ifdef HAVE_TPETRA_DEBUG
679 // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
680 SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
681 "Tpetra::Distributor::computeReceives: There was an error on at least "
682 "one process in counting the number of messages send by that process to "
683 "the other processs. Please report this bug to the Tpetra developers.",
684 *comm_);
685#endif // HAVE_TPETRA_DEBUG
686
687 // Compute the number of receives that this process needs to
688 // post. The number of receives includes any self sends (i.e.,
689 // messages sent by this process to itself).
690 //
691 // (We will use numReceives_ this below to post exactly that
692 // number of receives, with MPI_ANY_SOURCE as the sending rank.
693 // This will tell us from which processes this process expects
694 // to receive, and how many packets of data we expect to receive
695 // from each process.)
696 //
697 // toProcsFromMe[i] is the number of messages sent by this
698 // process to process i. Compute the sum (elementwise) of all
699 // the toProcsFromMe arrays on all processes in the
700 // communicator. If the array x is that sum, then if this
701 // process has rank j, x[j] is the number of messages sent
702 // to process j, that is, the number of receives on process j
703 // (including any messages sent by process j to itself).
704 //
705 // Yes, this requires storing and operating on an array of
706 // length P, where P is the number of processes in the
707 // communicator. Epetra does this too. Avoiding this O(P)
708 // memory bottleneck would require some research.
709 //
710 // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
711 // implement this O(P) memory algorithm.
712 //
713 // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
714 // process (0) from toProcsFromMe, to numRecvsOnEachProc.
715 // Then, scatter the latter, so that each process p gets
716 // numRecvsOnEachProc[p].
717 //
718 // 2. Like #1, but use MPI_Reduce_scatter instead of
719 // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
720 // optimized to reduce the number of messages, but
721 // MPI_Reduce_scatter is more general than we need (it
722 // allows the equivalent of MPI_Scatterv). See Bug 6336.
723 //
724 // 3. Do an all-reduce on toProcsFromMe, and let my process
725 // (with rank myRank) get numReceives_ from
726 // toProcsFromMe[myRank]. The HPCCG miniapp uses the
727 // all-reduce method.
728 //
729 // Approaches 1 and 3 have the same critical path length.
730 // However, #3 moves more data. This is because the final
731 // result is just one integer, but #3 moves a whole array of
732 // results to all the processes. This is why we use Approach 1
733 // here.
734 //
735 // mfh 12 Apr 2013: See discussion in createFromSends() about
736 // how we could use this communication to propagate an error
737 // flag for "free" in a release build.
738
739 const int root = 0; // rank of root process of the reduction
740 Array<int> numRecvsOnEachProc; // temp; only needed on root
741 if (myRank == root) {
742 numRecvsOnEachProc.resize (numProcs);
743 }
744 int numReceivesAsInt = 0; // output
745 reduce<int, int> (toProcsFromMe.getRawPtr (),
746 numRecvsOnEachProc.getRawPtr (),
747 numProcs, REDUCE_SUM, root, *comm_);
748 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
749 &numReceivesAsInt, 1, root, *comm_);
750 numReceives_ = static_cast<size_t> (numReceivesAsInt);
751 }
752
753 // Now we know numReceives_, which is this process' number of
754 // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
755 // with this number of entries.
756 lengthsFrom_.assign (numReceives_, 0);
757 procsFrom_.assign (numReceives_, 0);
758
759 //
760 // Ask (via nonblocking receive) each process from which we are
761 // receiving how many packets we should expect from it in the
762 // communication pattern.
763 //
764
765 // At this point, numReceives_ includes any self message that
766 // there may be. At the end of this routine, we'll subtract off
767 // the self message (if there is one) from numReceives_. In this
768 // routine, we don't need to receive a message from ourselves in
769 // order to figure out our lengthsFrom_ and source process ID; we
770 // can just ask ourselves directly. Thus, the actual number of
771 // nonblocking receives we post here does not include the self
772 // message.
773 const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
774
775 // Teuchos' wrapper for nonblocking receives requires receive
776 // buffers that it knows won't go away. This is why we use RCPs,
777 // one RCP per nonblocking receive request. They get allocated in
778 // the loop below.
779 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
780 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
781 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
782
783 // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
784 // (receive data from any process).
785#ifdef HAVE_MPI
786 const int anySourceProc = MPI_ANY_SOURCE;
787#else
788 const int anySourceProc = -1;
789#endif
790
791 // Post the (nonblocking) receives.
792 for (size_t i = 0; i < actualNumReceives; ++i) {
793 // Once the receive completes, we can ask the corresponding
794 // CommStatus object (output by wait()) for the sending process'
795 // ID (which we'll assign to procsFrom_[i] -- don't forget to
796 // do that!).
797 lengthsFromBuffers[i].resize (1);
798 lengthsFromBuffers[i][0] = as<size_t> (0);
799 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
800 mpiTag, *comm_);
801 }
802
803 // Post the sends: Tell each process to which we are sending how
804 // many packets it should expect from us in the communication
805 // pattern. We could use nonblocking sends here, as long as we do
806 // a waitAll() on all the sends and receives at once.
807 //
808 // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
809 // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
810 // not include any message that it might send to itself.
811 for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
812 if (procIdsToSendTo_[i] != myRank) {
813 // Send a message to procIdsToSendTo_[i], telling that process that
814 // this communication pattern will send that process
815 // lengthsTo_[i] blocks of packets.
816 const size_t* const lengthsTo_i = &lengthsTo_[i];
817 send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
818 }
819 else {
820 // We don't need a send in the self-message case. If this
821 // process will send a message to itself in the communication
822 // pattern, then the last element of lengthsFrom_ and
823 // procsFrom_ corresponds to the self-message. Of course
824 // this process knows how long the message is, and the process
825 // ID is its own process ID.
826 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
827 procsFrom_[numReceives_-1] = myRank;
828 }
829 }
830
831 //
832 // Wait on all the receives. When they arrive, check the status
833 // output of wait() for the receiving process ID, unpack the
834 // request buffers into lengthsFrom_, and set procsFrom_ from the
835 // status.
836 //
837 waitAll (*comm_, requests (), statuses ());
838 for (size_t i = 0; i < actualNumReceives; ++i) {
839 lengthsFrom_[i] = *lengthsFromBuffers[i];
840 procsFrom_[i] = statuses[i]->getSourceRank ();
841 }
842
843 // Sort the procsFrom_ array, and apply the same permutation to
844 // lengthsFrom_. This ensures that procsFrom_[i] and
845 // lengthsFrom_[i] refers to the same thing.
846 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
847
848 // Compute indicesFrom_
849 totalReceiveLength_ =
850 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
851 indicesFrom_.clear ();
852
853 startsFrom_.clear ();
854 startsFrom_.reserve (numReceives_);
855 for (size_t i = 0, j = 0; i < numReceives_; ++i) {
856 startsFrom_.push_back(j);
857 j += lengthsFrom_[i];
858 }
859
860 if (sendMessageToSelf_) {
861 --numReceives_;
862 }
863}
864
865void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
866{
867 using Teuchos::FancyOStream;
868 using Teuchos::getIntegralValue;
869 using Teuchos::ParameterList;
870 using Teuchos::parameterList;
871 using Teuchos::RCP;
872 using std::endl;
873
874 if (! plist.is_null()) {
875 RCP<const ParameterList> validParams = getValidParameters ();
876 plist->validateParametersAndSetDefaults (*validParams);
877
878 const Details::EDistributorSendType sendType =
879 getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
880
881 // Now that we've validated the input list, save the results.
882 sendType_ = sendType;
883
884 // ParameterListAcceptor semantics require pointer identity of the
885 // sublist passed to setParameterList(), so we save the pointer.
886 this->setMyParamList (plist);
887 }
888}
889
890Teuchos::Array<std::string> distributorSendTypes()
891{
892 Teuchos::Array<std::string> sendTypes;
893 sendTypes.push_back ("Isend");
894 sendTypes.push_back ("Send");
895 sendTypes.push_back ("Alltoall");
896#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
897 sendTypes.push_back ("MpiAdvanceAlltoall");
898 sendTypes.push_back ("MpiAdvanceNbralltoallv");
899#endif
900 return sendTypes;
901}
902
903Teuchos::RCP<const Teuchos::ParameterList>
904DistributorPlan::getValidParameters() const
905{
906 using Teuchos::Array;
907 using Teuchos::ParameterList;
908 using Teuchos::parameterList;
909 using Teuchos::RCP;
910 using Teuchos::setStringToIntegralParameter;
911
912 Array<std::string> sendTypes = distributorSendTypes ();
913 const std::string defaultSendType ("Send");
914 Array<Details::EDistributorSendType> sendTypeEnums;
915 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
916 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
917 sendTypeEnums.push_back (Details::DISTRIBUTOR_ALLTOALL);
918#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
919 sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL);
920 sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV);
921#endif
922
923 RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
924
925 setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
926 defaultSendType, "When using MPI, the variant of send to use in "
927 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
928 plist->set ("Timer Label","","Label for Time Monitor output");
929
930 return Teuchos::rcp_const_cast<const ParameterList> (plist);
931}
932
933#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
934
935// Used by Teuchos::RCP to clean up an owned MPIX_Comm*
936struct MpixCommDeallocator {
937 void free(MPIX_Comm **comm) const {
938 MPIX_Comm_free(*comm);
939 }
940};
941
942void DistributorPlan::initializeMpiAdvance() {
943
944 // assert the mpix communicator is null. if this is not the case we will figure out why
945 TEUCHOS_ASSERT(mpixComm_.is_null());
946
947 // use the members to initialize the graph for neightborhood mode, or just the MPIX communicator for non-neighborhood mode
948 Teuchos::RCP<const Teuchos::MpiComm<int> > mpiComm = Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm_);
949 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawComm = mpiComm->getRawMpiComm();
950 int err = 0;
951 if (sendType_ == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
952 MPIX_Comm **mpixComm = new(MPIX_Comm*);
953 err = MPIX_Comm_init(mpixComm, (*rawComm)());
954 mpixComm_ = Teuchos::RCP(mpixComm,
955 MpixCommDeallocator(),
956 true /*take ownership*/
957 );
958 }
959 else if (sendType_ == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
960 int numRecvs = (int)(numReceives_ + (sendMessageToSelf_ ? 1 : 0));
961 int *sourceRanks = procsFrom_.data();
962
963 // int *sourceWeights = static_cast<int*>(lengthsFrom_.data());// lengthsFrom_ may not be int
964 const int *sourceWeights = MPI_UNWEIGHTED;
965 int numSends = (int)(numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0));
966 int *destRanks = procIdsToSendTo_.data();
967
968 // int *destWeights = static_cast<int*>(lengthsTo_.data()); // lengthsTo_ may not be int
969 const int *destWeights = MPI_UNWEIGHTED; // lengthsTo_ may not be int
970
971 MPIX_Comm **mpixComm = new(MPIX_Comm*);
972 err = MPIX_Dist_graph_create_adjacent((*rawComm)(), numRecvs, sourceRanks, sourceWeights, numSends, destRanks, destWeights, MPI_INFO_NULL, false, mpixComm);
973 mpixComm_ = Teuchos::RCP(mpixComm,
974 MpixCommDeallocator(),
975 true /*take ownership*/
976 );
977 }
978
979 TEUCHOS_ASSERT(err == 0);
980}
981#endif
982
983}
984}
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra's behavior.
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
static bool debug()
Whether Tpetra is in debug mode.
Nonmember function that computes a residual Computes R = B - A * X.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2, const bool stableSort=false)
Sort the first array, and apply the resulting permutation to the second array.