diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C index b5181bccec94cbdeffd908884b86c33139a5b2d7..30e71a12c51c187a92a636c892783697fe9cefaa 100644 --- a/applications/test/parallel-chunks/Test-parallel-chunks.C +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -44,175 +44,62 @@ Description using namespace Foam; -// Looks like Pstream::exchangeBuf -template<class T> -void do_exchangeBuf +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to <int>. +template<class Type> +inline std::size_t maxTransferCount ( - const label sendSize, - const char* sendData, - const label recvSize, - char* recvData, - const int tag, - const label comm, - const bool wait -) + const std::size_t max_bytes = std::size_t(0) +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // forAll(recvSizes, proci) - { - // if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) - if (!Pstream::master(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData, - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - // forAll(sendBufs, proci) - for (const int proci : Pstream::subProcs(comm)) - { - if (sendSize > 0) - // if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData, - sendSize*sizeof(T), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendSize*sizeof(T)) - << Foam::abort(FatalError); - } - } - } - - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int> + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); } -// Looks like Pstream::exchangeContainer -template<class Container, class T> -void do_exchangeContainer +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to <int>. +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t PstreamDetail_maxTransferBytes ( - const Container& sendData, - const label recvSize, - Container& recvData, - const int tag, - const label comm, - const bool wait -) + const int64_t max_bytes +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // for (const int proci : Pstream::allProcs(comm)) - { - if (!Pstream::master(comm) && recvSize > 0) - // if (proci != Pstream::myProcNo(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData.data_bytes(), - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - if (Pstream::master(comm) && sendData.size() > 0) - { - for (const int proci : Pstream::subProcs(comm)) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); - } - } - } - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); } -template<class Container, class T> +template<class Container, class Type> void broadcast_chunks ( Container& sendData, const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true + const label comm = UPstream::worldComm + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { // OR static_assert(is_contiguous<T>::value, "Contiguous data only!") - if (!is_contiguous<T>::value) + if (!is_contiguous<Type>::value) { FatalErrorInFunction - << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError); + << "Contiguous data only." << sizeof(Type) + << Foam::abort(FatalError); } - if (UPstream::maxCommsSize <= 0) + if (maxComms_bytes == 0) { // Do in one go Info<< "send " << sendData.size() << " elements in one go" << endl; @@ -227,93 +114,90 @@ void broadcast_chunks sendData.resize_nocopy(recvSize); // A no-op on master - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize + // The chunk size (number of elements) corresponding to max byte transfer + // Is zero for non-chunked exchanges. + const std::size_t chunkSize ( - max + PstreamDetail_maxTransferCount<Type> ( - static_cast<label>(1), - static_cast<label>(UPstream::maxCommsSize/sizeof(T)) + PstreamDetail_maxTransferBytes(maxComms_bytes) ) ); - label nChunks(0); - { - // Get max send count (elements) - // forAll(sendBufs, proci) - // { - // if (proci != Pstream::myProcNo(comm)) - // { - // nChunks = max(nChunks, sendBufs[proci].size()); - // } - // } - nChunks = sendSize; + if (chunkSize) + { // Convert from send count (elements) to number of chunks. // Can normally calculate with (count-1), but add some safety - if (nChunks) - { - nChunks = 1 + (nChunks/maxChunkSize); - } - reduce(nChunks, maxOp<label>(), tag, comm); + label nChunks = 1 + (sendSize/label(chunkSize)); Info << "send " << sendSize << " elements (" - << (sendSize*sizeof(T)) << " bytes) in " << nChunks - << " chunks of " << maxChunkSize << " elements (" - << (maxChunkSize*sizeof(T)) << " bytes) for maxCommsSize:" - << Pstream::maxCommsSize + << (sendSize*sizeof(Type)) << " bytes) in " << nChunks + << " chunks of " << label(chunkSize) << " elements (" + << label(chunkSize*sizeof(Type)) << " bytes) for maxCommsSize:" + << label(maxComms_bytes) << endl; } + // stress-test with shortened sendSize // will produce useless loops, but no calls // sendSize /= 2; - label nSend(0); - label startSend(0); - char* charPtrSend; + typedef stdFoam::span<Type> sendType; - for (label iter = 0; iter < nChunks; ++iter) + do { - nSend = min - ( - maxChunkSize, - sendSize-startSend - ); + sendType payload(sendData.data(), sendData.size()); - charPtrSend = - ( - nSend > 0 - ? reinterpret_cast<char*>(&(sendData[startSend])) - : nullptr - ); + if (!chunkSize) + { + UPstream::broadcast + ( + payload.data_bytes(), + payload.size_bytes(), + comm + ); + break; + } - Info<< "iter " << iter - << ": beg=" << startSend << " len=" << nSend - << " (" << (nSend*sizeof(T)) << " bytes)" << endl; + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - UPstream::broadcast(charPtrSend, nSend*sizeof(T), comm); + if (payload.size() <= beg) + { + // No more data windows + break; + } - // forAll(nSend, proci) - { - startSend += nSend; + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + Info<< "iter " << iter + << ": beg=" << label(beg) << " len=" << label(window.size()) + << " (" << label(window.size_bytes()) << " bytes)" << endl; + + UPstream::broadcast + ( + window.data_bytes(), + window.size_bytes(), + comm + ); } } + while (false); - Info<< "final: " << startSend << endl; + Info<< "final" << endl; } @@ -333,7 +217,7 @@ int main(int argc, char *argv[]) } labelList input1; - if (Pstream::master()) + if (UPstream::master()) { input1 = identity(500); } @@ -348,7 +232,7 @@ int main(int argc, char *argv[]) // Mostly the same with PstreamBuffers if (false) { - PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking); + PstreamBuffers pBufs; labelList sendData; if (Pstream::master()) diff --git a/applications/test/parallel-comm3b/Test-parallel-comm3b.C b/applications/test/parallel-comm3b/Test-parallel-comm3b.C index 0bbcc5da5e1f381da4e6d9fdcd18907a1bde8787..ea64880146d0e46d41e94b1d95cff6be770d2500 100644 --- a/applications/test/parallel-comm3b/Test-parallel-comm3b.C +++ b/applications/test/parallel-comm3b/Test-parallel-comm3b.C @@ -130,7 +130,7 @@ int main(int argc, char *argv[]) for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -143,8 +143,8 @@ int main(int argc, char *argv[]) { // Message found and had size: receive it - const label proci = probed.first; - const label count = probed.second; + const label proci(probed.first); + const label count(probed.second); recvBufs(proci).resize_nocopy(count); recvFromProc(recvRequests.size()) = proci; diff --git a/applications/test/parallel-nbx2/Test-parallel-nbx2.C b/applications/test/parallel-nbx2/Test-parallel-nbx2.C index bd3a4a224ee36ef36adc26fca2ea50abe0aa7c37..9919c42c90b0379c7d88474834ab6d1a8b3bb14a 100644 --- a/applications/test/parallel-nbx2/Test-parallel-nbx2.C +++ b/applications/test/parallel-nbx2/Test-parallel-nbx2.C @@ -119,7 +119,7 @@ int main(int argc, char *argv[]) for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -132,8 +132,8 @@ int main(int argc, char *argv[]) { // Message found and had size: receive it - const label proci = probed.first; - const label count = probed.second; + const label proci(probed.first); + const label count(probed.second); if (optNonBlocking) { diff --git a/etc/controlDict b/etc/controlDict index d0ffb7655227b6cbec2ffb272bd0352f6c2c92be..552f12c520154ec761f55a04314914a921a8984a 100644 --- a/etc/controlDict +++ b/etc/controlDict @@ -1,7 +1,7 @@ /*--------------------------------*- C++ -*----------------------------------*\ | ========= | | | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox | -| \\ / O peration | Version: v2312 | +| \\ / O peration | Version: v2406 | | \\ / A nd | Website: www.openfoam.com | | \\/ M anipulation | | \*---------------------------------------------------------------------------*/ @@ -146,13 +146,18 @@ OptimisationSwitches // The default and minimum is (20000000). mpiBufferSize 0; - // Optional max size (bytes) for unstructured data exchanges. In some - // phases of OpenFOAM it can send over very large data chunks + // Optional max size (bytes) for unstructured data exchanges. + // In some phases of OpenFOAM it can send over very large data chunks // (e.g. in parallel load balancing) and some MPI implementations have - // problems with this. Setting this variable > 0 indicates that the - // data exchange needs to be done in multiple passes, each of maxCommsSize. - // This is not switched on by default since it requires an additional - // global reduction, even if multi-pass is not needed) + // problems with this. + // + // This tuning parameter specifies the max number of bytes before + // switching to a multi-pass send/recv + // (currently only affects PstreamBuffers exchanges). + // + // 0 : disabled + // >0 : limit exchanges to specified number of bytes + // <0 : limit exchanges to INT_MAX minus specified number of bytes maxCommsSize 0; // Optional (experimental) feature in lduMatrixUpdate diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 034f6d3c33e82e456145026406991ae4bfa7eae6..c3f271d643f780f00d1d9a915b1b5ac8c8b4e80e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2023 OpenCFD Ltd. + Copyright (C) 2016-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,6 +24,21 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. +Note + The send/recv windows for chunk-wise transfers: + + iter data window + ---- ----------- + 0 [0, 1*chunk] + 1 [1*chunk, 2*chunk] + 2 [2*chunk, 3*chunk] + ... + + In older versions (v2312 and earlier) the number of chunks was + determined by the sender sizes and used an extra MPI_Allreduce. + However instead rely on the send/recv buffers having been consistently + sized, which avoids the additional reduction. + \*---------------------------------------------------------------------------*/ #include "Pstream.H" @@ -37,23 +52,82 @@ namespace Foam namespace PstreamDetail { -//- Setup sends and receives, each specified as [rank, span] tuple -// The serial list of tuples can be populated from other lists, from maps -// of data or subsets of lists/maps etc. +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to <int>. +template<class Type> +inline std::size_t maxTransferCount +( + const std::size_t max_bytes = std::size_t(0) +) noexcept +{ + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int> + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); +} + + +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to <int>. +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t maxTransferBytes(const int64_t max_bytes) noexcept +{ + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); +} + + +//- Exchange of \em contiguous data, with or without chunking. +//- Setup sends and receives, each specified as [rank, span] tuple. +// The serial list of tuples can be populated from other lists, from +// maps of data or subsets of lists/maps etc. +// +// Any waiting for requests is done outside by the caller template<class Type> -void exchangeBuf +void exchangeBuffers ( const UList<std::pair<int, stdFoam::span<const Type>>>& sends, const UList<std::pair<int, stdFoam::span<Type>>>& recvs, - const int tag, const label comm, - const bool wait + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { - const label startOfRequests = UPstream::nRequests(); + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + // Caller already checked for parRun + + if (sends.empty() && recvs.empty()) + { + // Nothing to do + return; + } + const int myProci = UPstream::myProcNo(comm); + + // The chunk size (number of elements) corresponding to max byte transfer. + // Is zero for non-chunked exchanges. + const std::size_t chunkSize + ( + PstreamDetail::maxTransferCount<Type> + ( + PstreamDetail::maxTransferBytes(maxComms_bytes) + ) + ); + + // Set up receives // ~~~~~~~~~~~~~~~ @@ -63,8 +137,14 @@ void exchangeBuf const auto proci = slot.first; auto& payload = slot.second; - if (proci != myProci && !payload.empty()) + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) + { + continue; + } + else if (!chunkSize) { + // Dispatch without chunks UIPstream::read ( UPstream::commsTypes::nonBlocking, @@ -74,212 +154,124 @@ void exchangeBuf tag, comm ); + continue; } - } - // Set up sends - // ~~~~~~~~~~~~ + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - for (const auto& slot : sends) - { - // [rank, span] - const auto proci = slot.first; - const auto& payload = slot.second; + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); - if (proci != myProci && !payload.empty()) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - payload.cdata_bytes(), - payload.size_bytes(), - tag, - comm - ) - ) + if (payload.size() <= beg) { - FatalErrorInFunction - << "Cannot send outgoing message to:" - << proci << " nBytes:" - << label(payload.size_bytes()) - << Foam::abort(FatalError); + // No more data windows + break; } - } - } - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ + recvType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); - if (wait) - { - UPstream::waitRequests(startOfRequests); + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + window.data_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); + } } -} -//- Chunked exchange of \em contiguous data. -//- Setup sends and receives, each specified as [rank, span] tuple. -// The serial list of tuples can be populated from other lists, from -// maps of data or subsets of lists/maps etc. -template<class Type> -void exchangeChunkedBuf -( - const UList<std::pair<int, stdFoam::span<const Type>>>& sends, - const UList<std::pair<int, stdFoam::span<Type>>>& recvs, - - const int tag, - const label comm, - const bool wait -) -{ - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - // Caller already checked for parRun and maxChunkSize > 0 + // Set up sends + // ~~~~~~~~~~~~ + for (const auto& slot : sends) { - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize = - ( - max - ( - static_cast<label>(1), - static_cast<label>(UPstream::maxCommsSize/sizeof(Type)) - ) - ); - - const int myProci = UPstream::myProcNo(comm); + // [rank, span] + const auto proci = slot.first; + const auto& payload = slot.second; - label nChunks(0); + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) { - // Get max send count (elements) - auto maxCount = static_cast<stdFoam::span<char>::size_type>(0); - - for (const auto& slot : sends) - { - // [rank, span] - const auto proci = slot.first; - const auto count = slot.second.size(); - - if (proci != myProci && count > maxCount) - { - // Note: using max() can be ambiguous - maxCount = count; - } - } + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks + bool ok = UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ); - // Convert from send count (elements) to number of chunks. - // Can normally calculate with (count-1), but add some safety - if (maxCount) + if (!ok) { - nChunks = 1 + label(maxCount/maxChunkSize); + FatalErrorInFunction + << "Failure sending message to:" << proci + << " nBytes:" << label(payload.size_bytes()) << nl + << Foam::abort(FatalError); } - - // MPI reduce (message tag is irrelevant) - reduce(nChunks, maxOp<label>(), UPstream::msgType(), comm); + continue; } - - // Dispatch the exchanges chunk-wise - List<sendTuple> sendChunks(sends); - List<recvTuple> recvChunks(recvs); - - // Dispatch - for (label iter = 0; iter < nChunks; ++iter) + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) { // The begin/end for the data window - const auto beg = static_cast<std::size_t>(iter*maxChunkSize); - const auto end = static_cast<std::size_t>((iter+1)*maxChunkSize); + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); - forAll(sendChunks, sloti) - { - const auto& baseline = sends[sloti].second; - auto& payload = sendChunks[sloti].second; - - // Window the data - if (beg < baseline.size()) - { - payload = - ( - (end < baseline.size()) - ? baseline.subspan(beg, end - beg) - : baseline.subspan(beg) - ); - } - else - { - payload = baseline.first(0); // zero-sized - } - } + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); - forAll(recvChunks, sloti) + if (payload.size() <= beg) { - const auto& baseline = recvs[sloti].second; - auto& payload = recvChunks[sloti].second; - - // Window the data - if (beg < baseline.size()) - { - payload = - ( - (end < baseline.size()) - ? baseline.subspan(beg, end - beg) - : baseline.subspan(beg) - ); - } - else - { - payload = baseline.first(0); // zero-sized - } + // No more data windows + break; } + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); - // Exchange data chunks - PstreamDetail::exchangeBuf<Type> + bool ok = UOPstream::write ( - sendChunks, - recvChunks, - tag, - comm, - wait + UPstream::commsTypes::nonBlocking, + proci, + window.cdata_bytes(), + window.size_bytes(), + msgTagChunk, + comm ); - // Debugging output - can report on master only... - #if 0 // ifdef Foam_PstreamExchange_debug_chunks - do + if (!ok) { - labelList sendStarts(sends.size()); - labelList sendCounts(sends.size()); - - forAll(sendChunks, sloti) - { - const auto& baseline = sends[sloti].second; - const auto& payload = sendChunks[sloti].second; - - sendStarts[sloti] = (payload.data() - baseline.data()); - sendCounts[sloti] = (payload.size()); - } - - Info<< "iter " << iter - << ": beg=" << flatOutput(sendStarts) - << " len=" << flatOutput(sendCounts) << endl; - } while (false); - #endif + FatalErrorInFunction + << "Failure sending message to:" << proci + << " nBytes:" << label(window.size_bytes()) << nl + << Foam::abort(FatalError); + } } } } @@ -298,30 +290,99 @@ void exchangeContainer UList<Container>& recvBufs, const int tag, const label comm, - const bool wait //!< Wait for requests to complete + const bool wait, //!< Wait for requests to complete + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + // Caller already checked for parRun + + if (sendBufs.empty() && recvBufs.empty()) + { + // Nothing to do + return; + } + const label startOfRequests = UPstream::nRequests(); - const label myProci = UPstream::myProcNo(comm); + const int myProci = UPstream::myProcNo(comm); + + // The chunk size (number of elements) corresponding to max byte transfer + const std::size_t chunkSize + ( + PstreamDetail::maxTransferCount<Type> + ( + PstreamDetail::maxTransferBytes(maxComms_bytes) + ) + ); + // Set up receives // ~~~~~~~~~~~~~~~ forAll(recvBufs, proci) { - auto& recvData = recvBufs[proci]; + // [rank, span] + recvType payload + ( + recvBufs[proci].data(), + std::size_t(recvBufs[proci].size()) + ); - if (proci != myProci && !recvData.empty()) + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) { + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks UIPstream::read ( UPstream::commsTypes::nonBlocking, proci, - recvData.data_bytes(), - recvData.size_bytes(), + payload.data_bytes(), + payload.size_bytes(), tag, comm ); + continue; + } + + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) + { + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); + + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); + + if (payload.size() <= beg) + { + // No more data windows + break; + } + + recvType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + window.data_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); } } @@ -331,35 +392,96 @@ void exchangeContainer forAll(sendBufs, proci) { - const auto& sendData = sendBufs[proci]; + // [rank, span] + sendType payload + ( + sendBufs[proci].cdata(), + std::size_t(sendBufs[proci].size()) + ); + + // No self-communication or zero-size payload + if (proci == myProci || payload.empty()) + { + continue; + } + else if (!chunkSize) + { + // Dispatch without chunks + bool ok = UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ); - if (proci != myProci && !sendData.empty()) + if (!ok) + { + FatalErrorInFunction + << "Fallure sending message to:" << proci + << " nBytes:" << label(payload.size_bytes()) << nl + << Foam::abort(FatalError); + } + continue; + } + + // Dispatch chunk-wise until there is nothing left + for (int iter = 0; /*true*/; ++iter) { - if + // The begin/end for the data window + const std::size_t beg = (std::size_t(iter)*chunkSize); + const std::size_t end = (std::size_t(iter+1)*chunkSize); + + // The message tag augmented by the iteration number + // - avoids message collisions between different chunks + const int msgTagChunk = (tag + iter); + + if (payload.size() <= beg) + { + // No more data windows + break; + } + + sendType window + ( + (end < payload.size()) + ? payload.subspan(beg, end - beg) + : payload.subspan(beg) + ); + + bool ok = UOPstream::write ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) + UPstream::commsTypes::nonBlocking, + proci, + window.cdata_bytes(), + window.size_bytes(), + msgTagChunk, + comm + ); + + if (!ok) { FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) + << "Failure sending message to:" << proci + << " nBytes:" << label(window.size_bytes()) << nl << Foam::abort(FatalError); } } } + // Wait for all to finish // ~~~~~~~~~~~~~~~~~~~~~~ + if (UPstream::debug) + { + Perr<< "Pstream::exchange with " + << (UPstream::nRequests() - startOfRequests) + << " requests" << nl; + } + if (wait) { UPstream::waitRequests(startOfRequests); @@ -380,70 +502,111 @@ void exchangeContainer Map<Container>& recvBufs, const int tag, const label comm, - const bool wait //!< Wait for requests to complete + const bool wait, //!< Wait for requests to complete + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { + typedef stdFoam::span<const Type> sendType; + typedef stdFoam::span<Type> recvType; + + typedef std::pair<int, sendType> sendTuple; + typedef std::pair<int, recvType> recvTuple; + const label startOfRequests = UPstream::nRequests(); const label myProci = UPstream::myProcNo(comm); - // Set up receives - // ~~~~~~~~~~~~~~~ - - forAllIters(recvBufs, iter) + // Serialize recv sequences + DynamicList<recvTuple> recvs(recvBufs.size()); { - const label proci = iter.key(); - auto& recvData = iter.val(); - - if (proci != myProci && !recvData.empty()) + forAllIters(recvBufs, iter) { - UIPstream::read + const auto proci = iter.key(); + auto& recvData = recvBufs[proci]; + + recvType payload ( - UPstream::commsTypes::nonBlocking, - proci, - recvData.data_bytes(), - recvData.size_bytes(), - tag, - comm + recvData.data(), + std::size_t(recvData.size()) ); + + // No self-communication or zero-size payload + if (proci != myProci && !payload.empty()) + { + recvs.emplace_back(proci, payload); + } } - } + std::sort + ( + recvs.begin(), + recvs.end(), + [=](const recvTuple& a, const recvTuple& b) + { + // Sorted processor order + return (a.first < b.first); - // Set up sends - // ~~~~~~~~~~~~ + // OR: // Shorter messages first + // return (a.second.size() < b.second.size()); + } + ); + } - forAllConstIters(sendBufs, iter) + // Serialize send sequences + DynamicList<sendTuple> sends(sendBufs.size()); { - const label proci = iter.key(); - const auto& sendData = iter.val(); - - if (proci != myProci && !sendData.empty()) + forAllConstIters(sendBufs, iter) { - if + const auto proci = iter.key(); + const auto& sendData = iter.val(); + + sendType payload ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) + sendData.cdata(), + std::size_t(sendData.size()) + ); + + // No self-communication or zero-size payload + if (proci != myProci && !payload.empty()) { - FatalErrorInFunction - << "Cannot send outgoing message to:" - << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); + sends.emplace_back(proci, payload); } } + + std::sort + ( + sends.begin(), + sends.end(), + [=](const sendTuple& a, const sendTuple& b) + { + // Sorted processor order + return (a.first < b.first); + + // OR: // Shorter messages first + // return (a.second.size() < b.second.size()); + } + ); } + // Exchange buffers in chunk-wise transfers + PstreamDetail::exchangeBuffers<Type> + ( + sends, + recvs, + tag, + comm, + maxComms_bytes + ); + // Wait for all to finish // ~~~~~~~~~~~~~~~~~~~~~~ + if (UPstream::debug) + { + Perr<< "Pstream::exchange with " + << (UPstream::nRequests() - startOfRequests) + << " requests" << nl; + } + if (wait) { UPstream::waitRequests(startOfRequests); @@ -476,17 +639,17 @@ void Foam::Pstream::exchange } const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of list " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "List size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - recvBufs.resize_nocopy(numProcs); + recvBufs.resize_nocopy(numProc); if (UPstream::is_parallel(comm)) { @@ -505,72 +668,15 @@ void Foam::Pstream::exchange } } - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - if (UPstream::maxCommsSize <= 0) - { - // Do the exchanging in one go - PstreamDetail::exchangeContainer<Container, Type> - ( - sendBufs, - recvBufs, - tag, - comm, - wait - ); - } - else - { - // Dispatch using chunk-wise exchanges - // Populate send sequence - DynamicList<sendTuple> sends(sendBufs.size()); - forAll(sendBufs, proci) - { - const auto& sendData = sendBufs[proci]; - - if (proci != myProci && !sendData.empty()) - { - sends.push_back - ( - sendTuple - ( - proci, - { sendData.cdata(), std::size_t(sendData.size()) } - ) - ); - } - } - - // Populate recv sequence - DynamicList<recvTuple> recvs(recvBufs.size()); - forAll(recvBufs, proci) - { - auto& recvData = recvBufs[proci]; - - if (proci != myProci && !recvData.empty()) - { - recvs.push_back - ( - recvTuple - ( - proci, - { recvData.data(), std::size_t(recvData.size()) } - ) - ); - } - } - - // Exchange buffers in chunks - PstreamDetail::exchangeChunkedBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } + PstreamDetail::exchangeContainer<Container, Type> + ( + sendBufs, + recvBufs, + tag, + comm, + wait + // (default: UPstream::maxCommsSize) + ); } // Do myself. Already checked if in communicator @@ -617,100 +723,15 @@ void Foam::Pstream::exchange } } - // Define the exchange sequences as a flattened list. - // We add an additional step of ordering the send/recv list - // by message size, which can help with transfer speeds. - - typedef std::pair<int, stdFoam::span<const Type>> sendTuple; - typedef std::pair<int, stdFoam::span<Type>> recvTuple; - - // Populate send sequences - DynamicList<sendTuple> sends(sendBufs.size()); - forAllConstIters(sendBufs, iter) - { - const auto proci = iter.key(); - const auto& sendData = iter.val(); - - if (proci != myProci && !sendData.empty()) - { - sends.push_back - ( - sendTuple - ( - proci, - { sendData.cdata(), std::size_t(sendData.size()) } - ) - ); - } - } - - // Shorter messages first - std::sort + PstreamDetail::exchangeContainer<Container, Type> ( - sends.begin(), - sends.end(), - [=](const sendTuple& a, const sendTuple& b) - { - return (a.second.size() < b.second.size()); - } + sendBufs, + recvBufs, + tag, + comm, + wait + // (default: UPstream::maxCommsSize) ); - - // Populate recv sequences - DynamicList<recvTuple> recvs(recvBufs.size()); - forAllIters(recvBufs, iter) - { - const auto proci = iter.key(); - auto& recvData = recvBufs[proci]; - - if (proci != myProci && !recvData.empty()) - { - recvs.push_back - ( - recvTuple - ( - proci, - { recvData.data(), std::size_t(recvData.size()) } - ) - ); - } - } - - // Shorter messages first - std::sort - ( - recvs.begin(), - recvs.end(), - [=](const recvTuple& a, const recvTuple& b) - { - return (a.second.size() < b.second.size()); - } - ); - - - if (UPstream::maxCommsSize <= 0) - { - // Do the exchanging in a single go - PstreamDetail::exchangeBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } - else - { - // Exchange buffers in chunks - PstreamDetail::exchangeChunkedBuf<Type> - ( - sends, - recvs, - tag, - comm, - wait - ); - } } // Do myself (if actually in the communicator) @@ -758,23 +779,23 @@ void Foam::Pstream::exchangeSizes } const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of container " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "Container size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - labelList sendSizes(numProcs); - for (label proci = 0; proci < numProcs; ++proci) + labelList sendSizes(numProc); + for (label proci = 0; proci < numProc; ++proci) { sendSizes[proci] = sendBufs[proci].size(); } - recvSizes.resize_nocopy(numProcs); + recvSizes.resize_nocopy(numProc); recvSizes = 0; // Ensure non-received entries are properly zeroed // Preserve self-send, even if not described by neighbourhood @@ -856,7 +877,6 @@ void Foam::Pstream::exchangeSizes const label comm ) { - Map<label> sendSizes(2*sendBufs.size()); recvSizes.clear(); // Done in allToAllConsensus too, but be explicit here if (!UPstream::is_rank(comm)) @@ -864,6 +884,9 @@ void Foam::Pstream::exchangeSizes return; // Process not in communicator } + Map<label> sendSizes; + sendSizes.reserve(sendBufs.size()); + forAllConstIters(sendBufs, iter) { const label proci = iter.key(); @@ -899,17 +922,17 @@ void Foam::Pstream::exchangeSizes return; // Process not in communicator } - const label numProcs = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (sendBufs.size() != numProcs) + if (sendBufs.size() != numProc) { FatalErrorInFunction - << "Size of container " << sendBufs.size() - << " does not equal the number of processors " << numProcs + << "Container size " << sendBufs.size() + << " != number of ranks " << numProc << nl << Foam::abort(FatalError); } - labelList sendSizes(numProcs); + labelList sendSizes(numProc); forAll(sendBufs, proci) { sendSizes[proci] = sendBufs[proci].size(); @@ -919,7 +942,7 @@ void Foam::Pstream::exchangeSizes if ( UPstream::nProcsNonblockingExchange > 1 - && UPstream::nProcsNonblockingExchange <= numProcs + && UPstream::nProcsNonblockingExchange <= numProc ) { // Use algorithm NBX: Nonblocking Consensus Exchange @@ -955,9 +978,17 @@ void Foam::Pstream::exchange // non-zero sendcounts. Post all sends and wait. labelList recvSizes; - exchangeSizes(sendBufs, recvSizes, comm); + Pstream::exchangeSizes(sendBufs, recvSizes, comm); - exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); + Pstream::exchange<Container, Type> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + wait + ); } @@ -975,9 +1006,17 @@ void Foam::Pstream::exchange // but using nonblocking consensus exchange for the sizes Map<label> recvSizes; - exchangeSizes(sendBufs, recvSizes, tag, comm); + Pstream::exchangeSizes(sendBufs, recvSizes, tag, comm); - exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); + Pstream::exchange<Container, Type> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + wait + ); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C index de7ce5fa4248f29237366d7874a85ff936e58b1c..b26b1be61d176c7f70702499385c1849cd8fea00 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2023 OpenCFD Ltd. + Copyright (C) 2023-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -82,7 +82,7 @@ void exchangeConsensus { buf.clear(); } - recvSizes = Zero; + recvSizes = Foam::zero{}; if (!UPstream::is_rank(comm)) { @@ -109,7 +109,7 @@ void exchangeConsensus recvBufs[myProci] = sendBufs[myProci]; if (myProci < recvSizes.size()) { - recvSizes[myProci] = recvBufs.size(); + recvSizes[myProci] = recvBufs[myProci].size(); } } @@ -175,7 +175,7 @@ void exchangeConsensus for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -189,8 +189,8 @@ void exchangeConsensus // Message found and had size. // - receive into dest buffer location - const label proci = probed.first; - const label count = (probed.second / sizeof(Type)); + const label proci(probed.first); + const label count(probed.second / sizeof(Type)); auto& recvData = recvBufs[proci]; recvData.resize(count); // OK with resize() instead of _nocopy() @@ -254,10 +254,10 @@ void exchangeConsensus { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); - // TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0); + const bool initialBarrier = (UPstream::tuning_NBX_ > 0); const label myProci = UPstream::myProcNo(comm); - const label numProc = UPstream::myProcNo(comm); + const label numProc = UPstream::nProcs(comm); // Initial: clear all receive locations // Preferrable to clear out the map entries instead of the map itself @@ -300,7 +300,12 @@ void exchangeConsensus // Setup sends // ------------------------------------------------------------------------ - // TDB: initialBarrier ... + // An initial barrier may help to avoid synchronisation problems + // caused elsewhere + if (initialBarrier) + { + UPstream::barrier(comm); + } // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers @@ -347,7 +352,7 @@ void exchangeConsensus for (bool barrier_active = false, done = false; !done; /*nil*/) { - std::pair<int, int> probed = + std::pair<int, int64_t> probed = UPstream::probeMessage ( UPstream::commsTypes::nonBlocking, @@ -361,8 +366,8 @@ void exchangeConsensus // Message found and had size. // - receive into dest buffer location - const label proci = probed.first; - const label count = (probed.second / sizeof(Type)); + const label proci(probed.first); + const label count(probed.second / sizeof(Type)); auto& recvData = recvBufs(proci); recvData.resize(count); // OK with resize() instead of _nocopy() diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index ee446347708652eedd9148f2c6d1897c53762336..089d7b6e8288b00b504320d47d59971c86aa6d0e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -565,14 +565,14 @@ public: //- Probe for an incoming message. // - // \param commsType Blocking or not + // \param commsType Non-blocking or not // \param fromProcNo The source rank (negative == ANY_SOURCE) // \param tag The source message tag // \param communicator The communicator index // // \returns source rank and message size (bytes) // and (-1, 0) on failure - static std::pair<int,int> probeMessage + static std::pair<int,int64_t> probeMessage ( const UPstream::commsTypes commsType, const int fromProcNo, diff --git a/src/OpenFOAM/global/argList/argList.C b/src/OpenFOAM/global/argList/argList.C index 05934efede7ebcaae7c341c20f21f0c6d9cb7e74..557d4614a5a95730c48deb845ad6a32dbb5b937e 100644 --- a/src/OpenFOAM/global/argList/argList.C +++ b/src/OpenFOAM/global/argList/argList.C @@ -2049,6 +2049,8 @@ void Foam::argList::parse Info<< "Pstream initialized with:" << nl << " floatTransfer : " << Switch::name(UPstream::floatTransfer) << nl + << " maxCommsSize : " + << UPstream::maxCommsSize << nl << " nProcsSimpleSum : " << UPstream::nProcsSimpleSum << nl << " nonBlockingExchange: " diff --git a/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C b/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C index f45eedbbd03871b62198312ddb30039b4e5fe165..454df5ffd2f8724996cb063b0615577adb1c4981 100644 --- a/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C +++ b/src/OpenFOAM/global/fileOperations/fileOperation/fileOperationBroadcast.C @@ -146,15 +146,17 @@ static void broadcastFile_single const uint64_t maxChunkSize = ( - UPstream::maxCommsSize > 0 + (UPstream::maxCommsSize > 0) ? uint64_t(UPstream::maxCommsSize) - : uint64_t(pTraits<int>::max) + : (UPstream::maxCommsSize < 0) // (numBytes fewer than INT_MAX) + ? uint64_t(INT_MAX + UPstream::maxCommsSize) + : uint64_t(INT_MAX) // MPI limit is <int> ); while (fileLength > 0) { - const uint64_t sendSize = min(fileLength, maxChunkSize); + const uint64_t sendSize = std::min(fileLength, maxChunkSize); fileLength -= sendSize; // Read file contents into a character buffer diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 573c0944696d2dc7bdb56b7ae96444392666c085..c935914db63e1da9dd74ba905e662133cac2dcf8 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -91,7 +91,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) {} -std::pair<int,int> +std::pair<int,int64_t> Foam::UPstream::probeMessage ( const UPstream::commsTypes commsType, @@ -100,7 +100,7 @@ Foam::UPstream::probeMessage const label communicator ) { - return std::pair<int,int>(-1, 0); + return std::pair<int,int64_t>(-1, 0); } diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index b3771d7d1ed3b9ab45155221b15e8c38e7bbbad8..5314d8027aff1e82f0525f6ba06b3d3ea03cd8c2 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -766,7 +766,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) } -std::pair<int,int> +std::pair<int,int64_t> Foam::UPstream::probeMessage ( const UPstream::commsTypes commsType, @@ -775,7 +775,7 @@ Foam::UPstream::probeMessage const label communicator ) { - std::pair<int,int> result(-1, 0); + std::pair<int,int64_t> result(-1, 0); // No-op for non-parallel or not on communicator if (!UPstream::parRun() || !UPstream::is_rank(communicator)) @@ -869,7 +869,7 @@ Foam::UPstream::probeMessage result.first = status.MPI_SOURCE; - result.second = int(count); + result.second = int64_t(count); } return result;