Commit d49ef094 authored by mattijs's avatar mattijs
Browse files

ENH: exchange: use all-to-all comms

Determine sparse receive sizes instead of full matrix
parent 4388a656
......@@ -185,7 +185,7 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions
// Start sending. Sets number of bytes transferred
labelListList allNTrans(Pstream::nProcs());
labelList allNTrans(Pstream::nProcs());
pBufs.finishedSends(allNTrans);
......@@ -208,7 +208,7 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions
// Retrieve from receive buffers
forAll(allNTrans, procI)
{
label nRec = allNTrans[procI][Pstream::myProcNo()];
label nRec = allNTrans[procI];
//Pout<< "From processor " << procI << " receiving bytes " << nRec
// << endl;
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2013 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright (C) 2015 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -303,16 +303,25 @@ public:
// Exchange
//- Exchange data. Sends sendData, receives into recvData, sets
// sizes (not bytes). sizes[p0][p1] is what processor p0 has
// sent to p1. Continuous data only.
//- Exchange data. Sends sendData, receives into recvData. Gets
// passed sizes to receive. Continuous data only.
// If block=true will wait for all transfers to finish.
template<class Container, class T>
static void exchange
(
const List<Container >& sendBufs,
const labelUList& recvSizes,
List<Container >& recvBufs,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
);
template<class Container, class T>
static void exchange
(
const List<Container >&,
List<Container >&,
labelListList& sizes,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright (C) 2015 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -85,12 +85,10 @@ void Foam::PstreamBuffers::finishedSends(const bool block)
if (commsType_ == UPstream::nonBlocking)
{
labelListList sizes;
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvBuf_,
sizes,
tag_,
comm_,
block
......@@ -99,17 +97,25 @@ void Foam::PstreamBuffers::finishedSends(const bool block)
}
void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block)
void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block)
{
finishedSendsCalled_ = true;
if (commsType_ == UPstream::nonBlocking)
{
labelList sendSizes(sendBuf_.size());
forAll(sendBuf_, procI)
{
sendSizes[procI] = sendBuf_[procI].size();
}
recvSizes.setSize(sendBuf_.size());
UPstream::exchange(sendSizes.begin(), recvSizes.begin(), comm_);
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvSizes,
recvBuf_,
sizes,
tag_,
comm_,
block
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright (C) 2015 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -149,9 +149,9 @@ public:
void finishedSends(const bool block = true);
//- Mark all sends as having been done. Same as above but also returns
// sizes (bytes) transferred. Note:currently only valid for
// sizes (bytes) received. Note:currently only valid for
// non-blocking.
void finishedSends(labelListList& sizes, const bool block = true);
void finishedSends(labelList& recvSizes, const bool block = true);
//- Clear storage and reset
void clear();
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2014 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright 2015 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -348,6 +348,18 @@ public:
// Spawns slave processes and initialises inter-communication
static bool init(int& argc, char**& argv);
//- Exchange single integer to/from all processors in communicator
// sendBuf, recvBuf need to be sized with number of processors in
// communicstor. Sets recvBuf to whatever sendBuf on sending processor
// is
static void exchange
(
int* sendBuf,
int* recvBuf,
const label comm = UPstream::worldComm
);
// Non-blocking comms
//- Get number of outstanding requests
......
......@@ -37,8 +37,8 @@ template<class Container, class T>
void Foam::Pstream::exchange
(
const List<Container>& sendBufs,
const labelUList& recvSizes,
List<Container>& recvBufs,
labelListList& sizes,
const int tag,
const label comm,
const bool block
......@@ -53,23 +53,20 @@ void Foam::Pstream::exchange
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of list:" << sendBufs.size()
<< "Size of send buffer:" << sendBufs.size()
<< " does not equal the number of processors:"
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
sizes.setSize(UPstream::nProcs(comm));
labelList& nsTransPs = sizes[UPstream::myProcNo(comm)];
nsTransPs.setSize(UPstream::nProcs(comm));
forAll(sendBufs, procI)
if (recvSizes.size() != UPstream::nProcs(comm))
{
nsTransPs[procI] = sendBufs[procI].size();
FatalErrorInFunction
<< "Size of receive sizes:" << recvSizes.size()
<< " does not equal the number of processors:"
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
// Send sizes across. Note: blocks.
combineReduce(sizes, UPstream::listEq(), tag, comm);
recvBufs.setSize(sendBufs.size());
......@@ -80,9 +77,9 @@ void Foam::Pstream::exchange
// Set up receives
// ~~~~~~~~~~~~~~~
forAll(sizes, procI)
forAll(recvSizes, procI)
{
label nRecv = sizes[procI][UPstream::myProcNo(comm)];
label nRecv = recvSizes[procI];
if (procI != Pstream::myProcNo(comm) && nRecv > 0)
{
......@@ -144,4 +141,34 @@ void Foam::Pstream::exchange
}
template<class Container, class T>
void Foam::Pstream::exchange
(
const List<Container>& sendBufs,
List<Container>& recvBufs,
const int tag,
const label comm,
const bool block
)
{
labelList sendSizes(sendBufs.size());
forAll(sendBufs, procI)
{
sendSizes[procI] = sendBufs[procI].size();
}
labelList recvSizes(sendBufs.size());
UPstream::exchange(sendSizes.begin(), recvSizes.begin(), comm);
exchange<Container, T>
(
sendBufs,
recvSizes,
recvBufs,
tag,
comm,
block
);
}
// ************************************************************************* //
......@@ -446,12 +446,10 @@ void Foam::mapDistributeBase::exchangeAddressing
}
subMap_.setSize(Pstream::nProcs());
labelListList sendSizes;
Pstream::exchange<labelList, label>
(
wantedRemoteElements,
subMap_,
sendSizes,
tag,
Pstream::worldComm //TBD
);
......@@ -526,12 +524,10 @@ void Foam::mapDistributeBase::exchangeAddressing
}
subMap_.setSize(Pstream::nProcs());
labelListList sendSizes;
Pstream::exchange<labelList, label>
(
wantedRemoteElements,
subMap_,
sendSizes,
tag,
Pstream::worldComm //TBD
);
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright 2016 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -81,6 +81,17 @@ void Foam::reduce(scalar&, const sumOp<scalar>&, const int, const label, label&)
{}
void Foam::UPstream::exchange
(
int* sendBuf,
int* recvBuf,
const label communicator
)
{
recvBuf[0] = sendBuf[0];
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label,
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright (C) 2015 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -295,6 +295,41 @@ void Foam::reduce
}
void Foam::UPstream::exchange
(
int* sendBuf,
int* recvBuf,
const label communicator
)
{
if (!UPstream::parRun())
{
recvBuf[0] = sendBuf[0];
}
else
{
if
(
MPI_Alltoall
(
sendBuf,
1,
MPI_INT,
recvBuf,
1,
MPI_INT,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoall failure"
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,
......
......@@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
\\/ M anipulation |
\\/ M anipulation | Copyright 2015 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -321,25 +321,25 @@ void Foam::Cloud<ParticleType>::move(TrackData& td, const scalar trackTime)
}
// Start sending. Sets number of bytes transferred
labelListList allNTrans(Pstream::nProcs());
pBufs.finishedSends(allNTrans);
// Start sending. Sets number of bytes received
labelList nRecv(Pstream::nProcs());
pBufs.finishedSends(nRecv);
bool transfered = false;
forAll(allNTrans, i)
forAll(nRecv, i)
{
forAll(allNTrans[i], j)
if (nRecv[i])
{
if (allNTrans[i][j])
{
transfered = true;
break;
}
transfered = true;
break;
}
}
reduce(transfered, orOp<bool>());
if (!transfered)
{
break;
......@@ -350,7 +350,7 @@ void Foam::Cloud<ParticleType>::move(TrackData& td, const scalar trackTime)
{
label neighbProci = neighbourProcs[i];
label nRec = allNTrans[neighbProci][Pstream::myProcNo()];
label nRec = nRecv[neighbProci];
if (nRec)
{
......
......@@ -344,13 +344,7 @@ Foam::autoPtr<Foam::globalIndex> Foam::regionSplit::calcRegionSplit
// Get the wanted region labels into recvNonLocal
labelListList recvNonLocal(Pstream::nProcs());
labelListList sizes;
Pstream::exchange<labelList, label>
(
sendNonLocal,
recvNonLocal,
sizes
);
Pstream::exchange<labelList, label>(sendNonLocal, recvNonLocal);
// Now we have the wanted compact region labels that procI wants in
// recvNonLocal[procI]. Construct corresponding list of compact
......@@ -372,13 +366,7 @@ Foam::autoPtr<Foam::globalIndex> Foam::regionSplit::calcRegionSplit
// Send back (into recvNonLocal)
recvNonLocal.clear();
recvNonLocal.setSize(sendWantedLocal.size());
sizes.clear();
Pstream::exchange<labelList, label>
(
sendWantedLocal,
recvNonLocal,
sizes
);
Pstream::exchange<labelList, label>(sendWantedLocal, recvNonLocal);
sendWantedLocal.clear();
// Now recvNonLocal contains for every element in setNonLocal the
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment