diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index e2ec02e88beef62e46d97efb72cb2758a0201ae9..2e2f015403742b44b68f4e90d97e1e583e0fef18 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -55,6 +55,37 @@ class Pstream : public UPstream { + // Private Static Functions + + //- Exchange contiguous data. Sends sendData, receives into + // recvData. If block=true will wait for all transfers to finish. + // Data provided and received as container. + template<class Container, class T> + static void exchangeContainer + ( + const UList<Container>& sendBufs, + const labelUList& recvSizes, + List<Container>& recvBufs, + const int tag, + const label comm, + const bool block + ); + + //- Exchange contiguous data. Sends sendData, receives into + // recvData. If block=true will wait for all transfers to finish. + // Data provided and received as pointers. + template<class T> + static void exchangeBuf + ( + const labelUList& sendSizes, // number of T, not number of char + const UList<const char*>& sendBufs, + const labelUList& recvSizes, // number of T, not number of char + List<char*>& recvBufs, + const int tag, + const label comm, + const bool block + ); + protected: @@ -63,6 +94,7 @@ protected: //- Transfer buffer DynamicList<char> buf_; + public: // Declare name of the class and its debug switch diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index 53c4b6f344d847bed4582e98d1ad923d319540aa..51fb00060b8d4cd2917db6fe1f6073efb4fddccd 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -32,7 +32,6 @@ Description #ifndef PstreamReduceOps_H #define PstreamReduceOps_H -#include "Pstream.H" #include "ops.H" #include "vector2D.H" diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index d8954cb1c9e8168ec79a121ac83da42508815bed..77692890b3216362ed086740da3050d3aa1d89db 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -464,4 +464,16 @@ registerOptSwitch ); +int Foam::UPstream::maxCommsSize +( + Foam::debug::optimisationSwitch("maxCommsSize", 0) +); +registerOptSwitch +( + "maxCommsSize", + int, + Foam::UPstream::maxCommsSize +); + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 609155d492618139b54d6848a4e2678d0a09bb07..e2fb8a77016c0449847bd8cd40b6029b55fdfdf9 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -268,6 +268,9 @@ public: //- Number of polling cycles in processor updates static int nPollProcInterfaces; + //- Optional maximum message size (bytes) + static int maxCommsSize; + //- Default communicator (all processors) static label worldComm; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C index 3a3436cf27c6dc6030c6a86b7290328d8c77a4ec..34114a29a4f2556bf7ebcb371c6e389f9e5c4baa 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -28,11 +28,157 @@ Description #include "Pstream.H" #include "contiguous.H" -#include "PstreamCombineReduceOps.H" -#include "UPstream.H" +#include "PstreamReduceOps.H" // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // +template<class Container, class T> +void Foam::Pstream::exchangeContainer +( + const UList<Container>& sendBufs, + const labelUList& recvSizes, + List<Container>& recvBufs, + const int tag, + const label comm, + const bool block +) +{ + label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + forAll(recvSizes, proci) + { + if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) + { + UIPstream::read + ( + UPstream::nonBlocking, + proci, + reinterpret_cast<char*>(recvBufs[proci].begin()), + recvSizes[proci]*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + forAll(sendBufs, proci) + { + if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0) + { + if + ( + !UOPstream::write + ( + UPstream::nonBlocking, + proci, + reinterpret_cast<const char*>(sendBufs[proci].begin()), + sendBufs[proci].size()*sizeof(T), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendBufs[proci].size()*sizeof(T)) + << Foam::abort(FatalError); + } + } + } + + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (block) + { + Pstream::waitRequests(startOfRequests); + } +} + + +template<class T> +void Foam::Pstream::exchangeBuf +( + const labelUList& sendSizes, + const UList<const char*>& sendBufs, + const labelUList& recvSizes, + List<char*>& recvBufs, + const int tag, + const label comm, + const bool block +) +{ + label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + forAll(recvSizes, proci) + { + if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) + { + UIPstream::read + ( + UPstream::nonBlocking, + proci, + recvBufs[proci], + recvSizes[proci]*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + forAll(sendBufs, proci) + { + if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) + { + if + ( + !UOPstream::write + ( + UPstream::nonBlocking, + proci, + sendBufs[proci], + sendSizes[proci]*sizeof(T), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendSizes[proci]*sizeof(T)) + << Foam::abort(FatalError); + } + } + } + + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (block) + { + Pstream::waitRequests(startOfRequests); + } +} + + template<class Container, class T> void Foam::Pstream::exchange ( @@ -63,11 +209,7 @@ void Foam::Pstream::exchange if (UPstream::parRun() && UPstream::nProcs(comm) > 1) { - label startOfRequests = Pstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - + // Presize all receive buffers forAll(recvSizes, proci) { label nRecv = recvSizes[proci]; @@ -75,55 +217,121 @@ void Foam::Pstream::exchange if (proci != Pstream::myProcNo(comm) && nRecv > 0) { recvBufs[proci].setSize(nRecv); - UIPstream::read - ( - UPstream::nonBlocking, - proci, - reinterpret_cast<char*>(recvBufs[proci].begin()), - nRecv*sizeof(T), - tag, - comm - ); } } - - // Set up sends - // ~~~~~~~~~~~~ - - forAll(sendBufs, proci) + if (Pstream::maxCommsSize <= 0) { - if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0) + // Do the exchanging in one go + exchangeContainer<Container, T> + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + block + ); + } + else + { + // Determine the number of chunks to send. Note that we + // only have to look at the sending data since we are + // guaranteed that some processor's sending size is some other + // processor's receive size. Also we can ignore any local comms. + + label maxNSend = 0; + forAll(sendBufs, proci) { - if - ( - !UOPstream::write - ( - UPstream::nonBlocking, - proci, - reinterpret_cast<const char*>(sendBufs[proci].begin()), - sendBufs[proci].size()*sizeof(T), - tag, - comm - ) - ) + if (proci != Pstream::myProcNo(comm)) { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendBufs[proci].size()*sizeof(T)) - << Foam::abort(FatalError); + maxNSend = max(maxNSend, sendBufs[proci].size()); } } - } + const label maxNBytes = sizeof(T)*maxNSend; - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ + // We need to send maxNBytes bytes so the number of iterations: + // maxNBytes iterations + // --------- ---------- + // 0 0 + // 1..maxCommsSize 1 + // maxCommsSize+1..2*maxCommsSize 2 + // etc. - if (block) - { - Pstream::waitRequests(startOfRequests); + label nIter; + if (maxNBytes == 0) + { + nIter = 0; + } + else + { + nIter = (maxNBytes-1)/Pstream::maxCommsSize+1; + } + reduce(nIter, maxOp<label>(), tag, comm); + + + List<const char*> charSendBufs(sendBufs.size()); + List<char*> charRecvBufs(sendBufs.size()); + + labelList nRecv(sendBufs.size()); + labelList startRecv(sendBufs.size(), 0); + labelList nSend(sendBufs.size()); + labelList startSend(sendBufs.size(), 0); + + for (label iter = 0; iter < nIter; iter++) + { + forAll(sendBufs, proci) + { + nSend[proci] = min + ( + Pstream::maxCommsSize, + sendBufs[proci].size()-startSend[proci] + ); + charSendBufs[proci] = + ( + nSend[proci] > 0 + ? reinterpret_cast<const char*> + ( + &(sendBufs[proci][startSend[proci]]) + ) + : nullptr + ); + + nRecv[proci] = min + ( + Pstream::maxCommsSize, + recvBufs[proci].size()-startRecv[proci] + ); + + charRecvBufs[proci] = + ( + nRecv[proci] > 0 + ? reinterpret_cast<char*> + ( + &(recvBufs[proci][startRecv[proci]]) + ) + : nullptr + ); + } + + exchangeBuf<T> + ( + nSend, + charSendBufs, + nRecv, + charRecvBufs, + tag, + comm, + block + ); + + forAll(nSend, proci) + { + startSend[proci] += nSend[proci]; + startRecv[proci] += nRecv[proci]; + } + } } } diff --git a/src/OpenFOAM/db/dynamicLibrary/codedBase/codedBase.C b/src/OpenFOAM/db/dynamicLibrary/codedBase/codedBase.C index 5fcaf92d352db8d947a8ebb07df877c8d7c4c1bc..57caf7f9427fb1019240e3b1dd40993cb5e1436f 100644 --- a/src/OpenFOAM/db/dynamicLibrary/codedBase/codedBase.C +++ b/src/OpenFOAM/db/dynamicLibrary/codedBase/codedBase.C @@ -28,6 +28,7 @@ License #include "dynamicCode.H" #include "dynamicCodeContext.H" #include "dlLibraryTable.H" +#include "Pstream.H" #include "PstreamReduceOps.H" #include "OSspecific.H" #include "Ostream.H" diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 124e22a0c5decb5e9ba3eea68630c0ef1b0d56ff..4d14c2727a73fb43f129f8b456fa34134f6a022b 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -23,7 +23,7 @@ License \*---------------------------------------------------------------------------*/ -#include "UPstream.H" +#include "Pstream.H" #include "PstreamReduceOps.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index 4e105aa7677916c13554691106eb7f4fdd41a3d2..1f6ebca09ae3b9c6d10078187c2a819ec801b099 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -23,7 +23,7 @@ License \*---------------------------------------------------------------------------*/ -#include "UPstream.H" +#include "Pstream.H" #include "PstreamReduceOps.H" #include "OSspecific.H" #include "PstreamGlobals.H"