-
Henry Weller authored
Improvements to existing functionality -------------------------------------- - MPI is initialised without thread support if it is not needed e.g. uncollated - Use native c++11 threading; avoids problem with static destruction order. - etc/cellModels now only read if needed. - etc/controlDict can now be read from the environment variable FOAM_CONTROLDICT - Uniform files (e.g. '0/uniform/time') are now read only once on the master only (with the masterUncollated or collated file handlers) - collated format writes to 'processorsNNN' instead of 'processors'. The file format is unchanged. - Thread buffer and file buffer size are no longer limited to 2Gb. The global controlDict file contains parameters for file handling. Under some circumstances, e.g. running in parallel on a system without NFS, the user may need to set some parameters, e.g. fileHandler, before the global controlDict file is read from file. To support this, OpenFOAM now allows the global controlDict to be read as a string set to the FOAM_CONTROLDICT environment variable. The FOAM_CONTROLDICT environment variable can be set to the content the global controlDict file, e.g. from a sh/bash shell: export FOAM_CONTROLDICT=$(foamDictionary $FOAM_ETC/controlDict) FOAM_CONTROLDICT can then be passed to mpirun using the -x option, e.g.: mpirun -np 2 -x FOAM_CONTROLDICT simpleFoam -parallel Note that while this avoids the need for NFS to read the OpenFOAM configuration the executable still needs to load shared libraries which must either be copied locally or available via NFS or equivalent. New: Multiple IO ranks ---------------------- The masterUncollated and collated fileHandlers can now use multiple ranks for writing e.g.: mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' In this example ranks 0 ('processor0') and 3 ('processor3') now handle all the I/O. Rank 0 handles 0,1,2 and rank 3 handles 3,4,5. The set of IO ranks should always include 0 as first element and be sorted in increasing order. The collated fileHandler uses the directory naming processorsNNN_XXX-YYY where NNN is the total number of processors and XXX and YYY are first and last processor in the rank, e.g. in above example the directories would be processors6_0-2 processors6_3-5 and each of the collated files in these contains data of the local ranks only. The same naming also applies when e.g. running decomposePar: decomposePar -fileHandler collated -ioRanks '(0 3)' New: Distributed data --------------------- The individual root directories can be placed on different hosts with different paths if necessary. In the current framework it is necessary to specify the root per slave process but this has been simplified with the option of specifying the root per host with the -hostRoots command line option: mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' \ -hostRoots '("machineA" "/tmp/" "machineB" "/tmp")' The hostRoots option is followed by a list of machine name + root directory, the machine name can contain regular expressions. New: hostCollated ----------------- The new hostCollated fileHandler automatically sets the 'ioRanks' according to the host name with the lowest rank e.g. to run simpleFoam on 6 processors with ranks 0-2 on machineA and ranks 3-5 on machineB with the machines specified in the hostfile: mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler hostCollated This is equivalent to mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler collated -ioRanks '(0 3)' This example will write directories: processors6_0-2/ processors6_3-5/ A typical example would use distributed data e.g. no two nodes, machineA and machineB, each with three processes: decomposePar -fileHandler collated -case cavity # Copy case (constant/*, system/*, processors6/) to master: rsync -a cavity machineA:/tmp/ # Create root on slave: ssh machineB mkdir -p /tmp/cavity # Run mpirun --hostfile hostfile icoFoam \ -case /tmp/cavity -parallel -fileHandler hostCollated \ -hostRoots '("machineA" "/tmp" "machineB" "/tmp")' Contributed by Mattijs Janssens
Henry Weller authoredImprovements to existing functionality -------------------------------------- - MPI is initialised without thread support if it is not needed e.g. uncollated - Use native c++11 threading; avoids problem with static destruction order. - etc/cellModels now only read if needed. - etc/controlDict can now be read from the environment variable FOAM_CONTROLDICT - Uniform files (e.g. '0/uniform/time') are now read only once on the master only (with the masterUncollated or collated file handlers) - collated format writes to 'processorsNNN' instead of 'processors'. The file format is unchanged. - Thread buffer and file buffer size are no longer limited to 2Gb. The global controlDict file contains parameters for file handling. Under some circumstances, e.g. running in parallel on a system without NFS, the user may need to set some parameters, e.g. fileHandler, before the global controlDict file is read from file. To support this, OpenFOAM now allows the global controlDict to be read as a string set to the FOAM_CONTROLDICT environment variable. The FOAM_CONTROLDICT environment variable can be set to the content the global controlDict file, e.g. from a sh/bash shell: export FOAM_CONTROLDICT=$(foamDictionary $FOAM_ETC/controlDict) FOAM_CONTROLDICT can then be passed to mpirun using the -x option, e.g.: mpirun -np 2 -x FOAM_CONTROLDICT simpleFoam -parallel Note that while this avoids the need for NFS to read the OpenFOAM configuration the executable still needs to load shared libraries which must either be copied locally or available via NFS or equivalent. New: Multiple IO ranks ---------------------- The masterUncollated and collated fileHandlers can now use multiple ranks for writing e.g.: mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' In this example ranks 0 ('processor0') and 3 ('processor3') now handle all the I/O. Rank 0 handles 0,1,2 and rank 3 handles 3,4,5. The set of IO ranks should always include 0 as first element and be sorted in increasing order. The collated fileHandler uses the directory naming processorsNNN_XXX-YYY where NNN is the total number of processors and XXX and YYY are first and last processor in the rank, e.g. in above example the directories would be processors6_0-2 processors6_3-5 and each of the collated files in these contains data of the local ranks only. The same naming also applies when e.g. running decomposePar: decomposePar -fileHandler collated -ioRanks '(0 3)' New: Distributed data --------------------- The individual root directories can be placed on different hosts with different paths if necessary. In the current framework it is necessary to specify the root per slave process but this has been simplified with the option of specifying the root per host with the -hostRoots command line option: mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' \ -hostRoots '("machineA" "/tmp/" "machineB" "/tmp")' The hostRoots option is followed by a list of machine name + root directory, the machine name can contain regular expressions. New: hostCollated ----------------- The new hostCollated fileHandler automatically sets the 'ioRanks' according to the host name with the lowest rank e.g. to run simpleFoam on 6 processors with ranks 0-2 on machineA and ranks 3-5 on machineB with the machines specified in the hostfile: mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler hostCollated This is equivalent to mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler collated -ioRanks '(0 3)' This example will write directories: processors6_0-2/ processors6_3-5/ A typical example would use distributed data e.g. no two nodes, machineA and machineB, each with three processes: decomposePar -fileHandler collated -case cavity # Copy case (constant/*, system/*, processors6/) to master: rsync -a cavity machineA:/tmp/ # Create root on slave: ssh machineB mkdir -p /tmp/cavity # Run mpirun --hostfile hostfile icoFoam \ -case /tmp/cavity -parallel -fileHandler hostCollated \ -hostRoots '("machineA" "/tmp" "machineB" "/tmp")' Contributed by Mattijs Janssens
decomposedBlockData.C 28.99 KiB
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "decomposedBlockData.H"
#include "OPstream.H"
#include "IPstream.H"
#include "PstreamBuffers.H"
#include "Fstream.H"
#include "StringStream.H"
#include "dictionary.H"
#include "objectRegistry.H"
#include "SubList.H"
#include "labelPair.H"
#include "masterUncollatedFileOperation.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
namespace Foam
{
defineTypeNameAndDebug(decomposedBlockData, 0);
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::decomposedBlockData::decomposedBlockData
(
const label comm,
const IOobject& io,
const UPstream::commsTypes commsType
)
:
regIOobject(io),
commsType_(commsType),
comm_(comm)
{
// Temporary warning
if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
{
WarningInFunction
<< "decomposedBlockData " << name()
<< " constructed with IOobject::MUST_READ_IF_MODIFIED"
" but decomposedBlockData does not support automatic rereading."
<< endl;
}
if
(
(
io.readOpt() == IOobject::MUST_READ
|| io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
)
|| (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
)
{
read();
}
}
Foam::decomposedBlockData::decomposedBlockData
(
const label comm,
const IOobject& io,
const UList<char>& list,
const UPstream::commsTypes commsType
)
:
regIOobject(io),
commsType_(commsType),
comm_(comm)
{
// Temporary warning
if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
{
WarningInFunction
<< "decomposedBlockData " << name()
<< " constructed with IOobject::MUST_READ_IF_MODIFIED"
" but decomposedBlockData does not support automatic rereading."
<< endl;
}
if
(
(
io.readOpt() == IOobject::MUST_READ
|| io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
)
|| (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
)
{
read();
}
else
{
List<char>::operator=(list);
}
}
Foam::decomposedBlockData::decomposedBlockData
(
const label comm,
const IOobject& io,
List<char>&& list,
const UPstream::commsTypes commsType
)
:
regIOobject(io),
commsType_(commsType),
comm_(comm)
{
// Temporary warning
if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
{
WarningInFunction
<< "decomposedBlockData " << name()
<< " constructed with IOobject::MUST_READ_IF_MODIFIED"
" but decomposedBlockData does not support automatic rereading."
<< endl;
}
List<char>::transfer(list);
if
(
(
io.readOpt() == IOobject::MUST_READ
|| io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
)
|| (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
)
{
read();
}
}
// * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * * //
Foam::decomposedBlockData::~decomposedBlockData()
{}
// * * * * * * * * * * * * * * * Members Functions * * * * * * * * * * * * * //
bool Foam::decomposedBlockData::readMasterHeader(IOobject& io, Istream& is)
{
if (debug)
{
Pout<< "decomposedBlockData::readMasterHeader:"
<< " stream:" << is.name() << endl;
}
// Master-only reading of header
is.fatalCheck("read(Istream&)");
List<char> data(is);
is.fatalCheck("read(Istream&) : reading entry");
string buf(data.begin(), data.size());
IStringStream str
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
is.name()
);
return io.readHeader(str);
}
void Foam::decomposedBlockData::writeHeader
(
Ostream& os,
const IOstream::versionNumber version,
const IOstream::streamFormat format,
const word& type,
const string& note,
const fileName& location,
const word& name
)
{
IOobject::writeBanner(os)
<< "FoamFile\n{\n"
<< " version " << version << ";\n"
<< " format " << format << ";\n"
<< " class " << type << ";\n";
// This may be useful to have as well
/*
if (os.format() == IOstream::BINARY)
{
os << " arch " << Foam::FOAMbuildArch << ";\n";
}
*/
if (Pstream::parRun())
{
os << " blocks " << Pstream::nProcs() << ";\n";
}
if (note.size())
{
os << " note " << note << ";\n";
}
if (location.size())
{
os << " location " << location << ";\n";
}
os << " object " << name << ";\n"
<< "}" << nl;
IOobject::writeDivider(os) << nl;
}
Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
(
const label blocki,
Istream& is,
IOobject& headerIO
)
{
if (debug)
{
Pout<< "decomposedBlockData::readBlock:"
<< " stream:" << is.name() << " attempt to read block " << blocki
<< endl;
}
is.fatalCheck("read(Istream&)");
List<char> data;
autoPtr<ISstream> realIsPtr;
if (blocki == 0)
{
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
string buf(data.begin(), data.size());
realIsPtr.reset
(
new IStringStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
is.name()
)
);
// Read header
if (!headerIO.readHeader(realIsPtr()))
{
FatalIOErrorInFunction(realIsPtr())
<< "problem while reading header for object "
<< is.name() << exit(FatalIOError);
}
}
else
{
// Read master for header
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
IOstream::versionNumber ver(IOstream::currentVersion);
IOstream::streamFormat fmt;
{
string buf(data.begin(), data.size());
IStringStream headerStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
is.name()
);
// Read header
if (!headerIO.readHeader(headerStream))
{
FatalIOErrorInFunction(headerStream)
<< "problem while reading header for object "
<< is.name() << exit(FatalIOError);
}
ver = headerStream.version();
fmt = headerStream.format();
}
for (label i = 1; i < blocki+1; i++)
{
// Read data, override old data
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
}
string buf(data.begin(), data.size());
realIsPtr.reset
(
new IStringStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
is.name()
)
);
// Apply master stream settings to realIsPtr
realIsPtr().format(fmt);
realIsPtr().version(ver);
}
return realIsPtr;
}
bool Foam::decomposedBlockData::readBlocks
(
const label comm,
autoPtr<ISstream>& isPtr,
List<char>& data,
const UPstream::commsTypes commsType
)
{
if (debug)
{
Pout<< "decomposedBlockData::readBlocks:"
<< " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< " comm:" << comm << endl;
}
bool ok = false;
if (commsType == UPstream::commsTypes::scheduled)
{
if (UPstream::master(comm))
{
Istream& is = isPtr();
is.fatalCheck("read(Istream&)");
// Read master data
{
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
}
// Read slave data
for
(
label proci = 1;
proci < UPstream::nProcs(comm);
++proci
)
{
List<char> elems(is);
is.fatalCheck("read(Istream&) : reading entry");
OPstream os
(
UPstream::commsTypes::scheduled,
proci,
0,
UPstream::msgType(),
comm
);
os << elems;
}
ok = is.good();
}
else
{
IPstream is
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
0,
UPstream::msgType(),
comm
);
is >> data;
}
}
else
{
PstreamBuffers pBufs
(
UPstream::commsTypes::nonBlocking,
UPstream::msgType(),
comm
);
if (UPstream::master(comm))
{
Istream& is = isPtr();
is.fatalCheck("read(Istream&)");
// Read master data
{
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
}
// Read slave data
for
(
label proci = 1;
proci < UPstream::nProcs(comm);
++proci
)
{
List<char> elems(is);
is.fatalCheck("read(Istream&) : reading entry");
UOPstream os(proci, pBufs);
os << elems;
}
}
labelList recvSizes;
pBufs.finishedSends(recvSizes);
if (!UPstream::master(comm))
{
UIPstream is(UPstream::masterNo(), pBufs);
is >> data;
}
}
Pstream::scatter(ok, Pstream::msgType(), comm);
return ok;
}
Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
(
const label comm,
const fileName& fName,
autoPtr<ISstream>& isPtr,
IOobject& headerIO,
const UPstream::commsTypes commsType
)
{
if (debug)
{
Pout<< "decomposedBlockData::readBlocks:"
<< " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
<< " commsType:" << Pstream::commsTypeNames[commsType] << endl;
}
bool ok = false;
List<char> data;
autoPtr<ISstream> realIsPtr;
if (commsType == UPstream::commsTypes::scheduled)
{
if (UPstream::master(comm))
{
Istream& is = isPtr();
is.fatalCheck("read(Istream&)");
// Read master data
{
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
string buf(data.begin(), data.size());
realIsPtr.reset
(
new IStringStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
// Read header
if (!headerIO.readHeader(realIsPtr()))
{
FatalIOErrorInFunction(realIsPtr())
<< "problem while reading header for object "
<< is.name() << exit(FatalIOError);
}
}
// Read slave data
for
(
label proci = 1;
proci < UPstream::nProcs(comm);
++proci
)
{
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
OPstream os
(
UPstream::commsTypes::scheduled,
proci,
0,
UPstream::msgType(),
comm
);
os << data;
}
ok = is.good();
}
else
{
IPstream is
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
0,
UPstream::msgType(),
comm
);
is >> data;
string buf(data.begin(), data.size());
realIsPtr.reset
(
new IStringStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
}
}
else
{
PstreamBuffers pBufs
(
UPstream::commsTypes::nonBlocking,
UPstream::msgType(),
comm
);
if (UPstream::master(comm))
{
Istream& is = isPtr();
is.fatalCheck("read(Istream&)");
// Read master data
{
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
string buf(data.begin(), data.size());
realIsPtr.reset
(
new IStringStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
// Read header
if (!headerIO.readHeader(realIsPtr()))
{
FatalIOErrorInFunction(realIsPtr())
<< "problem while reading header for object "
<< is.name() << exit(FatalIOError);
}
}
// Read slave data
for
(
label proci = 1;
proci < UPstream::nProcs(comm);
++proci
)
{
List<char> elems(is);
is.fatalCheck("read(Istream&) : reading entry");
UOPstream os(proci, pBufs);
os << elems;
}
ok = is.good();
}
labelList recvSizes;
pBufs.finishedSends(recvSizes);
if (!UPstream::master(comm))
{
UIPstream is(UPstream::masterNo(), pBufs);
is >> data;
string buf(data.begin(), data.size());
realIsPtr.reset
(
new IStringStream
(
buf,
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
}
}
Pstream::scatter(ok, Pstream::msgType(), comm);
// version
string versionString(realIsPtr().version().str());
Pstream::scatter(versionString, Pstream::msgType(), comm);
realIsPtr().version(IOstream::versionNumber(versionString));
// stream
{
OStringStream os;
os << realIsPtr().format();
string formatString(os.str());
Pstream::scatter(formatString, Pstream::msgType(), comm);
realIsPtr().format(formatString);
}
word name(headerIO.name());
Pstream::scatter(name, Pstream::msgType(), comm);
headerIO.rename(name);
Pstream::scatter(headerIO.headerClassName(), Pstream::msgType(), comm);
Pstream::scatter(headerIO.note(), Pstream::msgType(), comm);
//Pstream::scatter(headerIO.instance(), Pstream::msgType(), comm);
//Pstream::scatter(headerIO.local(), Pstream::msgType(), comm);
return realIsPtr;
}
void Foam::decomposedBlockData::gather
(
const label comm,
const label data,
labelList& datas
)
{
const label nProcs = UPstream::nProcs(comm);
datas.setSize(nProcs);
char* data0Ptr = reinterpret_cast<char*>(datas.begin());
List<int> recvOffsets;
List<int> recvSizes;
if (UPstream::master(comm))
{
recvOffsets.setSize(nProcs);
forAll(recvOffsets, proci)
{
// Note: truncating long int to int since UPstream::gather limited
// to ints
recvOffsets[proci] =
int(reinterpret_cast<char*>(&datas[proci]) - data0Ptr);
}
recvSizes.setSize(nProcs, sizeof(label));
}
UPstream::gather
(
reinterpret_cast<const char*>(&data),
sizeof(label),
data0Ptr,
recvSizes,
recvOffsets,
comm
);
}
void Foam::decomposedBlockData::gatherSlaveData
(
const label comm,
const UList<char>& data,
const labelUList& recvSizes,
const label startProc,
const label nProcs,
List<int>& sliceOffsets,
List<char>& recvData
)
{
// Calculate master data
List<int> sliceSizes;
if (UPstream::master(comm))
{
const label numProcs = UPstream::nProcs(comm);
sliceSizes.setSize(numProcs, 0);
sliceOffsets.setSize(numProcs+1, 0);
int totalSize = 0;
label proci = startProc;
for (label i = 0; i < nProcs; i++)
{
sliceSizes[proci] = int(recvSizes[proci]);
sliceOffsets[proci] = totalSize;
totalSize += sliceSizes[proci];
++proci;
}
sliceOffsets[proci] = totalSize;
recvData.setSize(totalSize);
}
int nSend = 0;
if
(
!UPstream::master(comm)
&& (UPstream::myProcNo(comm) >= startProc)
&& (UPstream::myProcNo(comm) < startProc+nProcs)
)
{
// Note: UPstream::gather limited to int
nSend = int(data.byteSize());
}
UPstream::gather
(
data.begin(),
nSend,
recvData.begin(),
sliceSizes,
sliceOffsets,
comm
);
}
Foam::label Foam::decomposedBlockData::calcNumProcs
(
const label comm,
const off_t maxBufferSize,
const labelUList& recvSizes,
const label startProci
)
{
const label nProcs = UPstream::nProcs(comm);
label nSendProcs = -1;
if (UPstream::master(comm))
{
off_t totalSize = recvSizes[startProci];
label proci = startProci+1;
while (proci < nProcs && (totalSize+recvSizes[proci] < maxBufferSize))
{
totalSize += recvSizes[proci];
proci++;
}
nSendProcs = proci-startProci;
}
// Scatter nSendProcs
label n;
UPstream::scatter
(
reinterpret_cast<const char*>(&nSendProcs),
List<int>(nProcs, sizeof(nSendProcs)),
List<int>(nProcs, 0),
reinterpret_cast<char*>(&n),
sizeof(n),
comm
);
return n;
}
bool Foam::decomposedBlockData::writeBlocks
(
const label comm,
autoPtr<OSstream>& osPtr,
List<std::streamoff>& start,
const UList<char>& data,
const labelUList& recvSizes,
const PtrList<SubList<char>>& slaveData,
const UPstream::commsTypes commsType,
const bool syncReturnState
)
{
if (debug)
{
Pout<< "decomposedBlockData::writeBlocks:"
<< " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
<< " data:" << data.size()
<< " (master only) slaveData:" << slaveData.size()
<< " commsType:" << Pstream::commsTypeNames[commsType] << endl;
}
const label nProcs = UPstream::nProcs(comm);
bool ok = true;
if (slaveData.size())
{
// Already have gathered the slave data. communicator only used to
// check who is the master
if (UPstream::master(comm))
{
OSstream& os = osPtr();
start.setSize(nProcs);
// Write master data
{
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
// Write slaves
label slaveOffset = 0;
for (label proci = 1; proci < nProcs; ++proci)
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os << slaveData[proci];
slaveOffset += recvSizes[proci];
}
ok = os.good();
}
}
else if (commsType == UPstream::commsTypes::scheduled)
{
if (UPstream::master(comm))
{
start.setSize(nProcs);
OSstream& os = osPtr();
// Write master data
{
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
// Write slaves
List<char> elems;
for (label proci = 1; proci < nProcs; ++proci)
{
elems.setSize(recvSizes[proci]);
IPstream::read
(
UPstream::commsTypes::scheduled,
proci,
elems.begin(),
elems.size(),
Pstream::msgType(),
comm
);
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os << elems;
}
ok = os.good();
}
else
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
data.begin(),
data.byteSize(),
Pstream::msgType(),
comm
);
}
}
else
{
// Write master data
if (UPstream::master(comm))
{
start.setSize(nProcs);
OSstream& os = osPtr();
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
// Find out how many processor can be received into
// maxMasterFileBufferSize
// Starting slave processor and number of processors
label startProc = 1;
label nSendProcs = nProcs-1;
while (nSendProcs > 0)
{
nSendProcs = calcNumProcs
(
comm,
off_t
(
fileOperations::masterUncollatedFileOperation::
maxMasterFileBufferSize
),
recvSizes,
startProc
);
if (startProc == nProcs || nSendProcs == 0)
{
break;
}
// Gather data from (a slice of) the slaves
List<int> sliceOffsets;
List<char> recvData;
gatherSlaveData
(
comm,
data,
recvSizes,
startProc, // startProc,
nSendProcs, // nProcs,
sliceOffsets,
recvData
);
if (UPstream::master(comm))
{
OSstream& os = osPtr();
// Write slaves
for
(
label proci = startProc;
proci < startProc+nSendProcs;
proci++
)
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os <<
SubList<char>
(
recvData,
sliceOffsets[proci+1]-sliceOffsets[proci],
sliceOffsets[proci]
);
}
}
startProc += nSendProcs;
}
if (UPstream::master(comm))
{
ok = osPtr().good();
}
}
if (syncReturnState)
{
//- Enable to get synchronised error checking. Is the one that keeps
// slaves as slow as the master (which does all the writing)
Pstream::scatter(ok, Pstream::msgType(), comm);
}
return ok;
}
bool Foam::decomposedBlockData::read()
{
autoPtr<ISstream> isPtr;
fileName objPath(fileHandler().filePath(false, *this, word::null));
if (UPstream::master(comm_))
{
isPtr.reset(new IFstream(objPath));
IOobject::readHeader(isPtr());
}
List<char>& data = *this;
return readBlocks(comm_, isPtr, data, commsType_);
}
bool Foam::decomposedBlockData::writeData(Ostream& os) const
{
const List<char>& data = *this;
string str
(
reinterpret_cast<const char*>(data.cbegin()),
data.byteSize()
);
IOobject io(*this);
if (Pstream::master(comm_))
{
IStringStream is
(
str,
IOstream::ASCII,
IOstream::currentVersion,
name()
);
io.readHeader(is);
}
// Scatter header information
// version
string versionString(os.version().str());
Pstream::scatter(versionString, Pstream::msgType(), comm_);
// stream
string formatString;
{
OStringStream os;
os << os.format();
formatString = os.str();
Pstream::scatter(formatString, Pstream::msgType(), comm_);
}
//word masterName(name());
//Pstream::scatter(masterName, Pstream::msgType(), comm_);
Pstream::scatter(io.headerClassName(), Pstream::msgType(), comm_);
Pstream::scatter(io.note(), Pstream::msgType(), comm_);
//Pstream::scatter(io.instance(), Pstream::msgType(), comm);
//Pstream::scatter(io.local(), Pstream::msgType(), comm);
fileName masterLocation(instance()/db().dbDir()/local());
Pstream::scatter(masterLocation, Pstream::msgType(), comm_);
if (!Pstream::master(comm_))
{
writeHeader
(
os,
IOstream::versionNumber(versionString),
IOstream::formatEnum(formatString),
io.headerClassName(),
io.note(),
masterLocation,
name()
);
}
os.writeQuoted(str, false);
if (!Pstream::master(comm_))
{
IOobject::writeEndDivider(os);
}
return os.good();
}
bool Foam::decomposedBlockData::writeObject
(
IOstream::streamFormat fmt,
IOstream::versionNumber ver,
IOstream::compressionType cmp,
const bool valid
) const
{
autoPtr<OSstream> osPtr;
if (UPstream::master(comm_))
{
// Note: always write binary. These are strings so readable
// anyway. They have already be tokenised on the sending side.
osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
IOobject::writeHeader(osPtr());
}
labelList recvSizes;
gather(comm_, label(this->byteSize()), recvSizes);
List<std::streamoff> start;
PtrList<SubList<char>> slaveData; // dummy slave data
return writeBlocks
(
comm_,
osPtr,
start,
*this,
recvSizes,
slaveData,
commsType_
);
}
Foam::label Foam::decomposedBlockData::numBlocks(const fileName& fName)
{
label nBlocks = 0;
IFstream is(fName);
is.fatalCheck("decomposedBlockData::numBlocks(const fileName&)");
if (!is.good())
{
return nBlocks;
}
// FoamFile header
token firstToken(is);
if
(
is.good()
&& firstToken.isWord()
&& firstToken.wordToken() == "FoamFile"
)
{
dictionary headerDict(is);
is.version(headerDict.lookup("version"));
is.format(headerDict.lookup("format"));
// Obtain number of blocks directly
if (headerDict.readIfPresent("blocks", nBlocks))
{
return nBlocks;
}
}
// Fallback to brute force read of each data block
List<char> data;
while (is.good())
{
token sizeToken(is);
if (!sizeToken.isLabel())
{
return nBlocks;
}
is.putBack(sizeToken);
is >> data;
nBlocks++;
}
return nBlocks;
}
// ************************************************************************* //