From 00e2d3e4ef7e7b7e7c81386bc794d8130a2f5bc2 Mon Sep 17 00:00:00 2001 From: Mark Olesen <Mark.Olesen@esi-group.com> Date: Mon, 13 Feb 2023 14:43:20 +0100 Subject: [PATCH] WIP: mapped PstreamBuffers with NBX ENH: sparse storage and data exchange for PstreamBuffers - change the underlying storage from a numProcs list of buffers to a Map of buffers. The reduced memory footprint on large systems is on aspect but the primary motivation is to more easily support sparse data exchange patterns. The Map storage for PstreamBuffers corresponds to a non-blocking consensus exchange of sizes that automatically propagates through different parts of the code and avoids all-to-all. CONFIG: enable nonBlockingExchange as default (for testing) - this changes the Pstream::exchangeSizes to use NBX instead of all-to-all, even for List containers. --- etc/controlDict | 2 +- .../db/IOstreams/Pstreams/PstreamBuffers.C | 460 +++++++++++++++++- .../db/IOstreams/Pstreams/PstreamBuffers.H | 78 ++- .../fvMesh/zoneDistribute/zoneDistribute.C | 61 +++ .../fvMesh/zoneDistribute/zoneDistribute.H | 10 + .../fvMesh/zoneDistribute/zoneDistributeI.H | 46 +- 6 files changed, 626 insertions(+), 31 deletions(-) diff --git a/etc/controlDict b/etc/controlDict index ea88edf7d7f..164ada0005c 100644 --- a/etc/controlDict +++ b/etc/controlDict @@ -130,7 +130,7 @@ OptimisationSwitches // Number processors to change to tree communication nProcsSimpleSum 0; // Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX) - nonBlockingExchange 0; + nonBlockingExchange 1; // MPI buffer size (bytes) // Can override with the MPI_BUFFER_SIZE env variable. diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 90a17969a52..599f56edc8e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -27,14 +27,52 @@ License \*---------------------------------------------------------------------------*/ #include "PstreamBuffers.H" +#ifdef Foam_PstreamBuffers_dense #include "bitSet.H" +#endif + +// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * // + +namespace Foam +{ + +#ifdef Foam_PstreamBuffers_map_storage +//- Retrieve size of specified buffer, first checking for existence +static inline label getBufferSize +( + const Map<DynamicList<char>>& buffers, + const label proci +) +{ + const auto iter = buffers.cfind(proci); + + return (iter.good() ? iter.val().size() : 0); +} +#else +//- Retrieve size of specified buffer, no access checking +static inline label getBufferSize +( + const UList<DynamicList<char>>& buffers, + const label proci +) +{ + return buffers[proci].size(); +} +#endif + +} // End namespace Foam + // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // void Foam::PstreamBuffers::finalExchange ( const bool wait, + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes + #else labelList& recvSizes + #endif ) { // Could also check that it is not called twice @@ -43,8 +81,35 @@ void Foam::PstreamBuffers::finalExchange if (commsType_ == UPstream::commsTypes::nonBlocking) { - // Dense storage uses all-to-all - Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_); + // Use PEX algorithm + + #ifdef Foam_PstreamBuffers_map_storage + // PEX stage 1: exchange sizes (non-blocking consensus) + Pstream::exchangeSizes + ( + sendBuffers_, + recvSizes, + tag_, + comm_ + ); + #else + // Like Pstream::exchangeSizes + labelList sendSizes(nProcs_); + forAll(sendBuffers_, proci) + { + sendSizes[proci] = sendBuffers_[proci].size(); + } + recvSizes.resize_nocopy(nProcs_); + + // PEX stage 1: exchange sizes (non-blocking consensus) + UPstream::allToAllConsensus + ( + sendSizes, + recvSizes, + (tag_ + 314159), // some unique tag? + comm_ + ); + #endif Pstream::exchange<DynamicList<char>, char> ( @@ -59,6 +124,7 @@ void Foam::PstreamBuffers::finalExchange } +#ifdef Foam_PstreamBuffers_dense void Foam::PstreamBuffers::finalExchange ( const labelUList& sendProcs, @@ -94,14 +160,19 @@ void Foam::PstreamBuffers::finalExchange ); } } +#endif -void Foam::PstreamBuffers::finalExchangeGatherScatter +void Foam::PstreamBuffers::finalGatherScatter ( const bool isGather, const bool wait, const bool needSizes, + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes + #else labelList& recvSizes + #endif ) { // Could also check that it is not called twice @@ -114,10 +185,20 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter // Only send to master [0]. Master is also allowed to 'send' to itself + #ifdef Foam_PstreamBuffers_map_storage + forAllIters(sendBuffers_, iter) + { + if (iter.key() != 0) + { + iter.val().clear(); + } + } + #else for (label proci=1; proci < sendBuffers_.size(); ++proci) { sendBuffers_[proci].clear(); } + #endif } else { @@ -133,39 +214,99 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter if (commsType_ == UPstream::commsTypes::nonBlocking) { + #ifdef Foam_PstreamBuffers_map_storage + labelList recvCount; + #else + labelList& recvCount = recvSizes; + #endif + if (isGather) { // gather mode (all-to-one): master [0] <- everyone - recvSizes = - UPstream::listGatherValues(sendBuffers_[0].size(), comm_); + const label nSend = getBufferSize(sendBuffers_, 0); + + recvCount = UPstream::listGatherValues(nSend, comm_); + #ifdef Foam_PstreamBuffers_map_storage + // Transcribe recv count from list to map + recvSizes.clear(); + if (UPstream::master(comm_)) + { + for (label proci=1; proci < recvCount.size(); ++proci) + { + if (recvCount[proci] > 0) + { + recvSizes.insert(proci, recvCount[proci]); + } + } + } + #else if (!UPstream::master(comm_)) { recvSizes.resize_nocopy(nProcs_); recvSizes = Zero; } + #endif } else { // scatter mode (one-to-all): master [0] -> everyone - recvSizes.resize_nocopy(nProcs_); - if (UPstream::master(comm_)) { + recvCount.resize(nProcs_, Zero); + + #ifdef Foam_PstreamBuffers_map_storage + forAllConstIters(sendBuffers_, iter) + { + recvCount[iter.key()] = iter.val().size(); + } + #else forAll(sendBuffers_, proci) { - recvSizes[proci] = sendBuffers_[proci].size(); + recvCount[proci] = sendBuffers_[proci].size(); } + #endif + } + else + { + // Scattering, so non-master sends nothing + recvCount = Zero; + + #ifdef Foam_PstreamBuffers_map_storage + recvSizes.clear(); + recvSizes.resize_nocopy(nProcs_); + #else + recvSizes = Zero; + #endif } - const label myRecv(UPstream::listScatterValues(recvSizes, comm_)); + const label nRecv(UPstream::listScatterValues(recvCount, comm_)); - recvSizes = Zero; - recvSizes[0] = myRecv; - } + if (UPstream::master(comm_)) + { + #ifdef Foam_PstreamBuffers_map_storage + recvSizes.clear(); + #else + recvSizes = Zero; + #endif + } + else + { + #ifdef Foam_PstreamBuffers_map_storage + recvSizes.clear(); + if (nRecv) + { + recvSizes.insert(0, nRecv); + } + #else + recvSizes = Zero; + recvSizes[0] = nRecv; + #endif + } + } Pstream::exchange<DynamicList<char>, char> ( @@ -197,9 +338,18 @@ Foam::PstreamBuffers::PstreamBuffers tag_(tag), comm_(communicator), nProcs_(UPstream::nProcs(comm_)), + + #ifdef Foam_PstreamBuffers_map_storage + // Default sizing (128) is probably OK. + // Meshes often have 16-20 neighbours (avg) and 100 neighbours (max) + sendBuffers_(), + recvBuffers_(), + recvPositions_() + #else sendBuffers_(nProcs_), recvBuffers_(nProcs_), recvPositions_(nProcs_, Zero) + #endif {} @@ -207,7 +357,23 @@ Foam::PstreamBuffers::PstreamBuffers Foam::PstreamBuffers::~PstreamBuffers() { - // Check that all data has been consumed. + // Check that all data has been consumed + #ifdef Foam_PstreamBuffers_map_storage + forAllConstIters(recvBuffers_, iter) + { + const label proci = iter.key(); + const label len = iter.val().size(); + const label pos = recvPositions_.lookup(proci, len); + + if (pos < len) + { + FatalErrorInFunction + << "Message from processor " << proci + << " Only consumed " << pos << " of " << len << " bytes" << nl + << Foam::abort(FatalError); + } + } + #else forAll(recvBuffers_, proci) { const label pos = recvPositions_[proci]; @@ -221,6 +387,7 @@ Foam::PstreamBuffers::~PstreamBuffers() << Foam::abort(FatalError); } } + #endif } @@ -231,7 +398,11 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer const label proci ) { + #ifdef Foam_PstreamBuffers_map_storage + return sendBuffers_(proci); // Created on demand if needed + #else return sendBuffers_[proci]; + #endif } @@ -240,13 +411,21 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer const label proci ) { + #ifdef Foam_PstreamBuffers_map_storage + return recvBuffers_(proci); // Created on demand if needed + #else return recvBuffers_[proci]; + #endif } Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci) { + #ifdef Foam_PstreamBuffers_map_storage + return recvPositions_(proci, 0); // Created on demand if needed + #else return recvPositions_[proci]; + #endif } @@ -254,20 +433,38 @@ Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci) void Foam::PstreamBuffers::clearSends() { + #ifdef Foam_PstreamBuffers_map_storage + forAllIters(sendBuffers_, iter) + { + iter.val().clear(); + } + #else for (DynamicList<char>& buf : sendBuffers_) { buf.clear(); } + #endif } void Foam::PstreamBuffers::clearRecvs() { + #ifdef Foam_PstreamBuffers_map_storage + forAllIters(recvBuffers_, iter) + { + iter.val().clear(); + } + forAllIters(recvPositions_, iter) + { + iter.val() = 0; + } + #else for (DynamicList<char>& buf : recvBuffers_) { buf.clear(); } recvPositions_ = Zero; + #endif } @@ -281,14 +478,41 @@ void Foam::PstreamBuffers::clear() void Foam::PstreamBuffers::clearSend(const label proci) { + #ifdef Foam_PstreamBuffers_map_storage + { + auto iter = sendBuffers_.find(proci); + if (iter.good()) + { + iter.val().clear(); + } + } + #else sendBuffers_[proci].clear(); + #endif } void Foam::PstreamBuffers::clearRecv(const label proci) { + #ifdef Foam_PstreamBuffers_map_storage + { + auto iter = recvBuffers_.find(proci); + if (iter.good()) + { + iter.val().clear(); + } + } + { + auto iter = recvPositions_.find(proci); + if (iter.good()) + { + iter.val() = 0; + } + } + #else recvBuffers_[proci].clear(); recvPositions_[proci] = 0; + #endif } @@ -296,6 +520,21 @@ void Foam::PstreamBuffers::clearStorage() { // Could also clear out entire sendBuffers_, recvBuffers_ and reallocate. // Not sure if it makes much difference + + #ifdef Foam_PstreamBuffers_map_storage + forAllIters(sendBuffers_, iter) + { + iter.val().clearStorage(); + } + forAllIters(recvBuffers_, iter) + { + iter.val().clearStorage(); + } + forAllIters(recvPositions_, iter) + { + iter.val() = 0; + } + #else for (DynamicList<char>& buf : sendBuffers_) { buf.clearStorage(); @@ -305,6 +544,7 @@ void Foam::PstreamBuffers::clearStorage() buf.clearStorage(); } recvPositions_ = Zero; + #endif finishedSendsCalled_ = false; } @@ -312,6 +552,15 @@ void Foam::PstreamBuffers::clearStorage() bool Foam::PstreamBuffers::hasSendData() const { + #ifdef Foam_PstreamBuffers_map_storage + forAllConstIters(sendBuffers_, iter) + { + if (!iter.val().empty()) + { + return true; + } + } + #else for (const DynamicList<char>& buf : sendBuffers_) { if (!buf.empty()) @@ -319,6 +568,7 @@ bool Foam::PstreamBuffers::hasSendData() const return true; } } + #endif return false; } @@ -327,6 +577,18 @@ bool Foam::PstreamBuffers::hasRecvData() const { if (finishedSendsCalled_) { + #ifdef Foam_PstreamBuffers_map_storage + forAllConstIters(recvBuffers_, iter) + { + const label proci = iter.key(); + const label len = iter.val().size(); + + if (recvPositions_.lookup(proci, 0) < len) + { + return true; + } + } + #else forAll(recvBuffers_, proci) { if (recvPositions_[proci] < recvBuffers_[proci].size()) @@ -334,6 +596,7 @@ bool Foam::PstreamBuffers::hasRecvData() const return true; } } + #endif } #ifdef FULLDEBUG else @@ -349,7 +612,7 @@ bool Foam::PstreamBuffers::hasRecvData() const Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const { - return sendBuffers_[proci].size(); + return getBufferSize(sendBuffers_, proci); } @@ -357,8 +620,16 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const { if (finishedSendsCalled_) { + #ifdef Foam_PstreamBuffers_map_storage + #else + const label len + ( + getBufferSize(recvBuffers_, proci) + - recvPositions_.lookup(proci, 0) + ); + #else const label len(recvBuffers_[proci].size() - recvPositions_[proci]); - + #endif if (len > 0) { return len; @@ -378,10 +649,25 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const Foam::labelList Foam::PstreamBuffers::recvDataCounts() const { - labelList counts(recvPositions_.size(), Zero); + labelList counts(nProcs_, Zero); if (finishedSendsCalled_) { + #ifdef Foam_PstreamBuffers_map_storage + forAllConstIters(recvBuffers_, iter) + { + const label proci = iter.key(); + const label len + ( + iter.val().size() - recvPositions_.lookup(proci, 0) + ); + + if (len > 0) + { + counts[proci] = len; + } + } + #else forAll(recvBuffers_, proci) { const label len(recvBuffers_[proci].size() - recvPositions_[proci]); @@ -391,6 +677,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const counts[proci] = len; } } + #endif } #ifdef FULLDEBUG else @@ -404,20 +691,35 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const } -Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const +Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount +( + const label excludeProci +) const { label maxLen = 0; if (finishedSendsCalled_) { - forAll(recvBuffers_, idx) + #ifdef Foam_PstreamBuffers_map_storage + forAllConstIters(recvBuffers_, iter) + { + const label proci = iter.key(); + if (excludeProci != proci) + { + label len(iter.val().size() - recvPositions_.lookup(proci, 0)); + maxLen = max(maxLen, len); + } + } + #else + forAll(recvBuffers_, proci) { - const label len(recvBuffers_[idx].size() - recvPositions_[idx]); - if (idx != proci) + if (excludeProci != proci) { + label len(recvBuffers_[proci].size() - recvPositions_[proci]); maxLen = max(maxLen, len); } } + #endif } #ifdef FULLDEBUG else @@ -449,6 +751,24 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const { if (finishedSendsCalled_) { + #ifdef Foam_PstreamBuffers_map_storage + const auto iter = recvBuffers_.cfind(proci); + + if (iter.good()) + { + const label pos = recvPositions_.lookup(proci, 0); + const label len = iter.val().size(); + + if (pos < len) + { + return UList<char> + ( + const_cast<char*>(iter.val().cbegin(pos)), + (len - pos) + ); + } + } + #else const label pos = recvPositions_[proci]; const label len = recvBuffers_[proci].size(); @@ -460,6 +780,7 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const (len - pos) ); } + #endif } #ifdef FULLDEBUG else @@ -483,14 +804,22 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept void Foam::PstreamBuffers::finishedSends(const bool wait) { + #ifdef Foam_PstreamBuffers_map_storage + Map<label> recvSizes; + #else labelList recvSizes; + #endif finalExchange(wait, recvSizes); } void Foam::PstreamBuffers::finishedSends ( + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else labelList& recvSizes, + #endif const bool wait ) { @@ -510,6 +839,7 @@ void Foam::PstreamBuffers::finishedSends } +#ifdef Foam_PstreamBuffers_dense void Foam::PstreamBuffers::finishedSends ( const labelUList& sendProcs, @@ -612,16 +942,58 @@ bool Foam::PstreamBuffers::finishedSends return changed; } +#endif void Foam::PstreamBuffers::finishedNeighbourSends ( const labelUList& neighProcs, + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else labelList& recvSizes, + #endif const bool wait ) { - finishedSends(neighProcs, neighProcs, recvSizes, wait); + #ifdef Foam_PstreamBuffers_map_storage + recvSizes.clear(); + + for (const label proci : neighProcs) + { + recvSizes.insert(proci, 0); + } + + // Prune any send buffers that are not neighbours + forAllIters(sendBuffers_, iter) + { + if (!recvSizes.contains(iter.key())) + { + iter.val().clear(); + } + } + + finalExchange(wait, recvSizes); + #else + // Resize for copying back + recvSizes.resize_nocopy(sendBuffers_.size()); + + // Prune send buffers that are not neighbours + { + labelHashSet keepProcs(neighProcs); + + // Prune send buffers that are not neighbours + forAll(sendBuffers_, proci) + { + if (!keepProcs.contains(proci)) + { + sendBuffers_[proci].clear(); + } + } + } + + finalExchange(wait, recvSizes); + #endif } @@ -631,31 +1003,65 @@ void Foam::PstreamBuffers::finishedNeighbourSends const bool wait ) { + #ifdef Foam_PstreamBuffers_map_storage finishedSends(neighProcs, neighProcs, wait); + #else + labelList recvSizes; + + // Prune send buffers that are not neighbours + { + labelHashSet keepProcs(neighProcs); + + // Prune send buffers that are not neighbours + forAll(sendBuffers_, proci) + { + if (!keepProcs.contains(proci)) + { + sendBuffers_[proci].clear(); + } + } + } + + finalExchange(wait, recvSizes); + #endif } void Foam::PstreamBuffers::finishedGathers(const bool wait) { + #ifdef Foam_PstreamBuffers_map_storage + Map<label> recvSizes; + finalGatherScatter(true, wait, false, recvSizes); + #else labelList recvSizes; - finalExchangeGatherScatter(true, wait, false, recvSizes); + finalGatherScatter(true, wait, false, recvSizes); + #endif } void Foam::PstreamBuffers::finishedScatters(const bool wait) { + #ifdef Foam_PstreamBuffers_map_storage + Map<label> recvSizes; + finalGatherScatter(false, wait, false, recvSizes); + #else labelList recvSizes; - finalExchangeGatherScatter(false, wait, false, recvSizes); + finalGatherScatter(false, wait, false, recvSizes); + #endif } void Foam::PstreamBuffers::finishedGathers ( + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else labelList& recvSizes, + #endif const bool wait ) { - finalExchangeGatherScatter(true, wait, true, recvSizes); + finalGatherScatter(true, wait, true, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -673,11 +1079,15 @@ void Foam::PstreamBuffers::finishedGathers void Foam::PstreamBuffers::finishedScatters ( + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else labelList& recvSizes, + #endif const bool wait ) { - finalExchangeGatherScatter(false, wait, true, recvSizes); + finalGatherScatter(false, wait, true, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 05efab023b8..67d648293ac 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -100,16 +100,27 @@ SourceFiles #define Foam_PstreamBuffers_H #include "DynamicList.H" +#include "Map.H" #include "UPstream.H" #include "IOstream.H" +// Transitional +#define Foam_PstreamBuffers_map_storage +// #define Foam_PstreamBuffers_dense + +#ifdef Foam_PstreamBuffers_dense +#undef Foam_PstreamBuffers_map_storage +#endif + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // namespace Foam { // Forward Declarations +#ifdef Foam_PstreamBuffers_dense class bitSet; +#endif /*---------------------------------------------------------------------------*\ Class PstreamBuffers Declaration @@ -143,6 +154,19 @@ class PstreamBuffers // Buffer storage + #ifdef Foam_PstreamBuffers_map_storage + + //- Send buffers (sparse) + Map<DynamicList<char>> sendBuffers_; + + //- Receive buffers (sparse) + Map<DynamicList<char>> recvBuffers_; + + //- Current read positions within recvBuffers_ (sparse) + Map<label> recvPositions_; + + #else + //- Send buffers. Size is nProcs() List<DynamicList<char>> sendBuffers_; @@ -152,6 +176,8 @@ class PstreamBuffers //- Current read positions within recvBuffers_. Size is nProcs() labelList recvPositions_; + #endif + // Private Member Functions @@ -160,12 +186,17 @@ class PstreamBuffers void finalExchange ( const bool wait, + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes + #else labelList& recvSizes + #endif ); //- Mark sends as done. // Only exchange sizes using the sendProcs/recvProcs subset // (nonBlocking comms). + #ifdef Foam_PstreamBuffers_dense void finalExchange ( const labelUList& sendProcs, @@ -173,6 +204,7 @@ class PstreamBuffers const bool wait, labelList& recvSizes ); + #endif //- For all-to-one or one-to-all void finalExchangeGatherScatter @@ -180,7 +212,11 @@ class PstreamBuffers const bool isGather, const bool wait, const bool needSizes, // If recvSizes needed or scratch + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes + #else labelList& recvSizes + #endif ); @@ -340,7 +376,7 @@ public: //- Maximum receive size, excluding the specified processor rank //- Must call finishedSends() or other finished.. method first! - label maxNonLocalRecvCount(const label proci) const; + label maxNonLocalRecvCount(const label excludeProci) const; //- Number of unconsumed receive bytes for the specified processor. //- Must call finishedSends() or other finished.. method first! @@ -392,7 +428,15 @@ public: // \param wait wait for requests to complete (in nonBlocking mode) // // \warning currently only valid for nonBlocking comms. - void finishedSends(labelList& recvSizes, const bool wait = true); + void finishedSends + ( + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else + labelList& recvSizes, + #endif + const bool wait = true + ); // Functions with restricted neighbours @@ -406,12 +450,14 @@ public: // \param wait wait for requests to complete (in nonBlocking mode) // // \warning currently only valid for nonBlocking comms. + #ifdef Foam_PstreamBuffers_dense void finishedSends ( const labelUList& sendProcs, const labelUList& recvProcs, const bool wait = true ); + #endif //- Mark sends as done using subset of send/recv ranks //- to exchange data on. Recovers the sizes (bytes) received. @@ -424,6 +470,7 @@ public: // \param wait wait for requests to complete (in nonBlocking mode) // // \warning currently only valid for nonBlocking comms. + #ifdef Foam_PstreamBuffers_dense void finishedSends ( const labelUList& sendProcs, @@ -431,6 +478,7 @@ public: labelList& recvSizes, const bool wait = true ); + #endif //- A caching version that uses a limited send/recv connectivity. // @@ -443,6 +491,7 @@ public: // \return True if the send/recv connectivity changed // // \warning currently only valid for nonBlocking comms. + #ifdef Foam_PstreamBuffers_dense bool finishedSends ( bitSet& sendConnections, @@ -450,6 +499,7 @@ public: DynamicList<label>& recvProcs, const bool wait = true ); + #endif //- Mark sends as done using subset of send/recv ranks //- and recover the sizes (bytes) received. @@ -480,7 +530,11 @@ public: void finishedNeighbourSends ( const labelUList& neighProcs, + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else labelList& recvSizes, + #endif const bool wait = true ); @@ -505,7 +559,15 @@ public: // \param wait wait for requests to complete (in nonBlocking mode) // // \warning currently only valid for nonBlocking comms. - void finishedGathers(labelList& recvSizes, const bool wait = true); + void finishedGathers + ( + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else + labelList& recvSizes, + #endif + const bool wait = true + ); //- Mark all sends to sub-procs as done. // @@ -525,7 +587,15 @@ public: // \param wait wait for requests to complete (in nonBlocking mode) // // \warning currently only valid for nonBlocking comms. - void finishedScatters(labelList& recvSizes, const bool wait = true); + void finishedScatters + ( + #ifdef Foam_PstreamBuffers_map_storage + Map<label>& recvSizes, + #else + labelList& recvSizes, + #endif + const bool wait = true + ); }; diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C index c2064396d5d..bb1406717b7 100644 --- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C +++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C @@ -43,7 +43,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh) MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh), stencil_(zoneCPCStencil::New(mesh)), globalNumbering_(stencil_.globalNumbering()), + #ifdef Foam_PstreamBuffers_dense send_(UPstream::nProcs()), + #else + // Default map sizing (as per PstreamBuffers) + send_(), + #endif pBufs_(UPstream::commsTypes::nonBlocking) { // Don't clear storage on persistent buffer @@ -90,6 +95,7 @@ void Foam::zoneDistribute::setUpCommforZone if (UPstream::parRun()) { + #ifdef Foam_PstreamBuffers_dense List<labelHashSet> needed(UPstream::nProcs()); // Bin according to originating (sending) processor @@ -136,6 +142,61 @@ void Foam::zoneDistribute::setUpCommforZone fromProc >> send_[proci]; } } + #else + + forAllIters(send_, iter) + { + iter.val().clear(); + } + + // Bin according to originating (sending) processor + for (const label celli : stencil.needsComm()) + { + if (zone[celli]) + { + for (const label gblIdx : stencil_[celli]) + { + const label proci = globalNumbering_.whichProcID(gblIdx); + + if (proci != Pstream::myProcNo()) + { + send_(proci).insert(gblIdx); + } + } + } + } + + // Stream the send data into PstreamBuffers, + + pBufs_.clear(); + + forAllIters(send_, iter) + { + const label proci = iter.key(); + auto& indices = iter.val(); + + if (proci != UPstream::myProcNo() && !indices.empty()) + { + // Serialize as List + UOPstream toProc(proci, pBufs_); + toProc << indices; + } + + // Clear out old contents + indices.clear(); + } + + pBufs_.finishedSends(); + + for (const int proci : pBufs_.allProcs()) + { + if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci)) + { + UIPstream fromProc(proci, pBufs_); + fromProc >> send_(proci); + } + } + #endif } } diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H index 3b0e182a582..ef8c05e75a2 100644 --- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H +++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H @@ -90,6 +90,8 @@ class zoneDistribute //- Global number into index of cells/faces const globalIndex& globalNumbering_; + #ifdef Foam_PstreamBuffers_dense + //- Global cell/face index to send for processor-to-processor comms List<labelList> send_; @@ -102,6 +104,14 @@ class zoneDistribute //- Parallel [cache]: recv data from these ranks DynamicList<label> recvProcs_; + #else + + //- Per proc the global cell/face index to send for + //- processor-to-processor comms + Map<labelHashSet> send_; + + #endif + //- Persistent set of exchange buffers PstreamBuffers pBufs_; diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H index 32261eee49d..0c4b82e0b47 100644 --- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H +++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2019-2020 DLR - Copyright (C) 2020-2022 OpenCFD Ltd. + Copyright (C) 2020-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -161,6 +161,8 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc if (UPstream::parRun()) { + #ifdef Foam_PstreamBuffers_dense + if (sendConnections_.empty()) { WarningInFunction @@ -208,6 +210,48 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc neiValues += tmpValues; } } + + #else + + pBufs_.clear(); + + forAllConstIters(send_, iter) + { + const label proci = iter.key(); + const auto& indices = iter.val(); + + if (proci != UPstream::myProcNo() && !indices.empty()) + { + // Serialize as Map + Map<Type> sendValues(2*indices.size()); + + for (const label sendIdx : indices) + { + sendValues.insert + ( + sendIdx, + getLocalValue(phi, globalNumbering_.toLocal(sendIdx)) + ); + } + + UOPstream toProc(proci, pBufs_); + toProc << sendValues; + } + } + + pBufs_.finishedSends(); + + for (const int proci : pBufs_.allProcs()) + { + if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci)) + { + UIPstream fromProc(proci, pBufs_); + Map<Type> tmpValues(fromProc); + + neiValues += tmpValues; + } + } + #endif } return neiValues; -- GitLab