Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_DistributorActor.hpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10// clang-format off
11
12#ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
13#define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
14
16#include "Tpetra_Util.hpp"
17
18#include "Teuchos_Array.hpp"
19#include "Teuchos_Comm.hpp"
21#include "Teuchos_RCP.hpp"
22#include "Teuchos_Time.hpp"
23
24#include "Kokkos_TeuchosCommAdapters.hpp"
25
26#ifdef HAVE_TPETRA_MPI
27#include "mpi.h"
28#endif
29
30namespace Tpetra {
31namespace Details {
32
33template <class View1, class View2>
34constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
35
36class DistributorActor {
37 static constexpr int DEFAULT_MPI_TAG = 1;
38
39public:
40 DistributorActor();
41 DistributorActor(const DistributorActor& otherActor);
42
43 template <class ExpView, class ImpView>
44 void doPostsAndWaits(const DistributorPlan& plan,
45 const ExpView &exports,
46 size_t numPackets,
47 const ImpView &imports);
48
49 template <class ExpView, class ImpView>
50 void doPostsAndWaits(const DistributorPlan& plan,
51 const ExpView &exports,
52 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
53 const ImpView &imports,
54 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
55
56 template <class ExpView, class ImpView>
57 void doPosts(const DistributorPlan& plan,
58 const ExpView& exports,
59 size_t numPackets,
60 const ImpView& imports);
61
62 template <class ExpView, class ImpView>
63 void doPosts(const DistributorPlan& plan,
64 const ExpView &exports,
65 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
66 const ImpView &imports,
67 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
68
69 void doWaits(const DistributorPlan& plan);
70
71 bool isReady() const;
72
73private:
74// clang-format on
75#ifdef HAVE_TPETRA_MPI
76 template <class ExpView, class ImpView>
77 void doPostsAllToAll(const DistributorPlan &plan, const ExpView &exports,
78 size_t numPackets, const ImpView &imports);
79
80 template <class ExpView, class ImpView>
81 void doPostsAllToAll(
82 const DistributorPlan &plan, const ExpView &exports,
83 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
84 const ImpView &imports,
85 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
86
87#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
88 template <class ExpView, class ImpView>
89 void doPostsNbrAllToAllV(const DistributorPlan &plan, const ExpView &exports,
90 size_t numPackets, const ImpView &imports);
91
92 template <class ExpView, class ImpView>
93 void doPostsNbrAllToAllV(
94 const DistributorPlan &plan, const ExpView &exports,
95 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
96 const ImpView &imports,
97 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
98#endif // HAVE_TPETRACORE_MPI_ADVANCE
99#endif // HAVE_TPETRA_CORE
100 // clang-format off
101 int mpiTag_;
102
103 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
104
105#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
106 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
107 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
108 Teuchos::RCP<Teuchos::Time> timer_doWaits_;
109 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
110 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
111 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
112 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
113 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
114 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
115 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
116 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
117 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
118 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
119
121 void makeTimers();
122#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
123};
124
125template <class ExpView, class ImpView>
126void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
127 const ExpView& exports,
128 size_t numPackets,
129 const ImpView& imports)
130{
131 static_assert(areKokkosViews<ExpView, ImpView>,
132 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
133 doPosts(plan, exports, numPackets, imports);
134 doWaits(plan);
135}
136
137template <class ExpView, class ImpView>
138void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
139 const ExpView& exports,
140 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
141 const ImpView& imports,
142 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
143{
144 static_assert(areKokkosViews<ExpView, ImpView>,
145 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
146 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
147 doWaits(plan);
148}
149
150template <typename ViewType>
151using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
152
153template <typename DstViewType, typename SrcViewType>
154using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
155 HostAccessibility<SrcViewType>::accessible>;
156
157template <typename DstViewType, typename SrcViewType>
158using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
159 !HostAccessibility<SrcViewType>::accessible>;
160
161template <typename DstViewType, typename SrcViewType>
162enableIfHostAccessible<DstViewType, SrcViewType>
163packOffset(const DstViewType& dst,
164 const SrcViewType& src,
165 const size_t dst_offset,
166 const size_t src_offset,
167 const size_t size)
168{
169 memcpy((void*) (dst.data()+dst_offset), src.data()+src_offset, size*sizeof(typename DstViewType::value_type));
170}
171
172template <typename DstViewType, typename SrcViewType>
173enableIfNotHostAccessible<DstViewType, SrcViewType>
174packOffset(const DstViewType& dst,
175 const SrcViewType& src,
176 const size_t dst_offset,
177 const size_t src_offset,
178 const size_t size)
179{
180 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
181}
182
183// clang-format on
184#ifdef HAVE_TPETRA_MPI
185template <class ExpView, class ImpView>
186void DistributorActor::doPostsAllToAll(const DistributorPlan &plan,
187 const ExpView &exports,
188 size_t numPackets,
189 const ImpView &imports) {
190 using size_type = Teuchos::Array<size_t>::size_type;
191
192 TEUCHOS_TEST_FOR_EXCEPTION(
193 !plan.getIndicesTo().is_null(), std::runtime_error,
194 "Send Type=\"Alltoall\" only works for fast-path communication.");
195
196 auto comm = plan.getComm();
197 const int myRank = comm->getRank();
198 std::vector<int> sendcounts(comm->getSize(), 0);
199 std::vector<int> sdispls(comm->getSize(), 0);
200 std::vector<int> recvcounts(comm->getSize(), 0);
201 std::vector<int> rdispls(comm->getSize(), 0);
202
203 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
204 for (size_t p = 0; p < numBlocks; ++p) {
205 sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
206 size_t sendcount = plan.getLengthsTo()[p] * numPackets;
207 // sendcount is converted down to int, so make sure it can be represented
208 TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
209 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
210 "Send count for block "
211 << p << " (" << sendcount
212 << ") is too large "
213 "to be represented as int.");
214 sendcounts[plan.getProcsTo()[p]] = static_cast<int>(sendcount);
215 }
216
217 const size_type actualNumReceives =
218 Teuchos::as<size_type>(plan.getNumReceives()) +
219 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
220 size_t curBufferOffset = 0;
221 for (size_type i = 0; i < actualNumReceives; ++i) {
222 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
223 TEUCHOS_TEST_FOR_EXCEPTION(
224 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
225 std::logic_error,
226 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
227 "Exceeded size of 'imports' array in packing loop on Process "
228 << myRank << ". imports.size() = " << imports.size()
229 << " < "
230 "curBufferOffset("
231 << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
232 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
233 // curBufLen is converted down to int, so make sure it can be represented
234 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
235 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
236 "Recv count for receive "
237 << i << " (" << curBufLen
238 << ") is too large "
239 "to be represented as int.");
240 recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(curBufLen);
241 curBufferOffset += curBufLen;
242 }
243
244 using T = typename ExpView::non_const_value_type;
245 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
246
247#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
248 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
249 MPIX_Comm *mpixComm = *plan.getMPIXComm();
250 TEUCHOS_TEST_FOR_EXCEPTION(
251 !mpixComm, std::runtime_error,
252 "plan's MPIX_Comm null in doPostsAllToAll, but "
253 "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
254 << DistributorHowInitializedEnumToString(plan.howInitialized()));
255
256 const int err = MPIX_Alltoallv(
257 exports.data(), sendcounts.data(), sdispls.data(), rawType,
258 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
259
260 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
261 "MPIX_Alltoallv failed with error \""
262 << Teuchos::mpiErrorCodeToString(err)
263 << "\".");
264
265 return;
266 }
267#endif
268 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
269 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
270 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
271 mpiComm->getRawMpiComm();
272
273 const int err = MPI_Alltoallv(
274 exports.data(), sendcounts.data(), sdispls.data(), rawType,
275 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
276
277 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
278 "MPI_Alltoallv failed with error \""
279 << Teuchos::mpiErrorCodeToString(err)
280 << "\".");
281
282 return;
283}
284
285#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
286template <class ExpView, class ImpView>
287void DistributorActor::doPostsNbrAllToAllV(const DistributorPlan &plan,
288 const ExpView &exports,
289 size_t numPackets,
290 const ImpView &imports) {
291 TEUCHOS_TEST_FOR_EXCEPTION(
292 !plan.getIndicesTo().is_null(), std::runtime_error,
293 "Send Type=\"Alltoall\" only works for fast-path communication.");
294
295 const int myRank = plan.getComm()->getRank();
296 MPIX_Comm *mpixComm = *plan.getMPIXComm();
297
298 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
299 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
300 std::vector<int> sendcounts(numSends, 0);
301 std::vector<int> sdispls(numSends, 0);
302 std::vector<int> recvcounts(numRecvs, 0);
303 std::vector<int> rdispls(numRecvs, 0);
304
305 for (size_t p = 0; p < numSends; ++p) {
306 sdispls[p] = plan.getStartsTo()[p] * numPackets;
307 const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
308 // sendcount is converted down to int, so make sure it can be represented
309 TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
310 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
311 "Send count for block "
312 << p << " (" << sendcount
313 << ") is too large "
314 "to be represented as int.");
315 sendcounts[p] = static_cast<int>(sendcount);
316 }
317
318 size_t curBufferOffset = 0;
319 for (size_t i = 0; i < numRecvs; ++i) {
320 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
321 TEUCHOS_TEST_FOR_EXCEPTION(
322 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
323 std::logic_error,
324 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
325 "Exceeded size of 'imports' array in packing loop on Process "
326 << myRank << ". imports.size() = " << imports.size()
327 << " < "
328 "curBufferOffset("
329 << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
330 rdispls[i] = curBufferOffset;
331 // curBufLen is converted down to int, so make sure it can be represented
332 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
333 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
334 "Recv count for receive "
335 << i << " (" << curBufLen
336 << ") is too large "
337 "to be represented as int.");
338 recvcounts[i] = static_cast<int>(curBufLen);
339 curBufferOffset += curBufLen;
340 }
341
342 using T = typename ExpView::non_const_value_type;
343 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
344
345 const int err = MPIX_Neighbor_alltoallv(
346 exports.data(), sendcounts.data(), sdispls.data(), rawType,
347 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
348
349 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
350 "MPIX_Neighbor_alltoallv failed with error \""
351 << Teuchos::mpiErrorCodeToString(err)
352 << "\".");
353}
354#endif // HAVE_TPETRACORE_MPI_ADVANCE
355#endif // HAVE_TPETRA_MPI
356// clang-format off
357
358template <class ExpView, class ImpView>
359void DistributorActor::doPosts(const DistributorPlan& plan,
360 const ExpView& exports,
361 size_t numPackets,
362 const ImpView& imports)
363{
364 static_assert(areKokkosViews<ExpView, ImpView>,
365 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
366 using Teuchos::Array;
367 using Teuchos::as;
368 using Teuchos::FancyOStream;
369 using Teuchos::includesVerbLevel;
370 using Teuchos::ireceive;
371 using Teuchos::isend;
372 using Teuchos::send;
373 using Teuchos::TypeNameTraits;
374 using Teuchos::typeName;
375 using std::endl;
376 using Kokkos::Compat::create_const_view;
377 using Kokkos::Compat::create_view;
378 using Kokkos::Compat::subview_offset;
379 using Kokkos::Compat::deep_copy_offset;
380 typedef Array<size_t>::size_type size_type;
381 typedef ExpView exports_view_type;
382 typedef ImpView imports_view_type;
383
384#ifdef KOKKOS_ENABLE_CUDA
385 static_assert
386 (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
387 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
388 "Please do not use Tpetra::Distributor with UVM allocations. "
389 "See Trilinos GitHub issue #1088.");
390#endif // KOKKOS_ENABLE_CUDA
391
392#ifdef KOKKOS_ENABLE_SYCL
393 static_assert
394 (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
395 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
396 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
397 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
398#endif // KOKKOS_ENABLE_SYCL
399
400#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
401 Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
402#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
403
404 const int myRank = plan.getComm()->getRank ();
405 // Run-time configurable parameters that come from the input
406 // ParameterList set by setParameterList().
407 const Details::EDistributorSendType sendType = plan.getSendType();
408
409//clang-format on
410#if defined(HAVE_TPETRA_MPI)
411 // All-to-all communication layout is quite different from
412 // point-to-point, so we handle it separately.
413
414 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
415 doPostsAllToAll(plan, exports,numPackets, imports);
416 return;
417 }
418#ifdef HAVE_TPETRACORE_MPI_ADVANCE
419 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
420 doPostsAllToAll(plan, exports,numPackets, imports);
421 return;
422 } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
423 doPostsNbrAllToAllV(plan, exports,numPackets, imports);
424 return;
425 }
426#endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
427// clang-format off
428
429#else // HAVE_TPETRA_MPI
430 if (plan.hasSelfMessage()) {
431 // This is how we "send a message to ourself": we copy from
432 // the export buffer to the import buffer. That saves
433 // Teuchos::Comm implementations other than MpiComm (in
434 // particular, SerialComm) the trouble of implementing self
435 // messages correctly. (To do this right, SerialComm would
436 // need internal buffer space for messages, keyed on the
437 // message's tag.)
438 size_t selfReceiveOffset = 0;
439 deep_copy_offset(imports, exports, selfReceiveOffset,
440 plan.getStartsTo()[0]*numPackets,
441 plan.getLengthsTo()[0]*numPackets);
442 }
443 // should we just return here?
444 // likely not as comm could be a serial comm
445#endif // HAVE_TPETRA_MPI
446
447 size_t selfReceiveOffset = 0;
448
449#ifdef HAVE_TPETRA_DEBUG
450 TEUCHOS_TEST_FOR_EXCEPTION
451 (requests_.size () != 0,
452 std::logic_error,
453 "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
454 << myRank << ": requests_.size() = " << requests_.size () << " != 0.");
455#endif // HAVE_TPETRA_DEBUG
456
457 // Distributor uses requests_.size() as the number of outstanding
458 // nonblocking message requests, so we resize to zero to maintain
459 // this invariant.
460 //
461 // getNumReceives() does _not_ include the self message, if there is
462 // one. Here, we do actually send a message to ourselves, so we
463 // include any self message in the "actual" number of receives to
464 // post.
465 //
466 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
467 // doesn't (re)allocate its array of requests. That happens in
468 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
469 // demand), or Resize_().
470 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
471 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
472 requests_.resize (0);
473
474 // Post the nonblocking receives. It's common MPI wisdom to post
475 // receives before sends. In MPI terms, this means favoring
476 // adding to the "posted queue" (of receive requests) over adding
477 // to the "unexpected queue" (of arrived messages not yet matched
478 // with a receive).
479 {
480#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
481 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
482#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
483
484 size_t curBufferOffset = 0;
485 for (size_type i = 0; i < actualNumReceives; ++i) {
486 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
487 if (plan.getProcsFrom()[i] != myRank) {
488 // If my process is receiving these packet(s) from another
489 // process (not a self-receive):
490 //
491 // 1. Set up the persisting view (recvBuf) of the imports
492 // array, given the offset and size (total number of
493 // packets from process getProcsFrom()[i]).
494 // 2. Start the Irecv and save the resulting request.
495 TEUCHOS_TEST_FOR_EXCEPTION(
496 curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
497 std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
498 "Exceeded size of 'imports' array in packing loop on Process " <<
499 myRank << ". imports.size() = " << imports.size () << " < "
500 "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
501 curBufLen << ").");
502 imports_view_type recvBuf =
503 subview_offset (imports, curBufferOffset, curBufLen);
504 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
505 mpiTag_, *plan.getComm()));
506 }
507 else { // Receiving from myself
508 selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
509 }
510 curBufferOffset += curBufLen;
511 }
512 }
513
514#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
515 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
516#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
517
518 // setup scan through getProcsTo() list starting with higher numbered procs
519 // (should help balance message traffic)
520 //
521 // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
522 // It doesn't depend on the input at all.
523 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
524 size_t procIndex = 0;
525 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
526 ++procIndex;
527 }
528 if (procIndex == numBlocks) {
529 procIndex = 0;
530 }
531
532 size_t selfNum = 0;
533 size_t selfIndex = 0;
534
535 if (plan.getIndicesTo().is_null()) {
536
537#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
538 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
539#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
540
541 // Data are already blocked (laid out) by process, so we don't
542 // need a separate send buffer (besides the exports array).
543 for (size_t i = 0; i < numBlocks; ++i) {
544 size_t p = i + procIndex;
545 if (p > (numBlocks - 1)) {
546 p -= numBlocks;
547 }
548
549 if (plan.getProcsTo()[p] != myRank) {
550 exports_view_type tmpSend = subview_offset(
551 exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
552
553 if (sendType == Details::DISTRIBUTOR_ISEND) {
554 // NOTE: This looks very similar to the tmpSend above, but removing
555 // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
556 // SerialNode builds
557 exports_view_type tmpSendBuf =
558 subview_offset (exports, plan.getStartsTo()[p] * numPackets,
559 plan.getLengthsTo()[p] * numPackets);
560 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
561 mpiTag_, *plan.getComm()));
562 }
563 else { // DISTRIBUTOR_SEND
564 send<int> (tmpSend,
565 as<int> (tmpSend.size ()),
566 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
567 }
568 }
569 else { // "Sending" the message to myself
570 selfNum = p;
571 }
572 }
573
574 if (plan.hasSelfMessage()) {
575 // This is how we "send a message to ourself": we copy from
576 // the export buffer to the import buffer. That saves
577 // Teuchos::Comm implementations other than MpiComm (in
578 // particular, SerialComm) the trouble of implementing self
579 // messages correctly. (To do this right, SerialComm would
580 // need internal buffer space for messages, keyed on the
581 // message's tag.)
582 deep_copy_offset(imports, exports, selfReceiveOffset,
583 plan.getStartsTo()[selfNum]*numPackets,
584 plan.getLengthsTo()[selfNum]*numPackets);
585 }
586
587 }
588 else { // data are not blocked by proc, use send buffer
589
590#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
591 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
592#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
593
594 typedef typename ExpView::non_const_value_type Packet;
595 typedef typename ExpView::array_layout Layout;
596 typedef typename ExpView::device_type Device;
597 typedef typename ExpView::memory_traits Mem;
598
599 // This buffer is long enough for only one message at a time.
600 // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
601 // of sendType requested by user.
602 // This code path formerly errored out with message:
603 // Tpetra::Distributor::doPosts(3 args, Kokkos):
604 // The "send buffer" code path
605 // doesn't currently work with nonblocking sends.
606 // Now, we opt to just do the communication in a way that works.
607#ifdef HAVE_TPETRA_DEBUG
608 if (sendType != Details::DISTRIBUTOR_SEND) {
609 if (plan.getComm()->getRank() == 0)
610 std::cout << "The requested Tpetra send type "
612 << " requires Distributor data to be ordered by"
613 << " the receiving processor rank. Since these"
614 << " data are not ordered, Tpetra will use Send"
615 << " instead." << std::endl;
616 }
617#endif
618
619 Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
620 plan.getMaxSendLength() * numPackets);
621
622 for (size_t i = 0; i < numBlocks; ++i) {
623 size_t p = i + procIndex;
624 if (p > (numBlocks - 1)) {
625 p -= numBlocks;
626 }
627
628 if (plan.getProcsTo()[p] != myRank) {
629 size_t sendArrayOffset = 0;
630 size_t j = plan.getStartsTo()[p];
631 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
632 packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
633 sendArrayOffset += numPackets;
634 }
635 typename ExpView::execution_space().fence();
636
637 ImpView tmpSend =
638 subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
639
640 send<int> (tmpSend,
641 as<int> (tmpSend.size ()),
642 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
643 }
644 else { // "Sending" the message to myself
645 selfNum = p;
646 selfIndex = plan.getStartsTo()[p];
647 }
648 }
649
650 if (plan.hasSelfMessage()) {
651 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
652 packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
653 ++selfIndex;
654 selfReceiveOffset += numPackets;
655 }
656 }
657 }
658}
659
660// clang-format on
661#ifdef HAVE_TPETRA_MPI
662template <class ExpView, class ImpView>
663void DistributorActor::doPostsAllToAll(
664 const DistributorPlan &plan, const ExpView &exports,
665 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
666 const ImpView &imports,
667 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
668 TEUCHOS_TEST_FOR_EXCEPTION(
669 !plan.getIndicesTo().is_null(), std::runtime_error,
670 "Send Type=\"Alltoall\" only works for fast-path communication.");
671
672 using size_type = Teuchos::Array<size_t>::size_type;
673
674 auto comm = plan.getComm();
675 std::vector<int> sendcounts(comm->getSize(), 0);
676 std::vector<int> sdispls(comm->getSize(), 0);
677 std::vector<int> recvcounts(comm->getSize(), 0);
678 std::vector<int> rdispls(comm->getSize(), 0);
679
680 size_t curPKToffset = 0;
681 for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
682 sdispls[plan.getProcsTo()[pp]] = curPKToffset;
683 size_t numPackets = 0;
684 for (size_t j = plan.getStartsTo()[pp];
685 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
686 numPackets += numExportPacketsPerLID[j];
687 }
688 // numPackets is converted down to int, so make sure it can be represented
689 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
690 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
691 "Send count for send "
692 << pp << " (" << numPackets
693 << ") is too large "
694 "to be represented as int.");
695 sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
696 curPKToffset += numPackets;
697 }
698
699 const size_type actualNumReceives =
700 Teuchos::as<size_type>(plan.getNumReceives()) +
701 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
702
703 size_t curBufferOffset = 0;
704 size_t curLIDoffset = 0;
705 for (size_type i = 0; i < actualNumReceives; ++i) {
706 size_t totalPacketsFrom_i = 0;
707 for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
708 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
709 }
710 curLIDoffset += plan.getLengthsFrom()[i];
711
712 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
713 // totalPacketsFrom_i is converted down to int, so make sure it can be
714 // represented
715 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
716 std::logic_error,
717 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
718 "Recv count for receive "
719 << i << " (" << totalPacketsFrom_i
720 << ") is too large "
721 "to be represented as int.");
722 recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
723 curBufferOffset += totalPacketsFrom_i;
724 }
725
726 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
727 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
728 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
729 mpiComm->getRawMpiComm();
730 using T = typename ExpView::non_const_value_type;
731 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
732
733#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
734 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
735 MPIX_Comm *mpixComm = *plan.getMPIXComm();
736 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
737 "MPIX_Comm is null in doPostsAllToAll \""
738 << __FILE__ << ":" << __LINE__);
739
740 const int err = MPIX_Alltoallv(
741 exports.data(), sendcounts.data(), sdispls.data(), rawType,
742 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
743
744 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
745 "MPIX_Alltoallv failed with error \""
746 << Teuchos::mpiErrorCodeToString(err)
747 << "\".");
748
749 return;
750 }
751#endif // HAVE_TPETRACORE_MPI_ADVANCE
752
753 const int err = MPI_Alltoallv(
754 exports.data(), sendcounts.data(), sdispls.data(), rawType,
755 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
756
757 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
758 "MPI_Alltoallv failed with error \""
759 << Teuchos::mpiErrorCodeToString(err)
760 << "\".");
761}
762
763#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
764template <class ExpView, class ImpView>
765void DistributorActor::doPostsNbrAllToAllV(
766 const DistributorPlan &plan, const ExpView &exports,
767 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
768 const ImpView &imports,
769 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
770 TEUCHOS_TEST_FOR_EXCEPTION(
771 !plan.getIndicesTo().is_null(), std::runtime_error,
772 "Send Type=\"Alltoall\" only works for fast-path communication.");
773
774 const Teuchos_Ordinal numSends = plan.getProcsTo().size();
775 const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
776
777 auto comm = plan.getComm();
778 std::vector<int> sendcounts(numSends, 0);
779 std::vector<int> sdispls(numSends, 0);
780 std::vector<int> recvcounts(numRecvs, 0);
781 std::vector<int> rdispls(numRecvs, 0);
782
783 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
784 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
785 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
786 mpiComm->getRawMpiComm();
787 using T = typename ExpView::non_const_value_type;
788 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
789
790 // unlike standard alltoall, entry `i` in sdispls and sendcounts
791 // refer to the ith participating rank, rather than rank i
792 size_t curPKToffset = 0;
793 for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
794 sdispls[pp] = curPKToffset;
795 size_t numPackets = 0;
796 for (size_t j = plan.getStartsTo()[pp];
797 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
798 numPackets += numExportPacketsPerLID[j];
799 }
800 // numPackets is converted down to int, so make sure it can be represented
801 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
802 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
803 "Send count for send "
804 << pp << " (" << numPackets
805 << ") is too large "
806 "to be represented as int.");
807 sendcounts[pp] = static_cast<int>(numPackets);
808 curPKToffset += numPackets;
809 }
810 size_t curBufferOffset = 0;
811 size_t curLIDoffset = 0;
812 for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
813 size_t totalPacketsFrom_i = 0;
814 for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
815 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
816 }
817 curLIDoffset += plan.getLengthsFrom()[i];
818
819 rdispls[i] = curBufferOffset;
820 // totalPacketsFrom_i is converted down to int, so make sure it can be
821 // represented
822 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
823 std::logic_error,
824 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
825 "Recv count for receive "
826 << i << " (" << totalPacketsFrom_i
827 << ") is too large "
828 "to be represented as int.");
829 recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
830 curBufferOffset += totalPacketsFrom_i;
831 }
832
833 MPIX_Comm *mpixComm = *plan.getMPIXComm();
834 const int err = MPIX_Neighbor_alltoallv(
835 exports.data(), sendcounts.data(), sdispls.data(), rawType,
836 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
837
838 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
839 "MPIX_Neighbor_alltoallv failed with error \""
840 << Teuchos::mpiErrorCodeToString(err)
841 << "\".");
842}
843#endif // HAVE_TPETRACORE_MPI_ADVANCE
844#endif // HAVE_TPETRA_MPI
845 // clang-format off
846
847template <class ExpView, class ImpView>
848void DistributorActor::doPosts(const DistributorPlan& plan,
849 const ExpView &exports,
850 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
851 const ImpView &imports,
852 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
853{
854 static_assert(areKokkosViews<ExpView, ImpView>,
855 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
856 using Teuchos::Array;
857 using Teuchos::as;
858 using Teuchos::ireceive;
859 using Teuchos::isend;
860 using Teuchos::send;
861 using Teuchos::TypeNameTraits;
862 using std::endl;
863 using Kokkos::Compat::create_const_view;
864 using Kokkos::Compat::create_view;
865 using Kokkos::Compat::subview_offset;
866 using Kokkos::Compat::deep_copy_offset;
867 typedef Array<size_t>::size_type size_type;
868 typedef ExpView exports_view_type;
869 typedef ImpView imports_view_type;
870
871#ifdef KOKKOS_ENABLE_CUDA
872 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
873 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
874 "Please do not use Tpetra::Distributor with UVM "
875 "allocations. See GitHub issue #1088.");
876#endif // KOKKOS_ENABLE_CUDA
877
878#ifdef KOKKOS_ENABLE_SYCL
879 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
880 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
881 "Please do not use Tpetra::Distributor with SharedUSM "
882 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
883#endif // KOKKOS_ENABLE_SYCL
884
885#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
886 Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
887#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
888
889 // Run-time configurable parameters that come from the input
890 // ParameterList set by setParameterList().
891 const Details::EDistributorSendType sendType = plan.getSendType();
892
893#ifdef HAVE_TPETRA_MPI
894 // All-to-all communication layout is quite different from
895 // point-to-point, so we handle it separately.
896 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
897 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
898 return;
899 }
900#ifdef HAVE_TPETRACORE_MPI_ADVANCE
901 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
902 {
903 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
904 return;
905 } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
906 doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
907 return;
908 }
909#endif
910
911#else // HAVE_TPETRA_MPI
912 if (plan.hasSelfMessage()) {
913
914 size_t selfReceiveOffset = 0;
915
916 // setup arrays containing starting-offsets into exports for each send,
917 // and num-packets-to-send for each send.
918 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
919 size_t maxNumPackets = 0;
920 size_t curPKToffset = 0;
921 for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
922 sendPacketOffsets[pp] = curPKToffset;
923 size_t numPackets = 0;
924 for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
925 numPackets += numExportPacketsPerLID[j];
926 }
927 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
928 packetsPerSend[pp] = numPackets;
929 curPKToffset += numPackets;
930 }
931
932 deep_copy_offset(imports, exports, selfReceiveOffset,
933 sendPacketOffsets[0], packetsPerSend[0]);
934 }
935#endif // HAVE_TPETRA_MPI
936
937 const int myProcID = plan.getComm()->getRank ();
938 size_t selfReceiveOffset = 0;
939
940#ifdef HAVE_TPETRA_DEBUG
941 // Different messages may have different numbers of packets.
942 size_t totalNumImportPackets = 0;
943 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
944 totalNumImportPackets += numImportPacketsPerLID[ii];
945 }
946 TEUCHOS_TEST_FOR_EXCEPTION(
947 imports.extent (0) < totalNumImportPackets, std::runtime_error,
948 "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
949 "enough entries to hold the expected number of import packets. "
950 "imports.extent(0) = " << imports.extent (0) << " < "
951 "totalNumImportPackets = " << totalNumImportPackets << ".");
952 TEUCHOS_TEST_FOR_EXCEPTION
953 (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
954 "doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
955 << requests_.size () << " != 0.");
956#endif // HAVE_TPETRA_DEBUG
957 // Distributor uses requests_.size() as the number of outstanding
958 // nonblocking message requests, so we resize to zero to maintain
959 // this invariant.
960 //
961 // getNumReceives() does _not_ include the self message, if there is
962 // one. Here, we do actually send a message to ourselves, so we
963 // include any self message in the "actual" number of receives to
964 // post.
965 //
966 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
967 // doesn't (re)allocate its array of requests. That happens in
968 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
969 // demand), or Resize_().
970 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
971 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
972 requests_.resize (0);
973
974 // Post the nonblocking receives. It's common MPI wisdom to post
975 // receives before sends. In MPI terms, this means favoring
976 // adding to the "posted queue" (of receive requests) over adding
977 // to the "unexpected queue" (of arrived messages not yet matched
978 // with a receive).
979 {
980#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
981 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
982#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
983
984 size_t curBufferOffset = 0;
985 size_t curLIDoffset = 0;
986 for (size_type i = 0; i < actualNumReceives; ++i) {
987 size_t totalPacketsFrom_i = 0;
988 for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
989 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
990 }
991 // totalPacketsFrom_i is converted down to int, so make sure it can be represented
992 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
993 std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
994 "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
995 "to be represented as int.");
996 curLIDoffset += plan.getLengthsFrom()[i];
997 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
998 // If my process is receiving these packet(s) from another
999 // process (not a self-receive), and if there is at least
1000 // one packet to receive:
1001 //
1002 // 1. Set up the persisting view (recvBuf) into the imports
1003 // array, given the offset and size (total number of
1004 // packets from process getProcsFrom()[i]).
1005 // 2. Start the Irecv and save the resulting request.
1006 imports_view_type recvBuf =
1007 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1008 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1009 mpiTag_, *plan.getComm()));
1010 }
1011 else { // Receiving these packet(s) from myself
1012 selfReceiveOffset = curBufferOffset; // Remember the offset
1013 }
1014 curBufferOffset += totalPacketsFrom_i;
1015 }
1016 }
1017
1018#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1019 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1020#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1021
1022 // setup arrays containing starting-offsets into exports for each send,
1023 // and num-packets-to-send for each send.
1024 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1025 size_t maxNumPackets = 0;
1026 size_t curPKToffset = 0;
1027 for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
1028 sendPacketOffsets[pp] = curPKToffset;
1029 size_t numPackets = 0;
1030 for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1031 numPackets += numExportPacketsPerLID[j];
1032 }
1033 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1034 // numPackets will be used as a message length, so make sure it can be represented as int
1035 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX),
1036 std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): "
1037 "packetsPerSend[" << pp << "] = " << numPackets << " is too large "
1038 "to be represented as int.");
1039 packetsPerSend[pp] = numPackets;
1040 curPKToffset += numPackets;
1041 }
1042
1043 // setup scan through getProcsTo() list starting with higher numbered procs
1044 // (should help balance message traffic)
1045 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1046 size_t procIndex = 0;
1047 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1048 ++procIndex;
1049 }
1050 if (procIndex == numBlocks) {
1051 procIndex = 0;
1052 }
1053
1054 size_t selfNum = 0;
1055 size_t selfIndex = 0;
1056 if (plan.getIndicesTo().is_null()) {
1057
1058#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1059 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1060#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1061
1062 // Data are already blocked (laid out) by process, so we don't
1063 // need a separate send buffer (besides the exports array).
1064 for (size_t i = 0; i < numBlocks; ++i) {
1065 size_t p = i + procIndex;
1066 if (p > (numBlocks - 1)) {
1067 p -= numBlocks;
1068 }
1069
1070 if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1071 exports_view_type tmpSend =
1072 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1073
1074 if (sendType == Details::DISTRIBUTOR_ISEND) {
1075 exports_view_type tmpSendBuf =
1076 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1077 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1078 mpiTag_, *plan.getComm()));
1079 }
1080 else { // DISTRIBUTOR_SEND
1081 send<int> (tmpSend,
1082 as<int> (tmpSend.size ()),
1083 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1084 }
1085 }
1086 else { // "Sending" the message to myself
1087 selfNum = p;
1088 }
1089 }
1090
1091 if (plan.hasSelfMessage()) {
1092 deep_copy_offset(imports, exports, selfReceiveOffset,
1093 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1094 }
1095 }
1096 else { // data are not blocked by proc, use send buffer
1097
1098#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1099 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1100#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1101
1102 // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
1103 typedef typename ExpView::non_const_value_type Packet;
1104 typedef typename ExpView::array_layout Layout;
1105 typedef typename ExpView::device_type Device;
1106 typedef typename ExpView::memory_traits Mem;
1107
1108 // This buffer is long enough for only one message at a time.
1109 // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
1110 // of sendType requested by user.
1111 // This code path formerly errored out with message:
1112 // Tpetra::Distributor::doPosts(4-arg, Kokkos):
1113 // The "send buffer" code path
1114 // doesn't currently work with nonblocking sends.
1115 // Now, we opt to just do the communication in a way that works.
1116#ifdef HAVE_TPETRA_DEBUG
1117 if (sendType != Details::DISTRIBUTOR_SEND) {
1118 if (plan.getComm()->getRank() == 0)
1119 std::cout << "The requested Tpetra send type "
1121 << " requires Distributor data to be ordered by"
1122 << " the receiving processor rank. Since these"
1123 << " data are not ordered, Tpetra will use Send"
1124 << " instead." << std::endl;
1125 }
1126#endif
1127
1128 Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
1129 maxNumPackets);
1130
1131 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1132 size_t ioffset = 0;
1133 for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
1134 indicesOffsets[j] = ioffset;
1135 ioffset += numExportPacketsPerLID[j];
1136 }
1137
1138 for (size_t i = 0; i < numBlocks; ++i) {
1139 size_t p = i + procIndex;
1140 if (p > (numBlocks - 1)) {
1141 p -= numBlocks;
1142 }
1143
1144 if (plan.getProcsTo()[p] != myProcID) {
1145 size_t sendArrayOffset = 0;
1146 size_t j = plan.getStartsTo()[p];
1147 size_t numPacketsTo_p = 0;
1148 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1149 numPacketsTo_p += numExportPacketsPerLID[j];
1150 deep_copy_offset(sendArray, exports, sendArrayOffset,
1151 indicesOffsets[j], numExportPacketsPerLID[j]);
1152 sendArrayOffset += numExportPacketsPerLID[j];
1153 }
1154 typename ExpView::execution_space().fence();
1155
1156 if (numPacketsTo_p > 0) {
1157 ImpView tmpSend =
1158 subview_offset(sendArray, size_t(0), numPacketsTo_p);
1159
1160 send<int> (tmpSend,
1161 as<int> (tmpSend.size ()),
1162 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1163 }
1164 }
1165 else { // "Sending" the message to myself
1166 selfNum = p;
1167 selfIndex = plan.getStartsTo()[p];
1168 }
1169 }
1170
1171 if (plan.hasSelfMessage()) {
1172 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1173 deep_copy_offset(imports, exports, selfReceiveOffset,
1174 indicesOffsets[selfIndex],
1175 numExportPacketsPerLID[selfIndex]);
1176 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1177 ++selfIndex;
1178 }
1179 }
1180 }
1181}
1182
1183}
1184}
1185
1186#endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
Stand-alone utility functions and macros.
Nonmember function that computes a residual Computes R = B - A * X.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.