diff --git a/applications/test/parallel-broadcast/Test-parallel-broadcast.C b/applications/test/parallel-broadcast/Test-parallel-broadcast.C index 44948fc1f28a62e99efc07e60ba69c4fe19309a5..38a68382e35192b05f7683902aefa9755e5f798f 100644 --- a/applications/test/parallel-broadcast/Test-parallel-broadcast.C +++ b/applications/test/parallel-broadcast/Test-parallel-broadcast.C @@ -118,7 +118,7 @@ int main(int argc, char *argv[]) value = args.executable(); } printPre(value); - UPstream::broadcast(value); // Low-level UPstream broadcast + Pstream::broadcast(value); // Streamed broadcast printPost(value); } diff --git a/applications/utilities/mesh/manipulation/orientFaceZone/orientFaceZone.C b/applications/utilities/mesh/manipulation/orientFaceZone/orientFaceZone.C index 4b150e8f4718d5a8583fd07e0ddfc5657d9ef80b..d98c4f82e29bcef62c312b2c54cde326490a2518 100644 --- a/applications/utilities/mesh/manipulation/orientFaceZone/orientFaceZone.C +++ b/applications/utilities/mesh/manipulation/orientFaceZone/orientFaceZone.C @@ -211,8 +211,8 @@ int main(int argc, char *argv[]) label proci = globalFaces.whichProcID(unsetFacei); label seedFacei = globalFaces.toLocal(proci, unsetFacei); - Info<< "Seeding from processor " << proci << " face " << seedFacei - << endl; + Info<< "Seeding from processor " << proci + << " face " << seedFacei << endl; if (proci == Pstream::myProcNo()) { diff --git a/applications/utilities/mesh/manipulation/splitMeshRegions/splitMeshRegions.C b/applications/utilities/mesh/manipulation/splitMeshRegions/splitMeshRegions.C index e81301ab510e9e93924b85e2af17a2bc2e565df0..fd68ceb34ada05569736dc0498869af570ad126d 100644 --- a/applications/utilities/mesh/manipulation/splitMeshRegions/splitMeshRegions.C +++ b/applications/utilities/mesh/manipulation/splitMeshRegions/splitMeshRegions.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2015-2021 OpenCFD Ltd. + Copyright (C) 2015-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -516,12 +516,15 @@ void getInterfaceSizes } - // Now all processor have consistent interface information - - Pstream::scatter(interfaces); - Pstream::scatter(interfaceNames); - Pstream::scatter(interfaceSizes); - Pstream::scatter(regionsToInterface); + // Consistent interface information for all processors + Pstream::broadcasts + ( + UPstream::worldComm, + interfaces, + interfaceNames, + interfaceSizes, + regionsToInterface + ); // Mark all inter-region faces. faceToInterface.setSize(mesh.nFaces(), -1); diff --git a/applications/utilities/miscellaneous/foamRestoreFields/foamRestoreFields.C b/applications/utilities/miscellaneous/foamRestoreFields/foamRestoreFields.C index 48254f28b6d62ac84c6aeace1b15ff21932d6f5f..9458b3c833766da22e3ed73f88e07d3d2d6f4e7c 100644 --- a/applications/utilities/miscellaneous/foamRestoreFields/foamRestoreFields.C +++ b/applications/utilities/miscellaneous/foamRestoreFields/foamRestoreFields.C @@ -441,7 +441,7 @@ int main(int argc, char *argv[]) { files = getFiles(args.path(), timeName); } - Pstream::scatter(files); + Pstream::broadcast(files); count += restoreFields ( diff --git a/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C b/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C index b5dbf5521576d6c81005dcac97b41ba1ad3be8d6..d0be2c50bb67ed35434d0b5a08228c2984fbdc03 100644 --- a/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C +++ b/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C @@ -208,9 +208,9 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions for (const int proci : pBufs.allProcs()) { //Pout<< "Receive from processor" << proci << " : " - // << pBufs.hasRecvData(proci) << endl; + // << pBufs.recvDataCount(proci) << endl; - if (pBufs.hasRecvData(proci)) + if (pBufs.recvDataCount(proci)) { UIPstream particleStream(proci, pBufs); diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index be7b9dbee8b93485908fad8c93ccb235ffc8f60f..31003c82934b616b1f2f3e61f7f23c25090d189b 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -370,12 +370,7 @@ bool Foam::decomposedBlockData::readBlocks } else { - PstreamBuffers pBufs - ( - UPstream::commsTypes::nonBlocking, - UPstream::msgType(), - comm - ); + PstreamBuffers pBufs(comm, UPstream::commsTypes::nonBlocking); if (UPstream::master(comm)) { @@ -497,12 +492,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks } else { - PstreamBuffers pBufs - ( - UPstream::commsTypes::nonBlocking, - UPstream::msgType(), - comm - ); + PstreamBuffers pBufs(comm, UPstream::commsTypes::nonBlocking); if (UPstream::master(comm)) { @@ -537,13 +527,15 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks Pstream::broadcast(ok, comm); - //- Set stream properties from realIsPtr on master + // Broadcast master header info, + // set stream properties from realIsPtr on master - // Scatter master header info int verValue; int fmtValue; unsigned labelWidth; unsigned scalarWidth; + word headerName(headerIO.name()); + if (UPstream::master(comm)) { verValue = realIsPtr().version().canonical(); @@ -551,23 +543,27 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks labelWidth = realIsPtr().labelByteSize(); scalarWidth = realIsPtr().scalarByteSize(); } - Pstream::scatter(verValue); //, Pstream::msgType(), comm); - Pstream::scatter(fmtValue); //, Pstream::msgType(), comm); - Pstream::scatter(labelWidth); //, Pstream::msgType(), comm); - Pstream::scatter(scalarWidth); //, Pstream::msgType(), comm); + + Pstream::broadcasts + ( + UPstream::worldComm, // Future? comm, + verValue, + fmtValue, + labelWidth, + scalarWidth, + headerName, + headerIO.headerClassName(), + headerIO.note() + // Unneeded: headerIO.instance() + // Unneeded: headerIO.local() + ); realIsPtr().version(IOstreamOption::versionNumber::canonical(verValue)); realIsPtr().format(IOstreamOption::streamFormat(fmtValue)); realIsPtr().setLabelByteSize(labelWidth); realIsPtr().setScalarByteSize(scalarWidth); - word name(headerIO.name()); - Pstream::scatter(name, Pstream::msgType(), comm); - headerIO.rename(name); - Pstream::scatter(headerIO.headerClassName(), Pstream::msgType(), comm); - Pstream::scatter(headerIO.note(), Pstream::msgType(), comm); - //Pstream::scatter(headerIO.instance(), Pstream::msgType(), comm); - //Pstream::scatter(headerIO.local(), Pstream::msgType(), comm); + headerIO.rename(headerName); return realIsPtr; } @@ -944,6 +940,8 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const int verValue; int fmtValue; + // Unneeded: word masterName(name()); + fileName masterLocation(instance()/db().dbDir()/local()); // Re-read my own data to find out the header information if (Pstream::master(comm_)) @@ -955,24 +953,23 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const fmtValue = static_cast<int>(headerStream.format()); } - // Scatter header information - Pstream::scatter(verValue, Pstream::msgType(), comm_); - Pstream::scatter(fmtValue, Pstream::msgType(), comm_); + // Broadcast header information + Pstream::broadcasts + ( + comm_, + verValue, + fmtValue, + // Unneeded: masterName + io.headerClassName(), + io.note(), + // Unneeded: io.instance() + // Unneeded: io.local() + masterLocation + ); streamOpt.version(IOstreamOption::versionNumber::canonical(verValue)); streamOpt.format(IOstreamOption::streamFormat(fmtValue)); - //word masterName(name()); - //Pstream::scatter(masterName, Pstream::msgType(), comm_); - - Pstream::scatter(io.headerClassName(), Pstream::msgType(), comm_); - Pstream::scatter(io.note(), Pstream::msgType(), comm_); - //Pstream::scatter(io.instance(), Pstream::msgType(), comm); - //Pstream::scatter(io.local(), Pstream::msgType(), comm); - - fileName masterLocation(instance()/db().dbDir()/local()); - Pstream::scatter(masterLocation, Pstream::msgType(), comm_); - if (!Pstream::master(comm_)) { decomposedBlockData::writeHeader diff --git a/src/OpenFOAM/db/IOstreams/IOstreams/IOstream.H b/src/OpenFOAM/db/IOstreams/IOstreams/IOstream.H index 415fd7d8625b202c7db9a5f1bfd63d40d22872eb..7226f7bc6007d409bd0e46b0564578991c059b55 100644 --- a/src/OpenFOAM/db/IOstreams/IOstreams/IOstream.H +++ b/src/OpenFOAM/db/IOstreams/IOstreams/IOstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2015 OpenFOAM Foundation - Copyright (C) 2018-2021 OpenCFD Ltd. + Copyright (C) 2018-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -42,8 +42,8 @@ SourceFiles \*---------------------------------------------------------------------------*/ -#ifndef IOstream_H -#define IOstream_H +#ifndef Foam_IOstream_H +#define Foam_IOstream_H #include "char.H" #include "bool.H" @@ -469,6 +469,36 @@ inline IOstream& scientific(IOstream& io) } +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +namespace Detail +{ + +//- Termination for input looping (no-op) +template<class IS> inline void inputLoop(IS&) {} + +//- Termination for output looping (no-op) +template<class OS> inline void outputLoop(OS&) {} + +//- Input looping. Read into first parameter and recurse. +template<class IS, class Type, class... Args> +inline void inputLoop(IS& is, Type& arg1, Args&&... args) +{ + is >> arg1; + Detail::inputLoop(is, std::forward<Args>(args)...); +} + +//- Output looping. Write first parameter and recurse. +template<class OS, class Type, class... Args> +inline void outputLoop(OS& os, const Type& arg1, Args&&... args) +{ + os << arg1; + Detail::outputLoop(os, std::forward<Args>(args)...); +} + +} // End namespace Detail + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/IOstreams/Istream.H b/src/OpenFOAM/db/IOstreams/IOstreams/Istream.H index 1d68414d7cc0cef70ae915b5e4bd45ce812718d6..d14ab64fb37453efc6be2398cfff9c2125302bdc 100644 --- a/src/OpenFOAM/db/IOstreams/IOstreams/Istream.H +++ b/src/OpenFOAM/db/IOstreams/IOstreams/Istream.H @@ -43,8 +43,8 @@ SourceFiles \*---------------------------------------------------------------------------*/ -#ifndef Istream_H -#define Istream_H +#ifndef Foam_Istream_H +#define Foam_Istream_H #include "IOstream.H" #include "token.H" diff --git a/src/OpenFOAM/db/IOstreams/IOstreams/Ostream.H b/src/OpenFOAM/db/IOstreams/IOstreams/Ostream.H index 6f0b87e6271e3e855672c2e34258d2b90f119730..7f76df67b311b1a2270adf3d6da7999d5313b58b 100644 --- a/src/OpenFOAM/db/IOstreams/IOstreams/Ostream.H +++ b/src/OpenFOAM/db/IOstreams/IOstreams/Ostream.H @@ -36,8 +36,8 @@ SourceFiles \*---------------------------------------------------------------------------*/ -#ifndef Ostream_H -#define Ostream_H +#ifndef Foam_Ostream_H +#define Foam_Ostream_H #include "IOstream.H" #include "keyType.H" @@ -50,6 +50,9 @@ namespace Foam // Forward Declarations class token; +constexpr char tab = '\t'; //!< The tab \c '\\t' character(0x09) +constexpr char nl = '\n'; //!< The newline \c '\\n' character (0x0a) + /*---------------------------------------------------------------------------*\ Class Ostream Declaration \*---------------------------------------------------------------------------*/ @@ -399,12 +402,6 @@ inline Ostream& endEntry(Ostream& os) return os; } - -// Useful aliases for tab and newline characters -constexpr char tab = '\t'; -constexpr char nl = '\n'; - - // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.C index 0042dee05a5a3c9c4a2497e60a6b35805a28197f..001766d8eb8f8e762c720c4d70e21a0029c0d1d8 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.C @@ -6,7 +6,6 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011 OpenFOAM Foundation - Copyright (C) 2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -27,7 +26,6 @@ License \*---------------------------------------------------------------------------*/ #include "Pstream.H" -#include "bitSet.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -37,40 +35,4 @@ namespace Foam } -// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // - -void Foam::Pstream::broadcast -( - bitSet& values, - const label comm -) -{ - if (UPstream::parRun() && UPstream::nProcs(comm) > 1) - { - // Broadcast the size - label len(values.size()); - UPstream::broadcast - ( - reinterpret_cast<char*>(&len), - sizeof(label), - comm, - UPstream::masterNo() - ); - - values.resize_nocopy(len); // A no-op on master - - if (len) - { - UPstream::broadcast - ( - values.data_bytes(), - values.size_bytes(), - comm, - UPstream::masterNo() - ); - } - } -} - - // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index d7da7aec0e5d789b05f175e815429134f3e4c019..b0ab673da512e7e15c3ba90a36fc487ff2766b19 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -54,9 +54,6 @@ SourceFiles namespace Foam { -// Forward Declarations -class bitSet; - /*---------------------------------------------------------------------------*\ Class Pstream Declaration \*---------------------------------------------------------------------------*/ @@ -131,61 +128,21 @@ public: // Broadcast - //- Broadcast buffer or string content to all processes in communicator + //- Broadcast buffer content to all processes in communicator. using UPstream::broadcast; - //- Generic broadcast using streams to serialize/de-serialize. - // Not normally used directly. - template<class T> - static void genericBroadcast - ( - T& value, - const label comm = UPstream::worldComm - ); - - //- Generic broadcast multiple values (contiguous or non-contiguous) + //- Broadcast content (contiguous or non-contiguous) //- to all processes in communicator. - template<class ListType> - static void genericListBroadcast - ( - ListType& values, - const label comm = UPstream::worldComm - ); - - //- Broadcast value (contiguous or non-contiguous) - //- to all processes in communicator. - template<class T> + template<class Type> static void broadcast ( - T& value, + Type& value, const label comm = UPstream::worldComm ); - //- Broadcast multiple values (contiguous or non-contiguous) - //- to all processes in communicator. - template<class T> - static void broadcast - ( - List<T>& values, - const label comm = UPstream::worldComm - ); - - //- Broadcast multiple values (contiguous or non-contiguous) - //- to all processes in communicator. - template<class T, int SizeMin> - static void broadcast - ( - DynamicList<T, SizeMin>& values, - const label comm = UPstream::worldComm - ); - - //- Broadcast bitSet values - //- to all processes in communicator. - static void broadcast - ( - bitSet& values, - const label comm = UPstream::worldComm - ); + //- Broadcast multiple items to all processes in communicator. + template<class Type, class... Args> + static void broadcasts(const label comm, Type& arg1, Args&&... args); // Gather @@ -542,9 +499,9 @@ public: const label comm = UPstream::worldComm ); - //- Helper: exchange sizes of sendData. sendData is the data per - // processor (in the communicator). Returns sizes of sendData - // on the sending processor. + //- Helper: exchange sizes of sendData. + //- The sendData is the data per processor (in the communicator). + // Returns sizes of sendData on the sending processor. template<class Container> static void exchangeSizes ( @@ -554,8 +511,9 @@ public: ); - //- Helper: exchange contiguous data. Sends sendData, receives into - // recvData. If block=true will wait for all transfers to finish. + //- Helper: exchange contiguous data. + //- Sends sendData, receives into recvData. + // If wait=true will wait for all transfers to finish. template<class Container, class T> static void exchange ( @@ -567,9 +525,10 @@ public: const bool wait = true //!< Wait for requests to complete ); - //- Exchange contiguous data. Sends sendData, receives into - // recvData. Determines sizes to receive. - // If block=true will wait for all transfers to finish. + //- Exchange contiguous data. + //- Sends sendData, receives into recvData. + //- Determines sizes to receive. + // If wait=true will wait for all transfers to finish. template<class Container, class T> static void exchange ( diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C index f4a92b88bc00242dd3ba77d34df0afab4edf5a7d..5532260412bad3a0d77e415bf4a1cf82bcd7551a 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C @@ -31,92 +31,56 @@ License // * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // -template<class T> -void Foam::Pstream::genericBroadcast(T& value, const label comm) +template<class Type> +void Foam::Pstream::broadcast(Type& value, const label comm) { - // Generic: use stream interface if (UPstream::parRun() && UPstream::nProcs(comm) > 1) { - if (UPstream::master(comm)) - { - OPBstream toAll(UPstream::masterNo(), comm); - toAll << value; - } - else - { - IPBstream fromMaster(UPstream::masterNo(), comm); - fromMaster >> value; - } - } -} - - -template<class ListType> -void Foam::Pstream::genericListBroadcast(ListType& values, const label comm) -{ - if (!is_contiguous<typename ListType::value_type>::value) - { - Pstream::genericBroadcast(values, comm); - } - else if (UPstream::parRun() && UPstream::nProcs(comm) > 1) - { - // Broadcast the size - label len(values.size()); - UPstream::broadcast - ( - reinterpret_cast<char*>(&len), - sizeof(label), - comm, - UPstream::masterNo() - ); - values.resize_nocopy(len); // A no-op on master - - if (len) + if (is_contiguous<Type>::value) { + // Note: contains parallel guard internally as well UPstream::broadcast ( - values.data_bytes(), - values.size_bytes(), + reinterpret_cast<char*>(&value), + sizeof(Type), comm, UPstream::masterNo() ); } + else + { + if (UPstream::master(comm)) + { + OPBstream os(UPstream::masterNo(), comm); + os << value; + } + else + { + IPBstream is(UPstream::masterNo(), comm); + is >> value; + } + } } } -template<class T> -void Foam::Pstream::broadcast(T& value, const label comm) +template<class Type, class... Args> +void Foam::Pstream::broadcasts(const label comm, Type& arg1, Args&&... args) { - if (!is_contiguous<T>::value) - { - Pstream::genericBroadcast(value, comm); - } - else if (UPstream::parRun() && UPstream::nProcs(comm) > 1) + if (UPstream::parRun() && UPstream::nProcs(comm) > 1) { - UPstream::broadcast - ( - reinterpret_cast<char*>(&value), - sizeof(T), - comm, - UPstream::masterNo() - ); + if (UPstream::master(comm)) + { + OPBstream os(UPstream::masterNo(), comm); + Detail::outputLoop(os, arg1, std::forward<Args>(args)...); + } + else + { + IPBstream is(UPstream::masterNo(), comm); + Detail::inputLoop(is, arg1, std::forward<Args>(args)...); + } } } -template<class T> -void Foam::Pstream::broadcast(List<T>& values, const label comm) -{ - Pstream::genericListBroadcast(values, comm); -} - - -template<class T, int SizeMin> -void Foam::Pstream::broadcast(DynamicList<T, SizeMin>& values, const label comm) -{ - Pstream::genericListBroadcast(values, comm); -} - - // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 55d892b9c4f4e2e2018554e3def539c0e9b8f6ba..db03e95838424f6d634de121b8cd555a09773b80 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -172,9 +172,29 @@ Foam::PstreamBuffers::PstreamBuffers commsType_(commsType), tag_(tag), comm_(comm), - sendBuf_(UPstream::nProcs(comm)), - recvBuf_(UPstream::nProcs(comm)), - recvBufPos_(UPstream::nProcs(comm), Zero) + sendBuf_(UPstream::nProcs(comm_)), + recvBuf_(UPstream::nProcs(comm_)), + recvBufPos_(UPstream::nProcs(comm_), Zero) +{} + + +Foam::PstreamBuffers::PstreamBuffers +( + const label comm, + const UPstream::commsTypes commsType, + const int tag, + IOstreamOption::streamFormat fmt +) +: + finishedSendsCalled_(false), + allowClearRecv_(true), + format_(fmt), + commsType_(commsType), + tag_(tag), + comm_(comm), + sendBuf_(UPstream::nProcs(comm_)), + recvBuf_(UPstream::nProcs(comm_)), + recvBufPos_(UPstream::nProcs(comm_), Zero) {} @@ -215,6 +235,13 @@ void Foam::PstreamBuffers::clear() } +void Foam::PstreamBuffers::clearRecv(const label proci) +{ + recvBuf_[proci].clear(); + recvBufPos_[proci] = 0; +} + + void Foam::PstreamBuffers::clearStorage() { // Could also clear out entire sendBuf_, recvBuf_ and reallocate. @@ -246,21 +273,72 @@ bool Foam::PstreamBuffers::hasSendData() const } -bool Foam::PstreamBuffers::hasSendData(const label proci) const +bool Foam::PstreamBuffers::hasRecvData() const +{ + if (finishedSendsCalled_) + { + forAll(recvBufPos_, proci) + { + if (recvBuf_[proci].size() > recvBufPos_[proci]) + { + return true; + } + } + } + #ifdef FULLDEBUG + else + { + FatalErrorInFunction + << "Call finishedSends first" << exit(FatalError); + } + #endif + + return false; +} + + +Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const { - return !sendBuf_[proci].empty(); + return sendBuf_[proci].size(); } -bool Foam::PstreamBuffers::hasRecvData() const +Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const +{ + if (finishedSendsCalled_) + { + const label len(recvBuf_[proci].size() > recvBufPos_[proci]); + + if (len > 0) + { + return len; + } + } + #ifdef FULLDEBUG + else + { + FatalErrorInFunction + << "Call finishedSends first" << exit(FatalError); + } + #endif + + return 0; +} + + +Foam::labelList Foam::PstreamBuffers::recvDataCounts() const { + labelList counts(recvBuf_.size(), Zero); + if (finishedSendsCalled_) { - for (const DynamicList<char>& buf : recvBuf_) + forAll(recvBufPos_, proci) { - if (!buf.empty()) + const label len(recvBuf_[proci].size() - recvBufPos_[proci]); + + if (len > 0) { - return true; + counts[proci] = len; } } } @@ -272,15 +350,25 @@ bool Foam::PstreamBuffers::hasRecvData() const } #endif - return false; + return counts; } -bool Foam::PstreamBuffers::hasRecvData(const label proci) const +const Foam::UList<char> +Foam::PstreamBuffers::peekRecvData(const label proci) const { if (finishedSendsCalled_) { - return !recvBuf_[proci].empty(); + const label len(recvBuf_[proci].size() - recvBufPos_[proci]); + + if (len > 0) + { + return UList<char> + ( + const_cast<char*>(&recvBuf_[proci][recvBufPos_[proci]]), + len + ); + } } #ifdef FULLDEBUG else @@ -290,7 +378,7 @@ bool Foam::PstreamBuffers::hasRecvData(const label proci) const } #endif - return false; + return UList<char>(); } @@ -386,7 +474,7 @@ bool Foam::PstreamBuffers::finishedSends // - reasonable to assume there are no self-sends on UPstream::myProcNo forAll(sendBuf_, proci) { - // ie, hasSendData(proci) + // ie, sendDataCount(proci) != 0 if (sendConnections.set(proci, !sendBuf_[proci].empty())) { // The state changed @@ -404,7 +492,7 @@ bool Foam::PstreamBuffers::finishedSends sendProcs.clear(); forAll(sendBuf_, proci) { - // ie, hasSendData(proci) + // ie, sendDataCount(proci) != 0 if (!sendBuf_[proci].empty()) { sendProcs.append(proci); @@ -417,7 +505,7 @@ bool Foam::PstreamBuffers::finishedSends recvProcs.clear(); forAll(recvBuf_, proci) { - // ie, hasRecvData(proci) + // ie, recvDataCount(proci) if (!recvBuf_[proci].empty()) { recvProcs.append(proci); @@ -470,12 +558,34 @@ void Foam::PstreamBuffers::finishedGathers // For nonBlocking mode, simply recover received sizes // from the buffers themselves. - recvSizes.resize_nocopy(recvBuf_.size()); + recvSizes = recvDataCounts(); +} + + +void Foam::PstreamBuffers::finishedScatters +( + labelList& recvSizes, + const bool wait +) +{ + finalExchangeGatherScatter(false, wait); - forAll(recvBuf_, proci) + if (commsType_ != UPstream::commsTypes::nonBlocking) { - recvSizes[proci] = recvBuf_[proci].size(); + FatalErrorInFunction + << "Obtaining sizes not supported in " + << UPstream::commsTypeNames[commsType_] << endl + << " since transfers already in progress. Use non-blocking instead." + << exit(FatalError); + + // Note: maybe possible only if using different tag from write started + // by ~UOPstream. Needs some work. } + + // For nonBlocking mode, simply recover received sizes + // from the buffers themselves. + + recvSizes = recvDataCounts(); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 75209b559dcf44811867421260abb105f2d0ddf6..44bfea19c64298ef8742f4a744dcb29826edadc5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -177,7 +177,7 @@ public: // Constructors - //- Construct given comms type, communication options, IO format + //- Construct given comms type, message tag, communicator, IO format explicit PstreamBuffers ( const UPstream::commsTypes commsType, @@ -186,6 +186,15 @@ public: IOstreamOption::streamFormat fmt = IOstreamOption::BINARY ); + //- Construct given communicator, comms type, message tag, IO format + explicit PstreamBuffers + ( + const label comm, + const UPstream::commsTypes commsType, + const int tag = UPstream::msgType(), + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + //- Destructor - checks that all data have been consumed ~PstreamBuffers(); @@ -251,20 +260,6 @@ public: return finishedSendsCalled_; } - //- True if any (local) send buffers have data - bool hasSendData() const; - - //- True if (local) send buffer has data on specified processor. - bool hasSendData(const label proci) const; - - //- True if any (local) recv buffers have data. - //- Must call finishedSends() or finishedNeighbourSends() first! - bool hasRecvData() const; - - //- True if (local) recv buffer has data on specified processor. - //- Must call finishedSends() or finishedNeighbourSends() first! - bool hasRecvData(const label proci) const; - //- Is clearStorage of individual receive buffer by external hooks //- allowed? (default: true) bool allowClearRecv() const noexcept @@ -272,13 +267,42 @@ public: return allowClearRecv_; } + //- True if any (local) send buffers have data + bool hasSendData() const; + + //- True if any (local) recv buffers have unconsumed data. + //- Must call finishedSends() or other finished.. method first! + bool hasRecvData() const; + + //- Number of send bytes for the specified processor. + label sendDataCount(const label proci) const; + + //- Number of unconsumed receive bytes for the specified processor. + //- Must call finishedSends() or other finished.. method first! + label recvDataCount(const label proci) const; + + //- Number of unconsumed receive bytes for all processors. + //- Must call finishedSends() or other finished.. method first! + labelList recvDataCounts() const; + + //- Number of unconsumed receive bytes for the specified processor. + //- Must call finishedSends() or other finished.. method first! + // The method is only useful in limited situations, such as when + // PstreamBuffers has been used to fill contiguous data + // (eg, using OPstream::write). + const UList<char> peekRecvData(const label proci) const; + // Edit //- Clear individual buffers and reset states. - // Does not clear individual buffer storage. + // Does not remove the buffer storage. void clear(); + //- Clear an individual receive buffer (eg, data not required) + // Does not remove the buffer storage. + void clearRecv(const label proci); + //- Clear individual buffer storage and reset states. void clearStorage(); @@ -406,6 +430,7 @@ public: //- Mark all sends to master as done. // // Non-blocking mode: populates receive buffers. + // Can use recvDataCounts() method to recover sizes received. // // \param wait wait for requests to complete (in nonBlocking mode) // @@ -425,11 +450,22 @@ public: //- Mark all sends to sub-procs as done. // // Non-blocking mode: populates receive buffers. + // Can use recvDataCounts() method to recover sizes received. // // \param wait wait for requests to complete (in nonBlocking mode) // // \warning currently only valid for nonBlocking comms. void finishedScatters(const bool wait = true); + + //- Mark all sends to sub-procs as done. + //- Recovers the sizes (bytes) received. + // + // Non-blocking mode: populates receive buffers (all-to-one). + // \param[out] recvSizes the sizes (bytes) received + // \param wait wait for requests to complete (in nonBlocking mode) + // + // \warning currently only valid for nonBlocking comms. + void finishedScatters(labelList& recvSizes, const bool wait = true); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 7ffb09e3d147df9fadef01c7b2ec6f8677f2e757..26c592c6f55e1453c0aae88fd237cbb6f3079637 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -46,7 +46,7 @@ void Foam::Pstream::exchangeContainer const bool wait ) { - const label startOfRequests = Pstream::nRequests(); + const label startOfRequests = UPstream::nRequests(); // Set up receives // ~~~~~~~~~~~~~~~ @@ -120,7 +120,7 @@ void Foam::Pstream::exchangeBuf const bool wait ) { - const label startOfRequests = Pstream::nRequests(); + const label startOfRequests = UPstream::nRequests(); // Set up receives // ~~~~~~~~~~~~~~~ diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C index 9cafc25cbf77b4bb4992a271dc5fe270f0e0cd2f..7dee1199c9488cbeded20c9b50c2d08dcc61f09f 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C @@ -320,7 +320,11 @@ Foam::Ostream& Foam::UOPstreamBase::write(const doubleScalar val) } -Foam::Ostream& Foam::UOPstreamBase::write(const char* data, std::streamsize count) +Foam::Ostream& Foam::UOPstreamBase::write +( + const char* data, + std::streamsize count +) { if (format() != BINARY) { diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index c58d61992f5455f783b42c07c9e55cb7908dbdbb..37ec9d57401598962f3c467460100f377276020d 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -51,43 +51,6 @@ Foam::UPstream::commsTypeNames }); -// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // - -void Foam::UPstream::broadcast -( - std::string& str, - const label comm, - const int rootProcNo -) -{ - if (UPstream::parRun() && UPstream::nProcs(comm) > 1) - { - // Broadcast the string length - std::size_t len(str.length()); - - UPstream::broadcast - ( - reinterpret_cast<char*>(&len), - sizeof(std::size_t), - comm, - rootProcNo - ); - - if (!UPstream::master(comm)) - { - // Do not touch string on the master even although it would - // be a no-op. We are truly paranoid. - str.resize(len); - } - - if (len) - { - UPstream::broadcast(&str[0], len, comm, rootProcNo); - } - } -} - - // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads) diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 895ba8e1570b6600159ab2710c85a91dfb8c8cf7..2364716736e42ae300c5dce85cf8c4d7664961ba 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -708,14 +708,6 @@ public: const int rootProcNo = masterNo() ); - //- Broadcast string content to all processes in communicator. - static void broadcast - ( - std::string& str, - const label communicator = worldComm, - const int rootProcNo = masterNo() - ); - // Housekeeping diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index c802283c5bbc3a9b90f18cacfed31457b7389add..d325e127a14074da440b48d0275f3c852a2a1b48 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -366,8 +366,7 @@ bool Foam::OFstreamCollator::write totalSize += recvSize; maxLocalSize = max(maxLocalSize, recvSize); } - Pstream::broadcast(totalSize, localComm_); - Pstream::broadcast(maxLocalSize, localComm_); + Pstream::broadcasts(localComm_, totalSize, maxLocalSize); } if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) diff --git a/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C b/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C index 2f3f7cfd6e9665ab6ae592367cd7aad526d6330d..e985abd4a077acdc03131ce2ad119fd5e8d673bf 100644 --- a/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C +++ b/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C @@ -578,12 +578,7 @@ Foam::fileOperations::masterUncollatedFileOperation::read // const bool uniform = uniformFile(filePaths); - PstreamBuffers pBufs - ( - Pstream::commsTypes::nonBlocking, - Pstream::msgType(), - comm - ); + PstreamBuffers pBufs(comm, UPstream::commsTypes::nonBlocking); if (Pstream::master(comm)) { @@ -1175,10 +1170,9 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::filePath // and instance have to be same { int masterType(searchType); - Pstream::broadcast(masterType); + Pstream::broadcasts(UPstream::worldComm, masterType, newInstancePath); searchType = pathType(masterType); } - UPstream::broadcast(newInstancePath); if ( @@ -1191,12 +1185,11 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::filePath { // Distribute master path. This makes sure it is seen as uniform // and only gets read from the master. - UPstream::broadcast(objPath); - UPstream::broadcast(procsDir); + Pstream::broadcasts(UPstream::worldComm, objPath, procsDir); } else { - UPstream::broadcast(procsDir, comm_); + Pstream::broadcast(procsDir, comm_); // Use the master type to determine if additional information is // needed to construct the local equivalent @@ -1276,7 +1269,7 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::dirPath // processor directory naming (void)lookupProcessorsPath(io.objectPath()); - // Determine master dirPath and scatter + // Determine master dirPath and broadcast fileName objPath; pathType searchType = NOTFOUND; @@ -1303,10 +1296,10 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::dirPath { int masterType(searchType); - Pstream::broadcast(masterType); //, comm_); + // Future?: comm_, + Pstream::broadcasts(UPstream::worldComm, masterType, newInstancePath); searchType = pathType(masterType); } - UPstream::broadcast(newInstancePath); //, comm_); if ( @@ -1319,12 +1312,11 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::dirPath { // Distribute master path. This makes sure it is seen as uniform // and only gets read from the master. - UPstream::broadcast(objPath); - UPstream::broadcast(procsDir); + Pstream::broadcasts(UPstream::worldComm, objPath, procsDir); } else { - UPstream::broadcast(procsDir, comm_); + Pstream::broadcast(procsDir, comm_); // Use the master type to determine if additional information is // needed to construct the local equivalent @@ -1479,8 +1471,8 @@ Foam::fileOperations::masterUncollatedFileOperation::findInstance } // Do parallel early exit to avoid calling time.times() - // UPstream::broadcast(foundInstance, comm_); - UPstream::broadcast(foundInstance, UPstream::worldComm); + // Pstream::broadcast(foundInstance, comm_); + Pstream::broadcast(foundInstance, UPstream::worldComm); if (!foundInstance.empty()) { io.instance() = foundInstance; @@ -1626,8 +1618,8 @@ Foam::fileOperations::masterUncollatedFileOperation::findInstance Pstream::parRun(oldParRun); } - // UPstream::broadcast(foundInstance, comm_); - UPstream::broadcast(foundInstance, UPstream::worldComm); + // Pstream::broadcast(foundInstance, comm_); + Pstream::broadcast(foundInstance, UPstream::worldComm); io.instance() = foundInstance; if (debug) { @@ -1712,8 +1704,8 @@ Foam::fileOperations::masterUncollatedFileOperation::readObjects UPstream::parRun(oldParRun); // Restore parallel state } - Pstream::broadcast(newInstance); //, comm_); - Pstream::broadcast(objectNames); //, comm_); + // Future? comm_ + Pstream::broadcasts(UPstream::worldComm, newInstance, objectNames); if (debug) { @@ -1747,7 +1739,7 @@ bool Foam::fileOperations::masterUncollatedFileOperation::readHeader filePaths[Pstream::myProcNo(Pstream::worldComm)] = fName; Pstream::gatherList(filePaths, Pstream::msgType(), Pstream::worldComm); bool uniform = uniformFile(filePaths); - Pstream::broadcast(uniform, Pstream::worldComm); + Pstream::broadcast(uniform, UPstream::worldComm); if (uniform) { @@ -1764,25 +1756,38 @@ bool Foam::fileOperations::masterUncollatedFileOperation::readHeader } } } - Pstream::broadcast(ok, Pstream::worldComm); - UPstream::broadcast(io.headerClassName(), Pstream::worldComm); - UPstream::broadcast(io.note(), Pstream::worldComm); + + Pstream::broadcasts + ( + UPstream::worldComm, + ok, + io.headerClassName(), + io.note() + ); } else { if (Pstream::nProcs(comm_) != Pstream::nProcs(Pstream::worldComm)) { // Re-gather file paths on local master - filePaths.setSize(Pstream::nProcs(comm_)); + filePaths.resize(Pstream::nProcs(comm_)); filePaths[Pstream::myProcNo(comm_)] = fName; Pstream::gatherList(filePaths, Pstream::msgType(), comm_); } - boolList result(Pstream::nProcs(comm_), false); - wordList headerClassName(Pstream::nProcs(comm_)); - stringList note(Pstream::nProcs(comm_)); + // Intermediate storage arrays (master only) + boolList result; + wordList headerClassName; + stringList note; + if (Pstream::master(comm_)) { + const label np = Pstream::nProcs(comm_); + + result.resize(np, false); + headerClassName.resize(np); + note.resize(np); + forAll(filePaths, proci) { if (!filePaths[proci].empty()) @@ -1808,14 +1813,31 @@ bool Foam::fileOperations::masterUncollatedFileOperation::readHeader } } } - ok = scatterList(result, Pstream::msgType(), comm_); - io.headerClassName() = scatterList - ( - headerClassName, - Pstream::msgType(), - comm_ - ); - io.note() = scatterList(note, Pstream::msgType(), comm_); + + // Is a more efficient scatter possible? + PstreamBuffers pBufs(comm_, UPstream::commsTypes::nonBlocking); + + if (Pstream::master(comm_)) + { + ok = result[0]; + io.headerClassName() = headerClassName[0]; + io.note() = note[0]; + + // Scatter to each proc + for (const int proci : pBufs.subProcs()) + { + UOPstream os(proci, pBufs); + os << result[proci] << headerClassName[proci] << note[proci]; + } + } + + pBufs.finishedScatters(); + + if (!Pstream::master(comm_)) + { + UIPstream is(Pstream::masterNo(), pBufs); + is >> ok >> io.headerClassName() >> io.note(); + } } if (debug) @@ -2086,9 +2108,13 @@ bool Foam::fileOperations::masterUncollatedFileOperation::read // Broadcast regIOobjects content if (Pstream::parRun()) { - Pstream::broadcast(ok, UPstream::worldComm); - UPstream::broadcast(io.headerClassName(), UPstream::worldComm); - UPstream::broadcast(io.note(), UPstream::worldComm); + Pstream::broadcasts + ( + UPstream::worldComm, + ok, + io.headerClassName(), + io.note() + ); if (Pstream::master(UPstream::worldComm)) { @@ -2513,7 +2539,7 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::getFile { fName = monitor().getFile(watchIndex); } - UPstream::broadcast(fName); //, comm_); + Pstream::broadcast(fName); //, comm_); return fName; } diff --git a/src/OpenFOAM/global/fileOperations/uncollatedFileOperation/uncollatedFileOperation.C b/src/OpenFOAM/global/fileOperations/uncollatedFileOperation/uncollatedFileOperation.C index d014c67e2858063e10b460f115ec6c84c0a1546e..1f3e4eb9d6dcb5ba3effa519041f245d9ceb3221 100644 --- a/src/OpenFOAM/global/fileOperations/uncollatedFileOperation/uncollatedFileOperation.C +++ b/src/OpenFOAM/global/fileOperations/uncollatedFileOperation/uncollatedFileOperation.C @@ -648,8 +648,12 @@ bool Foam::fileOperations::uncollatedFileOperation::read if (masterOnly && Pstream::parRun()) { - UPstream::broadcast(io.headerClassName(), UPstream::worldComm); - UPstream::broadcast(io.note(), UPstream::worldComm); + Pstream::broadcasts + ( + UPstream::worldComm, + io.headerClassName(), + io.note() + ); if (UPstream::master(UPstream::worldComm)) { diff --git a/src/OpenFOAM/parallel/globalIndex/globalIndex.H b/src/OpenFOAM/parallel/globalIndex/globalIndex.H index 2d4496c46d3cbc0cc6768bd377f6e1789e3f1eaf..213482ee99bc175f8bb5f5c6163847a7f2836d7b 100644 --- a/src/OpenFOAM/parallel/globalIndex/globalIndex.H +++ b/src/OpenFOAM/parallel/globalIndex/globalIndex.H @@ -312,7 +312,10 @@ public: //- From global to local on proci inline label toLocal(const label proci, const label i) const; - //- Which processor does global come from? Binary search. + //- Which processor does global id come from? + // Does an initial check for isLocal first (assumed to occur + // reasonably frequently) followed by a binary search. + //- Fatal for out of range ids (eg, negative or >= totalSize() inline label whichProcID(const label i) const; diff --git a/src/OpenFOAM/parallel/globalIndex/globalIndexI.H b/src/OpenFOAM/parallel/globalIndex/globalIndexI.H index 4adb07cc0ed16bda0033bc1a10c58826eec8035d..a586a3c07af6b50ff47e93373eda623f0c9e4c4b 100644 --- a/src/OpenFOAM/parallel/globalIndex/globalIndexI.H +++ b/src/OpenFOAM/parallel/globalIndex/globalIndexI.H @@ -338,7 +338,9 @@ inline Foam::label Foam::globalIndex::whichProcID(const label i) const << abort(FatalError); } - return findLower(offsets_, i+1); + const label proci(Pstream::myProcNo()); + + return isLocal(proci, i) ? proci : findLower(offsets_, i+1); } diff --git a/src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C b/src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C index 1a4b59b5bc88ff42a6dd70e44f9c75ad67716b81..e26a77265a21dc7c2989b3697b05532513f1ce5f 100644 --- a/src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C +++ b/src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C @@ -1042,7 +1042,7 @@ void Foam::globalIndex::get for (const int proci : sendBufs.allProcs()) { - if (sendBufs.hasRecvData(proci)) + if (sendBufs.recvDataCount(proci)) { UIPstream is(proci, sendBufs); labelList localIDs(is); diff --git a/src/OpenFOAM/primitives/chars/char/char.H b/src/OpenFOAM/primitives/chars/char/char.H index 5bd77a75bf0b8da58e8ffb6bbbd520824a36abcf..7708e2be9e6e322e50a599ae363e11029f7143e1 100644 --- a/src/OpenFOAM/primitives/chars/char/char.H +++ b/src/OpenFOAM/primitives/chars/char/char.H @@ -87,7 +87,7 @@ inline bool isspace(char c) noexcept namespace Foam { -// Template specialisation for pTraits<char> +//- Template specialisation for pTraits\<char\> template<> class pTraits<char> { diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index 2e971a0201949c4ce80200c9dc2bc00d0780c3d6..87fb171a521e83550892a2789c7887d065861afc 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -315,8 +315,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread) worldIDs_[proci] = allWorlds_.find(world); } } - Pstream::broadcast(allWorlds_); - Pstream::broadcast(worldIDs_); + Pstream::broadcasts(UPstream::worldComm, allWorlds_, worldIDs_); DynamicList<label> subRanks; forAll(worlds, proci) diff --git a/src/dynamicMesh/fvMeshTools/fvMeshTools.C b/src/dynamicMesh/fvMeshTools/fvMeshTools.C index 54645d49341ecd9b86d9029214abf90cc6972164..490b9e1b332d263ab1b26581c85760500693f9cf 100644 --- a/src/dynamicMesh/fvMeshTools/fvMeshTools.C +++ b/src/dynamicMesh/fvMeshTools/fvMeshTools.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2012-2016 OpenFOAM Foundation - Copyright (C) 2015-2021 OpenCFD Ltd. + Copyright (C) 2015-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -481,9 +481,13 @@ Foam::autoPtr<Foam::fvMesh> Foam::fvMeshTools::newMesh } // Broadcast information to all - Pstream::scatter(patchEntries); - Pstream::scatter(facesInstance); - Pstream::scatter(pointsInstance); + Pstream::broadcasts + ( + UPstream::worldComm, + patchEntries, + facesInstance, + pointsInstance + ); // Dummy meshes @@ -687,11 +691,16 @@ Foam::autoPtr<Foam::fvMesh> Foam::fvMeshTools::newMesh // ~~~~~~~~~~~~~~~ wordList pointZoneNames(mesh.pointZones().names()); - Pstream::scatter(pointZoneNames); wordList faceZoneNames(mesh.faceZones().names()); - Pstream::scatter(faceZoneNames); wordList cellZoneNames(mesh.cellZones().names()); - Pstream::scatter(cellZoneNames); + + Pstream::broadcasts + ( + UPstream::worldComm, + pointZoneNames, + faceZoneNames, + cellZoneNames + ); if (!haveMesh) { diff --git a/src/dynamicMesh/polyTopoChange/polyTopoChange/hexRef8/hexRef8Data.C b/src/dynamicMesh/polyTopoChange/polyTopoChange/hexRef8/hexRef8Data.C index 32fea05e5675b01db8e9352122fd95d99cdb3a67..f74ff1be9af5a07afa528af5287d4c6b245af269 100644 --- a/src/dynamicMesh/polyTopoChange/polyTopoChange/hexRef8/hexRef8Data.C +++ b/src/dynamicMesh/polyTopoChange/polyTopoChange/hexRef8/hexRef8Data.C @@ -264,7 +264,7 @@ void Foam::hexRef8Data::sync(const IOobject& io) { // Get master length scalar masterLen = (Pstream::master() ? level0EdgePtr_().value() : 0); - Pstream::scatter(masterLen); + Pstream::broadcast(masterLen); if (!level0EdgePtr_) { IOobject rio(io, "level0Edge"); diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C index 2720b0204847fd7ffc52e5253d32169b6c81a4c8..89a0e372a42122849a7ec7ec1f42398b55df29a0 100644 --- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C +++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C @@ -95,11 +95,11 @@ void Foam::zoneDistribute::setUpCommforZone { for (const label gblIdx : stencil_[celli]) { - if (!globalNumbering_.isLocal(gblIdx)) + const label proci = globalNumbering_.whichProcID(gblIdx); + + if (proci != Pstream::myProcNo()) { - const label procID = - globalNumbering_.whichProcID(gblIdx); - needed[procID].insert(gblIdx); + needed[proci].insert(gblIdx); } } } @@ -126,7 +126,7 @@ void Foam::zoneDistribute::setUpCommforZone { send_[proci].clear(); - if (proci != UPstream::myProcNo() && pBufs.hasRecvData(proci)) + if (proci != UPstream::myProcNo() && pBufs.recvDataCount(proci)) { UIPstream fromProc(proci, pBufs); fromProc >> send_[proci]; diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H index 91c70f31cf74895c525813495ebcc5a3ed76a730..e82f33b4e5777ccdd4168a72b88bd34d9977a2a6 100644 --- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H +++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H @@ -200,7 +200,7 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc for (const int proci : pBufs.allProcs()) { - if (proci != UPstream::myProcNo() && pBufs.hasRecvData(proci)) + if (proci != UPstream::myProcNo() && pBufs.recvDataCount(proci)) { UIPstream fromProc(proci, pBufs); Map<Type> tmpValues(fromProc); diff --git a/src/lagrangian/basic/Cloud/Cloud.C b/src/lagrangian/basic/Cloud/Cloud.C index c5f062124b7f8933796ed67fd724b2ff15d2b0ad..5b7aa9a9f71b66bf93f38d8fbca59993f90680ef 100644 --- a/src/lagrangian/basic/Cloud/Cloud.C +++ b/src/lagrangian/basic/Cloud/Cloud.C @@ -281,7 +281,7 @@ void Foam::Cloud<ParticleType>::move // Retrieve from receive buffers for (const label proci : neighbourProcs) { - if (pBufs.hasRecvData(proci)) + if (pBufs.recvDataCount(proci)) { UIPstream is(proci, pBufs); diff --git a/src/lagrangian/intermediate/submodels/Kinematic/PatchInteractionModel/RecycleInteraction/RecycleInteraction.C b/src/lagrangian/intermediate/submodels/Kinematic/PatchInteractionModel/RecycleInteraction/RecycleInteraction.C index 79e55a188ffb36aed08f02622dc8dcec0f3a85fa..1b42f0c21c19b854f152661e9cfb0fbf0a5e43d0 100644 --- a/src/lagrangian/intermediate/submodels/Kinematic/PatchInteractionModel/RecycleInteraction/RecycleInteraction.C +++ b/src/lagrangian/intermediate/submodels/Kinematic/PatchInteractionModel/RecycleInteraction/RecycleInteraction.C @@ -255,7 +255,7 @@ void Foam::RecycleInteraction<CloudType>::postEvolve() // Retrieve from receive buffers for (const int proci : pBufs.allProcs()) { - if (pBufs.hasRecvData(proci)) + if (pBufs.recvDataCount(proci)) { UIPstream is(proci, pBufs); diff --git a/src/lumpedPointMotion/state/lumpedPointState.C b/src/lumpedPointMotion/state/lumpedPointState.C index c5033400a0bdb8313b2b451472190fc5f0285d2b..da1d13de31a2825dd288d8144deb500138eaac19 100644 --- a/src/lumpedPointMotion/state/lumpedPointState.C +++ b/src/lumpedPointMotion/state/lumpedPointState.C @@ -405,10 +405,14 @@ bool Foam::lumpedPointState::readData { // Broadcast master data to everyone - Pstream::scatter(points_); - Pstream::scatter(angles_); - Pstream::scatter(degrees_); - Pstream::scatter(ok); + Pstream::broadcasts + ( + UPstream::worldComm, + ok, + degrees_, + points_, + angles_ + ); } rotationPtr_.reset(nullptr); diff --git a/src/mesh/snappyHexMesh/meshRefinement/meshRefinement.C b/src/mesh/snappyHexMesh/meshRefinement/meshRefinement.C index 799c2045a0e17a284235338b707604294d6e9c3d..7ae55ea74e60573de2a086cea7f5a5ca565a8cf1 100644 --- a/src/mesh/snappyHexMesh/meshRefinement/meshRefinement.C +++ b/src/mesh/snappyHexMesh/meshRefinement/meshRefinement.C @@ -2947,7 +2947,7 @@ Foam::fileName Foam::meshRefinement::writeLeakPath } // Probably do not need to broadcast name (only written on master anyhow) - UPstream::broadcast(fName); + Pstream::broadcast(fName); return fName; } diff --git a/src/mesh/snappyHexMesh/meshRefinement/meshRefinementBaffles.C b/src/mesh/snappyHexMesh/meshRefinement/meshRefinementBaffles.C index a37213e7b864d388be9fd10c8f6a7054de180cbe..c4b4f6e3d4273c1ada89b314af8eee87455cdf9b 100644 --- a/src/mesh/snappyHexMesh/meshRefinement/meshRefinementBaffles.C +++ b/src/mesh/snappyHexMesh/meshRefinement/meshRefinementBaffles.C @@ -3952,20 +3952,12 @@ Foam::label Foam::meshRefinement::markPatchZones break; } - label procI = globalFaces.whichProcID(globalSeed); - label seedFaceI = globalFaces.toLocal(procI, globalSeed); - - //Info<< "Seeding zone " << currentZoneI - // << " from processor " << procI << " face " << seedFaceI - // << endl; - - if (procI == Pstream::myProcNo()) + if (globalFaces.isLocal(globalSeed)) { - edgeTopoDistanceData<label>& faceInfo = allFaceInfo[seedFaceI]; - + const label seedFaceI = globalFaces.toLocal(globalSeed); // Set face - faceInfo = edgeTopoDistanceData<label>(0, currentZoneI); + edgeTopoDistanceData<label>& faceInfo = allFaceInfo[seedFaceI]; // .. and seed its edges const labelList& fEdges = patch.faceEdges()[seedFaceI]; @@ -4134,14 +4126,10 @@ void Foam::meshRefinement::consistentOrientation break; } - label procI = globalFaces.whichProcID(globalSeed); - label seedFaceI = globalFaces.toLocal(procI, globalSeed); - - //Info<< "Seeding from processor " << procI << " face " << seedFaceI - // << endl; - - if (procI == Pstream::myProcNo()) + if (globalFaces.isLocal(globalSeed)) { + const label seedFaceI = globalFaces.toLocal(globalSeed); + // Determine orientation of seedFace patchFaceOrientation& faceInfo = allFaceInfo[seedFaceI]; diff --git a/src/mesh/snappyHexMesh/snappyHexMeshDriver/snappyRefineDriver.C b/src/mesh/snappyHexMesh/snappyHexMeshDriver/snappyRefineDriver.C index fe5f8772b909dafbc7b601f79fda8dfaefd4d08e..a33f6f8965e27158a1a661923b0539e50606f72d 100644 --- a/src/mesh/snappyHexMesh/snappyHexMeshDriver/snappyRefineDriver.C +++ b/src/mesh/snappyHexMesh/snappyHexMeshDriver/snappyRefineDriver.C @@ -2499,8 +2499,7 @@ Foam::label Foam::snappyRefineDriver::directionalSmooth { scalar minSeed = min(allSeedPointDist); scalar maxSeed = max(allSeedPointDist); - Pstream::broadcast(minSeed); - Pstream::broadcast(maxSeed); + Pstream::broadcasts(UPstream::worldComm, minSeed, maxSeed); forAll(normalizedPosition, posI) { diff --git a/src/meshTools/mappedPatches/mappedPolyPatch/mappedPatchBase.C b/src/meshTools/mappedPatches/mappedPolyPatch/mappedPatchBase.C index c3dab6608cf6c646e312c184c5315e5ec9ab8345..89ec09d90772e2b4be798e93b27b0c38741ed6d2 100644 --- a/src/meshTools/mappedPatches/mappedPolyPatch/mappedPatchBase.C +++ b/src/meshTools/mappedPatches/mappedPolyPatch/mappedPatchBase.C @@ -248,8 +248,7 @@ void Foam::mappedPatchBase::collectSamples UPstream::listGatherValues<label>(patch_.size(), myComm) ); - Pstream::broadcast(procToWorldIndex, myComm); - Pstream::broadcast(nPerProc, myComm); + Pstream::broadcasts(myComm, procToWorldIndex, nPerProc); patchFaceWorlds.setSize(patchFaces.size()); patchFaceProcs.setSize(patchFaces.size()); diff --git a/src/overset/cellCellStencil/inverseDistance/inverseDistanceCellCellStencil.C b/src/overset/cellCellStencil/inverseDistance/inverseDistanceCellCellStencil.C index 5ab6e8673adff317b7baddff2b93252dc8dadf00..15bb8f719a3c084d764298fffb094643d745387e 100644 --- a/src/overset/cellCellStencil/inverseDistance/inverseDistanceCellCellStencil.C +++ b/src/overset/cellCellStencil/inverseDistance/inverseDistanceCellCellStencil.C @@ -794,23 +794,10 @@ void Foam::cellCellStencils::inverseDistance::markDonors // forAll(cellRegion, celli) // { // label region = cellRegion[celli]; -// -// // Count originating processor. Use isLocal as efficiency since -// // most cells are locally originating. -// if (globalRegions.isLocal(region)) +// label proci = globalRegions.whichProcID(region); +// if (haveRegion.insert(region)) // { -// if (haveRegion.insert(region)) -// { -// nOriginating[Pstream::myProcNo()]++; -// } -// } -// else -// { -// label proci = globalRegions.whichProcID(region); -// if (haveRegion.insert(region)) -// { -// nOriginating[proci]++; -// } +// nOriginating[proci]++; // } // } // } diff --git a/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C b/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C index e91439971819f6023652231537d4e6b1f2b6fed7..0e906ad1c73fbfd89563173aec3361835f38366e 100644 --- a/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C +++ b/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C @@ -82,9 +82,13 @@ void surfaceNoise::initialise(const fileName& fName) nAvailableTimes = allTimes.size() - startTimeIndex_; } - Pstream::scatter(pIndex_); - Pstream::scatter(startTimeIndex_); - Pstream::scatter(nAvailableTimes); + Pstream::broadcasts + ( + UPstream::worldComm, + pIndex_, + startTimeIndex_, + nAvailableTimes + ); // Note: all processors should call the windowing validate function @@ -108,9 +112,13 @@ void surfaceNoise::initialise(const fileName& fName) nFace_ = surf.size(); } - Pstream::scatter(times_); - Pstream::scatter(deltaT_); - Pstream::scatter(nFace_); + Pstream::broadcasts + ( + UPstream::worldComm, + times_, + deltaT_, + nFace_ + ); } @@ -319,7 +327,7 @@ scalar surfaceNoise::writeSurfaceData areaAverage = sum(allData)/(allData.size() + ROOTVSMALL); } } - Pstream::scatter(areaAverage); + Pstream::broadcast(areaAverage); return areaAverage; } @@ -409,7 +417,7 @@ scalar surfaceNoise::surfaceAverage // areaAverage = sum(allData*surf.magSf())/sum(surf.magSf()); areaAverage = sum(allData)/allData.size(); } - Pstream::scatter(areaAverage); + Pstream::broadcast(areaAverage); return areaAverage; } diff --git a/src/sampling/sampledSet/sampledSets/sampledSetsImpl.C b/src/sampling/sampledSet/sampledSets/sampledSetsImpl.C index 18520dbf982abd413e66e86fbb20d50773e1e6b5..f444300ad33f761a99b15cb8ffb75bac74e557c3 100644 --- a/src/sampling/sampledSet/sampledSets/sampledSetsImpl.C +++ b/src/sampling/sampledSet/sampledSets/sampledSetsImpl.C @@ -82,7 +82,7 @@ void Foam::sampledSets::writeCoordSet { outputName = writer.write(fieldName, values); } - UPstream::broadcast(outputName); + Pstream::broadcast(outputName); if (outputName.size()) { @@ -207,9 +207,7 @@ void Foam::sampledSets::performAction // Use sorted order values = UIndirectList<Type>(values, globOrder)(); } - Pstream::broadcast(avgValue); - Pstream::broadcast(sizeValue); - Pstream::broadcast(limits); + Pstream::broadcasts(UPstream::worldComm, avgValue, sizeValue, limits); // Store results: min/max/average/size with the name of the set // for scoping. @@ -261,9 +259,13 @@ void Foam::sampledSets::performAction if (size()) { - Pstream::broadcast(avgEnsemble); - Pstream::broadcast(sizeEnsemble); - Pstream::broadcast(limitsEnsemble); + Pstream::broadcasts + ( + UPstream::worldComm, + avgEnsemble, + sizeEnsemble, + limitsEnsemble + ); // Store results: min/max/average/size for the ensemble // Eg, average(T) ...