Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#ifndef TPETRA_DETAILS_READTRIPLES_HPP
11#define TPETRA_DETAILS_READTRIPLES_HPP
12
21
22#include "TpetraCore_config.h"
23#include "Tpetra_Details_PackTriples.hpp"
24#include "Kokkos_ArithTraits.hpp"
25#include "Teuchos_MatrixMarket_generic.hpp"
26#include "Teuchos_CommHelpers.hpp"
27#include <iostream>
28#include <typeinfo> // for debugging
29
30namespace Tpetra {
31namespace Details {
32
33//
34// Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
35// interface. I put "public" in quotes because it's public only for
36// Tpetra developers, NOT for Tpetra users.
37//
38
39namespace Impl {
40
41// mfh 01 Feb 2017: Unfortunately,
42// Teuchos::MatrixMarket::readComplexData requires Teuchos to have
43// complex arithmetic support enabled. To avoid this issue, I
44// reimplement the function here. It's not very long.
45
78template<class OrdinalType, class RealType>
79bool
80readComplexData (std::istream& istr,
81 OrdinalType& rowIndex,
82 OrdinalType& colIndex,
83 RealType& realPart,
84 RealType& imagPart,
85 const std::size_t lineNumber,
86 const bool tolerant)
87{
88 using ::Teuchos::MatrixMarket::readRealData;
89
90 RealType the_realPart, the_imagPart;
91 if (! readRealData (istr, rowIndex, colIndex, the_realPart, lineNumber, tolerant)) {
92 if (tolerant) {
93 return false;
94 }
95 else {
96 std::ostringstream os;
97 os << "Failed to read pattern data and/or real value from line "
98 << lineNumber << " of input";
99 throw std::invalid_argument(os.str());
100 }
101 }
102 if (istr.eof ()) {
103 if (tolerant) {
104 return false;
105 }
106 else {
107 std::ostringstream os;
108 os << "No more data after real value on line "
109 << lineNumber << " of input";
110 throw std::invalid_argument (os.str ());
111 }
112 }
113 istr >> the_imagPart;
114 if (istr.fail ()) {
115 if (tolerant) {
116 return false;
117 }
118 else {
119 std::ostringstream os;
120 os << "Failed to get imaginary value from line "
121 << lineNumber << " of input";
122 throw std::invalid_argument (os.str ());
123 }
124 }
125 realPart = the_realPart;
126 imagPart = the_imagPart;
127 return true;
128}
129
130
140template<class SC,
141 class GO,
142 const bool isComplex = ::Kokkos::ArithTraits<SC>::is_complex>
143struct ReadLine {
164 static int
165 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
166 const std::string& line,
167 const std::size_t lineNumber,
168 const bool tolerant = false,
169 std::ostream* errStrm = NULL,
170 const bool debug = false);
171};
172
180template<class SC, class GO>
181struct ReadLine<SC, GO, true> {
202 static int
203 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
204 const std::string& line,
205 const std::size_t lineNumber,
206 const bool tolerant = false,
207 std::ostream* errStrm = NULL,
208 const bool debug = false)
209 {
210 using ::Teuchos::MatrixMarket::checkCommentLine;
211 typedef typename ::Kokkos::ArithTraits<SC>::mag_type real_type;
212 using std::endl;
213
214 GO rowInd, colInd;
215 real_type realPart, imagPart;
216 std::istringstream istr (line);
217 bool success = true;
218 try {
219 // Use the version of this function in this file, not the
220 // version in Teuchos_MatrixMarket_generic.hpp, because the
221 // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
222 success = readComplexData (istr, rowInd, colInd, realPart, imagPart,
223 lineNumber, tolerant);
224 }
225 catch (std::exception& e) {
226 success = false;
227 if (errStrm != NULL) {
228 std::ostringstream os;
229 os << "readLine: readComplexData threw an exception: " << e.what ()
230 << endl;
231 *errStrm << os.str ();
232 }
233 }
234
235 if (success) {
236 // if (debug && errStrm != NULL) {
237 // std::ostringstream os;
238 // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
239 // << ", realPart=" << realPart << ", imagPart=" << imagPart
240 // << std::endl;
241 // *errStrm << os.str ();
242 // }
243 // This line may have side effects.
244 const int errCode =
245 processTriple (rowInd, colInd, SC (realPart, imagPart));
246 if (errCode != 0 && errStrm != NULL) {
247 std::ostringstream os;
248 os << "readLine: processTriple returned " << errCode << " != 0."
249 << endl;
250 *errStrm << os.str ();
251 }
252 return errCode;
253 }
254 else {
255 return -1;
256 }
257 }
258};
259
267template<class SC, class GO>
268struct ReadLine<SC, GO, false> {
289 static int
290 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
291 const std::string& line,
292 const std::size_t lineNumber,
293 const bool tolerant = false,
294 std::ostream* errStrm = NULL,
295 const bool debug = false)
296 {
297 using ::Teuchos::MatrixMarket::checkCommentLine;
298 using ::Teuchos::MatrixMarket::readRealData;
299 using std::endl;
300
301 GO rowInd, colInd;
302 SC val;
303 std::istringstream istr (line);
304 bool success = true;
305 try {
306 success = readRealData (istr, rowInd, colInd, val,
307 lineNumber, tolerant);
308 }
309 catch (std::exception& e) {
310 success = false;
311 if (errStrm != NULL) {
312 std::ostringstream os;
313 os << "readLine: readRealData threw an exception: " << e.what ()
314 << endl;
315 *errStrm << os.str ();
316 }
317 }
318
319 if (success) {
320 if (debug && errStrm != NULL) {
321 std::ostringstream os;
322 os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
323 << ", val=" << val << std::endl;
324 *errStrm << os.str ();
325 }
326 // This line may have side effects.
327 const int errCode = processTriple (rowInd, colInd, val);
328 if (errCode != 0 && errStrm != NULL) {
329 std::ostringstream os;
330 os << "readLine: processTriple returned " << errCode << " != 0."
331 << endl;
332 *errStrm << os.str ();
333 }
334 return errCode;
335 }
336 else {
337 return -1;
338 }
339 }
340};
341
367template<class SC, class GO>
368int
369readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
370 const std::string& line,
371 const std::size_t lineNumber,
372 const bool tolerant = false,
373 std::ostream* errStrm = NULL,
374 const bool debug = false)
375{
376 return ReadLine<SC, GO>::readLine (processTriple, line, lineNumber,
377 tolerant, errStrm, debug);
378}
379
410template<class SC, class GO>
411int
412readTriples (std::istream& inputStream,
413 std::size_t& curLineNum,
414 std::size_t& numTriplesRead,
415 std::function<int (const GO, const GO, const SC&)> processTriple,
416 const std::size_t maxNumTriplesToRead,
417 const bool tolerant = false,
418 std::ostream* errStrm = NULL,
419 const bool debug = false)
420{
421 using Teuchos::MatrixMarket::checkCommentLine;
422 using std::endl;
423 using std::size_t;
424
425 numTriplesRead = 0; // output argument only
426 if (inputStream.eof ()) {
427 return 0; // no error, just nothing left to read
428 }
429 else if (inputStream.fail ()) {
430 if (errStrm != NULL) {
431 *errStrm << "Input stream reports a failure (not the same as "
432 "end-of-file)." << endl;
433 }
434 return -1;
435 }
436
437 std::string line;
438 std::vector<size_t> badLineNumbers;
439 int errCode = 0; // 0 means success
440
441 bool inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
442 ++curLineNum; // we read the line; we can't put it back
443 while (inputStreamCanStillBeRead && numTriplesRead < maxNumTriplesToRead) {
444 // if (debug && errStrm != NULL) {
445 // std::ostringstream os;
446 // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
447 // *errStrm << os.str ();
448 // }
449 size_t start, size;
450
451 const bool isCommentLine =
452 checkCommentLine (line, start, size, curLineNum, tolerant);
453 if (isCommentLine) {
454 // Move on to the next line, if there is a next line.
455 inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
456 ++curLineNum; // we read another line; we can't put it back
457 continue; // move on to the next line
458 }
459 else { // not a comment line; should have a sparse matrix entry
460 const std::string theLine = line.substr (start, size);
461 // If the line has a valid sparse matrix entry, extract it and
462 // hand it off to the processTriple closure.
463 const int curErrCode =
464 readLine (processTriple, theLine, curLineNum, tolerant, errStrm, debug);
465 if (curErrCode != 0) {
466 errCode = curErrCode;
467 badLineNumbers.push_back (curLineNum);
468 }
469 else {
470 ++numTriplesRead;
471 }
472 if (numTriplesRead < maxNumTriplesToRead) {
473 inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
474 }
475 }
476 } // while there are lines to read and we need more triples
477
478 if (errCode != 0 && errStrm != NULL) {
479 const size_t numBadLines = badLineNumbers.size ();
480 *errStrm << "Encountered " << numBadLines << " bad line"
481 << (numBadLines != size_t (1) ? "s" : "")
482 << ": [";
483 for (size_t k = 0; k < numBadLines; ++k) {
484 *errStrm << badLineNumbers[k];
485 if (k + 1 < numBadLines) {
486 *errStrm << ", ";
487 }
488 }
489 *errStrm << "]" << endl;
490 }
491 if (! inputStream.eof () && inputStream.fail ()) {
492 if (errCode == 0) {
493 errCode = -1;
494 }
495 if (errStrm != NULL) {
496 *errStrm << "The input stream is not at end-of-file, "
497 "but is in a bad state." << endl;
498 }
499 }
500 return errCode;
501}
502
528template<class SC, class GO>
529int
530readAndSendOneBatchOfTriples (std::istream& inputStream,
531 std::size_t& curLineNum,
532 std::size_t& numEntRead,
533 ::Teuchos::ArrayRCP<int>& sizeBuf,
534 ::Teuchos::ArrayRCP<char>& msgBuf,
535 std::vector<GO>& rowInds,
536 std::vector<GO>& colInds,
537 std::vector<SC>& vals,
538 const std::size_t maxNumEntPerMsg,
539 const int destRank,
540 const ::Teuchos::Comm<int>& comm,
541 const bool tolerant = false,
542 std::ostream* errStrm = NULL,
543 const bool debug = false)
544{
549 using ::Teuchos::isend;
550 using std::endl;
551
552 using ::Kokkos::ArithTraits;
553 // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
554 // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
555 constexpr int sizeTag = 42;
556 constexpr int msgTag = 43;
557 //constexpr int srcRank = 0;
558 int errCode = 0;
559
560 // This doesn't actually deallocate memory; it just changes the size
561 // back to zero, so that push_back starts over from the beginning.
562 rowInds.resize (0);
563 colInds.resize (0);
564 vals.resize (0);
565 // Closure that adds the new matrix entry to the above temp arrays.
566 auto processTriple = [&rowInds, &colInds, &vals]
567 (const GO rowInd, const GO colInd, const SC& val) {
568 try {
569 rowInds.push_back (rowInd);
570 colInds.push_back (colInd);
571 vals.push_back (val);
572 }
573 catch (...) {
574 return -1;
575 }
576 return 0;
577 };
578 numEntRead = 0; // output argument
579 errCode = readTriples<SC, GO> (inputStream, curLineNum, numEntRead,
580 processTriple, maxNumEntPerMsg, tolerant,
581 errStrm, debug);
582 if (debug && errStrm != NULL) {
583 std::ostringstream os;
584 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
585 << ", GO=" << typeid (GO).name () << ": "
586 << "readAndSendOneBatchOfTriples: readTriples read "
587 << numEntRead << " matrix entries, and returned errCode="
588 << errCode << "." << std::endl;
589 *errStrm << os.str ();
590 }
591 if (numEntRead != rowInds.size () ||
592 numEntRead != colInds.size () ||
593 numEntRead != vals.size ()) {
594 if (errStrm != NULL) {
595 *errStrm << "readTriples size results are not consistent. "
596 << "numEntRead = " << numEntRead
597 << ", rowInds.size() = " << rowInds.size ()
598 << ", colInds.size() = " << colInds.size ()
599 << ", and vals.size() = " << vals.size () << "."
600 << std::endl;
601 }
602 if (errCode == 0) {
603 errCode = -1;
604 }
605 }
606
607 // We don't consider reading having "failed" if we've reached
608 // end-of-file before reading maxNumEntPerMsg entries. It's OK if
609 // we got fewer triples than that. Furthermore, we have to send at
610 // least one message to the destination process, even if the read
611 // from the file failed.
612
613 if (numEntRead == 0 || errCode != 0) {
614 // Send a message size of zero to the receiving process, to tell
615 // it that we have no triples to send, or that there was an error
616 // reading. The latter just means that we "go through the
617 // motions," then broadcast the error code.
618 sizeBuf[0] = 0;
619 if (debug && errStrm != NULL) {
620 std::ostringstream os;
621 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
622 << ", GO=" << typeid (GO).name () << ": "
623 << "Post send (size=0, errCode=" << errCode << ") "
624 << "to " << destRank << " with tag " << sizeTag << endl;
625 *errStrm << os.str ();
626 }
627 send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
628 return errCode;
629 }
630 else { // we read a nonzero # of triples, without error
631 const int numEnt = static_cast<int> (numEntRead);
632 int countSize = 0; // output argument
633 int triplesSize = 0; // output argument
634
635 errCode = countPackTriplesCount (comm, countSize, errStrm);
636 // countSize should never be nonpositive, since we have to pack an
637 // integer size.
638 if (countSize <= 0 && errCode == 0) {
639 errCode = -1;
640 }
641
642 if (errCode != 0) {
643 // Send zero to the receiving process, to tell it about the error.
644 sizeBuf[0] = 0;
645 if (debug && errStrm != NULL) {
646 std::ostringstream os;
647 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
648 << ", GO=" << typeid (GO).name () << ": "
649 << "Post send (size=0, error case) to " << destRank
650 << " with tag " << sizeTag << endl;
651 *errStrm << os.str ();
652 }
653 send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
654 return errCode;
655 }
656 else { // countPackTriplesCount succeeded
657 errCode = countPackTriples<SC, GO> (numEnt, comm, triplesSize, errStrm);
658 if (errCode != 0) {
659 // Send a message size of zero to the receiving process, to
660 // tell it that there was an error counting.
661 sizeBuf[0] = 0;
662 if (debug && errStrm != NULL) {
663 std::ostringstream os;
664 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
665 << ", GO=" << typeid (GO).name () << ": "
666 << "Post send (size=0, error case) to " << destRank
667 << " with tag " << sizeTag << endl;
668 *errStrm << os.str ();
669 }
670 send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
671 return errCode;
672 }
673 else { // countPackTriples succeeded; message packed & ready to send
674 // Send the message size (in bytes). We can use a nonblocking
675 // send here, and try to overlap with message packing.
676 const int outBufSize = countSize + triplesSize;
677 sizeBuf[0] = outBufSize;
678 if (debug && errStrm != NULL) {
679 std::ostringstream os;
680 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
681 << ", GO=" << typeid (GO).name () << ": "
682 << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
683 << " with tag " << sizeTag << endl;
684 *errStrm << os.str ();
685 }
686 auto sizeReq = isend<int, int> (sizeBuf, destRank, sizeTag, comm);
687
688 msgBuf.resize (outBufSize);
689 char* outBuf = msgBuf.getRawPtr ();
690
691 // If anything goes wrong with packing, send the pack buffer
692 // anyway, since the receiving process expects a message.
693 int outBufCurPos = 0; // input/output argument
694 errCode = packTriplesCount (numEnt, outBuf, outBufSize,
695 outBufCurPos, comm, errStrm);
696 if (errCode == 0) {
697 errCode = packTriples<SC, GO> (rowInds.data (), colInds.data (),
698 vals.data (), numEnt, outBuf,
699 outBufSize, outBufCurPos, comm,
700 errStrm);
701 }
702 if (debug && errStrm != NULL) {
703 std::ostringstream os;
704 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
705 << ", GO=" << typeid (GO).name () << ": "
706 << "Post isend (packed data) to " << destRank
707 << " with tag " << msgTag << endl;
708 *errStrm << os.str ();
709 }
710 auto msgReq = isend<int, char> (msgBuf, destRank, msgTag, comm);
711
712 // Wait on the two messages. It doesn't matter in what order
713 // we send them, because they have different tags. The
714 // receiving process will wait on the first message first, in
715 // order to get the size of the second message.
716 if (debug && errStrm != NULL) {
717 std::ostringstream os;
718 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
719 << ", GO=" << typeid (GO).name () << ": "
720 << "Wait on isend (size)" << endl;
721 *errStrm << os.str ();
722 }
723 sizeReq->wait ();
724 if (debug && errStrm != NULL) {
725 std::ostringstream os;
726 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
727 << ", GO=" << typeid (GO).name () << ": "
728 << "Wait on isend (packed data)" << endl;
729 *errStrm << os.str ();
730 }
731 msgReq->wait ();
732
733 // This doesn't actually deallocate; it just resets sizes to zero.
734 rowInds.clear ();
735 colInds.clear ();
736 vals.clear ();
737 }
738 }
739 }
740 return errCode;
741}
742
750
778template<class SC, class GO, class CommRequestPtr>
779int
780recvOneBatchOfTriples (std::vector<GO>& rowInds,
781 std::vector<GO>& colInds,
782 std::vector<SC>& vals,
783 int& numEnt,
784 ::Teuchos::ArrayRCP<int>& sizeBuf,
785 ::Teuchos::ArrayRCP<char>& msgBuf,
786 CommRequestPtr& sizeReq,
787 const int srcRank,
788 const ::Teuchos::Comm<int>& comm,
789 const bool tolerant = false,
790 std::ostream* errStrm = NULL,
791 const bool debug = false)
792{
795 using ::Kokkos::ArithTraits;
796
798 //constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
799 //constexpr int sizeTag = 42;
800 constexpr int msgTag = 43;
801 int errCode = 0; // return value
802 numEnt = 0; // output argument
803
804 // Wait on the ireceive we preposted before calling this function.
805 if (debug && errStrm != NULL) {
806 std::ostringstream os;
807 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
808 << ", GO=" << typeid (GO).name () << ": "
809 << "Wait on irecv (size)" << std::endl;
810 *errStrm << os.str ();
811 }
812 sizeReq->wait ();
813 sizeReq = CommRequestPtr (NULL);
814 const int inBufSize = sizeBuf[0];
815 if (debug && errStrm != NULL) {
816 std::ostringstream os;
817 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
818 << ", GO=" << typeid (GO).name () << ": "
819 << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
820 *errStrm << os.str ();
821 }
822
823 if (inBufSize == 0) {
824 numEnt = 0;
825 rowInds.resize (0);
826 colInds.resize (0);
827 vals.resize (0);
828 }
829 else {
830 msgBuf.resize (inBufSize);
831 char* inBuf = msgBuf.getRawPtr ();
832
833 if (debug && errStrm != NULL) {
834 std::ostringstream os;
835 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
836 << ", GO=" << typeid (GO).name () << ": "
837 << "Post irecv (packed data) " << "from " << srcRank
838 << " with tag " << msgTag << std::endl;
839 *errStrm << os.str ();
840 }
841 auto msgReq = ::Teuchos::ireceive (msgBuf, srcRank, msgTag, comm);
842 if (debug && errStrm != NULL) {
843 std::ostringstream os;
844 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
845 << ", GO=" << typeid (GO).name () << ": "
846 << "Wait on irecv (packed data)" << std::endl;
847 *errStrm << os.str ();
848 }
849 msgReq->wait ();
850
851 int inBufCurPos = 0; // output argument
852 errCode = unpackTriplesCount (inBuf, inBufSize, inBufCurPos,
853 numEnt, comm, errStrm);
854 if (errCode == 0) {
855 rowInds.resize (numEnt);
856 colInds.resize (numEnt);
857 vals.resize (numEnt);
858 errCode = unpackTriples<SC, GO> (inBuf, inBufSize, inBufCurPos,
859 rowInds.data (), colInds.data (),
860 vals.data (), numEnt, comm, errStrm);
861 }
862 }
863 return errCode;
864}
865
866} // namespace Impl
867
868//
869// SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
870//
871
905template<class SC, class GO>
906int
907readAndDealOutTriples (std::istream& inputStream, // only valid on Proc 0
908 std::size_t& curLineNum, // only valid on Proc 0
909 std::size_t& totalNumEntRead, // only valid on Proc 0
910 std::function<int (const GO, const GO, const SC&)> processTriple,
911 const std::size_t maxNumEntPerMsg,
912 const ::Teuchos::Comm<int>& comm,
913 const bool tolerant = false,
914 std::ostream* errStrm = NULL,
915 const bool debug = false)
916{
917 using Kokkos::ArithTraits;
918 using std::endl;
919 using std::size_t;
920
921 constexpr int srcRank = 0;
922 //constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
924 constexpr int sizeTag = 42;
925 //constexpr int msgTag = 43;
926 const int myRank = comm.getRank ();
927 const int numProcs = comm.getSize ();
928 int errCode = 0;
929
930 ::Teuchos::ArrayRCP<int> sizeBuf (1);
931 ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
932
933 // Temporary storage for reading & packing (on Process srcRank) or
934 // unpacking (every other process) triples.
935 std::vector<GO> rowInds;
936 std::vector<GO> colInds;
937 std::vector<SC> vals;
938 rowInds.reserve (maxNumEntPerMsg);
939 colInds.reserve (maxNumEntPerMsg);
940 vals.reserve (maxNumEntPerMsg);
941
942 totalNumEntRead = 0;
943 if (myRank == srcRank) {
944 // Loop around through all the processes, including this one, over
945 // and over until we reach the end of the file, or an error occurs.
946 int destRank = 0;
947 bool lastMessageWasLegitZero = false;
948 for ( ;
949 ! inputStream.eof () && errCode == 0;
950 destRank = (destRank + 1) % numProcs) {
951
952 size_t curNumEntRead = 0; // output argument of below
953 if (destRank == srcRank) {
954 // We can read and process the triples directly. We don't
955 // need to use intermediate storage, because we don't need to
956 // pack and send the triples.
957 const int readErrCode =
958 Impl::readTriples<SC, GO> (inputStream, curLineNum, curNumEntRead,
959 processTriple, maxNumEntPerMsg, tolerant,
960 errStrm, debug);
961 if (debug && errStrm != NULL) {
962 std::ostringstream os;
963 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
964 << ", GO=" << typeid (GO).name () << ": "
965 << "(dest=src) readTriples returned curNumEntRead="
966 << curNumEntRead << ", errCode=" << readErrCode << endl;
967 *errStrm << os.str ();
968 }
969 errCode = (readErrCode != 0) ? readErrCode : errCode;
970 }
971 else {
972 if (false && debug && errStrm != NULL) {
973 std::ostringstream os;
974 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
975 << ", GO=" << typeid (GO).name () << ": "
976 << "Calling readAndSend... with destRank=" << destRank << endl;
977 *errStrm << os.str ();
978 }
979 // Read, pack, and send the triples to destRank.
980 const int readAndSendErrCode =
981 Impl::readAndSendOneBatchOfTriples<SC, GO> (inputStream, curLineNum,
982 curNumEntRead,
983 sizeBuf, msgBuf,
984 rowInds, colInds, vals,
985 maxNumEntPerMsg, destRank,
986 comm, tolerant, errStrm,
987 debug);
988 totalNumEntRead += curNumEntRead;
989 if (debug && errStrm != NULL) {
990 std::ostringstream os;
991 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
992 << ", GO=" << typeid (GO).name () << ": "
993 << "readAndSend... with destRank=" << destRank
994 << " returned curNumEntRead=" << curNumEntRead
995 << ", errCode=" << readAndSendErrCode << endl;
996 *errStrm << os.str ();
997 }
998 errCode = (readAndSendErrCode != 0) ? readAndSendErrCode : errCode;
999 if (readAndSendErrCode == 0 && curNumEntRead == 0) {
1000 lastMessageWasLegitZero = true;
1001 if (debug && errStrm != NULL) {
1002 std::ostringstream os;
1003 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1004 << ", GO=" << typeid (GO).name () << ": "
1005 << "Last send to " << destRank << " with tag " << sizeTag
1006 << " was legit zero, counts as termination" << endl;
1007 *errStrm << os.str ();
1008 }
1009 }
1010 }
1011 } // loop around through processes until done reading file, or error
1012
1013 // Loop around through the remaining processes, and tell them that
1014 // we're done, by sending zero. If the last message we sent to
1015 // destRank was zero, then skip that process, since it only
1016 // expects one message of size zero. Note that destRank got
1017 // incremented mod numProcs at end of loop, so we have to
1018 // decrement it mod numProcs.
1019 destRank = (destRank - 1) % numProcs;
1020 if (destRank < 0) { // C mod operator does not promise positivity
1021 destRank = destRank + numProcs;
1022 }
1023
1024 const int startRank = lastMessageWasLegitZero ? (destRank+1) : destRank;
1025 for (int outRank = startRank; outRank < numProcs; ++outRank) {
1026 if (outRank != srcRank) {
1027 if (debug && errStrm != NULL) {
1028 std::ostringstream os;
1029 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1030 << ", GO=" << typeid (GO).name () << ": "
1031 << "Post send (size, termination msg) to " << outRank
1032 << " with tag " << sizeTag << "(was last message legit zero? "
1033 << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1034 *errStrm << os.str ();
1035 }
1036 sizeBuf[0] = 0;
1037 ::Teuchos::send (sizeBuf.getRawPtr (), 1, outRank, sizeTag, comm);
1038 }
1039 }
1040 }
1041 else {
1042 while (true) {
1043 // Prepost a message to receive the size (in bytes) of the
1044 // incoming packet.
1045 sizeBuf[0] = 0; // superfluous, but safe
1046 if (debug && errStrm != NULL) {
1047 std::ostringstream os;
1048 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1049 << ", GO=" << typeid (GO).name () << ": "
1050 << "Post irecv (size) from " << srcRank
1051 << " with tag " << sizeTag << std::endl;
1052 *errStrm << os.str ();
1053 }
1054 auto sizeReq = ::Teuchos::ireceive (sizeBuf, srcRank, sizeTag, comm);
1055
1056 int numEnt = 0; // output argument
1057 const int recvErrCode =
1058 Impl::recvOneBatchOfTriples (rowInds, colInds, vals, numEnt, sizeBuf,
1059 msgBuf, sizeReq, srcRank, comm, tolerant,
1060 errStrm, debug);
1061 if (debug && errStrm != NULL) {
1062 std::ostringstream os;
1063 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1064 << ", GO=" << typeid (GO).name () << ": "
1065 << "recvOneBatchOfTriples returned numEnt=" << numEnt
1066 << ", errCode=" << recvErrCode << endl;
1067 *errStrm << os.str ();
1068 }
1069 errCode = (recvErrCode != 0) ? recvErrCode : errCode;
1070
1071 if (numEnt != static_cast<int> (rowInds.size ()) ||
1072 numEnt != static_cast<int> (colInds.size ()) ||
1073 numEnt != static_cast<int> (vals.size ())) {
1074 errCode = (errCode == 0) ? -1 : errCode;
1075 if (errStrm != NULL) {
1076 *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1077 << "numEnt = " << numEnt
1078 << ", rowInds.size() = " << rowInds.size ()
1079 << ", colInds.size() = " << colInds.size ()
1080 << ", vals.size() = " << vals.size () << "."
1081 << endl;
1082 }
1083 } // if sizes inconsistent
1084
1085 // Sending zero items is how Process srcRank tells this process
1086 // that it (Process srcRank) is done sending out data.
1087 if (numEnt == 0) {
1088 break;
1089 }
1090
1091 for (int k = 0; k < numEnt && errCode == 0; ++k) {
1092 const int curErrCode = processTriple (rowInds[k], colInds[k], vals[k]);
1093 errCode = (curErrCode == 0) ? errCode : curErrCode;
1094 }
1095 } // while we still get messages from srcRank
1096 }
1097
1098 if (debug && errStrm != NULL) {
1099 std::ostringstream os;
1100 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1101 << ", GO=" << typeid (GO).name () << ": "
1102 << "Done with send/recv loop" << endl;
1103 *errStrm << os.str ();
1104 }
1105 // Do a bitwise OR to get an error code that is nonzero if and only
1106 // if any process' local error code is nonzero.
1107 using ::Teuchos::outArg;
1108 using ::Teuchos::REDUCE_BOR;
1109 using ::Teuchos::reduceAll;
1110 const int lclErrCode = errCode;
1111 reduceAll<int, int> (comm, REDUCE_BOR, lclErrCode, outArg (errCode));
1112 return errCode;
1113}
1114
1115} // namespace Details
1116} // namespace Tpetra
1117
1118#endif // TPETRA_DETAILS_READTRIPLES_HPP
bool readComplexData(std::istream &istr, OrdinalType &rowIndex, OrdinalType &colIndex, RealType &realPart, RealType &imagPart, const std::size_t lineNumber, const bool tolerant)
Read "<rowIndex> <colIndex> <realPart> <imagPart>" from a line.
int readTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numTriplesRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumTriplesToRead, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most numTriplesToRead triples from the given Matrix Market input stream, and pass along any r...
int recvOneBatchOfTriples(std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, int &numEnt, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, CommRequestPtr &sizeReq, const int srcRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readAndSendOneBatchOfTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numEntRead, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, const std::size_t maxNumEntPerMsg, const int destRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Nonmember function that computes a residual Computes R = B - A * X.
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries ("triples").
int packTriples(const OrdinalType[], const OrdinalType[], const ScalarType[], const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm=NULL)
Pack matrix entries ("triples" (i, j, A(i,j))) into the given output buffer.
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
int unpackTriples(const char[], const int, int &, OrdinalType[], OrdinalType[], ScalarType[], const int, const ::Teuchos::Comm< int > &, std::ostream *errStrm=NULL)
Unpack matrix entries ("triples" (i, j, A(i,j))) from the given input buffer.
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
int countPackTriples(const int numEnt, const ::Teuchos::Comm< int > &comm, int &size, std::ostream *errStrm=NULL)
Compute the buffer size required by packTriples for packing numEnt number of (i,j,...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Implementation of the readLine stand-alone function in this namespace (see below).
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...