diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C index b5181bccec94cbdeffd908884b86c33139a5b2d7..30e71a12c51c187a92a636c892783697fe9cefaa 100644 --- a/applications/test/parallel-chunks/Test-parallel-chunks.C +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -44,175 +44,62 @@ Description using namespace Foam; -// Looks like Pstream::exchangeBuf -template<class T> -void do_exchangeBuf +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to <int>. +template<class Type> +inline std::size_t maxTransferCount ( - const label sendSize, - const char* sendData, - const label recvSize, - char* recvData, - const int tag, - const label comm, - const bool wait -) + const std::size_t max_bytes = std::size_t(0) +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // forAll(recvSizes, proci) - { - // if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) - if (!Pstream::master(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData, - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - // forAll(sendBufs, proci) - for (const int proci : Pstream::subProcs(comm)) - { - if (sendSize > 0) - // if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData, - sendSize*sizeof(T), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendSize*sizeof(T)) - << Foam::abort(FatalError); - } - } - } - - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int> + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); } -// Looks like Pstream::exchangeContainer -template<class Container, class T> -void do_exchangeContainer +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to <int>. +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t PstreamDetail_maxTransferBytes ( - const Container& sendData, - const label recvSize, - Container& recvData, - const int tag, - const label comm, - const bool wait -) + const int64_t max_bytes +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // for (const int proci : Pstream::allProcs(comm)) - { - if (!Pstream::master(comm) && recvSize > 0) - // if (proci != Pstream::myProcNo(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData.data_bytes(), - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - if (Pstream::master(comm) && sendData.size() > 0) - { - for (const int proci : Pstream::subProcs(comm)) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); - } - } - } - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); } -template<class Container, class T> +template<class Container, class Type> void broadcast_chunks ( Container& sendData, const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true + const label comm = UPstream::worldComm + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { // OR static_assert(is_contiguous<T>::value, "Contiguous data only!") - if (!is_contiguous<T>::value) + if (!is_contiguous<Type>::value) { FatalErrorInFunction - << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError); + << "Contiguous data only." << sizeof(Type) + << Foam::abort(FatalError); } - if (UPstream::maxCommsSize <= 0) + if (maxComms_bytes == 0) { // Do in one go Info<< "send " << sendData.size() << " elements in one go" << endl; @@ -227,93 +114,90 @@ void broadcast_chunks sendData.resize_nocopy(recvSize); // A no-op on master - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize + // The chunk size (number of elements) corresponding to max byte transfer + // Is zero for non-chunked exchanges. + const std::size_t chunkSize ( - max + PstreamDetail_maxTransferCount<Type> ( - static_cast<label>(1), - static_cast<label>(UPstream::maxCommsSize/sizeof(T)) + PstreamDetail_maxTransferBytes(maxComms_bytes) ) ); - label nChunks(0); - { - // Get max send count (elements) - // forAll(sendBufs, proci) - // { - // if (proci != Pstream::myProcNo(comm)) - // { - // nChunks = max(nChunks, sendBufs[proci].size()); - // } - // } - nChunks = sendSize; + if (chunkSize) + { // Convert from send count (elements) to number of chunks. // Can normally calculate with (count-1), but add some safety - if (nChunks) - { - nChunks = 1 + (nChunks/maxChunkSize); - } - reduce(nChunks, maxOp<label>(), tag, comm); + label nChunks = 1 + (sendSize/label(chunkSize)); Info << "send " << sendSize << " elements (" - << (sendSize*sizeof(T)) << " bytes) in " << nChunks - << " chunks of " << maxChunkSize << " elements (" - << (maxChunkSize*sizeof(T)) << " bytes) for maxCommsSize:" - << Pstream::maxCommsSize + << (sendSize*sizeof(Type)) << " bytes) in " << nChunks + << " chunks of " << label(chunkSize) << " elements (" + << label(chunkSize*sizeof(Type)) << " bytes) for maxCommsSize:" + << label(maxComms_bytes) << endl; } + // stress-test with shortened sendSize // will produce useless loops, but no calls // sendSize /= 2; - label nSend(0); - label startSend(0); - char* charPtrSend; + typedef stdFoam::span<Type> sendType; - for (label iter = 0; iter < nChunks; ++iter) + do { - nSend = min - ( - maxChunkSize, - sendSize-startSend - ); + sendType payload(sendData.data(), sendData.size()); - charPtrSend = - ( - nSend > 0 - ? reinterpret_cast<char*>(&(sendData[startSend])) - : nullptr - ); + if (!chunkSize) + { + UPstream::broadcast + ( + payload.data_bytes(), + payload.size_bytes(), + comm + ); + break; + } - Info<< "iter " << iter - << ": beg=" << startSend << " len=" << nSend - << " (" << (nSend*sizeof(T)) << " bytes)" << endl; + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - UPstream::broadcast(charPtrSend, nSend*sizeof(T), comm); + if (payload.size() <= beg) + { + // No more data windows + break; + } - // forAll(nSend, proci) - { - startSend += nSend; + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + Info<< "iter " << iter + << ": beg=" << label(beg) << " len=" << label(window.size()) + << " (" << label(window.size_bytes()) << " bytes)" << endl; + + UPstream::broadcast + ( + window.data_bytes(), + window.size_bytes(), + comm + ); } } + while (false); - Info<< "final: " << startSend << endl; + Info<< "final" << endl; } @@ -333,7 +217,7 @@ int main(int argc, char *argv[]) } labelList input1; - if (Pstream::master()) + if (UPstream::master()) { input1 = identity(500); } @@ -348,7 +232,7 @@ int main(int argc, char *argv[]) // Mostly the same with PstreamBuffers if (false) { - PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking); + PstreamBuffers pBufs; labelList sendData; if (Pstream::master()) diff --git a/applications/test/parallel-comm3b/Test-parallel-comm3b.C b/applications/test/parallel-comm3b/Test-parallel-comm3b.C index 0bbcc5da5e1f381da4e6d9fdcd18907a1bde8787..ea64880146d0e46d41e94b1d95cff6be770d2500 100644 --- a/applications/test/parallel-comm3b/Test-parallel-comm3b.C +++ b/applications/test/parallel-comm3b/Test-parallel-comm3b.C @@ -130,7 +130,7 @@ int main(int argc, char *argv[]) for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -143,8 +143,8 @@ int main(int argc, char *argv[]) { // Message found and had size: receive it - const label proci = probed.first; - const label count = probed.second; + const label proci(probed.first); + const label count(probed.second); recvBufs(proci).resize_nocopy(count); recvFromProc(recvRequests.size()) = proci; diff --git a/applications/test/parallel-nbx2/Test-parallel-nbx2.C b/applications/test/parallel-nbx2/Test-parallel-nbx2.C index bd3a4a224ee36ef36adc26fca2ea50abe0aa7c37..9919c42c90b0379c7d88474834ab6d1a8b3bb14a 100644 --- a/applications/test/parallel-nbx2/Test-parallel-nbx2.C +++ b/applications/test/parallel-nbx2/Test-parallel-nbx2.C @@ -119,7 +119,7 @@ int main(int argc, char *argv[]) for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -132,8 +132,8 @@ int main(int argc, char *argv[]) { // Message found and had size: receive it - const label proci = probed.first; - const label count = probed.second; + const label proci(probed.first); + const label count(probed.second); if (optNonBlocking) { diff --git a/etc/controlDict b/etc/controlDict index d0ffb7655227b6cbec2ffb272bd0352f6c2c92be..552f12c520154ec761f55a04314914a921a8984a 100644 --- a/etc/controlDict +++ b/etc/controlDict @@ -1,7 +1,7 @@ /*--------------------------------*- C++ -*----------------------------------*\ | ========= | | | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox | -| \\ / O peration | Version: v2312 | +| \\ / O peration | Version: v2406 | | \\ / A nd | Website: www.openfoam.com | | \\/ M anipulation | | \*---------------------------------------------------------------------------*/ @@ -146,13 +146,18 @@ OptimisationSwitches // The default and minimum is (20000000). mpiBufferSize 0; - // Optional max size (bytes) for unstructured data exchanges. In some - // phases of OpenFOAM it can send over very large data chunks + // Optional max size (bytes) for unstructured data exchanges. + // In some phases of OpenFOAM it can send over very large data chunks // (e.g. in parallel load balancing) and some MPI implementations have - // problems with this. Setting this variable > 0 indicates that the - // data exchange needs to be done in multiple passes, each of maxCommsSize. - // This is not switched on by default since it requires an additional - // global reduction, even if multi-pass is not needed) + // problems with this. + // + // This tuning parameter specifies the max number of bytes before + // switching to a multi-pass send/recv + // (currently only affects PstreamBuffers exchanges). + // + // 0 : disabled + // >0 : limit exchanges to specified number of bytes + // <0 : limit exchanges to INT_MAX minus specified number of bytes maxCommsSize 0; // Optional (experimental) feature in lduMatrixUpdate diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C index d82a270238cd7804aa61ff2ca535a4f1fc59d49e..340f0a2f990813172778ad811721fc72e35fce46 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C @@ -77,7 +77,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) if (debug) { - Pout<< "UIPstream::UIPstream PstreamBuffers :" + Perr<< "UIPstream::UIPstream PstreamBuffers :" << " fromProcNo:" << fromProcNo_ << " tag:" << tag_ << " comm:" << comm_ << " receive buffer size:" << messageSize_ diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C index ac47fdef691e384c375831ae1322fd8c452bf425..4e37481e56b197eb65137b7ac98feaceadc514b2 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C @@ -78,7 +78,7 @@ void Foam::Pstream::combineGather if (debug & 2) { - Pout<< " received from " + Perr<< " received from " << belowID << " data:" << received << endl; } @@ -98,7 +98,7 @@ void Foam::Pstream::combineGather if (debug & 2) { - Pout<< " received from " + Perr<< " received from " << belowID << " data:" << received << endl; } @@ -111,7 +111,7 @@ void Foam::Pstream::combineGather { if (debug & 2) { - Pout<< " sending to " << myComm.above() + Perr<< " sending to " << myComm.above() << " data:" << value << endl; } @@ -190,7 +190,7 @@ void Foam::Pstream::listCombineGather if (debug & 2) { - Pout<< " received from " + Perr<< " received from " << belowID << " data:" << received << endl; } @@ -213,7 +213,7 @@ void Foam::Pstream::listCombineGather if (debug & 2) { - Pout<< " received from " + Perr<< " received from " << belowID << " data:" << received << endl; } @@ -229,7 +229,7 @@ void Foam::Pstream::listCombineGather { if (debug & 2) { - Pout<< " sending to " << myComm.above() + Perr<< " sending to " << myComm.above() << " data:" << values << endl; } @@ -306,7 +306,7 @@ void Foam::Pstream::mapCombineGather if (debug & 2) { - Pout<< " received from " + Perr<< " received from " << belowID << " data:" << received << endl; } @@ -337,7 +337,7 @@ void Foam::Pstream::mapCombineGather { if (debug & 2) { - Pout<< " sending to " << myComm.above() + Perr<< " sending to " << myComm.above() << " data:" << values << endl; } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 034f6d3c33e82e456145026406991ae4bfa7eae6..c3f271d643f780f00d1d9a915b1b5ac8c8b4e80e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2023 OpenCFD Ltd. + Copyright (C) 2016-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,6 +24,21 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. +Note + The send/recv windows for chunk-wise transfers: + + iter data window + ---- ----------- + 0 [0, 1*chunk] + 1 [1*chunk, 2*chunk] + 2 [2*chunk, 3*chunk] + ... + + In older versions (v2312 and earlier) the number of chunks was + determined by the sender sizes and used an extra MPI_Allreduce. + However instead rely on the send/recv buffers having been consistently + sized, which avoids the additional reduction. + \*---------------------------------------------------------------------------*/ #include "Pstream.H" @@ -37,23 +52,82 @@ namespace Foam namespace PstreamDetail { -//- Setup sends and receives, each specified as [rank, span] tuple -// The serial list of tuples can be populated from other lists, from maps -// of data or subsets of lists/maps etc. +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to <int>. +template<class Type> +inline std::size_t maxTransferCount +( + const std::size_t max_bytes = std::size_t(0) +) noexcept +{ + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int> + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); +} + + +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to <int>. +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t maxTransferBytes(const int64_t max_bytes) noexcept +{ + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); +} + + +//- Exchange of \em contiguous data, with or without chunking. +//- Setup sends and receives, each specified as [rank, span] tuple. +// The serial list of tuples can be populated from other lists, from +// maps of data or subsets of lists/maps etc. +// +// Any waiting for requests is done outside by the caller template<class Type> -void exchangeBuf +void exchangeBuffers ( const UList<std::pair<int, stdFoam::span<const Type>>>& sends, const UList<std::pair<int, stdFoam::span<Type>>>& recvs, - const int tag, const label comm, - const bool wait + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { - const label startOfRequests = UPstream::nRequests(); + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + // Caller already checked for parRun + + if (sends.empty() && recvs.empty()) + { + // Nothing to do + return; + } + const int myProci = UPstream::myProcNo(comm); + + // The chunk size (number of elements) corresponding to max byte transfer. + // Is zero for non-chunked exchanges. + const std::size_t chunkSize + ( + PstreamDetail::maxTransferCount<Type> + ( + PstreamDetail::maxTransferBytes(maxComms_bytes) + ) + ); + + // Set up receives // ~~~~~~~~~~~~~~~ @@ -63,8 +137,14 @@ void exchangeBuf const auto proci = slot.first; auto& payload = slot.second; - if (proci != myProci && !payload.empty()) + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) + { + continue; + } + else if (!chunkSize) { + // Dispatch without chunks UIPstream::read ( UPstream::commsTypes::nonBlocking, @@ -74,212 +154,124 @@ void exchangeBuf tag, comm ); + continue; } - } - // Set up sends - // ~~~~~~~~~~~~ + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - for (const auto& slot : sends) - { - // [rank, span] - const auto proci = slot.first; - const auto& payload = slot.second; + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); - if (proci != myProci && !payload.empty()) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - payload.cdata_bytes(), - payload.size_bytes(), - tag, - comm - ) - ) + if (payload.size() <= beg) { - FatalErrorInFunction - << "Cannot send outgoing message to:" - << proci << " nBytes:" - << label(payload.size_bytes()) - << Foam::abort(FatalError); + // No more data windows + break; } - } - } - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ + recvType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); - if (wait) - { - UPstream::waitRequests(startOfRequests); + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + window.data_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); + } } -} -//- Chunked exchange of \em contiguous data. -//- Setup sends and receives, each specified as [rank, span] tuple. -// The serial list of tuples can be populated from other lists, from -// maps of data or subsets of lists/maps etc. -template<class Type> -void exchangeChunkedBuf -( - const UList<std::pair<int, stdFoam::span<const Type>>>& sends, - const UList<std::pair<int, stdFoam::span<Type>>>& recvs, - - const int tag, - const label comm, - const bool wait -) -{ - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - // Caller already checked for parRun and maxChunkSize > 0 + // Set up sends + // ~~~~~~~~~~~~ + for (const auto& slot : sends) { - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize = - ( - max - ( - static_cast<label>(1), - static_cast<label>(UPstream::maxCommsSize/sizeof(Type)) - ) - ); - - const int myProci = UPstream::myProcNo(comm); + // [rank, span] + const auto proci = slot.first; + const auto& payload = slot.second; - label nChunks(0); + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) { - // Get max send count (elements) - auto maxCount = static_cast<stdFoam::span<char>::size_type>(0); - - for (const auto& slot : sends) - { - // [rank, span] - const auto proci = slot.first; - const auto count = slot.second.size(); - - if (proci != myProci && count > maxCount) - { - // Note: using max() can be ambiguous - maxCount = count; - } - } + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks + bool ok = UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ); - // Convert from send count (elements) to number of chunks. - // Can normally calculate with (count-1), but add some safety - if (maxCount) + if (!ok) { - nChunks = 1 + label(maxCount/maxChunkSize); + FatalErrorInFunction + << "Failure sending message to:" << proci + << " nBytes:" << label(payload.size_bytes()) << nl + << Foam::abort(FatalError); } - - // MPI reduce (message tag is irrelevant) - reduce(nChunks, maxOp<label>(), UPstream::msgType(), comm); + continue; } - - // Dispatch the exchanges chunk-wise - List<sendTuple> sendChunks(sends); - List<recvTuple> recvChunks(recvs); - - // Dispatch - for (label iter = 0; iter < nChunks; ++iter) + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) { // The begin/end for the data window - const auto beg = static_cast<std::size_t>(iter*maxChunkSize); - const auto end = static_cast<std::size_t>((iter+1)*maxChunkSize); + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - forAll(sendChunks, sloti) - { - const auto& baseline = sends[sloti].second; - auto& payload = sendChunks[sloti].second; - - // Window the data - if (beg < baseline.size()) - { - payload = - ( - (end < baseline.size()) - ? baseline.subspan(beg, end - beg) - : baseline.subspan(beg) - ); - } - else - { - payload = baseline.first(0); // zero-sized - } - } + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); - forAll(recvChunks, sloti) + if (payload.size() <= beg) { - const auto& baseline = recvs[sloti].second; - auto& payload = recvChunks[sloti].second; - - // Window the data - if (beg < baseline.size()) - { - payload = - ( - (end < baseline.size()) - ? baseline.subspan(beg, end - beg) - : baseline.subspan(beg) - ); - } - else - { - payload = baseline.first(0); // zero-sized - } + // No more data windows + break; } + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); - // Exchange data chunks - PstreamDetail::exchangeBuf<Type> + bool ok = UOPstream::write ( - sendChunks, - recvChunks, - tag, - comm, - wait + UPstream::commsTypes::nonBlocking, + proci, + window.cdata_bytes(), + window.size_bytes(), + msgTagChunk, + comm ); - // Debugging output - can report on master only... - #if 0 // ifdef Foam_PstreamExchange_debug_chunks - do + if (!ok) { - labelList sendStarts(sends.size()); - labelList sendCounts(sends.size()); - - forAll(sendChunks, sloti) - { - const auto& baseline = sends[sloti].second; - const auto& payload = sendChunks[sloti].second; - - sendStarts[sloti] = (payload.data() - baseline.data()); - sendCounts[sloti] = (payload.size()); - } - - Info<< "iter " << iter - << ": beg=" << flatOutput(sendStarts) - << " len=" << flatOutput(sendCounts) << endl; - } while (false); - #endif + FatalErrorInFunction + << "Failure sending message to:" << proci + << " nBytes:" << label(window.size_bytes()) << nl + << Foam::abort(FatalError); + } } } } @@ -298,30 +290,99 @@ void exchangeContainer UList<Container>& recvBufs, const int tag, const label comm, - const bool wait //!< Wait for requests to complete + const bool wait, //!< Wait for requests to complete + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + // Caller already checked for parRun + + if (sendBufs.empty() && recvBufs.empty()) + { + // Nothing to do + return; + } + const label startOfRequests = UPstream::nRequests(); - const label myProci = UPstream::myProcNo(comm); + const int myProci = UPstream::myProcNo(comm); + + // The chunk size (number of elements) corresponding to max byte transfer + const std::size_t chunkSize + ( + PstreamDetail::maxTransferCount<Type> + ( + PstreamDetail::maxTransferBytes(maxComms_bytes) + ) + ); + // Set up receives // ~~~~~~~~~~~~~~~ forAll(recvBufs, proci) { - auto& recvData = recvBufs[proci]; + // [rank, span] + recvType payload + ( + recvBufs[proci].data(), + std::size_t(recvBufs[proci].size()) + ); - if (proci != myProci && !recvData.empty()) + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) { + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks UIPstream::read ( UPstream::commsTypes::nonBlocking, proci, - recvData.data_bytes(), - recvData.size_bytes(), + payload.data_bytes(), + payload.size_bytes(), tag, comm ); + continue; + } + + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); + + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); + + if (payload.size() <= beg) + { + // No more data windows + break; + } + + recvType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + window.data_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); } } @@ -331,35 +392,96 @@ void exchangeContainer forAll(sendBufs, proci) { - const auto& sendData = sendBufs[proci]; + // [rank, span] + sendType payload + ( + sendBufs[proci].cdata(), + std::size_t(sendBufs[proci].size()) + ); + + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) + { + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks + bool ok = UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ); - if (proci != myProci && !sendData.empty()) + if (!ok) + { + FatalErrorInFunction + << "Fallure sending message to:" << proci + << " nBytes:" << label(payload.size_bytes()) << nl + << Foam::abort(FatalError); + } + continue; + } + + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) { - if + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); + + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); + + if (payload.size() <= beg) + { + // No more data windows + break; + } + + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + bool ok = UOPstream::write ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) + UPstream::commsTypes::nonBlocking, + proci, + window.cdata_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); + + if (!ok) { FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) + << "Failure sending message to:" << proci + << " nBytes:" << label(window.size_bytes()) << nl << Foam::abort(FatalError); } } } + // Wait for all to finish // ~~~~~~~~~~~~~~~~~~~~~~ + if (UPstream::debug) + { + Perr<< "Pstream::exchange with " + << (UPstream::nRequests() - startOfRequests) + << " requests" << nl; + } + if (wait) { UPstream::waitRequests(startOfRequests); @@ -380,70 +502,111 @@ void exchangeContainer Map<Container>& recvBufs, const int tag, const label comm, - const bool wait //!< Wait for requests to complete + const bool wait, //!< Wait for requests to complete + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + typedef std::pair<int, sendType> sendTuple; + typedef std::pair<int, recvType> recvTuple; + const label startOfRequests = UPstream::nRequests(); const label myProci = UPstream::myProcNo(comm); - // Set up receives - // ~~~~~~~~~~~~~~~ - - forAllIters(recvBufs, iter) + // Serialize recv sequences + DynamicList<recvTuple> recvs(recvBufs.size()); { - const label proci = iter.key(); - auto& recvData = iter.val(); - - if (proci != myProci && !recvData.empty()) + forAllIters(recvBufs, iter) { - UIPstream::read + const auto proci = iter.key(); + auto& recvData = recvBufs[proci]; + + recvType payload ( - UPstream::commsTypes::nonBlocking, - proci, - recvData.data_bytes(), - recvData.size_bytes(), - tag, - comm + recvData.data(), + std::size_t(recvData.size()) ); + + // No self-communication or zero-size payload + if (proci != myProci && !payload.empty()) + { + recvs.emplace_back(proci, payload); + } } - } + std::sort + ( + recvs.begin(), + recvs.end(), + [=](const recvTuple& a, const recvTuple& b) + { + // Sorted processor order + return (a.first < b.first); - // Set up sends - // ~~~~~~~~~~~~ + // OR: // Shorter messages first + // return (a.second.size() < b.second.size()); + } + ); + } - forAllConstIters(sendBufs, iter) + // Serialize send sequences + DynamicList<sendTuple> sends(sendBufs.size()); { - const label proci = iter.key(); - const auto& sendData = iter.val(); - - if (proci != myProci && !sendData.empty()) + forAllConstIters(sendBufs, iter) { - if + const auto proci = iter.key(); + const auto& sendData = iter.val(); + + sendType payload ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) + sendData.cdata(), + std::size_t(sendData.size()) + ); + + // No self-communication or zero-size payload + if (proci != myProci && !payload.empty()) { - FatalErrorInFunction - << "Cannot send outgoing message to:" - << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); + sends.emplace_back(proci, payload); } } + + std::sort + ( + sends.begin(), + sends.end(), + [=](const sendTuple& a, const sendTuple& b) + { + // Sorted processor order + return (a.first < b.first); + + // OR: // Shorter messages first + // return (a.second.size() < b.second.size()); + } + ); } + // Exchange buffers in chunk-wise transfers + PstreamDetail::exchangeBuffers<Type> + ( + sends, + recvs, + tag, + comm, + maxComms_bytes + ); + // Wait for all to finish // ~~~~~~~~~~~~~~~~~~~~~~ + if (UPstream::debug) + { + Perr<< "Pstream::exchange with " + << (UPstream::nRequests() - startOfRequests) + << " requests" << nl; + } + if (wait) { UPstream::waitRequests(startOfRequests); @@ -476,17 +639,17 @@ void Foam::Pstream::exchange } const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of list " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "List size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - recvBufs.resize_nocopy(numProcs); + recvBufs.resize_nocopy(numProc); if (UPstream::is_parallel(comm)) { @@ -505,72 +668,15 @@ void Foam::Pstream::exchange } } - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - if (UPstream::maxCommsSize <= 0) - { - // Do the exchanging in one go - PstreamDetail::exchangeContainer<Container, Type> - ( - sendBufs, - recvBufs, - tag, - comm, - wait - ); - } - else - { - // Dispatch using chunk-wise exchanges - // Populate send sequence - DynamicList<sendTuple> sends(sendBufs.size()); - forAll(sendBufs, proci) - { - const auto& sendData = sendBufs[proci]; - - if (proci != myProci && !sendData.empty()) - { - sends.push_back - ( - sendTuple - ( - proci, - { sendData.cdata(), std::size_t(sendData.size()) } - ) - ); - } - } - - // Populate recv sequence - DynamicList<recvTuple> recvs(recvBufs.size()); - forAll(recvBufs, proci) - { - auto& recvData = recvBufs[proci]; - - if (proci != myProci && !recvData.empty()) - { - recvs.push_back - ( - recvTuple - ( - proci, - { recvData.data(), std::size_t(recvData.size()) } - ) - ); - } - } - - // Exchange buffers in chunks - PstreamDetail::exchangeChunkedBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } + PstreamDetail::exchangeContainer<Container, Type> + ( + sendBufs, + recvBufs, + tag, + comm, + wait + // (default: UPstream::maxCommsSize) + ); } // Do myself. Already checked if in communicator @@ -617,100 +723,15 @@ void Foam::Pstream::exchange } } - // Define the exchange sequences as a flattened list. - // We add an additional step of ordering the send/recv list - // by message size, which can help with transfer speeds. - - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - // Populate send sequences - DynamicList<sendTuple> sends(sendBufs.size()); - forAllConstIters(sendBufs, iter) - { - const auto proci = iter.key(); - const auto& sendData = iter.val(); - - if (proci != myProci && !sendData.empty()) - { - sends.push_back - ( - sendTuple - ( - proci, - { sendData.cdata(), std::size_t(sendData.size()) } - ) - ); - } - } - - // Shorter messages first - std::sort + PstreamDetail::exchangeContainer<Container, Type> ( - sends.begin(), - sends.end(), - [=](const sendTuple& a, const sendTuple& b) - { - return (a.second.size() < b.second.size()); - } + sendBufs, + recvBufs, + tag, + comm, + wait + // (default: UPstream::maxCommsSize) ); - - // Populate recv sequences - DynamicList<recvTuple> recvs(recvBufs.size()); - forAllIters(recvBufs, iter) - { - const auto proci = iter.key(); - auto& recvData = recvBufs[proci]; - - if (proci != myProci && !recvData.empty()) - { - recvs.push_back - ( - recvTuple - ( - proci, - { recvData.data(), std::size_t(recvData.size()) } - ) - ); - } - } - - // Shorter messages first - std::sort - ( - recvs.begin(), - recvs.end(), - [=](const recvTuple& a, const recvTuple& b) - { - return (a.second.size() < b.second.size()); - } - ); - - - if (UPstream::maxCommsSize <= 0) - { - // Do the exchanging in a single go - PstreamDetail::exchangeBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } - else - { - // Exchange buffers in chunks - PstreamDetail::exchangeChunkedBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } } // Do myself (if actually in the communicator) @@ -758,23 +779,23 @@ void Foam::Pstream::exchangeSizes } const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of container " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "Container size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - labelList sendSizes(numProcs); - for (label proci = 0; proci < numProcs; ++proci) + labelList sendSizes(numProc); + for (label proci = 0; proci < numProc; ++proci) { sendSizes[proci] = sendBufs[proci].size(); } - recvSizes.resize_nocopy(numProcs); + recvSizes.resize_nocopy(numProc); recvSizes = 0; // Ensure non-received entries are properly zeroed // Preserve self-send, even if not described by neighbourhood @@ -856,7 +877,6 @@ void Foam::Pstream::exchangeSizes const label comm ) { - Map<label> sendSizes(2*sendBufs.size()); recvSizes.clear(); // Done in allToAllConsensus too, but be explicit here if (!UPstream::is_rank(comm)) @@ -864,6 +884,9 @@ void Foam::Pstream::exchangeSizes return; // Process not in communicator } + Map<label> sendSizes; + sendSizes.reserve(sendBufs.size()); + forAllConstIters(sendBufs, iter) { const label proci = iter.key(); @@ -899,17 +922,17 @@ void Foam::Pstream::exchangeSizes return; // Process not in communicator } - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of container " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "Container size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - labelList sendSizes(numProcs); + labelList sendSizes(numProc); forAll(sendBufs, proci) { sendSizes[proci] = sendBufs[proci].size(); @@ -919,7 +942,7 @@ void Foam::Pstream::exchangeSizes if ( UPstream::nProcsNonblockingExchange > 1 - && UPstream::nProcsNonblockingExchange <= numProcs + && UPstream::nProcsNonblockingExchange <= numProc ) { // Use algorithm NBX: Nonblocking Consensus Exchange @@ -955,9 +978,17 @@ void Foam::Pstream::exchange // non-zero sendcounts. Post all sends and wait. labelList recvSizes; - exchangeSizes(sendBufs, recvSizes, comm); + Pstream::exchangeSizes(sendBufs, recvSizes, comm); - exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); + Pstream::exchange<Container, Type> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + wait + ); } @@ -975,9 +1006,17 @@ void Foam::Pstream::exchange // but using nonblocking consensus exchange for the sizes Map<label> recvSizes; - exchangeSizes(sendBufs, recvSizes, tag, comm); + Pstream::exchangeSizes(sendBufs, recvSizes, tag, comm); - exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); + Pstream::exchange<Container, Type> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + wait + ); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C index de7ce5fa4248f29237366d7874a85ff936e58b1c..b26b1be61d176c7f70702499385c1849cd8fea00 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2023 OpenCFD Ltd. + Copyright (C) 2023-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -82,7 +82,7 @@ void exchangeConsensus { buf.clear(); } - recvSizes = Zero; + recvSizes = Foam::zero{}; if (!UPstream::is_rank(comm)) { @@ -109,7 +109,7 @@ void exchangeConsensus recvBufs[myProci] = sendBufs[myProci]; if (myProci < recvSizes.size()) { - recvSizes[myProci] = recvBufs.size(); + recvSizes[myProci] = recvBufs[myProci].size(); } } @@ -175,7 +175,7 @@ void exchangeConsensus for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -189,8 +189,8 @@ void exchangeConsensus // Message found and had size. // - receive into dest buffer location - const label proci = probed.first; - const label count = (probed.second / sizeof(Type)); + const label proci(probed.first); + const label count(probed.second / sizeof(Type)); auto& recvData = recvBufs[proci]; recvData.resize(count); // OK with resize() instead of _nocopy() @@ -254,10 +254,10 @@ void exchangeConsensus { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); - // TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0); + const bool initialBarrier = (UPstream::tuning_NBX_ > 0); const label myProci = UPstream::myProcNo(comm); - const label numProc = UPstream::myProcNo(comm); + const label numProc = UPstream::nProcs(comm); // Initial: clear all receive locations // Preferrable to clear out the map entries instead of the map itself @@ -300,7 +300,12 @@ void exchangeConsensus // Setup sends // ------------------------------------------------------------------------ - // TDB: initialBarrier ... + // An initial barrier may help to avoid synchronisation problems + // caused elsewhere + if (initialBarrier) + { + UPstream::barrier(comm); + } // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers @@ -347,7 +352,7 @@ void exchangeConsensus for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -361,8 +366,8 @@ void exchangeConsensus // Message found and had size. // - receive into dest buffer location - const label proci = probed.first; - const label count = (probed.second / sizeof(Type)); + const label proci(probed.first); + const label count(probed.second / sizeof(Type)); auto& recvData = recvBufs(proci); recvData.resize(count); // OK with resize() instead of _nocopy() diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C index e3705feed57ea4028f69a4fa6e4f15ff32df3999..5b3719e96166b06c4733046545a1fcb78daecfb5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C @@ -107,7 +107,7 @@ void Foam::Pstream::gatherList if (debug & 2) { - Pout<< " received through " + Perr<< " received through " << belowID << " data from:" << belowID << " data:" << values[belowID] << endl; } @@ -119,7 +119,7 @@ void Foam::Pstream::gatherList if (debug & 2) { - Pout<< " received through " + Perr<< " received through " << belowID << " data from:" << leafID << " data:" << values[leafID] << endl; } @@ -136,7 +136,7 @@ void Foam::Pstream::gatherList if (debug & 2) { - Pout<< " sending to " << myComm.above() + Perr<< " sending to " << myComm.above() << " data from me:" << myProci << " data:" << values[myProci] << endl; } @@ -177,7 +177,7 @@ void Foam::Pstream::gatherList { if (debug & 2) { - Pout<< " sending to " + Perr<< " sending to " << myComm.above() << " data from:" << leafID << " data:" << values[leafID] << endl; } @@ -259,7 +259,7 @@ void Foam::Pstream::scatterList if (debug & 2) { - Pout<< " received through " + Perr<< " received through " << myComm.above() << " data for:" << leafID << " data:" << values[leafID] << endl; } @@ -310,7 +310,7 @@ void Foam::Pstream::scatterList if (debug & 2) { - Pout<< " sent through " + Perr<< " sent through " << belowID << " data for:" << leafID << " data:" << values[leafID] << endl; } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index aff75a17c0a35104201f5af19fe6612a949b79f7..8bd8b474cf824afac7b4a68a2a55d14a301baec7 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -61,8 +61,8 @@ void reduce { if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) { - Pout<< "** reducing:" << value << " with comm:" << comm << endl; - error::printStack(Pout); + Perr<< "** reducing:" << value << " with comm:" << comm << endl; + error::printStack(Perr); } Pstream::gather(value, bop, tag, comm); Pstream::broadcast(value, comm); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C index a8d2c9e532fc389780f5b15b0b9cfe74c22a49f3..cfbb324164619f066e1bbce0faac81897c83b74f 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C @@ -257,7 +257,7 @@ Foam::UIPstreamBase::~UIPstreamBase() { if (debug) { - Pout<< "UIPstreamBase Destructor : tag:" << tag_ + Perr<< "UIPstreamBase Destructor : tag:" << tag_ << " fromProcNo:" << fromProcNo_ << " clearing receive buffer of size " << recvBuf_.size() diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index cf531d1014e2da1ee1f25ee61e8d3c2e0c50a259..c0dae071f9c2081a76b8690af6389ec518f1afc0 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -221,7 +221,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads) if (debug) { - Pout<< "UPstream::setParRun :" + Perr<< "UPstream::setParRun :" << " nProcs:" << nProcs << " haveThreads:" << haveThreads << endl; @@ -274,7 +274,7 @@ Foam::label Foam::UPstream::allocateCommunicator if (debug) { - Pout<< "Allocating communicator " << index << nl + Perr<< "Allocating communicator " << index << nl << " parent : " << parentIndex << nl << " procs : " << subRanks << nl << endl; @@ -335,7 +335,7 @@ Foam::label Foam::UPstream::allocateCommunicator if (debug) { - Pout<< "Allocating communicator " << index << nl + Perr<< "Allocating communicator " << index << nl << " parent : " << parentIndex << nl << " procs : " << flatOutput(subRanks) << nl << endl; @@ -492,7 +492,7 @@ bool Foam::UPstream::allocateHostCommunicatorPairs() if (debug) { - Pout<< "Allocating host communicators " + Perr<< "Allocating host communicators " << interHostComm_ << ", " << intraHostComm_ << nl << " parent : " << parentCommunicator << nl << endl; @@ -588,7 +588,7 @@ void Foam::UPstream::freeCommunicator if (debug) { - Pout<< "Communicators : Freeing communicator " << communicator + Perr<< "Communicators : Freeing communicator " << communicator << " parent: " << parentComm_[communicator] << " myProcNo: " << myProcNo_[communicator] << endl; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index ee446347708652eedd9148f2c6d1897c53762336..089d7b6e8288b00b504320d47d59971c86aa6d0e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -565,14 +565,14 @@ public: //- Probe for an incoming message. // - // \param commsType Blocking or not + // \param commsType Non-blocking or not // \param fromProcNo The source rank (negative == ANY_SOURCE) // \param tag The source message tag // \param communicator The communicator index // // \returns source rank and message size (bytes) // and (-1, 0) on failure - static std::pair<int,int> probeMessage + static std::pair<int,int64_t> probeMessage ( const UPstream::commsTypes commsType, const int fromProcNo, diff --git a/src/OpenFOAM/db/error/messageStream.C b/src/OpenFOAM/db/error/messageStream.C index 0953f53d75eb5a344aa7b36bf7c25d2791ab1ca0..2401debdf44ebfd34f7a29164aa30653316b502c 100644 --- a/src/OpenFOAM/db/error/messageStream.C +++ b/src/OpenFOAM/db/error/messageStream.C @@ -145,8 +145,8 @@ Foam::OSstream& Foam::messageStream::masterStream(const label communicator) { if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm) { - Pout<< "** messageStream with comm:" << communicator << endl; - error::printStack(Pout); + Perr<< "** messageStream with comm:" << communicator << endl; + error::printStack(Perr); } if (communicator == UPstream::worldComm || UPstream::master(communicator)) diff --git a/src/OpenFOAM/global/argList/argList.C b/src/OpenFOAM/global/argList/argList.C index 05934efede7ebcaae7c341c20f21f0c6d9cb7e74..557d4614a5a95730c48deb845ad6a32dbb5b937e 100644 --- a/src/OpenFOAM/global/argList/argList.C +++ b/src/OpenFOAM/global/argList/argList.C @@ -2049,6 +2049,8 @@ void Foam::argList::parse Info<< "Pstream initialized with:" << nl << " floatTransfer : " << Switch::name(UPstream::floatTransfer) << nl + << " maxCommsSize : " + << UPstream::maxCommsSize << nl << " nProcsSimpleSum : " << UPstream::nProcsSimpleSum << nl << " nonBlockingExchange: " diff --git a/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C b/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C index f45eedbbd03871b62198312ddb30039b4e5fe165..454df5ffd2f8724996cb063b0615577adb1c4981 100644 --- a/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C +++ b/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C @@ -146,15 +146,17 @@ static void broadcastFile_single const uint64_t maxChunkSize = ( - UPstream::maxCommsSize > 0 + (UPstream::maxCommsSize > 0) ? uint64_t(UPstream::maxCommsSize) - : uint64_t(pTraits<int>::max) + : (UPstream::maxCommsSize < 0) // (numBytes fewer than INT_MAX) + ? uint64_t(INT_MAX + UPstream::maxCommsSize) + : uint64_t(INT_MAX) // MPI limit is <int> ); while (fileLength > 0) { - const uint64_t sendSize = min(fileLength, maxChunkSize); + const uint64_t sendSize = std::min(fileLength, maxChunkSize); fileLength -= sendSize; // Read file contents into a character buffer diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 573c0944696d2dc7bdb56b7ae96444392666c085..c935914db63e1da9dd74ba905e662133cac2dcf8 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -91,7 +91,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) {} -std::pair<int,int> +std::pair<int,int64_t> Foam::UPstream::probeMessage ( const UPstream::commsTypes commsType, @@ -100,7 +100,7 @@ Foam::UPstream::probeMessage const label communicator ) { - return std::pair<int,int>(-1, 0); + return std::pair<int,int64_t>(-1, 0); } diff --git a/src/Pstream/mpi/UIPBstreamRead.C b/src/Pstream/mpi/UIPBstreamRead.C index 0ee1d63ce39a3a0f0309f392aac66f9831415749..5080583396cd19d300eae994d0617a803f4c19ea 100644 --- a/src/Pstream/mpi/UIPBstreamRead.C +++ b/src/Pstream/mpi/UIPBstreamRead.C @@ -60,7 +60,7 @@ void Foam::UIPBstream::bufferIPCrecv() if (UPstream::debug) { - Pout<< "UOPBstream IPC read buffer :" + Perr<< "UOPBstream IPC read buffer :" << " root:" << fromProcNo_ << " comm:" << comm_ << " probed size:" << label(bufSize) diff --git a/src/Pstream/mpi/UIPstreamRead.C b/src/Pstream/mpi/UIPstreamRead.C index 2a690f4b9f8305bfe234f9a52f1315f88110823d..bec4a2e87f6cbd72c0e593d25ceff5174ed733d5 100644 --- a/src/Pstream/mpi/UIPstreamRead.C +++ b/src/Pstream/mpi/UIPstreamRead.C @@ -68,17 +68,17 @@ static std::streamsize UPstream_mpi_receive if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm) { - Pout<< "UIPstream::read : starting read from:" << fromProcNo + Perr<< "UIPstream::read : starting read from:" << fromProcNo << " size:" << label(bufSize) << " tag:" << tag << " comm:" << communicator << " commsType:" << UPstream::commsTypeNames[commsType] << " warnComm:" << UPstream::warnComm << Foam::endl; - error::printStack(Pout); + error::printStack(Perr); } else if (UPstream::debug) { - Pout<< "UIPstream::read : starting read from:" << fromProcNo + Perr<< "UIPstream::read : starting read from:" << fromProcNo << " size:" << label(bufSize) << " tag:" << tag << " comm:" << communicator << " commsType:" << UPstream::commsTypeNames[commsType] @@ -123,7 +123,7 @@ static std::streamsize UPstream_mpi_receive } else if (UPstream::debug) { - Pout<< "UIPstream::read : finished recv from:" + Perr<< "UIPstream::read : finished recv from:" << fromProcNo << " size:" << label(bufSize) << " tag:" << tag << Foam::endl; @@ -198,7 +198,7 @@ static std::streamsize UPstream_mpi_receive if (UPstream::debug) { - Pout<< "UIPstream::read : started non-blocking recv from:" + Perr<< "UIPstream::read : started non-blocking recv from:" << fromProcNo << " size:" << label(bufSize) << " tag:" << tag << " request:" << @@ -225,7 +225,7 @@ void Foam::UIPstream::bufferIPCrecv() // Called by constructor if (UPstream::debug) { - Pout<< "UIPstream IPC read buffer :" + Perr<< "UIPstream IPC read buffer :" << " from:" << fromProcNo_ << " tag:" << tag_ << " comm:" << comm_ << " wanted size:" << recvBuf_.capacity() @@ -291,7 +291,7 @@ void Foam::UIPstream::bufferIPCrecv() if (UPstream::debug) { - Pout<< "UIPstream::UIPstream : probed size:" + Perr<< "UIPstream::UIPstream : probed size:" << label(count) << Foam::endl; } diff --git a/src/Pstream/mpi/UOPstreamWrite.C b/src/Pstream/mpi/UOPstreamWrite.C index 8613c17dfcff42ac0aad9ecbe6269e3759a65b8d..a8f4f04385f8dc3439f3ea2d06d161dbf02a91fd 100644 --- a/src/Pstream/mpi/UOPstreamWrite.C +++ b/src/Pstream/mpi/UOPstreamWrite.C @@ -74,17 +74,17 @@ bool Foam::UOPstream::write if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm) { - Pout<< "UOPstream::write : starting write to:" << toProcNo + Perr<< "UOPstream::write : starting write to:" << toProcNo << " size:" << label(bufSize) << " tag:" << tag << " comm:" << communicator << " commType:" << UPstream::commsTypeNames[commsType] << " warnComm:" << UPstream::warnComm << Foam::endl; - error::printStack(Pout); + error::printStack(Perr); } else if (UPstream::debug) { - Pout<< "UOPstream::write : starting write to:" << toProcNo + Perr<< "UOPstream::write : starting write to:" << toProcNo << " size:" << label(bufSize) << " tag:" << tag << " comm:" << communicator << " commType:" << UPstream::commsTypeNames[commsType] @@ -114,7 +114,7 @@ bool Foam::UOPstream::write if (UPstream::debug) { - Pout<< "UOPstream::write : finished buffered send to:" + Perr<< "UOPstream::write : finished buffered send to:" << toProcNo << " size:" << label(bufSize) << " tag:" << tag << Foam::endl; @@ -152,7 +152,7 @@ bool Foam::UOPstream::write if (UPstream::debug) { - Pout<< "UOPstream::write : finished send to:" + Perr<< "UOPstream::write : finished send to:" << toProcNo << " size:" << label(bufSize) << " tag:" << tag << Foam::endl; @@ -191,7 +191,7 @@ bool Foam::UOPstream::write if (UPstream::debug) { - Pout<< "UOPstream::write : started non-blocking send to:" + Perr<< "UOPstream::write : started non-blocking send to:" << toProcNo << " size:" << label(bufSize) << " tag:" << tag << " request:" << diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index b23c85d66c26211ef081657db0c2b6ee8c9749f3..5314d8027aff1e82f0525f6ba06b3d3ea03cd8c2 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -90,13 +90,13 @@ static void attachOurBuffers() if (Foam::UPstream::debug) { - Foam::Pout<< "UPstream::init : buffer-size " << len << '\n'; + Foam::Perr<< "UPstream::init : buffer-size " << len << '\n'; } } else { delete[] buf; - Foam::Pout<< "UPstream::init : could not attach buffer\n"; + Foam::Perr<< "UPstream::init : could not attach buffer\n"; } #endif } @@ -171,7 +171,7 @@ bool Foam::UPstream::initNull() { if (UPstream::debug) { - Pout<< "UPstream::initNull : was already initialized\n"; + Perr<< "UPstream::initNull : was already initialized\n"; } } else @@ -229,7 +229,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread) } else if (UPstream::debug) { - Pout<< "UPstream::init : was already initialized\n"; + Perr<< "UPstream::init : was already initialized\n"; } } else @@ -283,7 +283,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread) if (UPstream::debug) { - Pout<< "UPstream::init :" + Perr<< "UPstream::init :" << " thread-support : requested:" << needsThread << " obtained:" << ( @@ -390,7 +390,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread) &subRank ); - Pout<< "UPstream::init : in world:" << world + Perr<< "UPstream::init : in world:" << world << " using local communicator:" << subComm << " rank " << subRank << " of " << subNumProcs @@ -436,7 +436,7 @@ void Foam::UPstream::shutdown(int errNo) } else if (UPstream::debug && errNo == 0) { - Pout<< "UPstream::shutdown : was already finalized\n"; + Perr<< "UPstream::shutdown : was already finalized\n"; } ourMpi = false; return; @@ -465,7 +465,7 @@ void Foam::UPstream::shutdown(int errNo) if (UPstream::debug) { - Pout<< "UPstream::shutdown\n"; + Perr<< "UPstream::shutdown\n"; } // Check for any outstanding requests @@ -691,7 +691,7 @@ void Foam::UPstream::freeCommunicatorComponents(const label index) { if (UPstream::debug) { - Pout<< "freeCommunicatorComponents: " << index + Perr<< "freeCommunicatorComponents: " << index << " from " << PstreamGlobals::MPICommunicators_.size() << endl; } @@ -766,7 +766,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) } -std::pair<int,int> +std::pair<int,int64_t> Foam::UPstream::probeMessage ( const UPstream::commsTypes commsType, @@ -775,7 +775,7 @@ Foam::UPstream::probeMessage const label communicator ) { - std::pair<int,int> result(-1, 0); + std::pair<int,int64_t> result(-1, 0); // No-op for non-parallel or not on communicator if (!UPstream::parRun() || !UPstream::is_rank(communicator)) @@ -869,7 +869,7 @@ Foam::UPstream::probeMessage result.first = status.MPI_SOURCE; - result.second = int(count); + result.second = int64_t(count); } return result; diff --git a/src/Pstream/mpi/UPstreamBroadcast.C b/src/Pstream/mpi/UPstreamBroadcast.C index f3b7fac7d4a8eb14c5341f978842da4b01d01efb..dcf9ca8ed9dc4567e7c311a6d04e06754b3f04a7 100644 --- a/src/Pstream/mpi/UPstreamBroadcast.C +++ b/src/Pstream/mpi/UPstreamBroadcast.C @@ -49,16 +49,16 @@ bool Foam::UPstream::broadcast if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) { - Pout<< "UPstream::broadcast : root:" << rootProcNo + Perr<< "UPstream::broadcast : root:" << rootProcNo << " comm:" << comm << " size:" << label(bufSize) << " warnComm:" << UPstream::warnComm << Foam::endl; - error::printStack(Pout); + error::printStack(Perr); } else if (UPstream::debug) { - Pout<< "UPstream::broadcast : root:" << rootProcNo + Perr<< "UPstream::broadcast : root:" << rootProcNo << " comm:" << comm << " size:" << label(bufSize) << Foam::endl; diff --git a/src/Pstream/mpi/UPstreamRequest.C b/src/Pstream/mpi/UPstreamRequest.C index 04947e202822c9f1862687430debc654e315fa6a..3ac8b8ff8ab436803bd8e409284f8b6e79c192ff 100644 --- a/src/Pstream/mpi/UPstreamRequest.C +++ b/src/Pstream/mpi/UPstreamRequest.C @@ -291,7 +291,7 @@ void Foam::UPstream::waitRequests(const label pos, label len) if (UPstream::debug) { - Pout<< "UPstream::waitRequests : starting wait for " + Perr<< "UPstream::waitRequests : starting wait for " << count << " requests starting at " << pos << endl; } @@ -328,7 +328,7 @@ void Foam::UPstream::waitRequests(const label pos, label len) if (UPstream::debug) { - Pout<< "UPstream::waitRequests : finished wait." << endl; + Perr<< "UPstream::waitRequests : finished wait." << endl; } } @@ -409,7 +409,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len) if (UPstream::debug) { - Pout<< "UPstream::waitAnyRequest : starting wait for any of " + Perr<< "UPstream::waitAnyRequest : starting wait for any of " << count << " requests starting at " << pos << endl; } @@ -470,7 +470,7 @@ bool Foam::UPstream::waitSomeRequests if (UPstream::debug) { - Pout<< "UPstream:waitSomeRequest : starting wait for some of " + Perr<< "UPstream:waitSomeRequest : starting wait for some of " << count << " requests starting at " << pos << endl; } @@ -563,7 +563,7 @@ bool Foam::UPstream::waitSomeRequests if (UPstream::debug) { - Pout<< "UPstream:waitSomeRequest : starting wait for some of " + Perr<< "UPstream:waitSomeRequest : starting wait for some of " << requests.size() << " requests" << endl; } @@ -753,7 +753,7 @@ void Foam::UPstream::waitRequest(const label i) if (UPstream::debug) { - Pout<< "UPstream::waitRequest : starting wait for request:" + Perr<< "UPstream::waitRequest : starting wait for request:" << i << endl; } @@ -771,7 +771,7 @@ void Foam::UPstream::waitRequest(const label i) if (UPstream::debug) { - Pout<< "UPstream::waitRequest : finished wait for request:" + Perr<< "UPstream::waitRequest : finished wait for request:" << i << endl; } } @@ -823,7 +823,7 @@ bool Foam::UPstream::finishedRequest(const label i) if (UPstream::debug) { - Pout<< "UPstream::finishedRequest : check request:" + Perr<< "UPstream::finishedRequest : check request:" << i << endl; } @@ -898,7 +898,7 @@ bool Foam::UPstream::finishedRequests(const label pos, label len) if (UPstream::debug) { - Pout<< "UPstream::finishedRequests : check " << count + Perr<< "UPstream::finishedRequests : check " << count << " requests starting at " << pos << endl; } diff --git a/src/Pstream/mpi/UPstreamWrapping.txx b/src/Pstream/mpi/UPstreamWrapping.txx index 94fc3025e19ab291ec4d2ee2a9baab7e5bc81587..97b0a95b4c5622c0f21bdf3903d6de7a6695f042 100644 --- a/src/Pstream/mpi/UPstreamWrapping.txx +++ b/src/Pstream/mpi/UPstreamWrapping.txx @@ -80,18 +80,18 @@ void Foam::PstreamDetail::reduce0 if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) { - Pout<< "** MPI_Reduce (blocking):"; + Perr<< "** MPI_Reduce (blocking):"; if (count == 1) { - Pout<< (*values); + Perr<< (*values); } else { - Pout<< UList<Type>(values, count); + Perr<< UList<Type>(values, count); } - Pout<< " with comm:" << comm + Perr<< " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } profilingPstream::beginTiming(); @@ -138,23 +138,23 @@ void Foam::PstreamDetail::allReduce { if (immediate) { - Pout<< "** MPI_Iallreduce (non-blocking):"; + Perr<< "** MPI_Iallreduce (non-blocking):"; } else { - Pout<< "** MPI_Allreduce (blocking):"; + Perr<< "** MPI_Allreduce (blocking):"; } if (count == 1) { - Pout<< (*values); + Perr<< (*values); } else { - Pout<< UList<Type>(values, count); + Perr<< UList<Type>(values, count); } - Pout<< " with comm:" << comm + Perr<< " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } @@ -245,18 +245,18 @@ void Foam::PstreamDetail::allToAll { if (immediate) { - Pout<< "** MPI_Ialltoall (non-blocking):"; + Perr<< "** MPI_Ialltoall (non-blocking):"; } else { - Pout<< "** MPI_Alltoall (blocking):"; + Perr<< "** MPI_Alltoall (blocking):"; } - Pout<< " numProc:" << numProc + Perr<< " numProc:" << numProc << " sendData:" << sendData.size() << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } if @@ -376,18 +376,18 @@ void Foam::PstreamDetail::allToAllv { if (immediate) { - Pout<< "** MPI_Ialltoallv (non-blocking):"; + Perr<< "** MPI_Ialltoallv (non-blocking):"; } else { - Pout<< "** MPI_Alltoallv (blocking):"; + Perr<< "** MPI_Alltoallv (blocking):"; } - Pout<< " sendCounts:" << sendCounts + Perr<< " sendCounts:" << sendCounts << " sendOffsets:" << sendOffsets << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } if @@ -515,13 +515,13 @@ void Foam::PstreamDetail::allToAllConsensus if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) { - Pout<< "** non-blocking consensus Alltoall (list):"; - Pout<< " numProc:" << numProc + Perr<< "** non-blocking consensus Alltoall (list):"; + Perr<< " numProc:" << numProc << " sendData:" << sendData.size() << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } if (sendData.size() != numProc || recvData.size() != numProc) @@ -717,13 +717,13 @@ void Foam::PstreamDetail::allToAllConsensus if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) { - Pout<< "** non-blocking consensus Alltoall (map):"; - Pout<< " numProc:" << numProc + Perr<< "** non-blocking consensus Alltoall (map):"; + Perr<< " numProc:" << numProc << " sendData:" << sendBufs.size() << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } // Initial: clear out everything @@ -917,18 +917,18 @@ void Foam::PstreamDetail::gather { if (immediate) { - Pout<< "** MPI_Igather (non-blocking):"; + Perr<< "** MPI_Igather (non-blocking):"; } else { - Pout<< "** MPI_Gather (blocking):"; + Perr<< "** MPI_Gather (blocking):"; } - Pout<< " numProc:" << numProc + Perr<< " numProc:" << numProc << " count:" << count << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } @@ -1024,18 +1024,18 @@ void Foam::PstreamDetail::scatter { if (immediate) { - Pout<< "** MPI_Iscatter (non-blocking):"; + Perr<< "** MPI_Iscatter (non-blocking):"; } else { - Pout<< "** MPI_Scatter (blocking):"; + Perr<< "** MPI_Scatter (blocking):"; } - Pout<< " numProc:" << numProc + Perr<< " numProc:" << numProc << " count:" << count << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } @@ -1132,19 +1132,19 @@ void Foam::PstreamDetail::gatherv { if (immediate) { - Pout<< "** MPI_Igatherv (non-blocking):"; + Perr<< "** MPI_Igatherv (non-blocking):"; } else { - Pout<< "** MPI_Gatherv (blocking):"; + Perr<< "** MPI_Gatherv (blocking):"; } - Pout<< " np:" << np + Perr<< " np:" << np << " recvCounts:" << recvCounts << " recvOffsets:" << recvOffsets << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } if @@ -1274,19 +1274,19 @@ void Foam::PstreamDetail::scatterv { if (immediate) { - Pout<< "** MPI_Iscatterv (non-blocking):"; + Perr<< "** MPI_Iscatterv (non-blocking):"; } else { - Pout<< "** MPI_Scatterv (blocking):"; + Perr<< "** MPI_Scatterv (blocking):"; } - Pout<< " np:" << np + Perr<< " np:" << np << " sendCounts:" << sendCounts << " sendOffsets:" << sendOffsets << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); } if @@ -1400,17 +1400,17 @@ void Foam::PstreamDetail::allGather { if (immediate) { - Pout<< "** MPI_Iallgather (non-blocking):"; + Perr<< "** MPI_Iallgather (non-blocking):"; } else { - Pout<< "** MPI_Allgather (blocking):"; + Perr<< "** MPI_Allgather (blocking):"; } - Pout<< " numProc:" << UPstream::nProcs(comm) + Perr<< " numProc:" << UPstream::nProcs(comm) << " with comm:" << comm << " warnComm:" << UPstream::warnComm << endl; - error::printStack(Pout); + error::printStack(Perr); }