diff --git a/etc/controlDict b/etc/controlDict index 53059f9f43d547658e8cb542901e4639ad369935..fc3d73ae5444322f8ca41760710230f505493d15 100644 --- a/etc/controlDict +++ b/etc/controlDict @@ -1,7 +1,7 @@ /*--------------------------------*- C++ -*----------------------------------*\ | ========= | | | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox | -| \\ / O peration | Version: v2306 | +| \\ / O peration | Version: v2312 | | \\ / A nd | Website: www.openfoam.com | | \\/ M anipulation | | \*---------------------------------------------------------------------------*/ @@ -174,12 +174,13 @@ OptimisationSwitches nbx.tuning 0; // Additional PstreamBuffers tuning parameters (experimental) - // -1 : PEX with all-to-all for buffer sizes and point-to-point - // for contents (legacy approach) - // 0 : hybrid PEX with NBX for buffer sizes and point-to-point - // for contents (proposed new approach) - // 1 : full NBX for buffer sizes and contents (very experimental) - pbufs.tuning -1; + // 0 : (legacy PEX) + // * all-to-all for buffer sizes [legacy approach] + // * point-to-point for contents + // 1 : (hybrid PEX) + // * NBX for buffer sizes [new approach] + // * point-to-point for contents + pbufs.tuning 0; // ===== diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 32afe0be873115ec5acc547690da9ec67bf8e34f..dbcfe69c56d8614ebd17fe06cea35e614e977050 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -599,7 +599,7 @@ public: // Non-blocking exchange - //- Exchange \em contiguous data using non-blocking consensus + //- Exchange \em contiguous data using non-blocking consensus (NBX) //- Sends sendData, receives into recvData. // // Each entry of the recvBufs list is cleared before receipt. @@ -614,10 +614,10 @@ public: List<Container>& recvBufs, const int tag, const label comm, - const bool wait = true //!< Wait for requests to complete + const bool wait = true //!< (ignored) ); - //- Exchange \em contiguous data using non-blocking consensus + //- Exchange \em contiguous data using non-blocking consensus (NBX) //- Sends sendData, receives into recvData. // // Each \em entry of the recvBufs map is cleared before receipt, @@ -636,7 +636,23 @@ public: Map<Container>& recvBufs, const int tag, const label comm, - const bool wait = true //!< Wait for requests to complete + const bool wait = true //!< (ignored) + ); + + //- Exchange \em contiguous data using non-blocking consensus (NBX) + //- Sends sendData returns receive information. + // + // For \b non-parallel : copy own rank (if it exists and non-empty) + // + // \note The message tag should be chosen to be a unique value + // since the implementation uses probing with ANY_SOURCE !! + template<class Container, class Type> + static Map<Container> exchangeConsensus + ( + const Map<Container>& sendBufs, + const int tag, + const label comm, + const bool wait = true //!< (ignored) ); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 213969bd1386650106280b45d893b8db8ddd9a6a..1949e7904c1495d855e1ff969a6d649d1129494d 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -36,7 +36,7 @@ License int Foam::PstreamBuffers::algorithm ( // Name may change in the future (JUN-2023) - Foam::debug::optimisationSwitch("pbufs.tuning", -1) + Foam::debug::optimisationSwitch("pbufs.tuning", 0) ); registerOptSwitch ( @@ -46,20 +46,19 @@ registerOptSwitch ); -// Simple enumerations -// ------------------- -static constexpr int algorithm_PEX_allToAll = -1; // Traditional PEX -//static constexpr int algorithm_PEX_hybrid = 0; // Possible new default? -static constexpr int algorithm_full_NBX = 1; // Very experimental +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // +inline void Foam::PstreamBuffers::setFinished(bool on) noexcept +{ + finishedSendsCalled_ = on; +} -// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // inline void Foam::PstreamBuffers::initFinalExchange() { // Could also check that it is not called twice // but that is used for overlapping send/recv (eg, overset) - finishedSendsCalled_ = true; + setFinished(true); clearUnregistered(); } @@ -67,65 +66,149 @@ inline void Foam::PstreamBuffers::initFinalExchange() void Foam::PstreamBuffers::finalExchange ( + enum modeOption mode, const bool wait, labelList& recvSizes ) { initFinalExchange(); - if (commsType_ == UPstream::commsTypes::nonBlocking) + // Pre-flight checks + switch (mode) { - if - ( - wait - && (algorithm >= algorithm_full_NBX) - && (UPstream::maxCommsSize <= 0) - ) + case modeOption::DEFAULT : { - // NBX algorithm (nonblocking exchange) - // - when requested and waiting, no data chunking etc - - PstreamDetail::exchangeConsensus<DynamicList<char>, char> + // Choose (ALL_TO_ALL | NBX_PEX) from static settings + mode = ( - sendBuffers_, - recvBuffers_, - recvSizes, - (tag_ + 271828), // some unique tag? - comm_, - wait + (algorithm <= 0) + ? modeOption::ALL_TO_ALL + : modeOption::NBX_PEX ); - - return; + break; } - - // PEX algorithm with two different flavours of exchanging sizes - - // Assemble the send sizes (cf. Pstream::exchangeSizes) - labelList sendSizes(nProcs_); - forAll(sendBuffers_, proci) + case modeOption::GATHER : { - sendSizes[proci] = sendBuffers_[proci].size(); + // gather mode (all-to-one) : master [0] <- everyone + // - only send to master [0] + // note: master [0] is also allowed to 'send' to itself + + for (label proci = 1; proci < sendBuffers_.size(); ++proci) + { + sendBuffers_[proci].clear(); + } + break; } - recvSizes.resize_nocopy(nProcs_); - if (algorithm == algorithm_PEX_allToAll) + case modeOption::SCATTER : { - // PEX stage 1: exchange sizes (all-to-all) - UPstream::allToAll(sendSizes, recvSizes, comm_); + // scatter mode (one-to-all) : master [0] -> everyone + + if (!UPstream::master(comm_)) + { + // Non-master: has no sends + clearSends(); + } + break; } - else + + default : + break; + } + + + if (commsType_ == UPstream::commsTypes::nonBlocking) + { + // PEX algorithm with different flavours of exchanging sizes + // PEX stage 1: exchange sizes + + labelList sendSizes; // Not used by gather/scatter + + switch (mode) { - // PEX stage 1: exchange sizes (non-blocking consensus) - UPstream::allToAllConsensus - ( - sendSizes, - recvSizes, - (tag_ + 314159), // some unique tag? - comm_ - ); + case modeOption::GATHER : + { + // gather mode (all-to-one): master [0] <- everyone + // - presumed that MPI_Gather will be the most efficient + + recvSizes = + UPstream::listGatherValues(sendBuffers_[0].size(), comm_); + + if (!UPstream::master(comm_)) + { + recvSizes.resize_nocopy(nProcs_); + recvSizes = Zero; + } + + break; + } + + case modeOption::SCATTER : + { + // scatter mode (one-to-all): master [0] -> everyone + // - presumed that MPI_Scatter will be the most efficient + + recvSizes.resize_nocopy(nProcs_); + + if (UPstream::master(comm_)) + { + forAll(sendBuffers_, proci) + { + recvSizes[proci] = sendBuffers_[proci].size(); + } + } + + const label myRecv + ( + UPstream::listScatterValues(recvSizes, comm_) + ); + + recvSizes = Zero; + recvSizes[0] = myRecv; + + break; + } + + case modeOption::NBX_PEX : + { + // Assemble the send sizes (cf. Pstream::exchangeSizes) + sendSizes.resize_nocopy(nProcs_); + forAll(sendBuffers_, proci) + { + sendSizes[proci] = sendBuffers_[proci].size(); + } + recvSizes.resize_nocopy(nProcs_); + + // Exchange sizes (non-blocking consensus) + UPstream::allToAllConsensus + ( + sendSizes, + recvSizes, + (tag_ + 314159), // some unique tag? + comm_ + ); + break; + } + + case modeOption::DEFAULT : + case modeOption::ALL_TO_ALL : + { + // Assemble the send sizes (cf. Pstream::exchangeSizes) + sendSizes.resize_nocopy(nProcs_); + forAll(sendBuffers_, proci) + { + sendSizes[proci] = sendBuffers_[proci].size(); + } + recvSizes.resize_nocopy(nProcs_); + + // Exchange sizes (all-to-all) + UPstream::allToAll(sendSizes, recvSizes, comm_); + break; + } } + // PEX stage 2: point-to-point data exchange Pstream::exchange<DynamicList<char>, char> ( @@ -166,7 +249,7 @@ void Foam::PstreamBuffers::finalExchange recvSizes[proci] = 1; // Connected } - for (label proci=0; proci < nProcs_; ++proci) + for (label proci = 0; proci < nProcs_; ++proci) { if (!recvSizes[proci]) // Not connected { @@ -200,93 +283,6 @@ void Foam::PstreamBuffers::finalExchange } -void Foam::PstreamBuffers::finalGatherScatter -( - const bool isGather, - const bool wait, - labelList& recvSizes -) -{ - initFinalExchange(); - - if (isGather) - { - // gather mode (all-to-one) - - // Only send to master [0]. Master is also allowed to 'send' to itself - - for (label proci=1; proci < sendBuffers_.size(); ++proci) - { - sendBuffers_[proci].clear(); - } - } - else - { - // scatter mode (one-to-all) - - if (!UPstream::master(comm_)) - { - // Non-master: has no sends - clearSends(); - } - } - - - if (commsType_ == UPstream::commsTypes::nonBlocking) - { - // Use PEX algorithm - // - for a non-sparse gather/scatter, it is presumed that - // MPI_Gather/MPI_Scatter will be the most efficient way to - // communicate the sizes. - - // PEX stage 1: exchange sizes (gather or scatter) - if (isGather) - { - // gather mode (all-to-one): master [0] <- everyone - - recvSizes = - UPstream::listGatherValues(sendBuffers_[0].size(), comm_); - - if (!UPstream::master(comm_)) - { - recvSizes.resize_nocopy(nProcs_); - recvSizes = Zero; - } - } - else - { - // scatter mode (one-to-all): master [0] -> everyone - - recvSizes.resize_nocopy(nProcs_); - - if (UPstream::master(comm_)) - { - forAll(sendBuffers_, proci) - { - recvSizes[proci] = sendBuffers_[proci].size(); - } - } - - const label myRecv(UPstream::listScatterValues(recvSizes, comm_)); - - recvSizes = Zero; - recvSizes[0] = myRecv; - } - - // PEX stage 2: point-to-point data exchange - Pstream::exchange<DynamicList<char>, char> - ( - sendBuffers_, - recvSizes, - recvBuffers_, - tag_, - comm_, - wait - ); - } -} - - // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // Foam::PstreamBuffers::PstreamBuffers @@ -382,7 +378,7 @@ void Foam::PstreamBuffers::clear() { clearSends(); clearRecvs(); - finishedSendsCalled_ = false; + setFinished(false); } @@ -431,13 +427,13 @@ void Foam::PstreamBuffers::clearStorage() } recvPositions_ = Zero; - finishedSendsCalled_ = false; + setFinished(false); } void Foam::PstreamBuffers::initRegisterSend() { - if (!finishedSendsCalled_) + if (!finished()) { for (label proci = 0; proci < nProcs_; ++proci) { @@ -474,7 +470,7 @@ bool Foam::PstreamBuffers::hasSendData() const bool Foam::PstreamBuffers::hasRecvData() const { - if (finishedSendsCalled_) + if (finished()) { forAll(recvBuffers_, proci) { @@ -504,7 +500,7 @@ Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const { - if (finishedSendsCalled_) + if (finished()) { const label len(recvBuffers_[proci].size() - recvPositions_[proci]); @@ -529,7 +525,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const { labelList counts(nProcs_, Zero); - if (finishedSendsCalled_) + if (finished()) { forAll(recvBuffers_, proci) { @@ -560,7 +556,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount { label maxLen = 0; - if (finishedSendsCalled_) + if (finished()) { forAll(recvBuffers_, proci) { @@ -599,7 +595,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount() const const Foam::UList<char> Foam::PstreamBuffers::peekRecvData(const label proci) const { - if (finishedSendsCalled_) + if (finished()) { const label pos = recvPositions_[proci]; const label len = recvBuffers_[proci].size(); @@ -625,18 +621,17 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const } -bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept +void Foam::PstreamBuffers::finishedSends(const bool wait) { - bool old(allowClearRecv_); - allowClearRecv_ = on; - return old; + labelList recvSizes; + finalExchange(modeOption::DEFAULT, wait, recvSizes); } -void Foam::PstreamBuffers::finishedSends(const bool wait) +void Foam::PstreamBuffers::finishedSendsNBX(const bool wait) { labelList recvSizes; - finalExchange(wait, recvSizes); + finalExchange(modeOption::NBX_PEX, wait, recvSizes); } @@ -649,7 +644,7 @@ void Foam::PstreamBuffers::finishedSends // Resize for copying back recvSizes.resize_nocopy(sendBuffers_.size()); - finalExchange(wait, recvSizes); + finalExchange(modeOption::DEFAULT, wait, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -717,8 +712,9 @@ bool Foam::PstreamBuffers::finishedSends if (changed) { // Update send/recv topology + labelList recvSizes; - finishedSends(recvSizes, wait); // eg, using all-to-all + finishedSends(recvSizes, wait); // modeOption::DEFAULT (eg all-to-all) // The send ranks sendProcs.clear(); @@ -754,14 +750,14 @@ bool Foam::PstreamBuffers::finishedSends void Foam::PstreamBuffers::finishedGathers(const bool wait) { labelList recvSizes; - finalGatherScatter(true, wait, recvSizes); + finalExchange(modeOption::GATHER, wait, recvSizes); } void Foam::PstreamBuffers::finishedScatters(const bool wait) { labelList recvSizes; - finalGatherScatter(false, wait, recvSizes); + finalExchange(modeOption::SCATTER, wait, recvSizes); } @@ -771,7 +767,7 @@ void Foam::PstreamBuffers::finishedGathers const bool wait ) { - finalGatherScatter(true, wait, recvSizes); + finalExchange(modeOption::GATHER, wait, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -793,7 +789,7 @@ void Foam::PstreamBuffers::finishedScatters const bool wait ) { - finalGatherScatter(false, wait, recvSizes); + finalExchange(modeOption::SCATTER, wait, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -809,4 +805,27 @@ void Foam::PstreamBuffers::finishedScatters } +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // +// Controls + +bool Foam::PstreamBuffers::finished() const noexcept +{ + return finishedSendsCalled_; +} + + +bool Foam::PstreamBuffers::allowClearRecv() const noexcept +{ + return allowClearRecv_; +} + + +bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept +{ + bool old(allowClearRecv_); + allowClearRecv_ = on; + return old; +} + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index fc7f6f3389051da724d3c853286ff84dc6ee1f4a..e61bf65e904cc4f1384692faf582cb679685b9c9 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -32,7 +32,7 @@ Description Use UOPstream to stream data into buffers, call finishedSends() to notify that data is in buffers and then use IUPstream to get data out - of received buffers. Works with both blocking and nonBlocking. Does + of received buffers. Works with both blocking and non-blocking. Does not make much sense with scheduled since there you would not need these explicit buffers. @@ -151,6 +151,19 @@ class bitSet; class PstreamBuffers { + // Private Data Types + + //- Private enumeration for handling PEX stage 1 (sizing) modes + enum class modeOption : unsigned char + { + DEFAULT, //!< Use statically configured algorithm + GATHER, //!< Use all-to-one (gather) of sizes + SCATTER, //!< Use one-to-all (scatter) of sizes + ALL_TO_ALL, //!< Use allToAll to obtain sizes + NBX_PEX //!< Use consensus exchange (NBX) to obtain sizes + }; + + // Private Data //- Track if sends are complete @@ -190,20 +203,24 @@ class PstreamBuffers // Private Member Functions + //- Change status of finished sends called + inline void setFinished(bool on) noexcept; + //- Clear 'unregistered' send buffers, tag as being send-ready inline void initFinalExchange(); //- Mark all sends as having been done. - // This will start receives (nonBlocking comms). + // This will start receives (non-blocking comms). void finalExchange ( + enum modeOption mode, const bool wait, labelList& recvSizes ); //- Mark sends as done. // Only exchange sizes using the neighbour ranks - // (nonBlocking comms). + // (non-blocking comms). void finalExchange ( const labelUList& sendProcs, @@ -212,14 +229,6 @@ class PstreamBuffers labelList& recvSizes ); - //- For all-to-one or one-to-all - void finalGatherScatter - ( - const bool isGather, - const bool wait, - labelList& recvSizes - ); - // Friendship Access @@ -343,17 +352,11 @@ public: // Queries //- True if finishedSends() or finishedNeighbourSends() has been called - bool finished() const noexcept - { - return finishedSendsCalled_; - } + bool finished() const noexcept; //- Is clearStorage of individual receive buffer by external hooks //- allowed? (default: true) - bool allowClearRecv() const noexcept - { - return allowClearRecv_; - } + bool allowClearRecv() const noexcept; //- True if any (local) send buffers have data bool hasSendData() const; @@ -436,74 +439,96 @@ public: // Regular Functions - //- Mark sends as done + //- Mark the send phase as being finished. // - // Non-blocking mode: populates receive buffers (all-to-all). - // \param wait wait for requests to complete (in nonBlocking mode) + // Non-blocking mode: populates receive buffers using all-to-all + // or NBX (depending on tuning parameters). + // \param wait wait for requests to complete (in non-blocking mode) void finishedSends(const bool wait = true); - //- Mark sends as done. + //- Mark the send phase as being finished. + // + // Non-blocking mode: populates receive buffers using NBX. + // \param wait wait for requests to complete (in non-blocking mode) + void finishedSendsNBX(const bool wait = true); + + //- Mark the send phase as being finished. //- Recovers the sizes (bytes) received. // - // Non-blocking mode: populates receive buffers (all-to-all). - // \param[out] recvSizes the sizes (bytes) received - // \param wait wait for requests to complete (in nonBlocking mode) + // Non-blocking mode: populates receive buffers using all-to-all + // or NBX (depending on tuning parameters). + // \warning currently only valid for non-blocking comms. + void finishedSends + ( + //! [out] the sizes (bytes) received + labelList& recvSizes, + //! wait for requests to complete (in non-blocking mode) + const bool wait = true + ); + + //- Mark the send phase as being finished. + //- Recovers the sizes (bytes) received. // - // \warning currently only valid for nonBlocking comms. - void finishedSends(labelList& recvSizes, const bool wait = true); + // Non-blocking mode: populates receive buffers using NBX. + // \warning currently only valid for non-blocking comms. + void finishedSendsNBX + ( + //! [out] the sizes (bytes) received + labelList& recvSizes, + //! wait for requests to complete (in non-blocking mode) + const bool wait = true + ); // Functions with restricted neighbours - //- Mark sends as done using subset of send/recv ranks - //- and recover the sizes (bytes) received. + //- Mark the send phase as being finished, with communication + //- being limited to a known subset of send/recv ranks. // // Non-blocking mode: populates receive buffers. // - // \param neighProcs ranks used for sends/recvs - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \warning currently only valid for nonBlocking comms. + // \warning currently only valid for non-blocking comms. // \note Same as finishedSends with identical sendProcs/recvProcs void finishedNeighbourSends ( + //! ranks used for sends/recvs const labelUList& neighProcs, + //! wait for requests to complete (in non-blocking mode) const bool wait = true ); - //- Mark sends as done using subset of send/recv ranks - //- and recover the sizes (bytes) received. + //- Mark the send phase as being finished, with communication + //- being limited to a known subset of send/recv ranks. + //- Recovers the sizes (bytes) received. // // Non-blocking mode: it will populate receive buffers. // - // \param neighProcs ranks used for sends/recvs - // \param[out] recvSizes the sizes (bytes) received - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \warning currently only valid for nonBlocking mode. + // \warning currently only valid for non-blocking mode. void finishedNeighbourSends ( + //! ranks used for sends/recvs const labelUList& neighProcs, + //! [out] the sizes (bytes) received labelList& recvSizes, + //! wait for requests to complete (in non-blocking mode) const bool wait = true ); //- A caching version that uses a limited send/recv connectivity. // // Non-blocking mode: populates receive buffers. - // \param sendConnections on/off for sending ranks - // \param sendProcs ranks used for sends - // \param recvProcs ranks used for recvs - // \param wait wait for requests to complete (in nonBlocking mode) - // // \return True if the send/recv connectivity changed // - // \warning currently only valid for nonBlocking comms. + // \warning currently only valid for non-blocking comms. bool finishedSends ( + //! inter-rank connections (on/off) for sending ranks bitSet& sendConnections, + //! ranks used for sends DynamicList<label>& sendProcs, + //! ranks used for recvs DynamicList<label>& recvProcs, + //! wait for requests to complete (in non-blocking mode) const bool wait = true ); @@ -515,40 +540,46 @@ public: // Non-blocking mode: populates receive buffers. // Can use recvDataCount, maxRecvCount etc to recover sizes received. // - // \param wait wait for requests to complete (in nonBlocking mode) + // \param wait wait for requests to complete (in non-blocking mode) // - // \warning currently only valid for nonBlocking comms. + // \warning currently only valid for non-blocking comms. void finishedGathers(const bool wait = true); //- Mark all sends to master as done. //- Recovers the sizes (bytes) received. // // Non-blocking mode: populates receive buffers (all-to-one). - // \param[out] recvSizes the sizes (bytes) received - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \warning currently only valid for nonBlocking comms. - void finishedGathers(labelList& recvSizes, const bool wait = true); + // \warning currently only valid for non-blocking comms. + void finishedGathers + ( + //! [out] the sizes (bytes) received + labelList& recvSizes, + //! wait for requests to complete (in non-blocking mode) + const bool wait = true + ); //- Mark all sends to sub-procs as done. // // Non-blocking mode: populates receive buffers. // Can use recvDataCount, maxRecvCount etc to recover sizes received. // - // \param wait wait for requests to complete (in nonBlocking mode) + // \param wait wait for requests to complete (in non-blocking mode) // - // \warning currently only valid for nonBlocking comms. + // \warning currently only valid for non-blocking comms. void finishedScatters(const bool wait = true); //- Mark all sends to sub-procs as done. //- Recovers the sizes (bytes) received. // // Non-blocking mode: populates receive buffers (all-to-one). - // \param[out] recvSizes the sizes (bytes) received - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \warning currently only valid for nonBlocking comms. - void finishedScatters(labelList& recvSizes, const bool wait = true); + // \warning currently only valid for non-blocking comms. + void finishedScatters + ( + //! [out] the sizes (bytes) received + labelList& recvSizes, + //! wait for requests to complete (in non-blocking mode) + const bool wait = true + ); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C index 53894e2ddf6f76e8187483623c2b560a4ae97a74..ccc9cb6c3f734559545a4b8625fb036e3bc7c0a2 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C @@ -46,21 +46,19 @@ namespace Foam namespace PstreamDetail { -//- Exchange \em contiguous data using non-blocking consensus exchange +//- Exchange \em contiguous data using non-blocking consensus exchange (NBX) //- with optional tracking of the receive sizes. // // No internal guards or resizing - data containers are all properly // sized before calling. // -// \param[in] sendBufs The send buffers list (size: numProcs) -// \param[out] recvBufs The recv buffers list (size: numProcs) -// \param[out] recvSizes The recv sizes (size: 0 or numProcs). +// \param[in] sendBufs The send buffers list (size: numProc) +// \param[out] recvBufs The recv buffers list (size: numProc) +// \param[out] recvSizes The recv sizes (size: 0 or numProc). // This parameter can be an empty list, in which case the receive sizes // are not returned. // \param tag The message tag // \param comm The communicator -// \param wait Wait for non-blocking receives to complete -// \param recvCommType If blocking or (default) non-blocking template<class Container, class Type> void exchangeConsensus @@ -69,20 +67,17 @@ void exchangeConsensus UList<Container>& recvBufs, labelUList& recvSizes, const int tag, - const label comm, - const bool wait = true, - const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking + const label comm ) { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); const bool initialBarrier = (UPstream::tuning_NBX_ > 0); - const label startOfRequests = UPstream::nRequests(); const label myProci = UPstream::myProcNo(comm); const label numProc = UPstream::nProcs(comm); - // Initial: clear all receive information + // Initial: clear all receive locations for (auto& buf : recvBufs) { buf.clear(); @@ -98,28 +93,37 @@ void exchangeConsensus if (sendBufs.size() > numProc) { FatalErrorInFunction - << "Send buffers:" << sendBufs.size() << " > numProcs:" << numProc + << "Send buffers:" << sendBufs.size() << " > numProc:" << numProc << Foam::abort(FatalError); } if (recvBufs.size() < numProc) { FatalErrorInFunction - << "Recv buffers:" << recvBufs.size() << " < numProcs:" << numProc + << "Recv buffers:" << recvBufs.size() << " < numProc:" << numProc << Foam::abort(FatalError); } // #endif - if (!UPstream::is_parallel(comm)) + // Fake send/recv for myself - parallel or non-parallel { - // Do myself recvBufs[myProci] = sendBufs[myProci]; if (myProci < recvSizes.size()) { recvSizes[myProci] = recvBufs.size(); } + } + + if (!UPstream::is_parallel(comm)) + { + // Nothing left to do return; } + + // ------------------------------------------------------------------------ + // Setup sends + // ------------------------------------------------------------------------ + // An initial barrier may help to avoid synchronisation problems // caused elsewhere if (initialBarrier) @@ -127,11 +131,12 @@ void exchangeConsensus UPstream::barrier(comm); } + // Algorithm NBX: Nonblocking consensus with List containers DynamicList<UPstream::Request> sendRequests(sendBufs.size()); - // Start nonblocking synchronous send to processor dest + // Start nonblocking synchronous send to destination ranks for (label proci = 0; proci < numProc; ++proci) { const auto& sendData = sendBufs[proci]; @@ -140,19 +145,8 @@ void exchangeConsensus { // Do not send/recv empty data } - else if (proci == myProci) - { - // Do myself - recvBufs[proci] = sendData; - if (proci < recvSizes.size()) - { - recvSizes[proci] = sendData.size(); - } - } - else + else if (proci != myProci) { - // Has data to send. - // The MPI send requests are tracked on a local list UOPstream::write ( sendRequests.emplace_back(), @@ -167,7 +161,15 @@ void exchangeConsensus } + // ------------------------------------------------------------------------ // Probe and receive + // ------------------------------------------------------------------------ + // + // When receiving can use resize() instead of resize_nocopy() since the + // slots were already initially cleared. + // The resize() also works fine with FixedList since it will + // corresponds to a no-op: send and recv sizes will always be + // identical to its fixed size() / max_size() UPstream::Request barrierRequest; @@ -191,17 +193,16 @@ void exchangeConsensus const label count = (probed.second / sizeof(Type)); auto& recvData = recvBufs[proci]; - recvData.resize_nocopy(count); + recvData.resize(count); // OK with resize() instead of _nocopy() if (proci < recvSizes.size()) { recvSizes[proci] = count; } - // Any non-blocking MPI recv requests are tracked on internal stack UIPstream::read ( - recvCommType, + UPstream::commsTypes::blocking, proci, recvData.data_bytes(), recvData.size_bytes(), @@ -229,26 +230,18 @@ void exchangeConsensus } } } - - // Wait for non-blocking receives to finish - if (wait && recvCommType == UPstream::commsTypes::nonBlocking) - { - UPstream::waitRequests(startOfRequests); - } } -//- Exchange \em contiguous data using non-blocking consensus exchange. +//- Exchange \em contiguous data using non-blocking consensus exchange (NBX) // // No internal guards - the sending Map corresponds to a segment of -// 0-numProcs. +// 0-numProc. // -// \param[in] sendBufs The send buffers map (addr: 0-numProcs) +// \param[in] sendBufs The send buffers map (addr: 0-numProc) // \param[out] recvBufs The recv buffers map // \param tag The message tag // \param comm The communicator -// \param wait Wait for non-blocking receives to complete -// \param recvCommType If blocking or (default) non-blocking template<class Container, class Type> void exchangeConsensus @@ -256,17 +249,17 @@ void exchangeConsensus const Map<Container>& sendBufs, Map<Container>& recvBufs, const int tag, - const label comm, - const bool wait = true, - const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking + const label comm ) { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); - const label startOfRequests = UPstream::nRequests(); + // TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0); + const label myProci = UPstream::myProcNo(comm); + const label numProc = UPstream::myProcNo(comm); - // Initial: clear out receive 'slots' + // Initial: clear all receive locations // Preferrable to clear out the map entries instead of the map itself // since this can potentially preserve allocated space // (eg DynamicList entries) between calls @@ -276,9 +269,13 @@ void exchangeConsensus iter.val().clear(); } - if (!UPstream::is_parallel(comm)) + if (!UPstream::is_rank(comm)) + { + return; // Process not in communicator + } + + // Fake send/recv for myself - parallel or non-parallel { - // Do myself const auto iter = sendBufs.find(myProci); if (iter.good()) { @@ -290,43 +287,38 @@ void exchangeConsensus recvBufs(iter.key()) = sendData; } } + } + + if (!UPstream::is_parallel(comm)) + { + // Nothing left to do return; } + // ------------------------------------------------------------------------ + // Setup sends + // ------------------------------------------------------------------------ + + // TDB: initialBarrier ... + + // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers DynamicList<UPstream::Request> sendRequests(sendBufs.size()); - // Start nonblocking synchronous send to process dest + // Start nonblocking synchronous send to destination ranks forAllConstIters(sendBufs, iter) { const label proci = iter.key(); const auto& sendData = iter.val(); - #ifdef FULLDEBUG - if (proci >= UPstream::nProcs(comm)) - { - FatalErrorInFunction - << "Send buffer:" << proci << " >= numProcs:" - << UPstream::nProcs(comm) - << Foam::abort(FatalError); - } - #endif - - if (sendData.empty()) - { - // Do not send/recv empty data - } - else if (proci == myProci) + if (sendData.empty() || proci < 0 || proci >= numProc) { - // Do myself: insert_or_assign - recvBufs(proci) = sendData; + // Do not send/recv empty data or invalid destinations } - else + else if (proci != myProci) { - // Has data to send. - // The MPI send requests are tracked on a local list UOPstream::write ( sendRequests.emplace_back(), @@ -341,7 +333,15 @@ void exchangeConsensus } + // ------------------------------------------------------------------------ // Probe and receive + // ------------------------------------------------------------------------ + // + // When receiving can use resize() instead of resize_nocopy() since the + // slots were already initially cleared. + // The resize() also works fine with FixedList since it will + // corresponds to a no-op: send and recv sizes will always be + // identical to its fixed size() / max_size() UPstream::Request barrierRequest; @@ -365,12 +365,11 @@ void exchangeConsensus const label count = (probed.second / sizeof(Type)); auto& recvData = recvBufs(proci); - recvData.resize_nocopy(count); + recvData.resize(count); // OK with resize() instead of _nocopy() - // Any non-blocking MPI recv requests are tracked on internal stack UIPstream::read ( - recvCommType, + UPstream::commsTypes::blocking, proci, recvData.data_bytes(), recvData.size_bytes(), @@ -397,12 +396,6 @@ void exchangeConsensus } } } - - // Wait for non-blocking receives to finish - if (wait && recvCommType == UPstream::commsTypes::nonBlocking) - { - UPstream::waitRequests(startOfRequests); - } } } // namespace PstreamDetail @@ -418,7 +411,7 @@ void Foam::Pstream::exchangeConsensus List<Container>& recvBufs, const int tag, const label comm, - const bool wait + const bool /* wait (ignored) */ ) { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); @@ -427,7 +420,7 @@ void Foam::Pstream::exchangeConsensus { FatalErrorInFunction << "Send buffers size:" << sendBufs.size() - << " != numProcs:" << UPstream::nProcs(comm) + << " != numProc:" << UPstream::nProcs(comm) << Foam::abort(FatalError); } @@ -435,14 +428,13 @@ void Foam::Pstream::exchangeConsensus recvBufs.resize_nocopy(sendBufs.size()); labelList dummyRecvSizes; - PstreamDetail::exchangeConsensus + PstreamDetail::exchangeConsensus<Container, Type> ( sendBufs, recvBufs, dummyRecvSizes, tag, - comm, - wait + comm ); } @@ -454,20 +446,45 @@ void Foam::Pstream::exchangeConsensus Map<Container>& recvBufs, const int tag, const label comm, - const bool wait + const bool /* wait (ignored) */ ) { static_assert(is_contiguous<Type>::value, "Contiguous data only!"); - PstreamDetail::exchangeConsensus + PstreamDetail::exchangeConsensus<Container, Type> ( sendBufs, recvBufs, tag, - comm, - wait + comm ); } +template<class Container, class Type> +Foam::Map<Container> +Foam::Pstream::exchangeConsensus +( + const Map<Container>& sendBufs, + const int tag, + const label comm, + const bool /* wait (ignored) */ +) +{ + Map<Container> recvBufs; + + static_assert(is_contiguous<Type>::value, "Contiguous data only!"); + + PstreamDetail::exchangeConsensus<Container, Type> + ( + sendBufs, + recvBufs, + tag, + comm + ); + + return recvBufs; +} + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 00883bd9fa960ab871490129d2875da884095d3e..83b24683fe861ac025e80c4dbab2fce9c5b32309 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -44,6 +44,7 @@ SourceFiles #include "labelList.H" #include "DynamicList.H" #include "HashTable.H" +#include "Map.H" #include "Enum.H" #include "ListOps.H" @@ -55,9 +56,6 @@ namespace Foam //- Implementation details for UPstream/Pstream/MPI etc. namespace PstreamDetail {} -// Forward Declarations -template<class T> class Map; - /*---------------------------------------------------------------------------*\ Class UPstream Declaration \*---------------------------------------------------------------------------*/ @@ -968,119 +966,76 @@ public: //- Shutdown (finalize) MPI as required and exit program with errNo. static void exit(int errNo = 1); - //- Exchange integer data with all processors (in the communicator). - // \c sendData[proci] is the value to send to proci. - // After return recvData contains the data from the other processors. - // \n - // For \b non-parallel : does a simple copy of sendData to recvData - static void allToAll - ( - const UList<int32_t>& sendData, - UList<int32_t>& recvData, - const label communicator = worldComm - ); - - //- Exchange integer data with all processors (in the communicator). - // \c sendData[proci] is the value to send to proci. - // After return recvData contains the data from the other processors. - // \n - // For \b non-parallel : does a simple copy of sendData to recvData - static void allToAll - ( - const UList<int64_t>& sendData, - UList<int64_t>& recvData, - const label communicator = worldComm - ); - - //- Exchange \b non-zero integer data with all ranks in the communicator - //- using non-blocking consensus exchange. - // The \c sendData[proci] is the (non-zero) value to send to proci. - // After return recvData contains the non-zero values sent from the - // other processors. The recvData list is always assigned zero before - // receipt and values of zero are never transmitted. - // After return recvData contains the data from the other processors. - // \n - // For \b non-parallel : does a simple copy of sendData to recvData - // - // \note The message tag should be chosen to be a unique value - // since the implementation uses probing with ANY_SOURCE !! - // An initial barrier may help to avoid synchronisation problems - // caused elsewhere (See "nbx.tuning" opt switch) - static void allToAllConsensus - ( - const UList<int32_t>& sendData, - UList<int32_t>& recvData, - const int tag, - const label communicator = worldComm - ); - //- Exchange \b non-zero integer data with all ranks in the communicator - //- using non-blocking consensus exchange. - // The \c sendData[proci] is the (non-zero) value to send to proci. - // After return recvData contains the non-zero values sent from the - // other processors. The recvData list is always assigned zero before - // receipt and values of zero are never transmitted. - // After return recvData contains the data from the other processors. - // \n - // For \b non-parallel : does a simple copy of sendData to recvData - // - // \note The message tag should be chosen to be a unique value - // since the implementation uses probing with ANY_SOURCE !! - // An initial barrier may help to avoid synchronisation problems - // caused elsewhere (See "nbx.tuning" opt switch) - static void allToAllConsensus - ( - const UList<int64_t>& sendData, - UList<int64_t>& recvData, - const int tag, - const label communicator = worldComm - ); + #undef Pstream_CommonRoutines + #define Pstream_CommonRoutines(Native) \ + \ + /*!\brief Exchange \c Native data with all ranks in communicator */ \ + /*! \em non-parallel : simple copy of \p sendData to \p recvData */ \ + static void allToAll \ + ( \ + /*! [in] The value at [proci] is sent to proci */ \ + const UList<Native>& sendData, \ + /*! [out] The data received from the other ranks */ \ + UList<Native>& recvData, \ + const label communicator = worldComm \ + ); \ + \ + /*!\brief Exchange \em non-zero \c Native data between ranks [NBX] */ \ + /*! \p recvData is always initially assigned zero and no non-zero */ \ + /*! values are sent/received from other ranks. */ \ + /*! \em non-parallel : simple copy of \p sendData to \p recvData */ \ + /*! \note The message tag should be chosen to be a unique value */ \ + /*! since the implementation uses probing with ANY_SOURCE !! */ \ + /*! An initial barrier may help to avoid synchronisation problems */ \ + /*! caused elsewhere (See "nbx.tuning" opt switch) */ \ + static void allToAllConsensus \ + ( \ + /*! [in] The \em non-zero value at [proci] is sent to proci */ \ + const UList<Native>& sendData, \ + /*! [out] The non-zero value received from each rank */ \ + UList<Native>& recvData, \ + /*! Message tag for the communication */ \ + const int tag, \ + const label communicator = worldComm \ + ); \ + \ + /*!\brief Exchange \c Native data between ranks [NBX] */ \ + /*! \p recvData map is always cleared initially so a simple check */ \ + /*! of its keys is sufficient to determine connectivity. */ \ + /*! \em non-parallel : copy own rank (if it exists) */ \ + /*! See notes about message tags and "nbx.tuning" opt switch */ \ + static void allToAllConsensus \ + ( \ + /*! [in] The value at [proci] is sent to proci. */ \ + const Map<Native>& sendData, \ + /*! [out] The values received from given ranks. */ \ + Map<Native>& recvData, \ + /*! Message tag for the communication */ \ + const int tag, \ + const label communicator = worldComm \ + ); \ + \ + /*!\brief Exchange \c Native data between ranks [NBX] */ \ + /*! \returns any received data as a Map */ \ + static Map<Native> allToAllConsensus \ + ( \ + /*! [in] The value at [proci] is sent to proci. */ \ + const Map<Native>& sendData, \ + /*! Message tag for the communication */ \ + const int tag, \ + const label communicator = worldComm \ + ) \ + { \ + Map<Native> recvData; \ + allToAllConsensus(sendData, recvData, tag, communicator); \ + return recvData; \ + } - //- Exchange \b non-zero integer data with all ranks in the communicator - //- using non-blocking consensus exchange. - // The \c sendData[proci] is the (non-zero) value to send to proci. - // After return recvData contains the non-zero values sent from the - // other processors. Since the recvData map always cleared before - // receipt and values of zero are never transmitted, a simple check - // of its keys is sufficient to determine connectivity. - // \n - // For \b non-parallel : copy own rank (if it exists and non-zero) - // from sendData to recvData. - // - // \note The message tag should be chosen to be a unique value - // since the implementation uses probing with ANY_SOURCE !! - // An initial barrier may help to avoid synchronisation problems - // caused elsewhere (See "nbx.tuning" opt switch) - static void allToAllConsensus - ( - const Map<int32_t>& sendData, - Map<int32_t>& recvData, - const int tag, - const label communicator = worldComm - ); + Pstream_CommonRoutines(int32_t); + Pstream_CommonRoutines(int64_t); - //- Exchange \b non-zero integer data with all ranks in the communicator - //- using non-blocking consensus exchange. - // The \c sendData[proci] is the (non-zero) value to send to proci. - // After return recvData contains the non-zero values sent from the - // other processors. Since the recvData map always cleared before - // receipt and values of zero are never transmitted, a simple check - // of its keys is sufficient to determine connectivity. - // \n - // For \b non-parallel : copy own rank (if it exists and non-zero) - // from sendData to recvData. - // - // \note The message tag should be chosen to be a unique value - // since the implementation uses probing with ANY_SOURCE !! - // An initial barrier may help to avoid synchronisation problems - // caused elsewhere (See "nbx.tuning" opt switch) - static void allToAllConsensus - ( - const Map<int64_t>& sendData, - Map<int64_t>& recvData, - const int tag, - const label communicator = worldComm - ); + #undef Pstream_CommonRoutines // Low-level gather/scatter routines @@ -1121,13 +1076,7 @@ public: /*! Number of send/recv data per rank. Globally consistent! */ \ int count, \ const label communicator = worldComm \ - ); - - Pstream_CommonRoutines(char); - - - #undef Pstream_CommonRoutines - #define Pstream_CommonRoutines(Native) \ + ); \ \ /*! \brief Receive variable length \c Native data from all ranks */ \ static void gather \ diff --git a/src/Pstream/dummy/UPstreamGatherScatter.C b/src/Pstream/dummy/UPstreamGatherScatter.C index 2d1f125f3040dfb312774a84242f610514411d6f..4f772b4c1d1d8f865e04713ba999b4616ca3ce21 100644 --- a/src/Pstream/dummy/UPstreamGatherScatter.C +++ b/src/Pstream/dummy/UPstreamGatherScatter.C @@ -63,17 +63,9 @@ void Foam::UPstream::mpiAllGather \ int count, \ const label comm \ ) \ -{} - -Pstream_CommonRoutines(char); - -#undef Pstream_CommonRoutines - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -#undef Pstream_CommonRoutines -#define Pstream_CommonRoutines(Native) \ +{} \ + \ + \ void Foam::UPstream::gather \ ( \ const Native* sendData, \ diff --git a/src/Pstream/mpi/UPstreamGatherScatter.C b/src/Pstream/mpi/UPstreamGatherScatter.C index 4ed86d685a76ecee69c4ad7defec5cff3da678e7..4bd49dcedb505b4867164ea0819173fe41065526 100644 --- a/src/Pstream/mpi/UPstreamGatherScatter.C +++ b/src/Pstream/mpi/UPstreamGatherScatter.C @@ -79,17 +79,8 @@ void Foam::UPstream::mpiAllGather \ allData, count, \ TaggedType, comm \ ); \ -} - -Pstream_CommonRoutines(char, MPI_BYTE); - -#undef Pstream_CommonRoutines - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -#undef Pstream_CommonRoutines -#define Pstream_CommonRoutines(Native, TaggedType) \ +} \ + \ void Foam::UPstream::gather \ ( \ const Native* sendData, \ diff --git a/src/Pstream/mpi/UPstreamWrapping.txx b/src/Pstream/mpi/UPstreamWrapping.txx index 818d5072114b97a8907e6b7bd4d3069933a3e956..db3f095136ae7dcfed011e95ca733b3ae7ef6e72 100644 --- a/src/Pstream/mpi/UPstreamWrapping.txx +++ b/src/Pstream/mpi/UPstreamWrapping.txx @@ -507,7 +507,7 @@ void Foam::PstreamDetail::allToAllConsensus if (!UPstream::is_rank(comm)) { - return; + return; // Process not in communicator } const label myProci = UPstream::myProcNo(comm); @@ -539,11 +539,15 @@ void Foam::PstreamDetail::allToAllConsensus if (!UPstream::is_parallel(comm)) { - // deep copy + // Non-parallel : deep copy recvData.deepCopy(sendData); return; } + // Fake send/recv for myself + { + recvData[myProci] = sendData[myProci]; + } // Implementation description // -------------------------- @@ -562,7 +566,10 @@ void Foam::PstreamDetail::allToAllConsensus // This is because we are dealing with a flat list of entries to // send and not a sparse Map etc. - DynamicList<MPI_Request> sendRequests(sendData.size()); + + // ------------------------------------------------------------------------ + // Setup sends + // ------------------------------------------------------------------------ profilingPstream::beginTiming(); @@ -573,20 +580,16 @@ void Foam::PstreamDetail::allToAllConsensus MPI_Barrier(PstreamGlobals::MPICommunicators_[comm]); } + DynamicList<MPI_Request> sendRequests(sendData.size()); - // Start nonblocking synchronous send to process dest + // Start nonblocking synchronous send to destination rank for (label proci = 0; proci < numProc; ++proci) { if (sendData[proci] == zeroValue) { // Do not send/recv empty data } - else if (proci == myProci) - { - // Do myself - recvData[proci] = sendData[proci]; - } - else + else if (proci != myProci) { // Has data to send @@ -604,7 +607,9 @@ void Foam::PstreamDetail::allToAllConsensus } + // ------------------------------------------------------------------------ // Probe and receive + // ------------------------------------------------------------------------ MPI_Request barrierRequest; @@ -721,22 +726,29 @@ void Foam::PstreamDetail::allToAllConsensus } // Initial: clear out everything - const Type zeroValue = pTraits<Type>::zero; recvBufs.clear(); - if (!UPstream::is_parallel(comm)) + // Fake send/recv for myself - parallel or non-parallel { - // Do myself const auto iter = sendBufs.find(myProci); - if (iter.good() && (iter.val() != zeroValue)) + if (iter.good()) { // Do myself: insert_or_assign recvBufs(iter.key()) = iter.val(); } + } + + if (!UPstream::is_parallel(comm)) + { + // Nothing left to do return; } + // ------------------------------------------------------------------------ + // Setup sends + // ------------------------------------------------------------------------ + // Algorithm NBX: Nonblocking consensus // Implementation like above, but sending map data. @@ -752,7 +764,7 @@ void Foam::PstreamDetail::allToAllConsensus } - // Start nonblocking synchronous send to process dest + // Start nonblocking synchronous send to destination ranks // Same as forAllConstIters() const auto endIter = sendBufs.cend(); @@ -761,19 +773,8 @@ void Foam::PstreamDetail::allToAllConsensus const label proci = iter.key(); const auto& sendData = iter.val(); - if (sendData == zeroValue) - { - // Do not send/recv empty/zero data - } - else if (proci == myProci) + if (proci != myProci && proci >= 0 && proci < numProc) { - // Do myself: insert_or_assign - recvBufs(proci) = sendData; - } - else - { - // Has data to send - MPI_Issend ( &sendData, @@ -788,7 +789,9 @@ void Foam::PstreamDetail::allToAllConsensus } + // ------------------------------------------------------------------------ // Probe and receive + // ------------------------------------------------------------------------ MPI_Request barrierRequest;