From bed3deed5daef37babd03c3c87c3a0ffdf520679 Mon Sep 17 00:00:00 2001 From: Mark Olesen <Mark.Olesen@esi-group.com> Date: Sun, 20 Feb 2022 19:26:13 +0100 Subject: [PATCH] ENH: new broadcast version of Pstreams (#2371) - The idea of broadcast streams is to replace multiple master to subProcs communications with a single MPI_Bcast. if (Pstream::master()) { OPBstream toAll(Pstream::masterNo()); toAll << data; } else { IPBstream fromMaster(Pstream::masterNo()); fromMaster >> data; } // vs. if (Pstream::master()) { for (const int proci : Pstream::subProcs()) { OPstream os(Pstream::commsTypes::scheduled, proci); os << data; } } else { IPstream is(Pstream::commsTypes::scheduled, Pstream::masterNo()); is >> data; } Can simply use UPstream::broadcast() directly for contiguous data with known lengths. Based on ideas from T.Aoyagi(RIST), A.Azami(RIST) --- .../test/parallel-broadcast/Make/files | 3 + .../test/parallel-broadcast/Make/options | 2 + .../Test-parallel-broadcast.C | 147 ++++++++++++++++++ src/OpenFOAM/Make/files | 2 + .../db/IOstreams/Pstreams/IPBstreams.C | 107 +++++++++++++ src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H | 50 +++++- .../db/IOstreams/Pstreams/OPBstreams.C | 109 +++++++++++++ src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H | 45 +++++- .../db/IOstreams/Pstreams/UIPstream.H | 62 ++++++++ .../db/IOstreams/Pstreams/UOPstream.H | 63 ++++++++ src/Pstream/dummy/Make/files | 2 + src/Pstream/dummy/UIPBstreamRead.C | 55 +++++++ src/Pstream/dummy/UOPBstreamWrite.C | 56 +++++++ src/Pstream/mpi/Make/files | 2 + src/Pstream/mpi/UIPBstreamRead.C | 140 +++++++++++++++++ src/Pstream/mpi/UOPBstreamWrite.C | 119 ++++++++++++++ 16 files changed, 956 insertions(+), 8 deletions(-) create mode 100644 applications/test/parallel-broadcast/Make/files create mode 100644 applications/test/parallel-broadcast/Make/options create mode 100644 applications/test/parallel-broadcast/Test-parallel-broadcast.C create mode 100644 src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C create mode 100644 src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C create mode 100644 src/Pstream/dummy/UIPBstreamRead.C create mode 100644 src/Pstream/dummy/UOPBstreamWrite.C create mode 100644 src/Pstream/mpi/UIPBstreamRead.C create mode 100644 src/Pstream/mpi/UOPBstreamWrite.C diff --git a/applications/test/parallel-broadcast/Make/files b/applications/test/parallel-broadcast/Make/files new file mode 100644 index 00000000000..d0383911392 --- /dev/null +++ b/applications/test/parallel-broadcast/Make/files @@ -0,0 +1,3 @@ +Test-parallel-broadcast.C + +EXE = $(FOAM_USER_APPBIN)/Test-parallel-broadcast diff --git a/applications/test/parallel-broadcast/Make/options b/applications/test/parallel-broadcast/Make/options new file mode 100644 index 00000000000..18e6fe47afa --- /dev/null +++ b/applications/test/parallel-broadcast/Make/options @@ -0,0 +1,2 @@ +/* EXE_INC = */ +/* EXE_LIBS = */ diff --git a/applications/test/parallel-broadcast/Test-parallel-broadcast.C b/applications/test/parallel-broadcast/Test-parallel-broadcast.C new file mode 100644 index 00000000000..8c518bf9751 --- /dev/null +++ b/applications/test/parallel-broadcast/Test-parallel-broadcast.C @@ -0,0 +1,147 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +Application + Test-parallel-broadcast + +Description + Test for various broadcast routines. + +\*---------------------------------------------------------------------------*/ + +#include "List.H" +#include "argList.H" +#include "Time.H" +#include "vector.H" +#include "IPstream.H" +#include "OPstream.H" +#include "IOstreams.H" + +using namespace Foam; + + +// This is what our new scatter will look like inside +template<class Type> +void testBroadcastStream +( + Type& value, + const label comm = UPstream::worldComm +) +{ + Info<< nl << "is_contiguous:" << is_contiguous<Type>::value << endl; + Pout<< "pre-broadcast: " << value << endl; + + if (is_contiguous<Type>::value) + { + UPstream::broadcast + ( + reinterpret_cast<char*>(&value), + sizeof(Type), + comm, + UPstream::masterNo() + ); + } + else + { + if (UPstream::master()) + { + OPBstream toAll(UPstream::masterNo(), comm); + toAll << value; + } + else + { + IPBstream fromMaster(UPstream::masterNo(), comm); + fromMaster >> value; + } + } + + Pout<< "post-broadcast: " << value << endl; +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +int main(int argc, char *argv[]) +{ + argList::noCheckProcessorDirectories(); + #include "setRootCase.H" + #include "createTime.H" + + { + label value = -1; + if (Pstream::master()) + { + value = UPstream::nProcs(); + } + testBroadcastStream(value); + } + + { + labelList values; + if (Pstream::master()) + { + values = identity(UPstream::nProcs()); + } + testBroadcastStream(values); + } + + { + wordList values; + if (Pstream::master()) + { + values.resize(UPstream::nProcs()); + forAll(values, i) + { + values[i] = "value_" + Foam::name(i); + } + } + testBroadcastStream(values); + } + + { + vector values(vector::uniform(-1)); + if (Pstream::master()) + { + values = vector(1,2,3); + } + testBroadcastStream(values); + } + + { + FixedList<vector, 3> values(vector::uniform(-1)); + if (Pstream::master()) + { + values = vector(1,2,3); + } + testBroadcastStream(values); + } + + Info<< "End\n" << endl; + + return 0; +} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/Make/files b/src/OpenFOAM/Make/files index c7fe4cfc03d..52a176bf11d 100644 --- a/src/OpenFOAM/Make/files +++ b/src/OpenFOAM/Make/files @@ -274,6 +274,8 @@ $(Pstreams)/UIPstreamBase.C $(Pstreams)/UOPstreamBase.C $(Pstreams)/IPstreams.C $(Pstreams)/OPstreams.C +$(Pstreams)/IPBstreams.C +$(Pstreams)/OPBstreams.C dictionary = db/dictionary $(dictionary)/dictionary.C diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C new file mode 100644 index 00000000000..3bcb9142e8b --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C @@ -0,0 +1,107 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +\*---------------------------------------------------------------------------*/ + +#include "UIPstream.H" +#include "IPstream.H" +#include "IOstreams.H" + +// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // + +Foam::UIPBstream::UIPBstream +( + const commsTypes commsType, + const int fromProcNo, + DynamicList<char>& receiveBuf, + label& receiveBufPosition, + const int tag, + const label comm, + const bool clearAtEnd, + IOstreamOption::streamFormat fmt +) +: + UIPstreamBase + ( + commsType, + fromProcNo, + receiveBuf, + receiveBufPosition, + tag, + comm, + clearAtEnd, + fmt + ) +{ + bufferIPCrecv(); +} + + +Foam::IPBstream::IPBstream +( + const commsTypes commsType, + const int fromProcNo, + const label bufSize, + const int tag, + const label comm, + IOstreamOption::streamFormat fmt +) +: + Pstream(commsType, bufSize), + UIPBstream + ( + commsType, + fromProcNo, + Pstream::transferBuf_, + transferBufPosition_, + tag, + comm, + false, // Do not clear Pstream::transferBuf_ if at end + fmt + ), + transferBufPosition_(0) +{} + + +Foam::IPBstream::IPBstream +( + const int fromProcNo, + const label comm, + IOstreamOption::streamFormat fmt +) +: + IPBstream + ( + UPstream::commsTypes::scheduled, // irrelevant + fromProcNo, + label(0), // bufSize + UPstream::msgType(), // irrelevant + comm, + fmt + ) +{} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H index 5caea64dc9f..8ad13801932 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2021-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -31,14 +31,14 @@ Description Input inter-processor communications stream. SourceFiles - IPstream.C + IPstreams.C \*---------------------------------------------------------------------------*/ #include "Pstream.H" -#ifndef IPstream_H -#define IPstream_H +#ifndef Foam_IPstream_H +#define Foam_IPstream_H #include "UIPstream.H" @@ -79,6 +79,48 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class IPBstream Declaration +\*---------------------------------------------------------------------------*/ + +//- Input inter-processor communications stream +//- using MPI broadcast. +class IPBstream +: + public Pstream, + public UIPBstream +{ + // Private Data + + //- Receive index into Pstream::transferBuf_ + label transferBufPosition_; + +public: + + // Constructors + + //- Construct for broadcast root, optional buffer size, read format + IPBstream + ( + const commsTypes commsType, + const int fromProcNo, //!< UPstream::masterNo() - root procNo + const label bufSize = 0, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + + //- Construct for broadcast root with optional communicator, + //- write format + explicit IPBstream + ( + const int fromProcNo, //!< UPstream::masterNo() - root procNo + const label comm = UPstream::worldComm, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); +}; + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C new file mode 100644 index 00000000000..182597b193e --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C @@ -0,0 +1,109 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +\*---------------------------------------------------------------------------*/ + +#include "UOPstream.H" +#include "OPstream.H" +#include "IOstreams.H" + +// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // + +Foam::UOPBstream::UOPBstream +( + const commsTypes commsType, + const int toProcNo, + DynamicList<char>& sendBuf, + const int tag, + const label comm, + const bool sendAtDestruct, + IOstreamOption::streamFormat fmt +) +: + UOPstreamBase(commsType, toProcNo, sendBuf, tag, comm, sendAtDestruct, fmt) +{} + + +Foam::OPBstream::OPBstream +( + const commsTypes commsType, + const int toProcNo, + const label bufSize, + const int tag, + const label comm, + IOstreamOption::streamFormat fmt +) +: + Pstream(commsType, bufSize), + UOPBstream + ( + commsType, + toProcNo, + Pstream::transferBuf_, + tag, + comm, + true, // sendAtDestruct + fmt + ) +{} + + +Foam::OPBstream::OPBstream +( + const int toProcNo, + const label comm, + IOstreamOption::streamFormat fmt +) +: + OPBstream + ( + UPstream::commsTypes::scheduled, // irrelevant + toProcNo, + label(0), // bufSize + UPstream::msgType(), // irrelevant + comm, + fmt + ) +{} + + +// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // + +Foam::UOPBstream::~UOPBstream() +{ + if (sendAtDestruct_) + { + if (!bufferIPCsend()) + { + FatalErrorInFunction + << "Failed broadcast message of size " + << sendBuf_.size() << " root: " << toProcNo_ + << Foam::abort(FatalError); + } + } +} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H index c91c230377f..ecb2ed9df75 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2021-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -31,14 +31,14 @@ Description Output inter-processor communications stream. SourceFiles - OPstream.C + OPstreams.C \*---------------------------------------------------------------------------*/ #include "Pstream.H" -#ifndef OPstream_H -#define OPstream_H +#ifndef Foam_OPstream_H +#define Foam_OPstream_H #include "UOPstream.H" @@ -74,6 +74,43 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class OPBstream Declaration +\*---------------------------------------------------------------------------*/ + +//- Output inter-processor communications stream +//- using MPI broadcast. +class OPBstream +: + public Pstream, + public UOPBstream +{ +public: + + // Constructors + + //- Construct for broadcast root, optional buffer size, read format + OPBstream + ( + const commsTypes commsType, + const int toProcNo, //!< UPstream::masterNo() - root procNo + const label bufSize = 0, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + + //- Construct for broadcast root with optional communicator, + //- write format + explicit OPBstream + ( + const int toProcNo, //!< UPstream::masterNo() - root procNo + const label comm = UPstream::worldComm, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); +}; + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H index 5e593c93eb5..580a47085e5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H @@ -262,6 +262,68 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class UIPBstream Declaration +\*---------------------------------------------------------------------------*/ + +//- Input inter-processor communications stream +//- using MPI broadcast - operating on external buffer. +class UIPBstream +: + public UIPstreamBase +{ + // Private Member Functions + + //- Initial buffer recv via broadcast, called by constructor + void bufferIPCrecv(); + + +public: + + // Constructors + + //- Construct given process index to read from using the given + //- attached receive buffer, optional communication characteristics + //- and IO format + UIPBstream + ( + const commsTypes commsType, //!< ignored + const int fromProcNo, //!< UPstream::masterNo() + DynamicList<char>& receiveBuf, + label& receiveBufPosition, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool clearAtEnd = false, // destroy receiveBuf if at end + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + + + //- Destructor + virtual ~UIPBstream() = default; + + + // Member Functions + + //- Use all read methods from base + using UIPstreamBase::read; + + + // Static Functions + + //- Wrapped version of UPstream::broadcast + // \return the message size + static label read + ( + const commsTypes commsTypes, //!< ignored + const int rootProcNo, //!< UPstream::masterNo() + char* buf, + const std::streamsize bufSize, + const int tag = UPstream::msgType(), //!< ignored + const label comm = UPstream::worldComm + ); +}; + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H index da7365a2f97..2e0c184378d 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H @@ -331,6 +331,69 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class UOPBstream Declaration +\*---------------------------------------------------------------------------*/ + +//- Output inter-processor communications stream +//- using MPI broadcast - operating on external buffer. +// +// \note does not use commsType, tag etc. +class UOPBstream +: + public UOPstreamBase +{ + // Private Member Functions + + //- Final buffer send, called by destructor + bool bufferIPCsend(); + + +public: + + // Constructors + + //- Construct given process index to write to using the given + //- attached send buffer, optional communication characteristics + //- and IO format + UOPBstream + ( + const commsTypes commsType, //!< ignored + const int toProcNo, //!< UPstream::masterNo() + DynamicList<char>& sendBuf, + const int tag = UPstream::msgType(), //!< ignored + const label comm = UPstream::worldComm, + const bool sendAtDestruct = true, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ); + + + //- Destructor, usually sends buffer on destruct. + virtual ~UOPBstream(); + + + // Member Functions + + //- Use all write methods from base + using UOPstreamBase::write; + + + // Static Functions + + //- Wrapped version of UPstream::broadcast with const-cast + // \return True on success + static bool write + ( + const commsTypes commsType, //!< ignored + const int rootProcNo, //!< UPstream::masterNo() + const char* buf, + const std::streamsize bufSize, + const int tag = UPstream::msgType(), //!< ignored + const label comm = UPstream::worldComm + ); +}; + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/Pstream/dummy/Make/files b/src/Pstream/dummy/Make/files index 436bf6ffc40..90b44827f99 100644 --- a/src/Pstream/dummy/Make/files +++ b/src/Pstream/dummy/Make/files @@ -4,5 +4,7 @@ UPstreamReduce.C UIPstreamRead.C UOPstreamWrite.C +UIPBstreamRead.C +UOPBstreamWrite.C LIB = $(FOAM_LIBBIN)/dummy/libPstream diff --git a/src/Pstream/dummy/UIPBstreamRead.C b/src/Pstream/dummy/UIPBstreamRead.C new file mode 100644 index 00000000000..94e4dcd7c40 --- /dev/null +++ b/src/Pstream/dummy/UIPBstreamRead.C @@ -0,0 +1,55 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +\*---------------------------------------------------------------------------*/ + +#include "UIPstream.H" + +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +void Foam::UIPBstream::bufferIPCrecv() +{ + NotImplemented; +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +Foam::label Foam::UIPBstream::read +( + const commsTypes commsType, + const int rootProcNo, + char* buf, + const std::streamsize bufSize, + const int tag, + const label communicator +) +{ + NotImplemented; + return 0; +} + + +// ************************************************************************* // diff --git a/src/Pstream/dummy/UOPBstreamWrite.C b/src/Pstream/dummy/UOPBstreamWrite.C new file mode 100644 index 00000000000..3951b48a0d5 --- /dev/null +++ b/src/Pstream/dummy/UOPBstreamWrite.C @@ -0,0 +1,56 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +\*---------------------------------------------------------------------------*/ + +#include "UOPstream.H" + +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +bool Foam::UOPBstream::bufferIPCsend() +{ + NotImplemented; + return false; +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +bool Foam::UOPBstream::write +( + const commsTypes commsType, + const int rootProcNo, + const char* buf, + const std::streamsize bufSize, + const int tag, + const label communicator +) +{ + NotImplemented; + return false; +} + + +// ************************************************************************* // diff --git a/src/Pstream/mpi/Make/files b/src/Pstream/mpi/Make/files index 551d8092bd7..b117976a386 100644 --- a/src/Pstream/mpi/Make/files +++ b/src/Pstream/mpi/Make/files @@ -5,5 +5,7 @@ UPstreamReduce.C UIPstreamRead.C UOPstreamWrite.C +UIPBstreamRead.C +UOPBstreamWrite.C LIB = $(FOAM_MPI_LIBBIN)/libPstream diff --git a/src/Pstream/mpi/UIPBstreamRead.C b/src/Pstream/mpi/UIPBstreamRead.C new file mode 100644 index 00000000000..ae687a6b4b8 --- /dev/null +++ b/src/Pstream/mpi/UIPBstreamRead.C @@ -0,0 +1,140 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +\*---------------------------------------------------------------------------*/ + +#include "UIPstream.H" +#include "PstreamGlobals.H" +#include "profilingPstream.H" +#include "IOstreams.H" + +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +void Foam::UIPBstream::bufferIPCrecv() +{ + // Uses double broadcast. Symmetric with UOPBstream::bufferIPCsend() + // 1. for the data size + // 2. for the data itself + + // Expected message size, similar to MPI_Probe + // Same type must be expected in UOPBstream::bufferIPCsend() + label bufSize(0); + + // Broadcast #1 - data size + if + ( + !UPstream::broadcast + ( + reinterpret_cast<char*>(&bufSize), + sizeof(label), + comm_, + fromProcNo_ //< is actually rootProcNo + ) + ) + { + FatalErrorInFunction + << "MPI_Bcast failure receiving buffer size" << nl + << Foam::abort(FatalError); + } + + if (debug) + { + Pout<< "UOPBstream IPC read buffer :" + << " root:" << fromProcNo_ + << " comm:" << comm_ + << " probed size:" << bufSize + << " wanted size:" << recvBuf_.capacity() + << Foam::endl; + } + + // No buffer size allocated/specified + if (!recvBuf_.capacity()) + { + recvBuf_.resize(bufSize); + } + + // This is the only real information we can trust + messageSize_ = bufSize; + + // Broadcast #2 - data content + // - skip if there is no data to receive + + if (messageSize_) + { + if + ( + !UPstream::broadcast + ( + recvBuf_.data(), + messageSize_, // same as bufSize + comm_, + fromProcNo_ //< is actually rootProcNo + ) + ) + { + FatalErrorInFunction + << "MPI_Bcast failure receiving buffer data:" << bufSize << nl + << Foam::abort(FatalError); + } + } + + // Set addressed size. Leave actual allocated memory intact. + recvBuf_.resize(messageSize_); + + if (!messageSize_) + { + setEof(); + } +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +Foam::label Foam::UIPBstream::read +( + const commsTypes commsType, + const int rootProcNo, + char* buf, + const std::streamsize bufSize, + const int tag, + const label comm +) +{ + if + ( + !UPstream::broadcast(buf, bufSize, comm, rootProcNo) + ) + { + FatalErrorInFunction + << "MPI_Bcast failure receiving data:" << label(bufSize) << nl + << Foam::abort(FatalError); + return 0; + } + + return bufSize; +} + + +// ************************************************************************* // diff --git a/src/Pstream/mpi/UOPBstreamWrite.C b/src/Pstream/mpi/UOPBstreamWrite.C new file mode 100644 index 00000000000..230845d7610 --- /dev/null +++ b/src/Pstream/mpi/UOPBstreamWrite.C @@ -0,0 +1,119 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +\*---------------------------------------------------------------------------*/ + +#include "UOPstream.H" +#include "PstreamGlobals.H" +#include "profilingPstream.H" + +#include <mpi.h> + +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +bool Foam::UOPBstream::bufferIPCsend() +{ + // Uses double broadcast + // 1. for the data size + // 2. for the data itself + // With this information, can determine and resize receive buffer + + PstreamGlobals::checkCommunicator(comm_, toProcNo_); + + // Same type must be expected in UIPBstream::bufferIPCrecv() + label bufSize(sendBuf_.size()); + + // Broadcast #1 - data size + if + ( + !UPstream::broadcast + ( + reinterpret_cast<char*>(&bufSize), + sizeof(label), + comm_, + toProcNo_ //< is actually rootProcNo + ) + ) + { + FatalErrorInFunction + << "MPI_Bcast failure sending buffer size:" << bufSize << nl + << Foam::abort(FatalError); + return false; + } + + // Broadcast #2 - data content + // - skip if there is no data to send + if (bufSize) + { + if + ( + !Pstream::broadcast + ( + sendBuf_.data(), + sendBuf_.size(), // same as bufSize + comm_, + toProcNo_ //< is actually rootProcNo + ) + ) + { + FatalErrorInFunction + << "MPI_Bcast failure sending buffer data:" << bufSize << nl + << Foam::abort(FatalError); + return false; + } + } + + return true; +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +bool Foam::UOPBstream::write +( + const commsTypes commsType, /* unused */ + const int rootProcNo, + const char* buf, + const std::streamsize bufSize, + const int tag, /* unused */ + const label comm +) +{ + if + ( + !UPstream::broadcast(const_cast<char*>(buf), bufSize, comm, rootProcNo) + ) + { + FatalErrorInFunction + << "MPI_Bcast failure sending buffer data:" << label(bufSize) << nl + << Foam::abort(FatalError); + return false; + } + + return true; +} + + +// ************************************************************************* // -- GitLab