From d157793043a2964a89a33e6e8617c82e1c4eb662 Mon Sep 17 00:00:00 2001 From: Mark Olesen <Mark.Olesen@esi-group.com> Date: Mon, 29 Apr 2024 09:09:50 +0200 Subject: [PATCH] ENH: improve handling of multi-pass send/recv (#3152) - the maxCommsSize variable is used to 'chunk' large data transfers (eg, with PstreamBuffers) into a multi-pass send/recv sequence. The send/recv windows for chunk-wise transfers: iter data window ---- ----------- 0 [0, 1*chunk] 1 [1*chunk, 2*chunk] 2 [2*chunk, 3*chunk] ... Since we mostly send/recv in bytes, the current internal limit for MPI counts (INT_MAX) can be hit rather quickly. The chunking limit should thus also be INT_MAX, but since it is rather tedious to specify such large numbers, can instead use maxCommsSize = -1 to specify (INT_MAX-1) as the limit. The default value of maxCommsSize = 0 (ie, no chunking). Note ~~~~ In previous versions, the number of chunks was determined by the sender sizes. This required an additional MPI_Allreduce to establish an overall consistent number of chunks to walk. This additional overhead each time meant that maxCommsSize was rarely actually enabled. We can, however, instead rely on the send/recv buffers having been consistently sized and simply walk through the local send/recvs until no futher chunks need to be exchanged. As an additional enhancement, the message tags are connected to chunking iteration, which allows the setup of all send/recvs without an intermediate Allwait. ENH: extend UPstream::probeMessage to use int64 instead of int for sizes --- .../parallel-chunks/Test-parallel-chunks.C | 304 ++---- .../parallel-comm3b/Test-parallel-comm3b.C | 6 +- .../test/parallel-nbx2/Test-parallel-nbx2.C | 6 +- etc/controlDict | 19 +- .../db/IOstreams/Pstreams/PstreamExchange.C | 891 +++++++++--------- .../Pstreams/PstreamExchangeConsensus.C | 29 +- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 4 +- src/OpenFOAM/global/argList/argList.C | 2 + .../fileOperation/fileOperationBroadcast.C | 8 +- src/Pstream/dummy/UPstream.C | 4 +- src/Pstream/mpi/UPstream.C | 6 +- 11 files changed, 608 insertions(+), 671 deletions(-) diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C index b5181bccec9..30e71a12c51 100644 --- a/applications/test/parallel-chunks/Test-parallel-chunks.C +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -44,175 +44,62 @@ Description using namespace Foam; -// Looks like Pstream::exchangeBuf -template<class T> -void do_exchangeBuf +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to <int>. +template<class Type> +inline std::size_t maxTransferCount ( - const label sendSize, - const char* sendData, - const label recvSize, - char* recvData, - const int tag, - const label comm, - const bool wait -) + const std::size_t max_bytes = std::size_t(0) +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // forAll(recvSizes, proci) - { - // if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) - if (!Pstream::master(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData, - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - // forAll(sendBufs, proci) - for (const int proci : Pstream::subProcs(comm)) - { - if (sendSize > 0) - // if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData, - sendSize*sizeof(T), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendSize*sizeof(T)) - << Foam::abort(FatalError); - } - } - } - - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int> + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); } -// Looks like Pstream::exchangeContainer -template<class Container, class T> -void do_exchangeContainer +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to <int>. +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t PstreamDetail_maxTransferBytes ( - const Container& sendData, - const label recvSize, - Container& recvData, - const int tag, - const label comm, - const bool wait -) + const int64_t max_bytes +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // for (const int proci : Pstream::allProcs(comm)) - { - if (!Pstream::master(comm) && recvSize > 0) - // if (proci != Pstream::myProcNo(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData.data_bytes(), - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - if (Pstream::master(comm) && sendData.size() > 0) - { - for (const int proci : Pstream::subProcs(comm)) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); - } - } - } - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); } -template<class Container, class T> +template<class Container, class Type> void broadcast_chunks ( Container& sendData, const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true + const label comm = UPstream::worldComm + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { // OR static_assert(is_contiguous<T>::value, "Contiguous data only!") - if (!is_contiguous<T>::value) + if (!is_contiguous<Type>::value) { FatalErrorInFunction - << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError); + << "Contiguous data only." << sizeof(Type) + << Foam::abort(FatalError); } - if (UPstream::maxCommsSize <= 0) + if (maxComms_bytes == 0) { // Do in one go Info<< "send " << sendData.size() << " elements in one go" << endl; @@ -227,93 +114,90 @@ void broadcast_chunks sendData.resize_nocopy(recvSize); // A no-op on master - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize + // The chunk size (number of elements) corresponding to max byte transfer + // Is zero for non-chunked exchanges. + const std::size_t chunkSize ( - max + PstreamDetail_maxTransferCount<Type> ( - static_cast<label>(1), - static_cast<label>(UPstream::maxCommsSize/sizeof(T)) + PstreamDetail_maxTransferBytes(maxComms_bytes) ) ); - label nChunks(0); - { - // Get max send count (elements) - // forAll(sendBufs, proci) - // { - // if (proci != Pstream::myProcNo(comm)) - // { - // nChunks = max(nChunks, sendBufs[proci].size()); - // } - // } - nChunks = sendSize; + if (chunkSize) + { // Convert from send count (elements) to number of chunks. // Can normally calculate with (count-1), but add some safety - if (nChunks) - { - nChunks = 1 + (nChunks/maxChunkSize); - } - reduce(nChunks, maxOp<label>(), tag, comm); + label nChunks = 1 + (sendSize/label(chunkSize)); Info << "send " << sendSize << " elements (" - << (sendSize*sizeof(T)) << " bytes) in " << nChunks - << " chunks of " << maxChunkSize << " elements (" - << (maxChunkSize*sizeof(T)) << " bytes) for maxCommsSize:" - << Pstream::maxCommsSize + << (sendSize*sizeof(Type)) << " bytes) in " << nChunks + << " chunks of " << label(chunkSize) << " elements (" + << label(chunkSize*sizeof(Type)) << " bytes) for maxCommsSize:" + << label(maxComms_bytes) << endl; } + // stress-test with shortened sendSize // will produce useless loops, but no calls // sendSize /= 2; - label nSend(0); - label startSend(0); - char* charPtrSend; + typedef stdFoam::span<Type> sendType; - for (label iter = 0; iter < nChunks; ++iter) + do { - nSend = min - ( - maxChunkSize, - sendSize-startSend - ); + sendType payload(sendData.data(), sendData.size()); - charPtrSend = - ( - nSend > 0 - ? reinterpret_cast<char*>(&(sendData[startSend])) - : nullptr - ); + if (!chunkSize) + { + UPstream::broadcast + ( + payload.data_bytes(), + payload.size_bytes(), + comm + ); + break; + } - Info<< "iter " << iter - << ": beg=" << startSend << " len=" << nSend - << " (" << (nSend*sizeof(T)) << " bytes)" << endl; + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - UPstream::broadcast(charPtrSend, nSend*sizeof(T), comm); + if (payload.size() <= beg) + { + // No more data windows + break; + } - // forAll(nSend, proci) - { - startSend += nSend; + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + Info<< "iter " << iter + << ": beg=" << label(beg) << " len=" << label(window.size()) + << " (" << label(window.size_bytes()) << " bytes)" << endl; + + UPstream::broadcast + ( + window.data_bytes(), + window.size_bytes(), + comm + ); } } + while (false); - Info<< "final: " << startSend << endl; + Info<< "final" << endl; } @@ -333,7 +217,7 @@ int main(int argc, char *argv[]) } labelList input1; - if (Pstream::master()) + if (UPstream::master()) { input1 = identity(500); } @@ -348,7 +232,7 @@ int main(int argc, char *argv[]) // Mostly the same with PstreamBuffers if (false) { - PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking); + PstreamBuffers pBufs; labelList sendData; if (Pstream::master()) diff --git a/applications/test/parallel-comm3b/Test-parallel-comm3b.C b/applications/test/parallel-comm3b/Test-parallel-comm3b.C index 0bbcc5da5e1..ea64880146d 100644 --- a/applications/test/parallel-comm3b/Test-parallel-comm3b.C +++ b/applications/test/parallel-comm3b/Test-parallel-comm3b.C @@ -130,7 +130,7 @@ int main(int argc, char *argv[]) for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -143,8 +143,8 @@ int main(int argc, char *argv[]) { // Message found and had size: receive it - const label proci = probed.first; - const label count = probed.second; + const label proci(probed.first); + const label count(probed.second); recvBufs(proci).resize_nocopy(count); recvFromProc(recvRequests.size()) = proci; diff --git a/applications/test/parallel-nbx2/Test-parallel-nbx2.C b/applications/test/parallel-nbx2/Test-parallel-nbx2.C index bd3a4a224ee..9919c42c90b 100644 --- a/applications/test/parallel-nbx2/Test-parallel-nbx2.C +++ b/applications/test/parallel-nbx2/Test-parallel-nbx2.C @@ -119,7 +119,7 @@ int main(int argc, char *argv[]) for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -132,8 +132,8 @@ int main(int argc, char *argv[]) { // Message found and had size: receive it - const label proci = probed.first; - const label count = probed.second; + const label proci(probed.first); + const label count(probed.second); if (optNonBlocking) { diff --git a/etc/controlDict b/etc/controlDict index d0ffb765522..552f12c5201 100644 --- a/etc/controlDict +++ b/etc/controlDict @@ -1,7 +1,7 @@ /*--------------------------------*- C++ -*----------------------------------*\ | ========= | | | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox | -| \\ / O peration | Version: v2312 | +| \\ / O peration | Version: v2406 | | \\ / A nd | Website: www.openfoam.com | | \\/ M anipulation | | \*---------------------------------------------------------------------------*/ @@ -146,13 +146,18 @@ OptimisationSwitches // The default and minimum is (20000000). mpiBufferSize 0; - // Optional max size (bytes) for unstructured data exchanges. In some - // phases of OpenFOAM it can send over very large data chunks + // Optional max size (bytes) for unstructured data exchanges. + // In some phases of OpenFOAM it can send over very large data chunks // (e.g. in parallel load balancing) and some MPI implementations have - // problems with this. Setting this variable > 0 indicates that the - // data exchange needs to be done in multiple passes, each of maxCommsSize. - // This is not switched on by default since it requires an additional - // global reduction, even if multi-pass is not needed) + // problems with this. + // + // This tuning parameter specifies the max number of bytes before + // switching to a multi-pass send/recv + // (currently only affects PstreamBuffers exchanges). + // + // 0 : disabled + // >0 : limit exchanges to specified number of bytes + // <0 : limit exchanges to INT_MAX minus specified number of bytes maxCommsSize 0; // Optional (experimental) feature in lduMatrixUpdate diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 034f6d3c33e..c3f271d643f 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2023 OpenCFD Ltd. + Copyright (C) 2016-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,6 +24,21 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. +Note + The send/recv windows for chunk-wise transfers: + + iter data window + ---- ----------- + 0 [0, 1*chunk] + 1 [1*chunk, 2*chunk] + 2 [2*chunk, 3*chunk] + ... + + In older versions (v2312 and earlier) the number of chunks was + determined by the sender sizes and used an extra MPI_Allreduce. + However instead rely on the send/recv buffers having been consistently + sized, which avoids the additional reduction. + \*---------------------------------------------------------------------------*/ #include "Pstream.H" @@ -37,23 +52,82 @@ namespace Foam namespace PstreamDetail { -//- Setup sends and receives, each specified as [rank, span] tuple -// The serial list of tuples can be populated from other lists, from maps -// of data or subsets of lists/maps etc. +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to <int>. +template<class Type> +inline std::size_t maxTransferCount +( + const std::size_t max_bytes = std::size_t(0) +) noexcept +{ + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int> + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); +} + + +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to <int>. +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t maxTransferBytes(const int64_t max_bytes) noexcept +{ + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); +} + + +//- Exchange of \em contiguous data, with or without chunking. +//- Setup sends and receives, each specified as [rank, span] tuple. +// The serial list of tuples can be populated from other lists, from +// maps of data or subsets of lists/maps etc. +// +// Any waiting for requests is done outside by the caller template<class Type> -void exchangeBuf +void exchangeBuffers ( const UList<std::pair<int, stdFoam::span<const Type>>>& sends, const UList<std::pair<int, stdFoam::span<Type>>>& recvs, - const int tag, const label comm, - const bool wait + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { - const label startOfRequests = UPstream::nRequests(); + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + // Caller already checked for parRun + + if (sends.empty() && recvs.empty()) + { + // Nothing to do + return; + } + const int myProci = UPstream::myProcNo(comm); + + // The chunk size (number of elements) corresponding to max byte transfer. + // Is zero for non-chunked exchanges. + const std::size_t chunkSize + ( + PstreamDetail::maxTransferCount<Type> + ( + PstreamDetail::maxTransferBytes(maxComms_bytes) + ) + ); + + // Set up receives // ~~~~~~~~~~~~~~~ @@ -63,8 +137,14 @@ void exchangeBuf const auto proci = slot.first; auto& payload = slot.second; - if (proci != myProci && !payload.empty()) + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) + { + continue; + } + else if (!chunkSize) { + // Dispatch without chunks UIPstream::read ( UPstream::commsTypes::nonBlocking, @@ -74,212 +154,124 @@ void exchangeBuf tag, comm ); + continue; } - } - // Set up sends - // ~~~~~~~~~~~~ + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - for (const auto& slot : sends) - { - // [rank, span] - const auto proci = slot.first; - const auto& payload = slot.second; + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); - if (proci != myProci && !payload.empty()) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - payload.cdata_bytes(), - payload.size_bytes(), - tag, - comm - ) - ) + if (payload.size() <= beg) { - FatalErrorInFunction - << "Cannot send outgoing message to:" - << proci << " nBytes:" - << label(payload.size_bytes()) - << Foam::abort(FatalError); + // No more data windows + break; } - } - } - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ + recvType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); - if (wait) - { - UPstream::waitRequests(startOfRequests); + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + window.data_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); + } } -} -//- Chunked exchange of \em contiguous data. -//- Setup sends and receives, each specified as [rank, span] tuple. -// The serial list of tuples can be populated from other lists, from -// maps of data or subsets of lists/maps etc. -template<class Type> -void exchangeChunkedBuf -( - const UList<std::pair<int, stdFoam::span<const Type>>>& sends, - const UList<std::pair<int, stdFoam::span<Type>>>& recvs, - - const int tag, - const label comm, - const bool wait -) -{ - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - // Caller already checked for parRun and maxChunkSize > 0 + // Set up sends + // ~~~~~~~~~~~~ + for (const auto& slot : sends) { - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize = - ( - max - ( - static_cast<label>(1), - static_cast<label>(UPstream::maxCommsSize/sizeof(Type)) - ) - ); - - const int myProci = UPstream::myProcNo(comm); + // [rank, span] + const auto proci = slot.first; + const auto& payload = slot.second; - label nChunks(0); + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) { - // Get max send count (elements) - auto maxCount = static_cast<stdFoam::span<char>::size_type>(0); - - for (const auto& slot : sends) - { - // [rank, span] - const auto proci = slot.first; - const auto count = slot.second.size(); - - if (proci != myProci && count > maxCount) - { - // Note: using max() can be ambiguous - maxCount = count; - } - } + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks + bool ok = UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ); - // Convert from send count (elements) to number of chunks. - // Can normally calculate with (count-1), but add some safety - if (maxCount) + if (!ok) { - nChunks = 1 + label(maxCount/maxChunkSize); + FatalErrorInFunction + << "Failure sending message to:" << proci + << " nBytes:" << label(payload.size_bytes()) << nl + << Foam::abort(FatalError); } - - // MPI reduce (message tag is irrelevant) - reduce(nChunks, maxOp<label>(), UPstream::msgType(), comm); + continue; } - - // Dispatch the exchanges chunk-wise - List<sendTuple> sendChunks(sends); - List<recvTuple> recvChunks(recvs); - - // Dispatch - for (label iter = 0; iter < nChunks; ++iter) + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) { // The begin/end for the data window - const auto beg = static_cast<std::size_t>(iter*maxChunkSize); - const auto end = static_cast<std::size_t>((iter+1)*maxChunkSize); + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - forAll(sendChunks, sloti) - { - const auto& baseline = sends[sloti].second; - auto& payload = sendChunks[sloti].second; - - // Window the data - if (beg < baseline.size()) - { - payload = - ( - (end < baseline.size()) - ? baseline.subspan(beg, end - beg) - : baseline.subspan(beg) - ); - } - else - { - payload = baseline.first(0); // zero-sized - } - } + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); - forAll(recvChunks, sloti) + if (payload.size() <= beg) { - const auto& baseline = recvs[sloti].second; - auto& payload = recvChunks[sloti].second; - - // Window the data - if (beg < baseline.size()) - { - payload = - ( - (end < baseline.size()) - ? baseline.subspan(beg, end - beg) - : baseline.subspan(beg) - ); - } - else - { - payload = baseline.first(0); // zero-sized - } + // No more data windows + break; } + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); - // Exchange data chunks - PstreamDetail::exchangeBuf<Type> + bool ok = UOPstream::write ( - sendChunks, - recvChunks, - tag, - comm, - wait + UPstream::commsTypes::nonBlocking, + proci, + window.cdata_bytes(), + window.size_bytes(), + msgTagChunk, + comm ); - // Debugging output - can report on master only... - #if 0 // ifdef Foam_PstreamExchange_debug_chunks - do + if (!ok) { - labelList sendStarts(sends.size()); - labelList sendCounts(sends.size()); - - forAll(sendChunks, sloti) - { - const auto& baseline = sends[sloti].second; - const auto& payload = sendChunks[sloti].second; - - sendStarts[sloti] = (payload.data() - baseline.data()); - sendCounts[sloti] = (payload.size()); - } - - Info<< "iter " << iter - << ": beg=" << flatOutput(sendStarts) - << " len=" << flatOutput(sendCounts) << endl; - } while (false); - #endif + FatalErrorInFunction + << "Failure sending message to:" << proci + << " nBytes:" << label(window.size_bytes()) << nl + << Foam::abort(FatalError); + } } } } @@ -298,30 +290,99 @@ void exchangeContainer UList<Container>& recvBufs, const int tag, const label comm, - const bool wait //!< Wait for requests to complete + const bool wait, //!< Wait for requests to complete + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + // Caller already checked for parRun + + if (sendBufs.empty() && recvBufs.empty()) + { + // Nothing to do + return; + } + const label startOfRequests = UPstream::nRequests(); - const label myProci = UPstream::myProcNo(comm); + const int myProci = UPstream::myProcNo(comm); + + // The chunk size (number of elements) corresponding to max byte transfer + const std::size_t chunkSize + ( + PstreamDetail::maxTransferCount<Type> + ( + PstreamDetail::maxTransferBytes(maxComms_bytes) + ) + ); + // Set up receives // ~~~~~~~~~~~~~~~ forAll(recvBufs, proci) { - auto& recvData = recvBufs[proci]; + // [rank, span] + recvType payload + ( + recvBufs[proci].data(), + std::size_t(recvBufs[proci].size()) + ); - if (proci != myProci && !recvData.empty()) + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) { + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks UIPstream::read ( UPstream::commsTypes::nonBlocking, proci, - recvData.data_bytes(), - recvData.size_bytes(), + payload.data_bytes(), + payload.size_bytes(), tag, comm ); + continue; + } + + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); + + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); + + if (payload.size() <= beg) + { + // No more data windows + break; + } + + recvType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + window.data_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); } } @@ -331,35 +392,96 @@ void exchangeContainer forAll(sendBufs, proci) { - const auto& sendData = sendBufs[proci]; + // [rank, span] + sendType payload + ( + sendBufs[proci].cdata(), + std::size_t(sendBufs[proci].size()) + ); + + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) + { + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks + bool ok = UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ); - if (proci != myProci && !sendData.empty()) + if (!ok) + { + FatalErrorInFunction + << "Fallure sending message to:" << proci + << " nBytes:" << label(payload.size_bytes()) << nl + << Foam::abort(FatalError); + } + continue; + } + + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) { - if + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); + + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); + + if (payload.size() <= beg) + { + // No more data windows + break; + } + + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + bool ok = UOPstream::write ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) + UPstream::commsTypes::nonBlocking, + proci, + window.cdata_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); + + if (!ok) { FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) + << "Failure sending message to:" << proci + << " nBytes:" << label(window.size_bytes()) << nl << Foam::abort(FatalError); } } } + // Wait for all to finish // ~~~~~~~~~~~~~~~~~~~~~~ + if (UPstream::debug) + { + Perr<< "Pstream::exchange with " + << (UPstream::nRequests() - startOfRequests) + << " requests" << nl; + } + if (wait) { UPstream::waitRequests(startOfRequests); @@ -380,70 +502,111 @@ void exchangeContainer Map<Container>& recvBufs, const int tag, const label comm, - const bool wait //!< Wait for requests to complete + const bool wait, //!< Wait for requests to complete + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + typedef std::pair<int, sendType> sendTuple; + typedef std::pair<int, recvType> recvTuple; + const label startOfRequests = UPstream::nRequests(); const label myProci = UPstream::myProcNo(comm); - // Set up receives - // ~~~~~~~~~~~~~~~ - - forAllIters(recvBufs, iter) + // Serialize recv sequences + DynamicList<recvTuple> recvs(recvBufs.size()); { - const label proci = iter.key(); - auto& recvData = iter.val(); - - if (proci != myProci && !recvData.empty()) + forAllIters(recvBufs, iter) { - UIPstream::read + const auto proci = iter.key(); + auto& recvData = recvBufs[proci]; + + recvType payload ( - UPstream::commsTypes::nonBlocking, - proci, - recvData.data_bytes(), - recvData.size_bytes(), - tag, - comm + recvData.data(), + std::size_t(recvData.size()) ); + + // No self-communication or zero-size payload + if (proci != myProci && !payload.empty()) + { + recvs.emplace_back(proci, payload); + } } - } + std::sort + ( + recvs.begin(), + recvs.end(), + [=](const recvTuple& a, const recvTuple& b) + { + // Sorted processor order + return (a.first < b.first); - // Set up sends - // ~~~~~~~~~~~~ + // OR: // Shorter messages first + // return (a.second.size() < b.second.size()); + } + ); + } - forAllConstIters(sendBufs, iter) + // Serialize send sequences + DynamicList<sendTuple> sends(sendBufs.size()); { - const label proci = iter.key(); - const auto& sendData = iter.val(); - - if (proci != myProci && !sendData.empty()) + forAllConstIters(sendBufs, iter) { - if + const auto proci = iter.key(); + const auto& sendData = iter.val(); + + sendType payload ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) + sendData.cdata(), + std::size_t(sendData.size()) + ); + + // No self-communication or zero-size payload + if (proci != myProci && !payload.empty()) { - FatalErrorInFunction - << "Cannot send outgoing message to:" - << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); + sends.emplace_back(proci, payload); } } + + std::sort + ( + sends.begin(), + sends.end(), + [=](const sendTuple& a, const sendTuple& b) + { + // Sorted processor order + return (a.first < b.first); + + // OR: // Shorter messages first + // return (a.second.size() < b.second.size()); + } + ); } + // Exchange buffers in chunk-wise transfers + PstreamDetail::exchangeBuffers<Type> + ( + sends, + recvs, + tag, + comm, + maxComms_bytes + ); + // Wait for all to finish // ~~~~~~~~~~~~~~~~~~~~~~ + if (UPstream::debug) + { + Perr<< "Pstream::exchange with " + << (UPstream::nRequests() - startOfRequests) + << " requests" << nl; + } + if (wait) { UPstream::waitRequests(startOfRequests); @@ -476,17 +639,17 @@ void Foam::Pstream::exchange } const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of list " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "List size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - recvBufs.resize_nocopy(numProcs); + recvBufs.resize_nocopy(numProc); if (UPstream::is_parallel(comm)) { @@ -505,72 +668,15 @@ void Foam::Pstream::exchange } } - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - if (UPstream::maxCommsSize <= 0) - { - // Do the exchanging in one go - PstreamDetail::exchangeContainer<Container, Type> - ( - sendBufs, - recvBufs, - tag, - comm, - wait - ); - } - else - { - // Dispatch using chunk-wise exchanges - // Populate send sequence - DynamicList<sendTuple> sends(sendBufs.size()); - forAll(sendBufs, proci) - { - const auto& sendData = sendBufs[proci]; - - if (proci != myProci && !sendData.empty()) - { - sends.push_back - ( - sendTuple - ( - proci, - { sendData.cdata(), std::size_t(sendData.size()) } - ) - ); - } - } - - // Populate recv sequence - DynamicList<recvTuple> recvs(recvBufs.size()); - forAll(recvBufs, proci) - { - auto& recvData = recvBufs[proci]; - - if (proci != myProci && !recvData.empty()) - { - recvs.push_back - ( - recvTuple - ( - proci, - { recvData.data(), std::size_t(recvData.size()) } - ) - ); - } - } - - // Exchange buffers in chunks - PstreamDetail::exchangeChunkedBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } + PstreamDetail::exchangeContainer<Container, Type> + ( + sendBufs, + recvBufs, + tag, + comm, + wait + // (default: UPstream::maxCommsSize) + ); } // Do myself. Already checked if in communicator @@ -617,100 +723,15 @@ void Foam::Pstream::exchange } } - // Define the exchange sequences as a flattened list. - // We add an additional step of ordering the send/recv list - // by message size, which can help with transfer speeds. - - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - // Populate send sequences - DynamicList<sendTuple> sends(sendBufs.size()); - forAllConstIters(sendBufs, iter) - { - const auto proci = iter.key(); - const auto& sendData = iter.val(); - - if (proci != myProci && !sendData.empty()) - { - sends.push_back - ( - sendTuple - ( - proci, - { sendData.cdata(), std::size_t(sendData.size()) } - ) - ); - } - } - - // Shorter messages first - std::sort + PstreamDetail::exchangeContainer<Container, Type> ( - sends.begin(), - sends.end(), - [=](const sendTuple& a, const sendTuple& b) - { - return (a.second.size() < b.second.size()); - } + sendBufs, + recvBufs, + tag, + comm, + wait + // (default: UPstream::maxCommsSize) ); - - // Populate recv sequences - DynamicList<recvTuple> recvs(recvBufs.size()); - forAllIters(recvBufs, iter) - { - const auto proci = iter.key(); - auto& recvData = recvBufs[proci]; - - if (proci != myProci && !recvData.empty()) - { - recvs.push_back - ( - recvTuple - ( - proci, - { recvData.data(), std::size_t(recvData.size()) } - ) - ); - } - } - - // Shorter messages first - std::sort - ( - recvs.begin(), - recvs.end(), - [=](const recvTuple& a, const recvTuple& b) - { - return (a.second.size() < b.second.size()); - } - ); - - - if (UPstream::maxCommsSize <= 0) - { - // Do the exchanging in a single go - PstreamDetail::exchangeBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } - else - { - // Exchange buffers in chunks - PstreamDetail::exchangeChunkedBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } } // Do myself (if actually in the communicator) @@ -758,23 +779,23 @@ void Foam::Pstream::exchangeSizes } const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of container " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "Container size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - labelList sendSizes(numProcs); - for (label proci = 0; proci < numProcs; ++proci) + labelList sendSizes(numProc); + for (label proci = 0; proci < numProc; ++proci) { sendSizes[proci] = sendBufs[proci].size(); } - recvSizes.resize_nocopy(numProcs); + recvSizes.resize_nocopy(numProc); recvSizes = 0; // Ensure non-received entries are properly zeroed // Preserve self-send, even if not described by neighbourhood @@ -856,7 +877,6 @@ void Foam::Pstream::exchangeSizes const label comm ) { - Map<label> sendSizes(2*sendBufs.size()); recvSizes.clear(); // Done in allToAllConsensus too, but be explicit here if (!UPstream::is_rank(comm)) @@ -864,6 +884,9 @@ void Foam::Pstream::exchangeSizes return; // Process not in communicator } + Map<label> sendSizes; + sendSizes.reserve(sendBufs.size()); + forAllConstIters(sendBufs, iter) { const label proci = iter.key(); @@ -899,17 +922,17 @@ void Foam::Pstream::exchangeSizes return; // Process not in communicator } - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of container " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "Container size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - labelList sendSizes(numProcs); + labelList sendSizes(numProc); forAll(sendBufs, proci) { sendSizes[proci] = sendBufs[proci].size(); @@ -919,7 +942,7 @@ void Foam::Pstream::exchangeSizes if ( UPstream::nProcsNonblockingExchange > 1 - && UPstream::nProcsNonblockingExchange <= numProcs + && UPstream::nProcsNonblockingExchange <= numProc ) { // Use algorithm NBX: Nonblocking Consensus Exchange @@ -955,9 +978,17 @@ void Foam::Pstream::exchange // non-zero sendcounts. Post all sends and wait. labelList recvSizes; - exchangeSizes(sendBufs, recvSizes, comm); + Pstream::exchangeSizes(sendBufs, recvSizes, comm); - exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); + Pstream::exchange<Container, Type> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + wait + ); } @@ -975,9 +1006,17 @@ void Foam::Pstream::exchange // but using nonblocking consensus exchange for the sizes Map<label> recvSizes; - exchangeSizes(sendBufs, recvSizes, tag, comm); + Pstream::exchangeSizes(sendBufs, recvSizes, tag, comm); - exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); + Pstream::exchange<Container, Type> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + wait + ); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C index de7ce5fa424..b26b1be61d1 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2023 OpenCFD Ltd. + Copyright (C) 2023-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -82,7 +82,7 @@ void exchangeConsensus { buf.clear(); } - recvSizes = Zero; + recvSizes = Foam::zero{}; if (!UPstream::is_rank(comm)) { @@ -109,7 +109,7 @@ void exchangeConsensus recvBufs[myProci] = sendBufs[myProci]; if (myProci < recvSizes.size()) { - recvSizes[myProci] = recvBufs.size(); + recvSizes[myProci] = recvBufs[myProci].size(); } } @@ -175,7 +175,7 @@ void exchangeConsensus for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -189,8 +189,8 @@ void exchangeConsensus // Message found and had size. // - receive into dest buffer location - const label proci = probed.first; - const label count = (probed.second / sizeof(Type)); + const label proci(probed.first); + const label count(probed.second / sizeof(Type)); auto& recvData = recvBufs[proci]; recvData.resize(count); // OK with resize() instead of _nocopy() @@ -254,10 +254,10 @@ void exchangeConsensus { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); - // TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0); + const bool initialBarrier = (UPstream::tuning_NBX_ > 0); const label myProci = UPstream::myProcNo(comm); - const label numProc = UPstream::myProcNo(comm); + const label numProc = UPstream::nProcs(comm); // Initial: clear all receive locations // Preferrable to clear out the map entries instead of the map itself @@ -300,7 +300,12 @@ void exchangeConsensus // Setup sends // ------------------------------------------------------------------------ - // TDB: initialBarrier ... + // An initial barrier may help to avoid synchronisation problems + // caused elsewhere + if (initialBarrier) + { + UPstream::barrier(comm); + } // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers @@ -347,7 +352,7 @@ void exchangeConsensus for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -361,8 +366,8 @@ void exchangeConsensus // Message found and had size. // - receive into dest buffer location - const label proci = probed.first; - const label count = (probed.second / sizeof(Type)); + const label proci(probed.first); + const label count(probed.second / sizeof(Type)); auto& recvData = recvBufs(proci); recvData.resize(count); // OK with resize() instead of _nocopy() diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index ee446347708..089d7b6e828 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -565,14 +565,14 @@ public: //- Probe for an incoming message. // - // \param commsType Blocking or not + // \param commsType Non-blocking or not // \param fromProcNo The source rank (negative == ANY_SOURCE) // \param tag The source message tag // \param communicator The communicator index // // \returns source rank and message size (bytes) // and (-1, 0) on failure - static std::pair<int,int> probeMessage + static std::pair<int,int64_t> probeMessage ( const UPstream::commsTypes commsType, const int fromProcNo, diff --git a/src/OpenFOAM/global/argList/argList.C b/src/OpenFOAM/global/argList/argList.C index 05934efede7..557d4614a5a 100644 --- a/src/OpenFOAM/global/argList/argList.C +++ b/src/OpenFOAM/global/argList/argList.C @@ -2049,6 +2049,8 @@ void Foam::argList::parse Info<< "Pstream initialized with:" << nl << " floatTransfer : " << Switch::name(UPstream::floatTransfer) << nl + << " maxCommsSize : " + << UPstream::maxCommsSize << nl << " nProcsSimpleSum : " << UPstream::nProcsSimpleSum << nl << " nonBlockingExchange: " diff --git a/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C b/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C index f45eedbbd03..454df5ffd2f 100644 --- a/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C +++ b/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C @@ -146,15 +146,17 @@ static void broadcastFile_single const uint64_t maxChunkSize = ( - UPstream::maxCommsSize > 0 + (UPstream::maxCommsSize > 0) ? uint64_t(UPstream::maxCommsSize) - : uint64_t(pTraits<int>::max) + : (UPstream::maxCommsSize < 0) // (numBytes fewer than INT_MAX) + ? uint64_t(INT_MAX + UPstream::maxCommsSize) + : uint64_t(INT_MAX) // MPI limit is <int> ); while (fileLength > 0) { - const uint64_t sendSize = min(fileLength, maxChunkSize); + const uint64_t sendSize = std::min(fileLength, maxChunkSize); fileLength -= sendSize; // Read file contents into a character buffer diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 573c0944696..c935914db63 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -91,7 +91,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) {} -std::pair<int,int> +std::pair<int,int64_t> Foam::UPstream::probeMessage ( const UPstream::commsTypes commsType, @@ -100,7 +100,7 @@ Foam::UPstream::probeMessage const label communicator ) { - return std::pair<int,int>(-1, 0); + return std::pair<int,int64_t>(-1, 0); } diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index b3771d7d1ed..5314d8027af 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -766,7 +766,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) } -std::pair<int,int> +std::pair<int,int64_t> Foam::UPstream::probeMessage ( const UPstream::commsTypes commsType, @@ -775,7 +775,7 @@ Foam::UPstream::probeMessage const label communicator ) { - std::pair<int,int> result(-1, 0); + std::pair<int,int64_t> result(-1, 0); // No-op for non-parallel or not on communicator if (!UPstream::parRun() || !UPstream::is_rank(communicator)) @@ -869,7 +869,7 @@ Foam::UPstream::probeMessage result.first = status.MPI_SOURCE; - result.second = int(count); + result.second = int64_t(count); } return result; -- GitLab