diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 6ae66c7c0a4dc54689603b49662a6896026a4865..e811fce1b033f7fd9ccabc3c8866aea29762a660 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2021 OpenCFD Ltd. + Copyright (C) 2016-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -39,8 +39,8 @@ SourceFiles \*---------------------------------------------------------------------------*/ -#ifndef Pstream_H -#define Pstream_H +#ifndef Foam_Pstream_H +#define Foam_Pstream_H #include "UPstream.H" #include "DynamicList.H" @@ -340,17 +340,17 @@ public: // Exchange - //- Helper: exchange contiguous data. Sends sendData, receives into - // recvData. If block=true will wait for all transfers to finish. - template<class Container, class T> - static void exchange + //- Helper: exchange sizes of sendData for specified + //- set of send/receive processes. + template<class Container> + static void exchangeSizes ( - const UList<Container>& sendData, - const labelUList& recvSizes, - List<Container>& recvData, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool block = true + const labelUList& sendProcs, + const labelUList& recvProcs, + const Container& sendData, + labelList& sizes, + const label tag = UPstream::msgType(), + const label comm = UPstream::worldComm ); //- Helper: exchange sizes of sendData. sendData is the data per @@ -364,6 +364,20 @@ public: const label comm = UPstream::worldComm ); + + //- Helper: exchange contiguous data. Sends sendData, receives into + // recvData. If block=true will wait for all transfers to finish. + template<class Container, class T> + static void exchange + ( + const UList<Container>& sendData, + const labelUList& recvSizes, + List<Container>& recvData, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool block = true + ); + //- Exchange contiguous data. Sends sendData, receives into // recvData. Determines sizes to receive. // If block=true will wait for all transfers to finish. diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 85dc6fad6e13132dc4db4fee623fd710f1fc4096..e2f73c5e035f94ba69f4388a043b8bae27d80857 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -57,6 +57,42 @@ void Foam::PstreamBuffers::finalExchange } +void Foam::PstreamBuffers::finalExchange +( + const labelUList& sendProcs, + const labelUList& recvProcs, + labelList& recvSizes, + const bool block +) +{ + // Could also check that it is not called twice + finishedSendsCalled_ = true; + + if (commsType_ == UPstream::commsTypes::nonBlocking) + { + Pstream::exchangeSizes + ( + sendProcs, + recvProcs, + sendBuf_, + recvSizes, + tag_, + comm_ + ); + + Pstream::exchange<DynamicList<char>, char> + ( + sendBuf_, + recvSizes, + recvBuf_, + tag_, + comm_, + block + ); + } +} + + // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // Foam::PstreamBuffers::PstreamBuffers @@ -144,4 +180,40 @@ void Foam::PstreamBuffers::finishedSends } +void Foam::PstreamBuffers::finishedSends +( + const labelUList& sendProcs, + const labelUList& recvProcs, + const bool block +) +{ + labelList recvSizes; + finalExchange(sendProcs, recvProcs, recvSizes, block); +} + + +void Foam::PstreamBuffers::finishedSends +( + const labelUList& sendProcs, + const labelUList& recvProcs, + labelList& recvSizes, + const bool block +) +{ + finalExchange(sendProcs, recvProcs, recvSizes, block); + + if (commsType_ != UPstream::commsTypes::nonBlocking) + { + FatalErrorInFunction + << "Obtaining sizes not supported in " + << UPstream::commsTypeNames[commsType_] << endl + << " since transfers already in progress. Use non-blocking instead." + << exit(FatalError); + + // Note: maybe possible only if using different tag from write started + // by ~UOPstream. Needs some work. + } +} + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 5f178e000ea4148cbda7279a78d33b35cc637b46..193567813bc1da3706d0b9d22c1165d288f719fc 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -126,6 +126,16 @@ class PstreamBuffers // This will start receives in non-blocking mode. void finalExchange(labelList& recvSizes, const bool block); + //- Mark all sends as having been done. + // Only exchange sizes between sendProcs/recvProcs + void finalExchange + ( + const labelUList& sendProcs, + const labelUList& recvProcs, + labelList& recvSizes, + const bool block + ); + public: @@ -199,6 +209,28 @@ public: // Same as above but also returns sizes (bytes) received. // \note currently only valid for non-blocking. void finishedSends(labelList& recvSizes, const bool block = true); + + //- Mark all sends as having been done, with limited + //- send/recv processor connections. + // \note currently only valid for non-blocking. + void finishedSends + ( + const labelUList& sendProcs, + const labelUList& recvProcs, + const bool block = true + ); + + //- Mark all sends as having been done, with limited + //- send/recv processor connections. + // Same as above but also returns sizes (bytes) received. + // \note currently only valid for non-blocking. + void finishedSends + ( + const labelUList& sendProcs, + const labelUList& recvProcs, + labelList& recvSizes, + const bool block = true + ); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C index 8d6352375deb3fb2bbdee61844424c38dfed97a0..feeb310abd1fb8ab4331e708028faf377139238c 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2021 OpenCFD Ltd. + Copyright (C) 2016-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -209,7 +209,7 @@ void Foam::Pstream::exchange << Foam::abort(FatalError); } - recvBufs.setSize(sendBufs.size()); + recvBufs.resize_nocopy(sendBufs.size()); if (UPstream::parRun() && UPstream::nProcs(comm) > 1) { @@ -220,7 +220,7 @@ void Foam::Pstream::exchange if (proci != Pstream::myProcNo(comm) && nRecv > 0) { - recvBufs[proci].setSize(nRecv); + recvBufs[proci].resize_nocopy(nRecv); } } @@ -344,6 +344,90 @@ void Foam::Pstream::exchange } +template<class Container> +void Foam::Pstream::exchangeSizes +( + const labelUList& sendProcs, + const labelUList& recvProcs, + const Container& sendBufs, + labelList& recvSizes, + const label tag, + const label comm +) +{ + if (sendBufs.size() != UPstream::nProcs(comm)) + { + FatalErrorInFunction + << "Size of container " << sendBufs.size() + << " does not equal the number of processors " + << UPstream::nProcs(comm) + << Foam::abort(FatalError); + } + + labelList sendSizes(sendProcs.size()); + forAll(sendProcs, i) + { + sendSizes[i] = sendBufs[sendProcs[i]].size(); + } + + recvSizes.resize_nocopy(sendBufs.size()); + recvSizes = 0; // Ensure non-received entries are properly zeroed + + const label startOfRequests = UPstream::nRequests(); + + for (const label proci : recvProcs) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + reinterpret_cast<char*>(&recvSizes[proci]), + sizeof(label), + tag, + comm + ); + } + + forAll(sendProcs, i) + { + UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + sendProcs[i], + reinterpret_cast<char*>(&sendSizes[i]), + sizeof(label), + tag, + comm + ); + } + + UPstream::waitRequests(startOfRequests); +} + + +/// FUTURE? +/// +/// template<class Container> +/// void Foam::Pstream::exchangeSizes +/// ( +/// const labelUList& sendRecvProcs, +/// const Container& sendBufs, +/// labelList& recvSizes, +/// const label tag, +/// const label comm +/// ) +/// { +/// exchangeSizes<Container> +/// ( +/// sendRecvProcs, +/// sendRecvProcs, +/// sendBufs, +/// tag, +/// comm +/// ); +/// } + + template<class Container> void Foam::Pstream::exchangeSizes ( @@ -366,8 +450,8 @@ void Foam::Pstream::exchangeSizes { sendSizes[proci] = sendBufs[proci].size(); } - recvSizes.setSize(sendSizes.size()); - allToAll(sendSizes, recvSizes, comm); + recvSizes.resize_nocopy(sendSizes.size()); + UPstream::allToAll(sendSizes, recvSizes, comm); }