Commit 88bd9123 authored by Henry Weller's avatar Henry Weller
Browse files

Pstream: optimisation of data exchange

Contributed by Mattijs Janssens.

1. Any non-blocking data exchange needs to know in advance the sizes to
   receive so it can size the buffer.  For "halo" exchanges this is not
   a problem since the sizes are known in advance but or all other data
   exchanges these sizes need to be exchanged in advance.

   This was previously done by having all processors send the sizes of data to
   send to the master and send it back such that all processors
   - had the same information
   - all could work out who was sending what to where and hence what needed to
     be received.

   This is now changed such that we only send the size to the
   destination processor (instead of to all as previously). This means
   that
   - the list of sizes to send is now of size nProcs v.s. nProcs*nProcs before
   - we cut out the route to the master and back by using a native MPI
     call

   It causes a small change to the API of exchange and PstreamBuffers -
   they now return the sizes of the local buffers only (a labelList) and
   not the sizes of the buffers on all processors (labelListList)

2. Reversing the order of the way in which the sending is done when
   scattering information from the master processor to the other
   processors. This is done in a tree like fashion. Each processor has a
   set of processors to receive from/ send to. When receiving it will
   first receive from the processors with the least amount of
   sub-processors (i.e. the ones which return first). When sending it
   needs to do the opposite: start sending to the processor with the
   most amount of sub-tree since this is the critical path.
parent eabb03aa
......@@ -54,7 +54,7 @@ int main(int argc, char *argv[])
// Test mapDistribute
// ~~~~~~~~~~~~~~~~~~
if (false)
if (true)
{
Random rndGen(43544*Pstream::myProcNo());
......@@ -80,11 +80,6 @@ int main(int argc, char *argv[])
nSend[procI]++;
}
// Sync how many to send
labelListList allNTrans(Pstream::nProcs());
allNTrans[Pstream::myProcNo()] = nSend;
combineReduce(allNTrans, UPstream::listEq());
// Collect items to be sent
labelListList sendMap(Pstream::nProcs());
forAll(sendMap, procI)
......@@ -98,11 +93,15 @@ int main(int argc, char *argv[])
sendMap[procI][nSend[procI]++] = i;
}
// Sync how many to send
labelList nRecv;
Pstream::exchangeSizes(sendMap, nRecv);
// Collect items to be received
labelListList recvMap(Pstream::nProcs());
forAll(recvMap, procI)
{
recvMap[procI].setSize(allNTrans[procI][Pstream::myProcNo()]);
recvMap[procI].setSize(nRecv[procI]);
}
label constructSize = 0;
......
......@@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2012-2015 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2012-2016 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
......@@ -56,14 +56,6 @@ Foam::DistributedDelaunayMesh<Triangulation>::buildMap
nSend[procI]++;
}
// Send over how many I need to receive
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
labelListList sendSizes(Pstream::nProcs());
sendSizes[Pstream::myProcNo()] = nSend;
combineReduce(sendSizes, UPstream::listEq());
// 2. Size sendMap
labelListList sendMap(Pstream::nProcs());
......@@ -83,6 +75,11 @@ Foam::DistributedDelaunayMesh<Triangulation>::buildMap
sendMap[procI][nSend[procI]++] = i;
}
// 4. Send over how many I need to receive
labelList recvSizes;
Pstream::exchangeSizes(sendMap, recvSizes);
// Determine receive map
// ~~~~~~~~~~~~~~~~~~~~~
......@@ -100,7 +97,7 @@ Foam::DistributedDelaunayMesh<Triangulation>::buildMap
{
if (procI != Pstream::myProcNo())
{
label nRecv = sendSizes[procI][Pstream::myProcNo()];
label nRecv = recvSizes[procI];
constructMap[procI].setSize(nRecv);
......
......@@ -61,14 +61,6 @@ Foam::autoPtr<Foam::mapDistribute> Foam::backgroundMeshDecomposition::buildMap
nSend[procI]++;
}
// Send over how many I need to receive
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
labelListList sendSizes(Pstream::nProcs());
sendSizes[Pstream::myProcNo()] = nSend;
combineReduce(sendSizes, UPstream::listEq());
// 2. Size sendMap
labelListList sendMap(Pstream::nProcs());
......@@ -88,6 +80,11 @@ Foam::autoPtr<Foam::mapDistribute> Foam::backgroundMeshDecomposition::buildMap
sendMap[procI][nSend[procI]++] = i;
}
// 4. Send over how many I need to receive
labelList recvSizes;
Pstream::exchangeSizes(sendMap, recvSizes);
// Determine receive map
// ~~~~~~~~~~~~~~~~~~~~~
......@@ -105,7 +102,7 @@ Foam::autoPtr<Foam::mapDistribute> Foam::backgroundMeshDecomposition::buildMap
{
if (procI != Pstream::myProcNo())
{
label nRecv = sendSizes[procI][Pstream::myProcNo()];
label nRecv = recvSizes[procI];
constructMap[procI].setSize(nRecv);
......
......@@ -109,8 +109,10 @@ void Foam::IOdictionary::readFile(const bool masterOnly)
IOdictionary::readData(fromAbove);
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
// Send to my downstairs neighbours. Note reverse order not
// necessary here but just for consistency with other uses
// (e.g. gatherScatter.C)
forAllReverse(myComm.below(), belowI)
{
if (debug)
{
......
......@@ -303,21 +303,42 @@ public:
// Exchange
//- Exchange data. Sends sendData, receives into recvData, sets
// sizes (not bytes). sizes[p0][p1] is what processor p0 has
// sent to p1. Continuous data only.
// If block=true will wait for all transfers to finish.
//- Helper: exchange contiguous data. Sends sendData, receives into
// recvData. If block=true will wait for all transfers to finish.
template<class Container, class T>
static void exchange
(
const List<Container >&,
List<Container >&,
labelListList& sizes,
const UList<Container>& sendData,
const labelUList& recvSizes,
List<Container>& recvData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
);
//- Helper: exchange sizes of sendData. sendData is the data per
// processor (in the communicator). Returns sizes of sendData
// on the sending processor.
template<class Container>
static void exchangeSizes
(
const Container& sendData,
labelList& sizes,
const label comm = UPstream::worldComm
);
//- Exchange contiguous data. Sends sendData, receives into
// recvData. Determines sizes to receive.
// If block=true will wait for all transfers to finish.
template<class Container, class T>
static void exchange
(
const UList<Container>& sendData,
List<Container>& recvData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
);
};
......
......@@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
......@@ -85,12 +85,10 @@ void Foam::PstreamBuffers::finishedSends(const bool block)
if (commsType_ == UPstream::nonBlocking)
{
labelListList sizes;
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvBuf_,
sizes,
tag_,
comm_,
block
......@@ -99,17 +97,19 @@ void Foam::PstreamBuffers::finishedSends(const bool block)
}
void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block)
void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block)
{
finishedSendsCalled_ = true;
if (commsType_ == UPstream::nonBlocking)
{
Pstream::exchangeSizes(sendBuf_, recvSizes, comm_);
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvSizes,
recvBuf_,
sizes,
tag_,
comm_,
block
......@@ -123,22 +123,8 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block)
<< " since transfers already in progress. Use non-blocking instead."
<< exit(FatalError);
// Note: possible only if using different tag from write started
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
//sizes.setSize(UPstream::nProcs(comm));
//labelList& nsTransPs = sizes[UPstream::myProcNo(comm)];
//nsTransPs.setSize(UPstream::nProcs(comm));
//
//forAll(sendBuf_, procI)
//{
// nsTransPs[procI] = sendBuf_[procI].size();
//}
//
//// Send sizes across.
//int oldTag = UPstream::msgType();
//UPstream::msgType() = tag_;
//combineReduce(sizes, UPstream::listEq());
//UPstream::msgType() = oldTag;
}
}
......
......@@ -149,9 +149,9 @@ public:
void finishedSends(const bool block = true);
//- Mark all sends as having been done. Same as above but also returns
// sizes (bytes) transferred. Note:currently only valid for
// sizes (bytes) received. Note:currently only valid for
// non-blocking.
void finishedSends(labelListList& sizes, const bool block = true);
void finishedSends(labelList& recvSizes, const bool block = true);
//- Clear storage and reset
void clear();
......
......@@ -489,7 +489,15 @@ public:
//- Abort program
static void abort();
//- Exchange label with all processors (in the communicator).
// sendData[procI] is the label to send to procI.
// After return recvData contains the data from the other processors.
static void allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator = 0
);
};
......
......@@ -218,7 +218,7 @@ void Foam::Pstream::combineScatter
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
forAllReverse(myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
......@@ -453,7 +453,7 @@ void Foam::Pstream::listCombineScatter
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
forAllReverse(myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
......@@ -651,7 +651,7 @@ void Foam::Pstream::mapCombineScatter
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
forAllReverse(myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
......
......@@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
......@@ -36,9 +36,9 @@ Description
template<class Container, class T>
void Foam::Pstream::exchange
(
const List<Container>& sendBufs,
const UList<Container>& sendBufs,
const labelUList& recvSizes,
List<Container>& recvBufs,
labelListList& sizes,
const int tag,
const label comm,
const bool block
......@@ -53,23 +53,13 @@ void Foam::Pstream::exchange
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of list:" << sendBufs.size()
<< " does not equal the number of processors:"
<< "Size of list " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
sizes.setSize(UPstream::nProcs(comm));
labelList& nsTransPs = sizes[UPstream::myProcNo(comm)];
nsTransPs.setSize(UPstream::nProcs(comm));
forAll(sendBufs, procI)
{
nsTransPs[procI] = sendBufs[procI].size();
}
// Send sizes across. Note: blocks.
combineReduce(sizes, UPstream::listEq(), tag, comm);
recvBufs.setSize(sendBufs.size());
if (UPstream::nProcs(comm) > 1)
{
......@@ -78,10 +68,9 @@ void Foam::Pstream::exchange
// Set up receives
// ~~~~~~~~~~~~~~~
recvBufs.setSize(sendBufs.size());
forAll(sizes, procI)
forAll(recvSizes, procI)
{
label nRecv = sizes[procI][UPstream::myProcNo(comm)];
label nRecv = recvSizes[procI];
if (procI != Pstream::myProcNo(comm) && nRecv > 0)
{
......@@ -143,4 +132,48 @@ void Foam::Pstream::exchange
}
template<class Container>
void Foam::Pstream::exchangeSizes
(
const Container& sendBufs,
labelList& recvSizes,
const label comm
)
{
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of container " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
labelList sendSizes(sendBufs.size());
forAll(sendBufs, procI)
{
sendSizes[procI] = sendBufs[procI].size();
}
recvSizes.setSize(sendSizes.size());
allToAll(sendSizes, recvSizes, comm);
}
template<class Container, class T>
void Foam::Pstream::exchange
(
const UList<Container>& sendBufs,
List<Container>& recvBufs,
const int tag,
const label comm,
const bool block
)
{
labelList recvSizes;
exchangeSizes(sendBufs, recvSizes, comm);
exchange<Container, T>(sendBufs, recvSizes, recvBufs, tag, comm, block);
}
// ************************************************************************* //
......@@ -185,8 +185,10 @@ void Pstream::scatter
}
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
// Send to my downstairs neighbours. Note reverse order (compared to
// receiving). This is to make sure to send to the critical path
// (only when using a tree schedule!) first.
forAllReverse(myComm.below(), belowI)
{
if (contiguous<T>())
{
......
......@@ -274,7 +274,7 @@ void Pstream::scatterList
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
forAllReverse(myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
const labelList& notBelowLeaves = comms[belowID].allNotBelow();
......
......@@ -248,8 +248,9 @@ bool Foam::regIOobject::read()
ok = readData(fromAbove);
}
// Send to my downstairs neighbours
forAll(myComm.below(), belowI)
// Send to my downstairs neighbours. Note reverse order not
// nessecary here - just for consistency reasons.
forAllReverse(myComm.below(), belowI)
{
OPstream toBelow
(
......
......@@ -528,12 +528,10 @@ void Foam::mapDistribute::exchangeAddressing
}
subMap_.setSize(Pstream::nProcs());
labelListList sendSizes;
Pstream::exchange<labelList, label>
(
wantedRemoteElements,
subMap_,
sendSizes,
tag,
Pstream::worldComm //TBD
);
......@@ -608,12 +606,10 @@ void Foam::mapDistribute::exchangeAddressing
}
subMap_.setSize(Pstream::nProcs());
labelListList sendSizes;
Pstream::exchange<labelList, label>
(
wantedRemoteElements,
subMap_,
sendSizes,
tag,
Pstream::worldComm //TBD
);
......
......@@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
......@@ -81,6 +81,17 @@ void Foam::reduce(scalar&, const sumOp<scalar>&, const int, const label, label&)
{}
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
recvData.assign(sendData);
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label,
......
......@@ -295,6 +295,54 @@ void Foam::reduce
}
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
label np = nProcs(communicator);
if (sendData.size() != np || recvData.size() != np)
{
FatalErrorInFunction
<< "Size of sendData " << sendData.size()
<< " or size of recvData " << recvData.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
recvData.assign(sendData);
}
else
{
if
(
MPI_Alltoall
(
sendData.begin(),
sizeof(label),
MPI_BYTE,
recvData.begin(),
sizeof(label),
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoall failed for " << sendData
<< " on communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,
......
......@@ -322,7 +322,7 @@ void Foam::Cloud<ParticleType>::move(TrackData& td, const scalar trackTime)
// Start sending. Sets number of bytes transferred
labelListList allNTrans(Pstream::nProcs());
labelList allNTrans(Pstream::nProcs());
pBufs.finishedSends(allNTrans);
......@@ -330,15 +330,13 @@ void Foam::Cloud<ParticleType>::move(TrackData& td, const scalar trackTime)
forAll(allNTrans, i)
{
forAll(allNTrans[i], j)
if (allNTrans[i])
{
if (allNTrans[i][j])
{
transfered = true;
break;
}
transfered = true;
break;
}
}
reduce(transfered, orOp<bool>());
if (!transfered)
{
......@@ -350,7 +348,7 @@ void Foam::Cloud<ParticleType>::move(TrackData& td, const scalar trackTime)
{
label neighbProci = neighbourProcs[i];
label nRec = allNTrans[neighbProci][Pstream::myProcNo()];
label nRec = allNTrans[neighbProci];
if (nRec)
{
......
......@@ -844,15 +844,6 @@ void Foam::InteractionLists<ParticleType>::buildMap
nSend[procI]++;
}
// Send over how many I need to receive
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~