diff --git a/src/OpenFOAM/Make/files b/src/OpenFOAM/Make/files index ebb4506194fb3a27a0cf45601747f53cca682e86..c7fe4cfc03d8d4dec884182bf586edeb32a41dcc 100644 --- a/src/OpenFOAM/Make/files +++ b/src/OpenFOAM/Make/files @@ -266,14 +266,14 @@ StringStreams = $(Streams)/StringStreams $(StringStreams)/StringStream.C Pstreams = $(Streams)/Pstreams -$(Pstreams)/UIPstream.C -$(Pstreams)/IPstream.C -/* $(Pstreams)/UPstream.C in global.Cver */ +/* $(Pstreams)/UPstream.C in global.C */ $(Pstreams)/UPstreamCommsStruct.C $(Pstreams)/Pstream.C -$(Pstreams)/UOPstream.C -$(Pstreams)/OPstream.C $(Pstreams)/PstreamBuffers.C +$(Pstreams)/UIPstreamBase.C +$(Pstreams)/UOPstreamBase.C +$(Pstreams)/IPstreams.C +$(Pstreams)/OPstreams.C dictionary = db/dictionary $(dictionary)/dictionary.C diff --git a/src/Pstream/dummy/UIPread.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C similarity index 57% rename from src/Pstream/dummy/UIPread.C rename to src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C index f6cc67da047841a340169d5c0586307c4bb82fe2..232a8d075429eb7e3b6c16a55e167a88d6882d33 100644 --- a/src/Pstream/dummy/UIPread.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C @@ -5,8 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2011-2015 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,12 +23,11 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. -Description - Read from UIPstream - \*---------------------------------------------------------------------------*/ #include "UIPstream.H" +#include "IPstream.H" +#include "IOstreams.H" // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // @@ -45,51 +43,78 @@ Foam::UIPstream::UIPstream IOstreamOption::streamFormat fmt ) : - UPstream(commsType), - Istream(fmt, IOstreamOption::currentVersion), - fromProcNo_(fromProcNo), - recvBuf_(receiveBuf), - recvBufPos_(receiveBufPosition), - tag_(tag), - comm_(comm), - clearAtEnd_(clearAtEnd), - messageSize_(0) + UIPstreamBase + ( + commsType, + fromProcNo, + receiveBuf, + receiveBufPosition, + tag, + comm, + clearAtEnd, + fmt + ) { - NotImplemented; + if (commsType == commsTypes::nonBlocking) + { + // Message is already received into buffer + } + else + { + bufferIPCrecv(); + } } Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) : - UPstream(buffers.commsType_), - Istream(buffers.format_, IOstreamOption::currentVersion), - fromProcNo_(fromProcNo), - recvBuf_(buffers.recvBuf_[fromProcNo]), - recvBufPos_(buffers.recvBufPos_[fromProcNo]), - tag_(buffers.tag_), - comm_(buffers.comm_), - clearAtEnd_(true), - messageSize_(0) + UIPstreamBase(fromProcNo, buffers) { - NotImplemented; -} + if (commsType() == commsTypes::nonBlocking) + { + // Message is already received into buffer + messageSize_ = recvBuf_.size(); + if (debug) + { + Pout<< "UIPstream::UIPstream PstreamBuffers :" + << " fromProcNo:" << fromProcNo_ + << " tag:" << tag_ << " comm:" << comm_ + << " receive buffer size:" << messageSize_ + << Foam::endl; + } + } + else + { + bufferIPCrecv(); + } +} -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // -Foam::label Foam::UIPstream::read +Foam::IPstream::IPstream ( const commsTypes commsType, const int fromProcNo, - char* buf, - const std::streamsize bufSize, + const label bufSize, const int tag, - const label communicator + const label comm, + IOstreamOption::streamFormat fmt ) -{ - NotImplemented; - return 0; -} +: + Pstream(commsType, bufSize), + UIPstream + ( + commsType, + fromProcNo, + Pstream::transferBuf_, + transferBufPosition_, + tag, + comm, + false, // Do not clear Pstream::transferBuf_ if at end + fmt + ), + transferBufPosition_(0) +{} // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C similarity index 60% rename from src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C rename to src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C index af8035eea7626d870b7b237744f9eefea9ed43df..70e557ab3fdd5b8cf393e468b277ad840923e763 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C @@ -5,8 +5,8 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2011 OpenFOAM Foundation + Copyright (C) 2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -26,14 +26,36 @@ License \*---------------------------------------------------------------------------*/ -#include "IPstream.H" +#include "UOPstream.H" +#include "OPstream.H" // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // -Foam::IPstream::IPstream +Foam::UOPstream::UOPstream ( const commsTypes commsType, - const int fromProcNo, + const int toProcNo, + DynamicList<char>& sendBuf, + const int tag, + const label comm, + const bool sendAtDestruct, + IOstreamOption::streamFormat fmt +) +: + UOPstreamBase(commsType, toProcNo, sendBuf, tag, comm, sendAtDestruct, fmt) +{} + + +Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers) +: + UOPstreamBase(toProcNo, buffers) +{} + + +Foam::OPstream::OPstream +( + const commsTypes commsType, + const int toProcNo, const label bufSize, const int tag, const label comm, @@ -41,19 +63,34 @@ Foam::IPstream::IPstream ) : Pstream(commsType, bufSize), - UIPstream + UOPstream ( commsType, - fromProcNo, + toProcNo, Pstream::transferBuf_, - transferBufPosition_, tag, comm, - false, // Do not clear Pstream::transferBuf_ if at end + true, // sendAtDestruct fmt - ), - transferBufPosition_(0) + ) {} +// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // + +Foam::UOPstream::~UOPstream() +{ + if (sendAtDestruct_) + { + if (!bufferIPCsend()) + { + FatalErrorInFunction + << "Failed sending outgoing message of size " + << sendBuf_.size() << " to processor " << toProcNo_ + << Foam::abort(FatalError); + } + } +} + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 2607ce97abcc9aedc83a12b4e47eb44e87c0fc1e..85dc6fad6e13132dc4db4fee623fd710f1fc4096 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2021-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -28,6 +28,35 @@ License #include "PstreamBuffers.H" +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +void Foam::PstreamBuffers::finalExchange +( + labelList& recvSizes, + const bool block +) +{ + // Could also check that it is not called twice + finishedSendsCalled_ = true; + + if (commsType_ == UPstream::commsTypes::nonBlocking) + { + // all-to-all + Pstream::exchangeSizes(sendBuf_, recvSizes, comm_); + + Pstream::exchange<DynamicList<char>, char> + ( + sendBuf_, + recvSizes, + recvBuf_, + tag_, + comm_, + block + ); + } +} + + // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // Foam::PstreamBuffers::PstreamBuffers @@ -38,14 +67,14 @@ Foam::PstreamBuffers::PstreamBuffers IOstreamOption::streamFormat fmt ) : + finishedSendsCalled_(false), + format_(fmt), commsType_(commsType), tag_(tag), comm_(comm), - format_(fmt), sendBuf_(UPstream::nProcs(comm)), recvBuf_(UPstream::nProcs(comm)), - recvBufPos_(UPstream::nProcs(comm), Zero), - finishedSendsCalled_(false) + recvBufPos_(UPstream::nProcs(comm), Zero) {} @@ -70,45 +99,38 @@ Foam::PstreamBuffers::~PstreamBuffers() // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // -void Foam::PstreamBuffers::finishedSends(const bool block) +void Foam::PstreamBuffers::clear() { - // Could also check that it is not called twice - finishedSendsCalled_ = true; - - if (commsType_ == UPstream::commsTypes::nonBlocking) + for (DynamicList<char>& buf : sendBuf_) { - Pstream::exchange<DynamicList<char>, char> - ( - sendBuf_, - recvBuf_, - tag_, - comm_, - block - ); + buf.clear(); } + for (DynamicList<char>& buf : recvBuf_) + { + buf.clear(); + } + recvBufPos_ = 0; + + finishedSendsCalled_ = false; } -void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block) +void Foam::PstreamBuffers::finishedSends(const bool block) { - // Could also check that it is not called twice - finishedSendsCalled_ = true; + labelList recvSizes; + finalExchange(recvSizes, block); +} - if (commsType_ == UPstream::commsTypes::nonBlocking) - { - Pstream::exchangeSizes(sendBuf_, recvSizes, comm_); - Pstream::exchange<DynamicList<char>, char> - ( - sendBuf_, - recvSizes, - recvBuf_, - tag_, - comm_, - block - ); - } - else +void Foam::PstreamBuffers::finishedSends +( + labelList& recvSizes, + const bool block +) +{ + finalExchange(recvSizes, block); + + if (commsType_ != UPstream::commsTypes::nonBlocking) { FatalErrorInFunction << "Obtaining sizes not supported in " @@ -122,19 +144,4 @@ void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block) } -void Foam::PstreamBuffers::clear() -{ - for (DynamicList<char>& buf : sendBuf_) - { - buf.clear(); - } - for (DynamicList<char>& buf : recvBuf_) - { - buf.clear(); - } - recvBufPos_ = 0; - finishedSendsCalled_ = false; -} - - // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index de7414f469e99926d5eca63c0bd97001c802640a..5f178e000ea4148cbda7279a78d33b35cc637b46 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2021-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -70,8 +70,8 @@ SourceFiles #include "Pstream.H" -#ifndef PstreamBuffers_H -#define PstreamBuffers_H +#ifndef Foam_PstreamBuffers_H +#define Foam_PstreamBuffers_H #include "DynamicList.H" #include "UPstream.H" @@ -83,16 +83,24 @@ namespace Foam { /*---------------------------------------------------------------------------*\ - Class PstreamBuffers Declaration + Class PstreamBuffers Declaration \*---------------------------------------------------------------------------*/ class PstreamBuffers { - friend class UOPstream; - friend class UIPstream; + // Friendship + friend class UOPstreamBase; // Access to sendBuf_ + friend class UIPstreamBase; // Access to recvBuf_ recvBufPos_; + // Private Data + //- Track if sends are complete + bool finishedSendsCalled_; + + //- Buffer format (ascii | binary) + const IOstreamOption::streamFormat format_; + //- Communications type of this stream const UPstream::commsTypes commsType_; @@ -102,9 +110,6 @@ class PstreamBuffers //- Communicator const label comm_; - //- Buffer format (ascii | binary) - const IOstreamOption::streamFormat format_; - //- Send buffer List<DynamicList<char>> sendBuf_; @@ -114,8 +119,12 @@ class PstreamBuffers //- Current read positions within recvBuf_ labelList recvBufPos_; - //- Track if sends are complete - bool finishedSendsCalled_; + + // Private Member Functions + + //- Mark all sends as having been done. + // This will start receives in non-blocking mode. + void finalExchange(labelList& recvSizes, const bool block); public: @@ -138,6 +147,20 @@ public: // Member Functions + // Access + + //- The associated buffer format (ascii | binary) + IOstreamOption::streamFormat format() const noexcept + { + return format_; + } + + //- The communications type of the stream + UPstream::commsTypes commsType() const noexcept + { + return commsType_; + } + //- The transfer message type int tag() const noexcept { @@ -150,6 +173,22 @@ public: return comm_; } + //- True if finishedSends has been called + bool finished() const noexcept + { + return finishedSendsCalled_; + } + + + // Edit + + //- Reset (clear) individual buffers and reset state. + // Does not clear buffer storage + void clear(); + + + // Functions + //- Mark all sends as having been done. // This will start receives in non-blocking mode. // If block will wait for all transfers to finish @@ -160,10 +199,6 @@ public: // Same as above but also returns sizes (bytes) received. // \note currently only valid for non-blocking. void finishedSends(labelList& recvSizes, const bool block = true); - - //- Reset (clear) individual buffers and reset state. - // Does not clear buffer storage - void clear(); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H index 7b44640fc84e3449cb8b2c6fa0a57efe5f733d08..5e593c93eb5ad81667c69ffa3718dcd8c3f6b29c 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2017-2021 OpenCFD Ltd. + Copyright (C) 2017-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -25,24 +25,26 @@ License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. Class - Foam::UIPstream + Foam::UIPstreamBase Description - Input inter-processor communications stream operating on external - buffer. + Base class for input inter-processor communications stream + (ie, parallel streams). + Not to be used directly, thus contructors are protected. SourceFiles - UIPstream.C + UIPstreamBase.C \*---------------------------------------------------------------------------*/ #include "Pstream.H" -#ifndef UIPstream_H -#define UIPstream_H +#ifndef Foam_UIPstream_H +#define Foam_UIPstream_H #include "UPstream.H" #include "Istream.H" +#include "DynamicList.H" #include "PstreamBuffers.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -51,31 +53,14 @@ namespace Foam { /*---------------------------------------------------------------------------*\ - Class UIPstream Declaration + Class UIPstreamBase Declaration \*---------------------------------------------------------------------------*/ -class UIPstream +class UIPstreamBase : public UPstream, public Istream { - // Private Data - - int fromProcNo_; - - DynamicList<char>& recvBuf_; - - label& recvBufPos_; - - const int tag_; - - const label comm_; - - const bool clearAtEnd_; - - int messageSize_; - - // Private Member Functions //- Check buffer position against messageSize_ for EOF @@ -96,14 +81,31 @@ class UIPstream inline Istream& readString(std::string& str); -public: +protected: - // Constructors + // Protected Data + + int fromProcNo_; + + DynamicList<char>& recvBuf_; + + label& recvBufPos_; + + const int tag_; + + const label comm_; + + const bool clearAtEnd_; + + int messageSize_; + + + // Protected Constructors //- Construct given process index to read from using the given //- attached receive buffer, optional communication characteristics //- and IO format - UIPstream + UIPstreamBase ( const commsTypes commsType, const int fromProcNo, @@ -116,11 +118,14 @@ public: ); //- Construct given buffers - UIPstream(const int fromProcNo, PstreamBuffers& buffers); + UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers); - //- Destructor - ~UIPstream(); +public: + + + //- Destructor. Optionally clears external receive buffer. + virtual ~UIPstreamBase(); // Member Functions @@ -128,7 +133,7 @@ public: // Inquiry //- Return flags of output stream - ios_base::fmtflags flags() const + virtual ios_base::fmtflags flags() const { return ios_base::fmtflags(0); } @@ -136,18 +141,6 @@ public: // Read Functions - //- Read into given buffer from given processor - // \return the message size - static label read - ( - const commsTypes commsType, - const int fromProcNo, - char* buf, - const std::streamsize bufSize, - const int tag = UPstream::msgType(), - const label communicator = UPstream::worldComm - ); - //- Return next token from stream Istream& read(token& t); @@ -191,7 +184,7 @@ public: // Edit //- Set flags of stream - ios_base::fmtflags flags(const ios_base::fmtflags) + virtual ios_base::fmtflags flags(const ios_base::fmtflags) { return ios_base::fmtflags(0); } @@ -204,6 +197,71 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class UIPstream Declaration +\*---------------------------------------------------------------------------*/ + +//- Input inter-processor communications stream +//- using MPI send/recv etc. - operating on external buffer. +class UIPstream +: + public UIPstreamBase +{ + // Private Member Functions + + //- Initial buffer recv, called by constructor (blocking | scheduled) + void bufferIPCrecv(); + + +public: + + // Constructors + + //- Construct given process index to read from using the given + //- attached receive buffer, optional communication characteristics + //- and IO format + UIPstream + ( + const commsTypes commsType, + const int fromProcNo, + DynamicList<char>& receiveBuf, + label& receiveBufPosition, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool clearAtEnd = false, // destroy receiveBuf if at end + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + + //- Construct given buffers + UIPstream(const int fromProcNo, PstreamBuffers& buffers); + + + //- Destructor + virtual ~UIPstream() = default; + + + // Member Functions + + //- Use all read methods from base + using UIPstreamBase::read; + + + // Static Functions + + //- Read buffer contents from given processor + // \return the message size + static label read + ( + const commsTypes commsType, + const int fromProcNo, + char* buf, + const std::streamsize bufSize, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); +}; + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C similarity index 77% rename from src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C rename to src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C index 5d8187da08c784fbb13620495b77f1163ff8e222..4a6108788b38442589426d9ee9b687db4b0df024 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C @@ -74,7 +74,7 @@ inline static label byteAlign(const label pos, const size_t align) // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -inline void Foam::UIPstream::checkEof() +inline void Foam::UIPstreamBase::checkEof() { if (recvBufPos_ == messageSize_) { @@ -83,14 +83,14 @@ inline void Foam::UIPstream::checkEof() } -inline void Foam::UIPstream::prepareBuffer(const size_t align) +inline void Foam::UIPstreamBase::prepareBuffer(const size_t align) { recvBufPos_ = byteAlign(recvBufPos_, align); } template<class T> -inline void Foam::UIPstream::readFromBuffer(T& val) +inline void Foam::UIPstreamBase::readFromBuffer(T& val) { prepareBuffer(sizeof(T)); @@ -100,7 +100,7 @@ inline void Foam::UIPstream::readFromBuffer(T& val) } -inline void Foam::UIPstream::readFromBuffer +inline void Foam::UIPstreamBase::readFromBuffer ( void* data, const size_t count @@ -119,7 +119,7 @@ inline void Foam::UIPstream::readFromBuffer } -inline Foam::Istream& Foam::UIPstream::readString(std::string& str) +inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str) { // Use std::string::assign() to copy content, including '\0'. // Stripping (when desired) is the responsibility of the sending side. @@ -142,15 +142,78 @@ inline Foam::Istream& Foam::UIPstream::readString(std::string& str) } +// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // + +Foam::UIPstreamBase::UIPstreamBase +( + const commsTypes commsType, + const int fromProcNo, + DynamicList<char>& receiveBuf, + label& receiveBufPosition, + const int tag, + const label comm, + const bool clearAtEnd, + IOstreamOption::streamFormat fmt +) +: + UPstream(commsType), + Istream(fmt, IOstreamOption::currentVersion), + fromProcNo_(fromProcNo), + recvBuf_(receiveBuf), + recvBufPos_(receiveBufPosition), + tag_(tag), + comm_(comm), + clearAtEnd_(clearAtEnd), + messageSize_(0) +{ + setOpened(); + setGood(); +} + + +Foam::UIPstreamBase::UIPstreamBase +( + const int fromProcNo, + PstreamBuffers& buffers +) +: + UPstream(buffers.commsType()), + Istream(buffers.format(), IOstreamOption::currentVersion), + fromProcNo_(fromProcNo), + recvBuf_(buffers.recvBuf_[fromProcNo]), + recvBufPos_(buffers.recvBufPos_[fromProcNo]), + tag_(buffers.tag()), + comm_(buffers.comm()), + clearAtEnd_(true), + messageSize_(0) +{ + if + ( + commsType() != UPstream::commsTypes::scheduled + && !buffers.finished() + ) + { + FatalErrorInFunction + << "PstreamBuffers::finishedSends() never called." << endl + << "Please call PstreamBuffers::finishedSends() after doing" + << " all your sends (using UOPstream) and before doing any" + << " receives (using UIPstream)" << Foam::exit(FatalError); + } + + setOpened(); + setGood(); +} + + // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // -Foam::UIPstream::~UIPstream() +Foam::UIPstreamBase::~UIPstreamBase() { if (clearAtEnd_ && eof()) { if (debug) { - Pout<< "UIPstream::~UIPstream() : tag:" << tag_ + Pout<< "UIPstreamBase Destructor : tag:" << tag_ << " fromProcNo:" << fromProcNo_ << " clearing receive buffer of size " << recvBuf_.size() @@ -163,7 +226,7 @@ Foam::UIPstream::~UIPstream() // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // -Foam::Istream& Foam::UIPstream::read(token& t) +Foam::Istream& Foam::UIPstreamBase::read(token& t) { // Return the put back token if it exists // - with additional handling for special stream flags @@ -341,7 +404,7 @@ Foam::Istream& Foam::UIPstream::read(token& t) } -Foam::Istream& Foam::UIPstream::read(char& c) +Foam::Istream& Foam::UIPstreamBase::read(char& c) { c = recvBuf_[recvBufPos_]; ++recvBufPos_; @@ -350,40 +413,40 @@ Foam::Istream& Foam::UIPstream::read(char& c) } -Foam::Istream& Foam::UIPstream::read(word& str) +Foam::Istream& Foam::UIPstreamBase::read(word& str) { return readString(str); } -Foam::Istream& Foam::UIPstream::read(string& str) +Foam::Istream& Foam::UIPstreamBase::read(string& str) { return readString(str); } -Foam::Istream& Foam::UIPstream::read(label& val) +Foam::Istream& Foam::UIPstreamBase::read(label& val) { readFromBuffer(val); return *this; } -Foam::Istream& Foam::UIPstream::read(floatScalar& val) +Foam::Istream& Foam::UIPstreamBase::read(floatScalar& val) { readFromBuffer(val); return *this; } -Foam::Istream& Foam::UIPstream::read(doubleScalar& val) +Foam::Istream& Foam::UIPstreamBase::read(doubleScalar& val) { readFromBuffer(val); return *this; } -Foam::Istream& Foam::UIPstream::read(char* data, std::streamsize count) +Foam::Istream& Foam::UIPstreamBase::read(char* data, std::streamsize count) { if (count) { @@ -398,7 +461,7 @@ Foam::Istream& Foam::UIPstream::read(char* data, std::streamsize count) } -Foam::Istream& Foam::UIPstream::readRaw(char* data, std::streamsize count) +Foam::Istream& Foam::UIPstreamBase::readRaw(char* data, std::streamsize count) { // No check for format() == BINARY since this is either done in the // beginRawRead() method, or the caller knows what they are doing. @@ -409,7 +472,7 @@ Foam::Istream& Foam::UIPstream::readRaw(char* data, std::streamsize count) } -bool Foam::UIPstream::beginRawRead() +bool Foam::UIPstreamBase::beginRawRead() { if (format() != BINARY) { @@ -427,13 +490,13 @@ bool Foam::UIPstream::beginRawRead() } -void Foam::UIPstream::rewind() +void Foam::UIPstreamBase::rewind() { recvBufPos_ = 0; } -void Foam::UIPstream::print(Ostream& os) const +void Foam::UIPstreamBase::print(Ostream& os) const { os << "Reading from processor " << fromProcNo_ << " using communicator " << comm_ diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H index d4eefe305ed8f108bd3fc603bea9bc4e5a87bbfe..da7365a2f97b352ec0cdefd268790c002eec3001 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2014 OpenFOAM Foundation - Copyright (C) 2017-2021 OpenCFD Ltd. + Copyright (C) 2017-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -25,21 +25,22 @@ License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. Class - Foam::UOPstream + Foam::UOPstreamBase Description - Output inter-processor communications stream operating on external - buffer. + Base class for output inter-processor communications stream + (ie, parallel streams). + Not to be used directly, thus contructors are protected. SourceFiles - UOPstream.C + UOPstreamBase.C \*---------------------------------------------------------------------------*/ #include "Pstream.H" -#ifndef UOPstream_H -#define UOPstream_H +#ifndef Foam_UOPstream_H +#define Foam_UOPstream_H #include "UPstream.H" #include "Ostream.H" @@ -52,27 +53,14 @@ namespace Foam { /*---------------------------------------------------------------------------*\ - Class UOPstream Declaration + Class UOPstreamBase Declaration \*---------------------------------------------------------------------------*/ -class UOPstream +class UOPstreamBase : public UPstream, public Ostream { - // Private Data - - int toProcNo_; - - DynamicList<char>& sendBuf_; - - const int tag_; - - const label comm_; - - const bool sendAtDestruct_; - - // Private Member Functions //- Prepare send buffer for count bytes of output, @@ -100,14 +88,27 @@ class UOPstream inline void putString(const std::string& str); -public: +protected: + + // Protected Data + + int toProcNo_; + + DynamicList<char>& sendBuf_; + + const int tag_; + + const label comm_; + + const bool sendAtDestruct_; - // Constructors + + // Protected Constructors //- Construct given process index to write to using the given //- attached send buffer, optional communication characteristics //- and IO format - UOPstream + UOPstreamBase ( const commsTypes commsType, const int toProcNo, @@ -119,11 +120,13 @@ public: ); //- Construct given buffers - UOPstream(const int toProcNo, PstreamBuffers& buffers); + UOPstreamBase(const int toProcNo, PstreamBuffers& buffers); +public: + //- Destructor. - ~UOPstream(); + virtual ~UOPstreamBase(); // Member Functions @@ -139,17 +142,6 @@ public: // Write Functions - //- Write given buffer to given processor - static bool write - ( - const commsTypes commsType, - const int toProcNo, - const char* buf, - const std::streamsize bufSize, - const int tag = UPstream::msgType(), - const label communicator = UPstream::worldComm - ); - //- Write token to stream or otherwise handle it. // \return false if the token type was not handled by this method virtual bool write(const token& tok); @@ -262,7 +254,7 @@ public: // Edit //- Set flags of stream - ios_base::fmtflags flags(const ios_base::fmtflags) + virtual ios_base::fmtflags flags(const ios_base::fmtflags) { return ios_base::fmtflags(0); } @@ -275,6 +267,70 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class UOPstream Declaration +\*---------------------------------------------------------------------------*/ + +//- Output inter-processor communications stream +//- using MPI send/recv etc. - operating on external buffer. +class UOPstream +: + public UOPstreamBase +{ + // Private Member Functions + + //- Final buffer send, called by destructor + bool bufferIPCsend(); + + +public: + + // Constructors + + //- Construct given process index to write to using the given + //- attached send buffer, optional communication characteristics + //- and IO format + UOPstream + ( + const commsTypes commsType, + const int toProcNo, + DynamicList<char>& sendBuf, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool sendAtDestruct = true, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + + //- Construct given buffers + UOPstream(const int toProcNo, PstreamBuffers& buffers); + + + //- Destructor, usually sends buffer on destruct. + virtual ~UOPstream(); + + + // Member Functions + + //- Use all write methods from base class + using UOPstreamBase::write; + + + // Static Functions + + //- Write buffer contents to given processor + // \return True on success + static bool write + ( + const commsTypes commsType, + const int toProcNo, + const char* buf, + const std::streamsize bufSize, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); +}; + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C similarity index 77% rename from src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C rename to src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C index d65a2a5011ebb964424be56283169fc1a994ae8c..25992abec80df972b7b51c58f76a7aaf3763da07 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2016-2021 OpenCFD Ltd. + Copyright (C) 2016-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -52,7 +52,7 @@ inline static label byteAlign(const label pos, const size_t align) // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -inline void Foam::UOPstream::prepareBuffer +inline void Foam::UOPstreamBase::prepareBuffer ( const size_t count, const size_t align @@ -75,13 +75,13 @@ inline void Foam::UOPstream::prepareBuffer template<class T> -inline void Foam::UOPstream::writeToBuffer(const T& val) +inline void Foam::UOPstreamBase::writeToBuffer(const T& val) { writeToBuffer(&val, sizeof(T), sizeof(T)); } -inline void Foam::UOPstream::writeToBuffer +inline void Foam::UOPstreamBase::writeToBuffer ( const void* data, const size_t count, @@ -111,7 +111,7 @@ inline void Foam::UOPstream::writeToBuffer } -inline void Foam::UOPstream::putChar(const char c) +inline void Foam::UOPstreamBase::putChar(const char c) { if (!sendBuf_.capacity()) { @@ -121,7 +121,7 @@ inline void Foam::UOPstream::putChar(const char c) } -inline void Foam::UOPstream::putString(const std::string& str) +inline void Foam::UOPstreamBase::putString(const std::string& str) { const size_t len = str.size(); writeToBuffer(len); @@ -131,7 +131,7 @@ inline void Foam::UOPstream::putString(const std::string& str) // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // -Foam::UOPstream::UOPstream +Foam::UOPstreamBase::UOPstreamBase ( const commsTypes commsType, const int toProcNo, @@ -155,15 +155,15 @@ Foam::UOPstream::UOPstream } -Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers) +Foam::UOPstreamBase::UOPstreamBase(const int toProcNo, PstreamBuffers& buffers) : - UPstream(buffers.commsType_), - Ostream(buffers.format_, IOstreamOption::currentVersion), + UPstream(buffers.commsType()), + Ostream(buffers.format(), IOstreamOption::currentVersion), toProcNo_(toProcNo), sendBuf_(buffers.sendBuf_[toProcNo]), - tag_(buffers.tag_), - comm_(buffers.comm_), - sendAtDestruct_(buffers.commsType_ != UPstream::commsTypes::nonBlocking) + tag_(buffers.tag()), + comm_(buffers.comm()), + sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking) { setOpened(); setGood(); @@ -172,35 +172,13 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers) // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // -Foam::UOPstream::~UOPstream() -{ - if (sendAtDestruct_) - { - if - ( - !UOPstream::write - ( - commsType_, - toProcNo_, - sendBuf_.cdata(), - sendBuf_.size(), - tag_, - comm_ - ) - ) - { - FatalErrorInFunction - << "Failed sending outgoing message of size " << sendBuf_.size() - << " to processor " << toProcNo_ - << Foam::abort(FatalError); - } - } -} +Foam::UOPstreamBase::~UOPstreamBase() +{} // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // -bool Foam::UOPstream::write(const token& tok) +bool Foam::UOPstreamBase::write(const token& tok) { // Direct token handling only for some types @@ -244,7 +222,7 @@ bool Foam::UOPstream::write(const token& tok) } -Foam::Ostream& Foam::UOPstream::write(const char c) +Foam::Ostream& Foam::UOPstreamBase::write(const char c) { if (!isspace(c)) { @@ -255,7 +233,7 @@ Foam::Ostream& Foam::UOPstream::write(const char c) } -Foam::Ostream& Foam::UOPstream::write(const char* str) +Foam::Ostream& Foam::UOPstreamBase::write(const char* str) { const word nonWhiteChars(string::validate<word>(str)); @@ -272,7 +250,7 @@ Foam::Ostream& Foam::UOPstream::write(const char* str) } -Foam::Ostream& Foam::UOPstream::write(const word& str) +Foam::Ostream& Foam::UOPstreamBase::write(const word& str) { putChar(token::tokenType::WORD); putString(str); @@ -281,7 +259,7 @@ Foam::Ostream& Foam::UOPstream::write(const word& str) } -Foam::Ostream& Foam::UOPstream::write(const string& str) +Foam::Ostream& Foam::UOPstreamBase::write(const string& str) { putChar(token::tokenType::STRING); putString(str); @@ -290,7 +268,7 @@ Foam::Ostream& Foam::UOPstream::write(const string& str) } -Foam::Ostream& Foam::UOPstream::writeQuoted +Foam::Ostream& Foam::UOPstreamBase::writeQuoted ( const std::string& str, const bool quoted @@ -310,7 +288,7 @@ Foam::Ostream& Foam::UOPstream::writeQuoted } -Foam::Ostream& Foam::UOPstream::write(const int32_t val) +Foam::Ostream& Foam::UOPstreamBase::write(const int32_t val) { putChar(token::tokenType::LABEL); writeToBuffer(val); @@ -318,7 +296,7 @@ Foam::Ostream& Foam::UOPstream::write(const int32_t val) } -Foam::Ostream& Foam::UOPstream::write(const int64_t val) +Foam::Ostream& Foam::UOPstreamBase::write(const int64_t val) { putChar(token::tokenType::LABEL); writeToBuffer(val); @@ -326,7 +304,7 @@ Foam::Ostream& Foam::UOPstream::write(const int64_t val) } -Foam::Ostream& Foam::UOPstream::write(const floatScalar val) +Foam::Ostream& Foam::UOPstreamBase::write(const floatScalar val) { putChar(token::tokenType::FLOAT); writeToBuffer(val); @@ -334,7 +312,7 @@ Foam::Ostream& Foam::UOPstream::write(const floatScalar val) } -Foam::Ostream& Foam::UOPstream::write(const doubleScalar val) +Foam::Ostream& Foam::UOPstreamBase::write(const doubleScalar val) { putChar(token::tokenType::DOUBLE); writeToBuffer(val); @@ -342,7 +320,7 @@ Foam::Ostream& Foam::UOPstream::write(const doubleScalar val) } -Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count) +Foam::Ostream& Foam::UOPstreamBase::write(const char* data, std::streamsize count) { if (format() != BINARY) { @@ -358,7 +336,7 @@ Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count) } -Foam::Ostream& Foam::UOPstream::writeRaw +Foam::Ostream& Foam::UOPstreamBase::writeRaw ( const char* data, std::streamsize count @@ -374,7 +352,7 @@ Foam::Ostream& Foam::UOPstream::writeRaw } -bool Foam::UOPstream::beginRawWrite(std::streamsize count) +bool Foam::UOPstreamBase::beginRawWrite(std::streamsize count) { if (format() != BINARY) { @@ -391,7 +369,7 @@ bool Foam::UOPstream::beginRawWrite(std::streamsize count) } -void Foam::UOPstream::print(Ostream& os) const +void Foam::UOPstreamBase::print(Ostream& os) const { os << "Writing from processor " << toProcNo_ << " to processor " << myProcNo() << " in communicator " << comm_ diff --git a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.C b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.C index 00883ca01133733d3d55778c975495ea08aeb727..8c6df7a9713725dadf4b86c6ef6c673a2b13ee33 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.C +++ b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.C @@ -38,11 +38,12 @@ namespace Foam // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -void Foam::processorLduInterface::resizeBuf(List<char>& buf, const label size) +void Foam::processorLduInterface::resizeBuf(List<char>& buf, const label len) { - if (buf.size() < size) + if (buf.size() < len) { - buf.resize(size); + // Use nocopy variant since it will be overwritten + buf.resize_nocopy(len); } } diff --git a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H index d3a6a7801c724049599f29b76c5918836d4d9961..28c13b7e252041a0b2ea9e965e962123825a8f7b 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H +++ b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H @@ -36,8 +36,8 @@ SourceFiles \*---------------------------------------------------------------------------*/ -#ifndef processorLduInterface_H -#define processorLduInterface_H +#ifndef Foam_processorLduInterface_H +#define Foam_processorLduInterface_H #include "lduInterface.H" #include "primitiveFieldsFwd.H" @@ -67,7 +67,7 @@ class processorLduInterface // Private Member Functions //- Increase buffer size if required - static void resizeBuf(List<char>& buf, const label size); + static void resizeBuf(List<char>& buf, const label len); public: @@ -78,7 +78,7 @@ public: // Constructors - //- Construct null + //- Default construct processorLduInterface() = default; diff --git a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C index 70a5639ef60799eee75c1df3503abffdf0f94f2e..fa2292c6e5902adc4716e33be8bf4d0c482a759d 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C +++ b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C @@ -39,7 +39,7 @@ void Foam::processorLduInterface::send const UList<Type>& f ) const { - label nBytes = f.byteSize(); + const label nBytes = f.byteSize(); if ( diff --git a/src/Pstream/dummy/Make/files b/src/Pstream/dummy/Make/files index c025891574bf3e80d0099f7b382864188a6e8a62..d1d6ee320798a972361ca4acb2b8a290ae3e0e0e 100644 --- a/src/Pstream/dummy/Make/files +++ b/src/Pstream/dummy/Make/files @@ -1,5 +1,6 @@ UPstream.C -UIPread.C -UOPwrite.C + +UIPstreamRead.C +UOPstreamWrite.C LIB = $(FOAM_LIBBIN)/dummy/libPstream diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C b/src/Pstream/dummy/UIPstreamRead.C similarity index 72% rename from src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C rename to src/Pstream/dummy/UIPstreamRead.C index 75805c718d3d769303b213fec92311c0166ceccf..3fef73615b4ae36e392574caade5b50cf57489c3 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C +++ b/src/Pstream/dummy/UIPstreamRead.C @@ -5,8 +5,8 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2011-2015 OpenFOAM Foundation + Copyright (C) 2021-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -26,32 +26,31 @@ License \*---------------------------------------------------------------------------*/ -#include "OPstream.H" +#include "UIPstream.H" -// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -Foam::OPstream::OPstream +void Foam::UIPstream::bufferIPCrecv() +{ + NotImplemented; +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +Foam::label Foam::UIPstream::read ( const commsTypes commsType, - const int toProcNo, - const label bufSize, + const int fromProcNo, + char* buf, + const std::streamsize bufSize, const int tag, - const label comm, - IOstreamOption::streamFormat fmt + const label communicator ) -: - Pstream(commsType, bufSize), - UOPstream - ( - commsType, - toProcNo, - Pstream::transferBuf_, - tag, - comm, - true, // sendAtDestruct - fmt - ) -{} +{ + NotImplemented; + return 0; +} // ************************************************************************* // diff --git a/src/Pstream/dummy/UOPwrite.C b/src/Pstream/dummy/UOPstreamWrite.C similarity index 89% rename from src/Pstream/dummy/UOPwrite.C rename to src/Pstream/dummy/UOPstreamWrite.C index fa1c2991f5ddd77398460df5e3f3b353069f9b53..9acde63b742574e7e57dc4f9c87fc9342f91907f 100644 --- a/src/Pstream/dummy/UOPwrite.C +++ b/src/Pstream/dummy/UOPstreamWrite.C @@ -6,6 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2015 OpenFOAM Foundation + Copyright (C) 2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -23,13 +24,19 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. -Description - Write primitive and binary block from OPstream - \*---------------------------------------------------------------------------*/ #include "UOPstream.H" +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +bool Foam::UOPstream::bufferIPCsend() +{ + NotImplemented; + return false; +} + + // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // bool Foam::UOPstream::write @@ -43,7 +50,6 @@ bool Foam::UOPstream::write ) { NotImplemented; - return false; } diff --git a/src/Pstream/mpi/Make/files b/src/Pstream/mpi/Make/files index eccdf776646facdf15c41782c80a2eccf10f7c5f..e16e96a25f360ff0b026465457975634e15d7941 100644 --- a/src/Pstream/mpi/Make/files +++ b/src/Pstream/mpi/Make/files @@ -1,6 +1,7 @@ -UOPwrite.C -UIPread.C -UPstream.C PstreamGlobals.C +UPstream.C + +UIPstreamRead.C +UOPstreamWrite.C LIB = $(FOAM_MPI_LIBBIN)/libPstream diff --git a/src/Pstream/mpi/UIPread.C b/src/Pstream/mpi/UIPstreamRead.C similarity index 52% rename from src/Pstream/mpi/UIPread.C rename to src/Pstream/mpi/UIPstreamRead.C index bd8b3188991260390ad1a672a8095e56924b9130..c1bf6fd1d4977ef2b15ca4cdec04b25c7e4d0f67 100644 --- a/src/Pstream/mpi/UIPread.C +++ b/src/Pstream/mpi/UIPstreamRead.C @@ -24,9 +24,6 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. -Description - Read from UIPstream - \*---------------------------------------------------------------------------*/ #include "UIPstream.H" @@ -36,195 +33,64 @@ Description #include <mpi.h> -// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -Foam::UIPstream::UIPstream -( - const commsTypes commsType, - const int fromProcNo, - DynamicList<char>& receiveBuf, - label& receiveBufPosition, - const int tag, - const label comm, - const bool clearAtEnd, - IOstreamOption::streamFormat fmt -) -: - UPstream(commsType), - Istream(fmt, IOstreamOption::currentVersion), - fromProcNo_(fromProcNo), - recvBuf_(receiveBuf), - recvBufPos_(receiveBufPosition), - tag_(tag), - comm_(comm), - clearAtEnd_(clearAtEnd), - messageSize_(0) +void Foam::UIPstream::bufferIPCrecv() { - setOpened(); - setGood(); - - if (commsType == commsTypes::nonBlocking) + // Called by constructor + if (debug) { - // Message is already received into buffer + Pout<< "UIPstream IPC read buffer :" + << " from:" << fromProcNo_ + << " tag:" << tag_ << " comm:" << comm_ + << " wanted size:" << recvBuf_.capacity() + << Foam::endl; } - else - { - if (debug) - { - Pout<< "UIPstream::UIPstream :" - << " read from:" << fromProcNo - << " tag:" << tag_ << " comm:" << comm_ - << " wanted size:" << recvBuf_.capacity() - << Foam::endl; - } - - // No buffer size allocated/specified - probe size of incoming message - if (!recvBuf_.capacity()) - { - profilingPstream::beginTiming(); - - MPI_Status status; - MPI_Probe - ( - fromProcNo_, - tag_, - PstreamGlobals::MPICommunicators_[comm_], - &status - ); - MPI_Get_count(&status, MPI_BYTE, &messageSize_); - - // Assume these are from gathers ... - profilingPstream::addGatherTime(); - - recvBuf_.resize(messageSize_); + // No buffer size allocated/specified - probe size of incoming message + if (!recvBuf_.capacity()) + { + profilingPstream::beginTiming(); - if (debug) - { - Pout<< "UIPstream::UIPstream : probed size:" - << messageSize_ << Foam::endl; - } - } + MPI_Status status; - messageSize_ = UIPstream::read + MPI_Probe ( - commsType, fromProcNo_, - recvBuf_.data(), - recvBuf_.capacity(), tag_, - comm_ + PstreamGlobals::MPICommunicators_[comm_], + &status ); + MPI_Get_count(&status, MPI_BYTE, &messageSize_); + + // Assume these are from gathers ... + profilingPstream::addGatherTime(); - // Set addressed size. Leave actual allocated memory intact. recvBuf_.resize(messageSize_); - if (!messageSize_) + if (debug) { - setEof(); + Pout<< "UIPstream::UIPstream : probed size:" + << messageSize_ << Foam::endl; } } -} - -Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) -: - UPstream(buffers.commsType_), - Istream(buffers.format_, IOstreamOption::currentVersion), - fromProcNo_(fromProcNo), - recvBuf_(buffers.recvBuf_[fromProcNo]), - recvBufPos_(buffers.recvBufPos_[fromProcNo]), - tag_(buffers.tag_), - comm_(buffers.comm_), - clearAtEnd_(true), - messageSize_(0) -{ - if + messageSize_ = UIPstream::read ( - commsType() != UPstream::commsTypes::scheduled - && !buffers.finishedSendsCalled_ - ) - { - FatalErrorInFunction - << "PstreamBuffers::finishedSends() never called." << endl - << "Please call PstreamBuffers::finishedSends() after doing" - << " all your sends (using UOPstream) and before doing any" - << " receives (using UIPstream)" << Foam::exit(FatalError); - } - - setOpened(); - setGood(); - - if (commsType() == commsTypes::nonBlocking) - { - // Message is already received into buffer - messageSize_ = recvBuf_.size(); - - if (debug) - { - Pout<< "UIPstream::UIPstream PstreamBuffers :" - << " fromProcNo:" << fromProcNo - << " tag:" << tag_ << " comm:" << comm_ - << " receive buffer size:" << messageSize_ - << Foam::endl; - } - } - else + commsType_, + fromProcNo_, + recvBuf_.data(), + recvBuf_.capacity(), + tag_, + comm_ + ); + + // Set addressed size. Leave actual allocated memory intact. + recvBuf_.resize(messageSize_); + + if (!messageSize_) { - if (debug) - { - Pout<< "UIPstream::UIPstream PstreamBuffers :" - << " read from:" << fromProcNo - << " tag:" << tag_ << " comm:" << comm_ - << " wanted size:" << recvBuf_.capacity() - << Foam::endl; - } - - // No buffer size allocated/specified - probe size of incoming message - if (!recvBuf_.capacity()) - { - profilingPstream::beginTiming(); - - MPI_Status status; - - MPI_Probe - ( - fromProcNo_, - tag_, - PstreamGlobals::MPICommunicators_[comm_], - &status - ); - MPI_Get_count(&status, MPI_BYTE, &messageSize_); - - // Assume these are from gathers ... - profilingPstream::addGatherTime(); - - recvBuf_.resize(messageSize_); - - if (debug) - { - Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:" - << messageSize_ << Foam::endl; - } - } - - messageSize_ = UIPstream::read - ( - commsType(), - fromProcNo_, - recvBuf_.data(), - recvBuf_.capacity(), - tag_, - comm_ - ); - - // Set addressed size. Leave actual allocated memory intact. - recvBuf_.resize(messageSize_); - - if (!messageSize_) - { - setEof(); - } + setEof(); } } @@ -287,7 +153,6 @@ Foam::label Foam::UIPstream::read FatalErrorInFunction << "MPI_Recv cannot receive incoming message" << Foam::abort(FatalError); - return 0; } @@ -336,7 +201,7 @@ Foam::label Foam::UIPstream::read ) { FatalErrorInFunction - << "MPI_Recv cannot start non-blocking receive" + << "MPI_Irecv cannot start non-blocking receive" << Foam::abort(FatalError); return 0; diff --git a/src/Pstream/mpi/UOPwrite.C b/src/Pstream/mpi/UOPstreamWrite.C similarity index 90% rename from src/Pstream/mpi/UOPwrite.C rename to src/Pstream/mpi/UOPstreamWrite.C index f59fba924df5c5374b10970007175242fedca4d3..0ff30ff98dd81123ba6c64832e75aba8b2314165 100644 --- a/src/Pstream/mpi/UOPwrite.C +++ b/src/Pstream/mpi/UOPstreamWrite.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2021 OpenCFD Ltd. + Copyright (C) 2019-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,9 +24,6 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. -Description - Write primitive and binary block from OPstream - \*---------------------------------------------------------------------------*/ #include "UOPstream.H" @@ -35,6 +32,22 @@ Description #include <mpi.h> +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +bool Foam::UOPstream::bufferIPCsend() +{ + return UOPstream::write + ( + commsType_, + toProcNo_, + sendBuf_.cdata(), + sendBuf_.size(), + tag_, + comm_ + ); +} + + // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // bool Foam::UOPstream::write @@ -70,13 +83,13 @@ bool Foam::UOPstream::write PstreamGlobals::checkCommunicator(communicator, toProcNo); - bool transferFailed = true; + bool failed = true; profilingPstream::beginTiming(); if (commsType == commsTypes::blocking) { - transferFailed = MPI_Bsend + failed = MPI_Bsend ( const_cast<char*>(buf), bufSize, @@ -99,7 +112,7 @@ bool Foam::UOPstream::write } else if (commsType == commsTypes::scheduled) { - transferFailed = MPI_Send + failed = MPI_Send ( const_cast<char*>(buf), bufSize, @@ -124,7 +137,7 @@ bool Foam::UOPstream::write { MPI_Request request; - transferFailed = MPI_Isend + failed = MPI_Isend ( const_cast<char*>(buf), bufSize, @@ -151,12 +164,11 @@ bool Foam::UOPstream::write else { FatalErrorInFunction - << "Unsupported communications type " - << UPstream::commsTypeNames[commsType] + << "Unsupported communications type " << int(commsType) << Foam::abort(FatalError); } - return !transferFailed; + return !failed; } diff --git a/src/finiteArea/faMesh/faPatches/constraint/processor/processorFaPatch.C b/src/finiteArea/faMesh/faPatches/constraint/processor/processorFaPatch.C index 4ac1c79f227531cb601340a3aca243d00fa77bb9..fab3a58e1b2eab593edc8acd7e607c671218430a 100644 --- a/src/finiteArea/faMesh/faPatches/constraint/processor/processorFaPatch.C +++ b/src/finiteArea/faMesh/faPatches/constraint/processor/processorFaPatch.C @@ -28,8 +28,6 @@ License #include "processorFaPatch.H" #include "addToRunTimeSelectionTable.H" -#include "IPstream.H" -#include "OPstream.H" #include "transformField.H" #include "faBoundaryMesh.H" #include "faMesh.H" diff --git a/src/finiteArea/fields/faPatchFields/constraint/processor/processorFaPatchField.C b/src/finiteArea/fields/faPatchFields/constraint/processor/processorFaPatchField.C index 5c24cc8a1fc56c472eb4f0b7851295bc18ee3645..613fd7ce57e1514fbc99bcf023e4f04d8fb2635b 100644 --- a/src/finiteArea/fields/faPatchFields/constraint/processor/processorFaPatchField.C +++ b/src/finiteArea/fields/faPatchFields/constraint/processor/processorFaPatchField.C @@ -28,8 +28,6 @@ License #include "processorFaPatchField.H" #include "processorFaPatch.H" -#include "IPstream.H" -#include "OPstream.H" #include "transformField.H" // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // diff --git a/src/meshTools/algorithms/MeshWave/FaceCellWave.C b/src/meshTools/algorithms/MeshWave/FaceCellWave.C index 8de0d33767fc3ff975a72d477062e32b07734ba6..89c15a2fa4a5769eeb08b2151084424bbc771ed5 100644 --- a/src/meshTools/algorithms/MeshWave/FaceCellWave.C +++ b/src/meshTools/algorithms/MeshWave/FaceCellWave.C @@ -31,8 +31,8 @@ License #include "processorPolyPatch.H" #include "cyclicPolyPatch.H" #include "cyclicAMIPolyPatch.H" -#include "OPstream.H" -#include "IPstream.H" +#include "UIPstream.H" +#include "UOPstream.H" #include "PstreamReduceOps.H" #include "debug.H" #include "typeInfo.H" diff --git a/src/meshTools/algorithms/PointEdgeWave/PointEdgeWave.C b/src/meshTools/algorithms/PointEdgeWave/PointEdgeWave.C index 52604c021afe0988209bdfc682a9c3a6c9099a98..2a6357835b2c95ea5b6f8aa43d82d2b5c1186495 100644 --- a/src/meshTools/algorithms/PointEdgeWave/PointEdgeWave.C +++ b/src/meshTools/algorithms/PointEdgeWave/PointEdgeWave.C @@ -30,8 +30,8 @@ License #include "polyMesh.H" #include "processorPolyPatch.H" #include "cyclicPolyPatch.H" -#include "OPstream.H" -#include "IPstream.H" +#include "UIPstream.H" +#include "UOPstream.H" #include "PstreamCombineReduceOps.H" #include "debug.H" #include "typeInfo.H"