Commit 8498e3f5 authored by Mark Olesen's avatar Mark Olesen Committed by Mark Olesen
Browse files

ENH: reduce some overhead in fileOperations

- more consistent handling of file format (#1587)
parent 42299dca
......@@ -6,6 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2020 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -30,7 +31,6 @@ License
#include "IPstream.H"
#include "PstreamBuffers.H"
#include "Fstream.H"
#include "StringStream.H"
#include "dictionary.H"
#include "objectRegistry.H"
#include "SubList.H"
......@@ -160,12 +160,6 @@ Foam::decomposedBlockData::decomposedBlockData
}
// * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * * //
Foam::decomposedBlockData::~decomposedBlockData()
{}
// * * * * * * * * * * * * * * * Members Functions * * * * * * * * * * * * * //
bool Foam::decomposedBlockData::readMasterHeader(IOobject& io, Istream& is)
......@@ -181,15 +175,11 @@ bool Foam::decomposedBlockData::readMasterHeader(IOobject& io, Istream& is)
List<char> data(is);
is.fatalCheck("read(Istream&) : reading entry");
IListStream str
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
is.name()
);
return io.readHeader(str);
UIListStream headerStream(data);
headerStream.name() = is.name();
return io.readHeader(headerStream);
}
......@@ -254,24 +244,15 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
is.fatalCheck("read(Istream&)");
List<char> data;
autoPtr<ISstream> realIsPtr;
if (blocki == 0)
{
is >> data;
List<char> data(is);
is.fatalCheck("read(Istream&) : reading entry");
realIsPtr.reset
(
new IListStream
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
is.name()
)
);
realIsPtr.reset(new IListStream(std::move(data)));
realIsPtr->name() = is.name();
// Read header
if (!headerIO.readHeader(realIsPtr()))
......@@ -284,7 +265,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
else
{
// Read master for header
is >> data;
List<char> data(is);
is.fatalCheck("read(Istream&) : reading entry");
IOstream::versionNumber ver(IOstream::currentVersion);
......@@ -292,13 +273,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
unsigned labelByteSize;
unsigned scalarByteSize;
{
UIListStream headerStream
(
data,
IOstream::ASCII,
IOstream::currentVersion,
is.name()
);
UIListStream headerStream(data);
// Read header
if (!headerIO.readHeader(headerStream))
......@@ -315,20 +290,12 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
for (label i = 1; i < blocki+1; i++)
{
// Read data, override old data
// Read and discard data, only retain the last one
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
}
realIsPtr.reset
(
new IListStream
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
is.name()
)
);
realIsPtr.reset(new IListStream(std::move(data)));
realIsPtr->name() = is.name();
// Apply master stream settings to realIsPtr
realIsPtr().format(fmt);
......@@ -336,6 +303,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
realIsPtr().setLabelByteSize(labelByteSize);
realIsPtr().setScalarByteSize(scalarByteSize);
}
return realIsPtr;
}
......@@ -358,22 +326,6 @@ bool Foam::decomposedBlockData::readBlocks
bool ok = false;
//// Scatter master header info
//string ver;
//unsigned labelByteSize;
//unsigned scalarByteSize;
//if (UPstream::master(comm))
//{
// ver = isPtr().version().str();
// labelByteSize = isPtr().labelByteSize();
// scalarByteSize = isPtr().scalarByteSize();
//}
//Pstream::scatter(ver); //, Pstream::msgType(), comm);
//Pstream::scatter(labelByteSize); //, Pstream::msgType(), comm);
//Pstream::scatter(scalarByteSize); //, Pstream::msgType(), comm);
if (commsType == UPstream::commsTypes::scheduled)
{
if (UPstream::master(comm))
......@@ -509,17 +461,8 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
realIsPtr.reset
(
new IListStream
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
realIsPtr.reset(new IListStream(std::move(data)));
realIsPtr->name() = fName;
// Read header
if (!headerIO.readHeader(realIsPtr()))
......@@ -566,16 +509,8 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
);
is >> data;
realIsPtr.reset
(
new IListStream
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
realIsPtr.reset(new IListStream(std::move(data)));
realIsPtr->name() = fName;
}
}
else
......@@ -597,16 +532,8 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
is >> data;
is.fatalCheck("read(Istream&) : reading entry");
realIsPtr.reset
(
new IListStream
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
realIsPtr.reset(new IListStream(std::move(data)));
realIsPtr->name() = fName;
// Read header
if (!headerIO.readHeader(realIsPtr()))
......@@ -643,16 +570,8 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
UIPstream is(UPstream::masterNo(), pBufs);
is >> data;
realIsPtr.reset
(
new IListStream
(
std::move(data),
IOstream::ASCII,
IOstream::currentVersion,
fName
)
);
realIsPtr.reset(new IListStream(std::move(data)));
realIsPtr->name() = fName;
}
}
......@@ -661,26 +580,24 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
//- Set stream properties from realIsPtr on master
// Scatter master header info
string ver;
string format;
string versionString;
label formatValue;
unsigned labelByteSize;
unsigned scalarByteSize;
if (UPstream::master(comm))
{
ver = realIsPtr().version().str();
OStringStream os;
os << realIsPtr().format();
format = os.str();
versionString = realIsPtr().version().str();
formatValue = static_cast<label>(realIsPtr().format());
labelByteSize = realIsPtr().labelByteSize();
scalarByteSize = realIsPtr().scalarByteSize();
}
Pstream::scatter(ver); //, Pstream::msgType(), comm);
Pstream::scatter(format); //, Pstream::msgType(), comm);
Pstream::scatter(versionString); //, Pstream::msgType(), comm);
Pstream::scatter(formatValue); //, Pstream::msgType(), comm);
Pstream::scatter(labelByteSize); //, Pstream::msgType(), comm);
Pstream::scatter(scalarByteSize); //, Pstream::msgType(), comm);
realIsPtr().version(IOstream::versionNumber(ver));
realIsPtr().format(format);
realIsPtr().version(IOstream::versionNumber(versionString));
realIsPtr().format(IOstream::streamFormat(formatValue));
realIsPtr().setLabelByteSize(labelByteSize);
realIsPtr().setScalarByteSize(scalarByteSize);
......@@ -1072,30 +989,17 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
// Re-read my own data to find out the header information
if (Pstream::master(comm_))
{
UIListStream is
(
data,
IOstream::ASCII,
IOstream::currentVersion,
name()
);
io.readHeader(is);
UIListStream headerStream(data);
io.readHeader(headerStream);
}
// Scatter header information
// version
string versionString(os.version().str());
Pstream::scatter(versionString, Pstream::msgType(), comm_);
// stream
string formatString;
{
OStringStream os;
os << os.format();
formatString = os.str();
Pstream::scatter(formatString, Pstream::msgType(), comm_);
}
label formatValue(os.format());
Pstream::scatter(formatValue, Pstream::msgType(), comm_);
//word masterName(name());
//Pstream::scatter(masterName, Pstream::msgType(), comm_);
......@@ -1114,7 +1018,7 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
(
os,
IOstream::versionNumber(versionString),
IOstream::formatEnum(formatString),
IOstream::streamFormat(formatValue),
io.headerClassName(),
io.note(),
masterLocation,
......@@ -1122,12 +1026,18 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
);
}
string str
(
reinterpret_cast<const char*>(data.cbegin()),
data.byteSize()
);
os.writeQuoted(str, false);
// Write the character data
if (isA<OFstream>(os))
{
// Serial file output - can use writeRaw()
os.writeRaw(data.cdata(), data.byteSize());
}
else
{
// Other cases are less fortunate, and no std::string_view
std::string str(data.cdata(), data.byteSize());
os.writeQuoted(str, false);
}
if (!Pstream::master(comm_))
{
......
......@@ -6,6 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2020 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
......@@ -57,7 +58,7 @@ class decomposedBlockData
{
protected:
// Protected data
// Protected Data
//- Type to use for gather
const UPstream::commsTypes commsType_;
......@@ -90,6 +91,7 @@ protected:
public:
//- Declare type-name, virtual type (with debug switch)
TypeName("decomposedBlockData");
......@@ -123,7 +125,7 @@ public:
//- Destructor
virtual ~decomposedBlockData();
virtual ~decomposedBlockData() = default;
// Member functions
......@@ -133,7 +135,7 @@ public:
//- Write separated content. Assumes content is the serialised data
// and that the master data contains a header
virtual bool writeData(Ostream&) const;
virtual bool writeData(Ostream& os) const;
//- Write using stream options
virtual bool writeObject
......@@ -224,7 +226,7 @@ public:
);
//- Detect number of blocks in a file
static label numBlocks(const fileName&);
static label numBlocks(const fileName& fName);
};
......
......@@ -494,93 +494,71 @@ bool Foam::fileOperations::masterUncollatedFileOperation::uniformFile
void Foam::fileOperations::masterUncollatedFileOperation::readAndSend
(
const fileName& filePath,
const IOstream::compressionType cmp,
const labelUList& procs,
PstreamBuffers& pBufs
)
{
if (cmp == IOstream::compressionType::COMPRESSED)
IFstream ifs(filePath, IOstream::streamFormat::BINARY);
if (!ifs.good())
{
if (debug)
{
Pout<< "masterUncollatedFileOperation::readAndSend :"
<< " Opening compressed " << filePath << endl;
}
FatalIOErrorInFunction(filePath)
<< "Cannot open file " << filePath
<< exit(FatalIOError);
}
if (debug)
{
Pout<< "masterUncollatedFileOperation::readAndSend :"
<< " compressed:" << bool(ifs.compression()) << " "
<< filePath << endl;
}
IFstream is(filePath, IOstream::streamFormat::BINARY);
if (ifs.compression() == IOstream::compressionType::COMPRESSED)
{
// Could use Foam::fileSize, estimate uncompressed size (eg, 2x)
// and then string reserve followed by string assign...
if (!is.good())
// Uncompress and read file contents into a character buffer
const std::string buf
(
std::istreambuf_iterator<char>(ifs.stdStream()),
std::istreambuf_iterator<char>()
);
for (const label proci : procs)
{
FatalIOErrorInFunction(filePath) << "Cannot open file " << filePath
<< exit(FatalIOError);
UOPstream os(proci, pBufs);
os.write(buf.data(), buf.length());
}
std::ostringstream stringStr;
stringStr << is.stdStream().rdbuf();
string buf(stringStr.str());
forAll(procs, i)
if (debug)
{
UOPstream os(procs[i], pBufs);
os.write(&buf[0], buf.size());
Pout<< "masterUncollatedFileOperation::readStream :"
<< " From " << filePath << " sent " << buf.size()
<< " bytes" << endl;
}
}
else
{
off_t count(Foam::fileSize(filePath));
IFstream is(filePath, IOstream::streamFormat::BINARY);
const off_t count(Foam::fileSize(filePath));
if (!is.good())
// Read file contents into a character buffer
List<char> buf(static_cast<label>(count));
ifs.stdStream().read(buf.data(), count);
for (const label proci : procs)
{
FatalIOErrorInFunction(filePath) << "Cannot open file " << filePath
<< exit(FatalIOError);
UOPstream os(proci, pBufs);
os.write(buf.cdata(), count);
}
if (debug)
{
Pout<< "masterUncollatedFileOperation::readStream :"
<< " From " << filePath << " reading " << label(count)
<< " From " << filePath << " sent " << buf.size()
<< " bytes" << endl;
}
List<char> buf(static_cast<label>(count));
is.stdStream().read(buf.begin(), count);
forAll(procs, i)
{
UOPstream os(procs[i], pBufs);
os.write(buf.begin(), count);
}
}
}
void Foam::fileOperations::masterUncollatedFileOperation::readAndSend
(
const fileName& fName,
const labelUList& procs,
PstreamBuffers& pBufs
)
{
if (Foam::exists(fName+".gz", false))
{
readAndSend
(
fName,
IOstream::compressionType::COMPRESSED,
procs,
pBufs
);
}
else
{
readAndSend
(
fName,
IOstream::compressionType::UNCOMPRESSED,
procs,
pBufs
);
}
}
......@@ -656,18 +634,16 @@ Foam::fileOperations::masterUncollatedFileOperation::read
<< exit(FatalIOError);
}
autoPtr<IFstream> ifsPtr(new IFstream(filePaths[0]));
// Open master
isPtr.reset(new IFstream(filePaths[0]));
// Read header
if (!io.readHeader(ifsPtr()))
if (!io.readHeader(isPtr()))
{
FatalIOErrorInFunction(ifsPtr())
FatalIOErrorInFunction(isPtr())
<< "problem while reading header for object "
<< io.name() << exit(FatalIOError);
}
// Open master (steal from ifsPtr)
isPtr.reset(ifsPtr.ptr());
}
// Read slave files
......@@ -715,7 +691,7 @@ Foam::fileOperations::masterUncollatedFileOperation::read
List<char> buf(recvSizes[Pstream::masterNo()]);
if (recvSizes[Pstream::masterNo()] > 0)
{
is.read(buf.begin(), recvSizes[Pstream::masterNo()]);
is.read(buf.data(), recvSizes[Pstream::masterNo()]);
}
if (debug)
......@@ -723,17 +699,15 @@ Foam::fileOperations::masterUncollatedFileOperation::read
Pout<< "masterUncollatedFileOperation::readStream :"
<< " Done reading " << buf.size() << " bytes" << endl;
}
const fileName& fName = filePaths[Pstream::myProcNo(comm)];
isPtr.reset
(
new IListStream
(
std::move(buf),
IOstream::BINARY,
IOstream::currentVersion,
fName
)
);
// A local character buffer copy of the Pstream contents.
// Construct with same parameters (ASCII, current version)
// as the IFstream so that it has the same characteristics.
isPtr.reset(new IListStream(std::move(buf)));
// With the proper file name
isPtr->name() = filePaths[Pstream::myProcNo(comm)];
if (!io.readHeader(isPtr()))
{
......@@ -1424,6 +1398,7 @@ Foam::fileName Foam::fileOperations::masterUncollatedFileOperation::dirPath
{
// Retest all processors separately since some processors might
// have the file and some not (e.g. lagrangian data)
objPath = masterOp<fileName, fileOrNullOp>
(
io.objectPath(),
......@@ -1721,7 +1696,7 @@ Foam::fileOperations::masterUncollatedFileOperation::readObjects
{
// Avoid fileOperation::readObjects from triggering parallel ops
// (through call to filePath which triggers parallel )
bool oldParRun = UPstream::parRun();
const bool oldParRun = UPstream::parRun();
UPstream::parRun() = false;
//- Use non-time searching version
......@@ -1872,8 +1847,8 @@ bool Foam::fileOperations::masterUncollatedFileOperation::readHeader
== decomposedBlockData::typeName
)
{
// Read the header inside the container (master
// data)
// Read the header inside the container