Commit 33301fa2 authored by Henry Weller's avatar Henry Weller Committed by Andrew Heather
Browse files

collatedFileOperation: preferentially collect all data in the simulation thread

so the write thread does not have to do any parallel communication.  This avoids
the bugs in the threading support in OpenMPI.

Patch contributed by Mattijs Janssens
Resolves bug-report https://bugs.openfoam.org/view.php?id=2669
parent ec761da0
......@@ -31,8 +31,9 @@ License
#include "StringStream.H"
#include "dictionary.H"
#include "objectRegistry.H"
#include "foamVersion.H"
#include <sys/time.h>
#include "SubList.H"
#include "labelPair.H"
#include "masterUncollatedFileOperation.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
......@@ -648,12 +649,114 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
}
void Foam::decomposedBlockData::gather
(
const label comm,
const label data,
labelList& datas
)
{
const label nProcs = UPstream::nProcs(comm);
datas.setSize(nProcs);
char* data0Ptr = reinterpret_cast<char*>(datas.begin());
labelList recvOffsets;
labelList recvSizes;
if (UPstream::master())
{
recvOffsets.setSize(nProcs);
forAll(recvOffsets, proci)
{
recvOffsets[proci] =
reinterpret_cast<char*>(&datas[proci])
- data0Ptr;
}
recvSizes.setSize(nProcs, sizeof(label));
}
UPstream::gather
(
reinterpret_cast<const char*>(&data),
sizeof(label),
data0Ptr,
recvSizes,
recvOffsets,
comm
);
}
void Foam::decomposedBlockData::gatherSlaveData
(
const label comm,
const UList<char>& data,
const labelUList& recvSizes,
const label startProc,
const label nProcs,
List<int>& sliceOffsets,
List<char>& recvData
)
{
// Calculate master data
List<int> sliceSizes;
if (UPstream::master(comm))
{
const label numProcs = UPstream::nProcs(comm);
sliceSizes.setSize(numProcs, 0);
sliceOffsets.setSize(numProcs+1, 0);
int totalSize = 0;
label proci = startProc;
for (label i = 0; i < nProcs; i++)
{
sliceSizes[proci] = int(recvSizes[proci]);
sliceOffsets[proci] = totalSize;
totalSize += sliceSizes[proci];
proci++;
}
sliceOffsets[proci] = totalSize;
recvData.setSize(totalSize);
}
int nSend = 0;
if
(
!UPstream::master(comm)
&& (UPstream::myProcNo(comm) >= startProc)
&& (UPstream::myProcNo(comm) < startProc+nProcs)
)
{
nSend = data.byteSize();
}
UPstream::gather
(
data.begin(),
nSend,
recvData.begin(),
sliceSizes,
sliceOffsets,
comm
);
}
bool Foam::decomposedBlockData::writeBlocks
(
const label comm,
autoPtr<OSstream>& osPtr,
List<std::streamoff>& start,
const UList<char>& data,
const labelUList& recvSizes,
const bool haveSlaveData,
const List<char>& slaveData,
const UPstream::commsTypes commsType,
const bool syncReturnState
)
......@@ -663,20 +766,56 @@ bool Foam::decomposedBlockData::writeBlocks
Pout<< "decomposedBlockData::writeBlocks:"
<< " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
<< " data:" << data.size()
<< " haveSlaveData:" << haveSlaveData
<< " (master only) slaveData:" << slaveData.size()
<< " commsType:" << Pstream::commsTypeNames[commsType] << endl;
}
const label nProcs = UPstream::nProcs(comm);
bool ok = true;
labelList recvSizes(Pstream::nProcs(comm));
recvSizes[Pstream::myProcNo(comm)] = data.byteSize();
Pstream::gatherList(recvSizes, Pstream::msgType(), comm);
if (haveSlaveData)
{
// Already have gathered the slave data. communicator only used to
// check who is the master
if (commsType == UPstream::commsTypes::scheduled)
if (UPstream::master(comm))
{
OSstream& os = osPtr();
start.setSize(nProcs);
// Write master data
{
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
// Write slaves
label slaveOffset = 0;
for (label proci = 1; proci < nProcs; proci++)
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os << SubList<char>(slaveData, recvSizes[proci], slaveOffset);
slaveOffset += recvSizes[proci];
}
ok = os.good();
}
}
else if (commsType == UPstream::commsTypes::scheduled)
{
if (UPstream::master(comm))
{
start.setSize(UPstream::nProcs(comm));
start.setSize(nProcs);
OSstream& os = osPtr();
......@@ -688,7 +827,7 @@ bool Foam::decomposedBlockData::writeBlocks
}
// Write slaves
List<char> elems;
for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
for (label proci = 1; proci < nProcs; proci++)
{
elems.setSize(recvSizes[proci]);
IPstream::read
......@@ -723,102 +862,116 @@ bool Foam::decomposedBlockData::writeBlocks
}
else
{
if (debug)
// Write master data
if (UPstream::master(comm))
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Starting sending at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
start.setSize(nProcs);
OSstream& os = osPtr();
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
label startOfRequests = Pstream::nRequests();
// Find out how many processor can be received into
// maxMasterFileBufferSize
if (!UPstream::master(comm))
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
UPstream::masterNo(),
data.begin(),
data.byteSize(),
Pstream::msgType(),
comm
);
Pstream::waitRequests(startOfRequests);
}
else
// Starting slave processor and number of processors
labelPair startAndSize(1, nProcs-1);
while (startAndSize[1] > 0)
{
List<List<char>> recvBufs(Pstream::nProcs(comm));
for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
labelPair masterData(startAndSize);
if (UPstream::master(comm))
{
recvBufs[proci].setSize(recvSizes[proci]);
UIPstream::read
label totalSize = 0;
label proci = masterData[0];
while
(
UPstream::commsTypes::nonBlocking,
proci,
recvBufs[proci].begin(),
recvSizes[proci],
Pstream::msgType(),
comm
);
}
proci < nProcs
&& (
totalSize+recvSizes[proci]
< fileOperations::masterUncollatedFileOperation::
maxMasterFileBufferSize
)
)
{
totalSize += recvSizes[proci];
proci++;
}
if (debug)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Starting master-only writing at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
masterData[1] = proci-masterData[0];
}
start.setSize(UPstream::nProcs(comm));
OSstream& os = osPtr();
// Scatter masterData
UPstream::scatter
(
reinterpret_cast<const char*>(masterData.cdata()),
List<int>(nProcs, sizeof(masterData)),
List<int>(nProcs, 0),
reinterpret_cast<char*>(startAndSize.data()),
sizeof(startAndSize),
comm
);
// Write master data
if (startAndSize[1] == 0)
{
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
break;
}
if (debug)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Starting slave writing at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
}
// Write slaves
for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
// Gather data from (a slice of) the slaves
List<int> sliceOffsets;
List<char> recvData;
gatherSlaveData
(
comm,
data,
recvSizes,
startAndSize[0], // startProc,
startAndSize[1], // nProcs,
sliceOffsets,
recvData
);
if (UPstream::master(comm))
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
OSstream& os = osPtr();
if (Pstream::finishedRequest(startOfRequests+proci-1))
// Write slaves
for
(
label proci = startAndSize[0];
proci < startAndSize[0]+startAndSize[1];
proci++
)
{
os << recvBufs[proci];
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os <<
SubList<char>
(
recvData,
sliceOffsets[proci+1]-sliceOffsets[proci],
sliceOffsets[proci]
);
}
}
Pstream::resetRequests(startOfRequests);
startAndSize[0] += startAndSize[1];
}
ok = os.good();
if (UPstream::master(comm))
{
ok = osPtr().good();
}
}
if (debug)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Finished master-only writing at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
}
if (syncReturnState)
{
......@@ -936,8 +1089,23 @@ bool Foam::decomposedBlockData::writeObject
osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
IOobject::writeHeader(osPtr());
}
labelList recvSizes;
gather(comm_, this->byteSize(), recvSizes);
List<std::streamoff> start;
return writeBlocks(comm_, osPtr, start, *this, commsType_);
List<char> slaveData; // dummy already received slave data
return writeBlocks
(
comm_,
osPtr,
start,
*this,
recvSizes,
false, // don't have slave data
slaveData,
commsType_
);
}
......
......@@ -169,6 +169,32 @@ public:
const UPstream::commsTypes commsType
);
//- Helper: gather single label. Note: using native Pstream.
// datas sized with num procs but undefined contents on
// slaves
static void gather
(
const label comm,
const label data,
labelList& datas
);
//- Helper: gather data from (subset of) slaves. Returns
// recvData : received data
// recvOffsets : offset in data. recvOffsets is nProcs+1
static void gatherSlaveData
(
const label comm,
const UList<char>& data,
const labelUList& recvSizes,
const label startProc,
const label nProcs,
List<int>& recvOffsets,
List<char>& recvData
);
//- Write *this. Ostream only valid on master. Returns starts of
// processor blocks
static bool writeBlocks
......@@ -177,6 +203,12 @@ public:
autoPtr<OSstream>& osPtr,
List<std::streamoff>& start,
const UList<char>&,
const labelUList& recvSizes,
const bool haveSlaveData, // does master have slaveData
const List<char>& slaveData, // optional slave data (on master)
const UPstream::commsTypes,
const bool syncReturnState = true
);
......
......@@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation
\\/ M anipulation | Copyright (C) 2015-2016 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
......@@ -50,11 +50,13 @@ Foam::UPstream::commsTypeNames
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UPstream::setParRun(const label nProcs)
void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
{
if (nProcs == 0)
{
parRun_ = false;
haveThreads_ = haveThreads;
freeCommunicator(UPstream::worldComm);
label comm = allocateCommunicator(-1, labelList(1, label(0)), false);
if (comm != UPstream::worldComm)
......@@ -71,6 +73,7 @@ void Foam::UPstream::setParRun(const label nProcs)
else
{
parRun_ = true;
haveThreads_ = haveThreads;
// Redo worldComm communicator (this has been created at static
// initialisation time)
......@@ -354,6 +357,8 @@ Foam::UList<Foam::UPstream::commsStruct>::operator[](const label procID) const
bool Foam::UPstream::parRun_(false);
bool Foam::UPstream::haveThreads_(false);
Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
......
......@@ -185,6 +185,9 @@ private:
//- By default this is not a parallel run
static bool parRun_;
//- Have support for threads?
static bool haveThreads_;
//- Standard transfer message type
static int msgType_;
......@@ -211,6 +214,9 @@ private:
// Private Member Functions
//- Set data for parallel running
static void setParRun(const label nProcs, const bool haveThreads);
//- Calculate linear communication schedule
static List<commsStruct> calcLinearComm(const label nProcs);
......@@ -411,9 +417,11 @@ public:
return parRun_;
}
//- Set data for parallel running. Special case nProcs=0 to switch off
//- parallel running
static void setParRun(const label nProcs);
//- Have support for threads
static bool haveThreads()
{
return haveThreads_;
}
//- Number of processes in parallel run
static label nProcs(const label communicator = 0)
......@@ -517,6 +525,47 @@ public:
labelUList& recvData,
const label communicator = 0
);
//- Exchange data with all processors (in the communicator)
// sendSizes, sendOffsets give (per processor) the slice of
// sendData to send, similarly recvSizes, recvOffsets give the slice
// of recvData to receive
static void allToAll
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator = 0
);
//- Receive data from all processors on the master
static void gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator = 0
);
//- Send data to all processors from the root of the communicator
static void scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator = 0
);
};
......
......@@ -25,11 +25,7 @@ License
#include "OFstreamCollator.H"
#include "OFstream.H"
#include "OSspecific.H"
#include "IOstreams.H"
#include "Pstream.H"
#include "decomposedBlockData.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
......@@ -46,7 +42,10 @@ bool Foam::OFstreamCollator::writeFile
const label comm,
const word& typeName,
const fileName& fName,
const string& s,
const string& masterData,
const labelUList& recvSizes,
const bool haveSlaveData, // does master have slaveData
const UList<char>& slaveData, // on master: slave data
IOstream::streamFormat fmt,