Skip to content
Snippets Groups Projects
  • Henry Weller's avatar
    8959b8e0
    ENH: Improvements to the fileHandler and collated IO · 8959b8e0
    Henry Weller authored
    Improvements to existing functionality
    --------------------------------------
      - MPI is initialised without thread support if it is not needed e.g. uncollated
      - Use native c++11 threading; avoids problem with static destruction order.
      - etc/cellModels now only read if needed.
      - etc/controlDict can now be read from the environment variable FOAM_CONTROLDICT
      - Uniform files (e.g. '0/uniform/time') are now read only once on the master only
        (with the masterUncollated or collated file handlers)
      - collated format writes to 'processorsNNN' instead of 'processors'.  The file
        format is unchanged.
      - Thread buffer and file buffer size are no longer limited to 2Gb.
    
    The global controlDict file contains parameters for file handling.  Under some
    circumstances, e.g. running in parallel on a system without NFS, the user may
    need to set some parameters, e.g. fileHandler, before the global controlDict
    file is read from file.  To support this, OpenFOAM now allows the global
    controlDict to be read as a string set to the FOAM_CONTROLDICT environment
    variable.
    
    The FOAM_CONTROLDICT environment variable can be set to the content the global
    controlDict file, e.g. from a sh/bash shell:
    
        export FOAM_CONTROLDICT=$(foamDictionary $FOAM_ETC/controlDict)
    
    FOAM_CONTROLDICT can then be passed to mpirun using the -x option, e.g.:
    
        mpirun -np 2 -x FOAM_CONTROLDICT simpleFoam -parallel
    
    Note that while this avoids the need for NFS to read the OpenFOAM configuration
    the executable still needs to load shared libraries which must either be copied
    locally or available via NFS or equivalent.
    
    New: Multiple IO ranks
    ----------------------
    The masterUncollated and collated fileHandlers can now use multiple ranks for
    writing e.g.:
    
        mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)'
    
    In this example ranks 0 ('processor0') and 3 ('processor3') now handle all the
    I/O.  Rank 0 handles 0,1,2 and rank 3 handles 3,4,5.  The set of IO ranks should always
    include 0 as first element and be sorted in increasing order.
    
    The collated fileHandler uses the directory naming processorsNNN_XXX-YYY where
    NNN is the total number of processors and XXX and YYY are first and last
    processor in the rank, e.g. in above example the directories would be
    
        processors6_0-2
        processors6_3-5
    
    and each of the collated files in these contains data of the local ranks
    only. The same naming also applies when e.g. running decomposePar:
    
    decomposePar -fileHandler collated -ioRanks '(0 3)'
    
    New: Distributed data
    ---------------------
    
    The individual root directories can be placed on different hosts with different
    paths if necessary.  In the current framework it is necessary to specify the
    root per slave process but this has been simplified with the option of specifying
    the root per host with the -hostRoots command line option:
    
        mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' \
            -hostRoots '("machineA" "/tmp/" "machineB" "/tmp")'
    
    The hostRoots option is followed by a list of machine name + root directory, the
    machine name can contain regular expressions.
    
    New: hostCollated
    -----------------
    
    The new hostCollated fileHandler automatically sets the 'ioRanks' according to
    the host name with the lowest rank e.g. to run simpleFoam on 6 processors with
    ranks 0-2 on machineA and ranks 3-5 on machineB with the machines specified in
    the hostfile:
    
        mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler hostCollated
    
    This is equivalent to
    
        mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler collated -ioRanks '(0 3)'
    
    This example will write directories:
    
        processors6_0-2/
        processors6_3-5/
    
    A typical example would use distributed data e.g. no two nodes, machineA and
    machineB, each with three processes:
    
        decomposePar -fileHandler collated -case cavity
    
        # Copy case (constant/*, system/*, processors6/) to master:
        rsync -a cavity machineA:/tmp/
    
        # Create root on slave:
        ssh machineB mkdir -p /tmp/cavity
    
        # Run
        mpirun --hostfile hostfile icoFoam \
            -case /tmp/cavity -parallel -fileHandler hostCollated \
            -hostRoots '("machineA" "/tmp" "machineB" "/tmp")'
    
    Contributed by Mattijs Janssens
    8959b8e0
    History
    ENH: Improvements to the fileHandler and collated IO
    Henry Weller authored
    Improvements to existing functionality
    --------------------------------------
      - MPI is initialised without thread support if it is not needed e.g. uncollated
      - Use native c++11 threading; avoids problem with static destruction order.
      - etc/cellModels now only read if needed.
      - etc/controlDict can now be read from the environment variable FOAM_CONTROLDICT
      - Uniform files (e.g. '0/uniform/time') are now read only once on the master only
        (with the masterUncollated or collated file handlers)
      - collated format writes to 'processorsNNN' instead of 'processors'.  The file
        format is unchanged.
      - Thread buffer and file buffer size are no longer limited to 2Gb.
    
    The global controlDict file contains parameters for file handling.  Under some
    circumstances, e.g. running in parallel on a system without NFS, the user may
    need to set some parameters, e.g. fileHandler, before the global controlDict
    file is read from file.  To support this, OpenFOAM now allows the global
    controlDict to be read as a string set to the FOAM_CONTROLDICT environment
    variable.
    
    The FOAM_CONTROLDICT environment variable can be set to the content the global
    controlDict file, e.g. from a sh/bash shell:
    
        export FOAM_CONTROLDICT=$(foamDictionary $FOAM_ETC/controlDict)
    
    FOAM_CONTROLDICT can then be passed to mpirun using the -x option, e.g.:
    
        mpirun -np 2 -x FOAM_CONTROLDICT simpleFoam -parallel
    
    Note that while this avoids the need for NFS to read the OpenFOAM configuration
    the executable still needs to load shared libraries which must either be copied
    locally or available via NFS or equivalent.
    
    New: Multiple IO ranks
    ----------------------
    The masterUncollated and collated fileHandlers can now use multiple ranks for
    writing e.g.:
    
        mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)'
    
    In this example ranks 0 ('processor0') and 3 ('processor3') now handle all the
    I/O.  Rank 0 handles 0,1,2 and rank 3 handles 3,4,5.  The set of IO ranks should always
    include 0 as first element and be sorted in increasing order.
    
    The collated fileHandler uses the directory naming processorsNNN_XXX-YYY where
    NNN is the total number of processors and XXX and YYY are first and last
    processor in the rank, e.g. in above example the directories would be
    
        processors6_0-2
        processors6_3-5
    
    and each of the collated files in these contains data of the local ranks
    only. The same naming also applies when e.g. running decomposePar:
    
    decomposePar -fileHandler collated -ioRanks '(0 3)'
    
    New: Distributed data
    ---------------------
    
    The individual root directories can be placed on different hosts with different
    paths if necessary.  In the current framework it is necessary to specify the
    root per slave process but this has been simplified with the option of specifying
    the root per host with the -hostRoots command line option:
    
        mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' \
            -hostRoots '("machineA" "/tmp/" "machineB" "/tmp")'
    
    The hostRoots option is followed by a list of machine name + root directory, the
    machine name can contain regular expressions.
    
    New: hostCollated
    -----------------
    
    The new hostCollated fileHandler automatically sets the 'ioRanks' according to
    the host name with the lowest rank e.g. to run simpleFoam on 6 processors with
    ranks 0-2 on machineA and ranks 3-5 on machineB with the machines specified in
    the hostfile:
    
        mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler hostCollated
    
    This is equivalent to
    
        mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler collated -ioRanks '(0 3)'
    
    This example will write directories:
    
        processors6_0-2/
        processors6_3-5/
    
    A typical example would use distributed data e.g. no two nodes, machineA and
    machineB, each with three processes:
    
        decomposePar -fileHandler collated -case cavity
    
        # Copy case (constant/*, system/*, processors6/) to master:
        rsync -a cavity machineA:/tmp/
    
        # Create root on slave:
        ssh machineB mkdir -p /tmp/cavity
    
        # Run
        mpirun --hostfile hostfile icoFoam \
            -case /tmp/cavity -parallel -fileHandler hostCollated \
            -hostRoots '("machineA" "/tmp" "machineB" "/tmp")'
    
    Contributed by Mattijs Janssens
decomposedBlockData.C 28.99 KiB
/*---------------------------------------------------------------------------*\
  =========                 |
  \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\    /   O peration     |
    \\  /    A nd           | Copyright (C) 2017-2018 OpenFOAM Foundation
     \\/     M anipulation  |
-------------------------------------------------------------------------------
License
    This file is part of OpenFOAM.

    OpenFOAM is free software: you can redistribute it and/or modify it
    under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    for more details.

    You should have received a copy of the GNU General Public License
    along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.

\*---------------------------------------------------------------------------*/

#include "decomposedBlockData.H"
#include "OPstream.H"
#include "IPstream.H"
#include "PstreamBuffers.H"
#include "Fstream.H"
#include "StringStream.H"
#include "dictionary.H"
#include "objectRegistry.H"
#include "SubList.H"
#include "labelPair.H"
#include "masterUncollatedFileOperation.H"

// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //

namespace Foam
{
    defineTypeNameAndDebug(decomposedBlockData, 0);
}

// * * * * * * * * * * * * * * * * Constructors  * * * * * * * * * * * * * * //

Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }
    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
}


Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const UList<char>& list,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }

    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
    else
    {
        List<char>::operator=(list);
    }
}


Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    List<char>&& list,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }

    List<char>::transfer(list);

    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
}


// * * * * * * * * * * * * * * * Destructor  * * * * * * * * * * * * * * * * //

Foam::decomposedBlockData::~decomposedBlockData()
{}


// * * * * * * * * * * * * * * * Members Functions * * * * * * * * * * * * * //

bool Foam::decomposedBlockData::readMasterHeader(IOobject& io, Istream& is)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readMasterHeader:"
            << " stream:" << is.name() << endl;
    }

    // Master-only reading of header
    is.fatalCheck("read(Istream&)");

    List<char> data(is);
    is.fatalCheck("read(Istream&) : reading entry");
    string buf(data.begin(), data.size());
    IStringStream str
    (
        buf,
        IOstream::ASCII,
        IOstream::currentVersion,
        is.name()
    );

    return io.readHeader(str);
}


void Foam::decomposedBlockData::writeHeader
(
    Ostream& os,
    const IOstream::versionNumber version,
    const IOstream::streamFormat format,
    const word& type,
    const string& note,
    const fileName& location,
    const word& name
)
{
    IOobject::writeBanner(os)
        << "FoamFile\n{\n"
        << "    version     " << version << ";\n"
        << "    format      " << format << ";\n"
        << "    class       " << type << ";\n";

    // This may be useful to have as well
    /*
    if (os.format() == IOstream::BINARY)
    {
        os  << "    arch        " << Foam::FOAMbuildArch << ";\n";
    }
    */

    if (Pstream::parRun())
    {
        os  << "    blocks      " << Pstream::nProcs() << ";\n";
    }

    if (note.size())
    {
        os  << "    note        " << note << ";\n";
    }

    if (location.size())
    {
        os  << "    location    " << location << ";\n";
    }

    os  << "    object      " << name << ";\n"
        << "}" << nl;

    IOobject::writeDivider(os) << nl;
}


Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
(
    const label blocki,
    Istream& is,
    IOobject& headerIO
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlock:"
            << " stream:" << is.name() << " attempt to read block " << blocki
            << endl;
    }

    is.fatalCheck("read(Istream&)");

    List<char> data;
    autoPtr<ISstream> realIsPtr;

    if (blocki == 0)
    {
        is >> data;
        is.fatalCheck("read(Istream&) : reading entry");

        string buf(data.begin(), data.size());
        realIsPtr.reset
        (
            new IStringStream
            (
                buf,
                IOstream::ASCII,
                IOstream::currentVersion,
                is.name()
            )
        );

        // Read header
        if (!headerIO.readHeader(realIsPtr()))
        {
            FatalIOErrorInFunction(realIsPtr())
                << "problem while reading header for object "
                << is.name() << exit(FatalIOError);
        }
    }
    else
    {
        // Read master for header
        is >> data;
        is.fatalCheck("read(Istream&) : reading entry");

        IOstream::versionNumber ver(IOstream::currentVersion);
        IOstream::streamFormat fmt;
        {
            string buf(data.begin(), data.size());
            IStringStream headerStream
            (
                buf,
                IOstream::ASCII,
                IOstream::currentVersion,
                is.name()
            );

            // Read header
            if (!headerIO.readHeader(headerStream))
            {
                FatalIOErrorInFunction(headerStream)
                    << "problem while reading header for object "
                    << is.name() << exit(FatalIOError);
            }
            ver = headerStream.version();
            fmt = headerStream.format();
        }

        for (label i = 1; i < blocki+1; i++)
        {
            // Read data, override old data
            is >> data;
            is.fatalCheck("read(Istream&) : reading entry");
        }
        string buf(data.begin(), data.size());
        realIsPtr.reset
        (
            new IStringStream
            (
                buf,
                IOstream::ASCII,
                IOstream::currentVersion,
                is.name()
            )
        );

        // Apply master stream settings to realIsPtr
        realIsPtr().format(fmt);
        realIsPtr().version(ver);
    }
    return realIsPtr;
}


bool Foam::decomposedBlockData::readBlocks
(
    const label comm,
    autoPtr<ISstream>& isPtr,
    List<char>& data,
    const UPstream::commsTypes commsType
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlocks:"
            << " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
            << " commsType:" << Pstream::commsTypeNames[commsType]
            << " comm:" << comm << endl;
    }

    bool ok = false;

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                ++proci
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                OPstream os
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    0,
                    UPstream::msgType(),
                    comm
                );
                os << elems;
            }

            ok = is.good();
        }
        else
        {
            IPstream is
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                0,
                UPstream::msgType(),
                comm
            );
            is >> data;
        }
    }
    else
    {
        PstreamBuffers pBufs
        (
            UPstream::commsTypes::nonBlocking,
            UPstream::msgType(),
            comm
        );

        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");
            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                ++proci
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                UOPstream os(proci, pBufs);
                os << elems;
            }
        }

        labelList recvSizes;
        pBufs.finishedSends(recvSizes);

        if (!UPstream::master(comm))
        {
            UIPstream is(UPstream::masterNo(), pBufs);
            is >> data;
        }
    }

    Pstream::scatter(ok, Pstream::msgType(), comm);

    return ok;
}


Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
(
    const label comm,
    const fileName& fName,
    autoPtr<ISstream>& isPtr,
    IOobject& headerIO,
    const UPstream::commsTypes commsType
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlocks:"
            << " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
            << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
    }

    bool ok = false;

    List<char> data;
    autoPtr<ISstream> realIsPtr;

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                string buf(data.begin(), data.size());
                realIsPtr.reset
                (
                    new IStringStream
                    (
                        buf,
                        IOstream::ASCII,
                        IOstream::currentVersion,
                        fName
                    )
                );

                // Read header
                if (!headerIO.readHeader(realIsPtr()))
                {
                    FatalIOErrorInFunction(realIsPtr())
                        << "problem while reading header for object "
                        << is.name() << exit(FatalIOError);
                }
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                ++proci
            )
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                OPstream os
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    0,
                    UPstream::msgType(),
                    comm
                );
                os << data;
            }

            ok = is.good();
        }
        else
        {
            IPstream is
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                0,
                UPstream::msgType(),
                comm
            );
            is >> data;

            string buf(data.begin(), data.size());
            realIsPtr.reset
            (
                new IStringStream
                (
                    buf,
                    IOstream::ASCII,
                    IOstream::currentVersion,
                    fName
                )
            );
        }
    }
    else
    {
        PstreamBuffers pBufs
        (
            UPstream::commsTypes::nonBlocking,
            UPstream::msgType(),
            comm
        );

        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                string buf(data.begin(), data.size());
                realIsPtr.reset
                (
                    new IStringStream
                    (
                        buf,
                        IOstream::ASCII,
                        IOstream::currentVersion,
                        fName
                    )
                );

                // Read header
                if (!headerIO.readHeader(realIsPtr()))
                {
                    FatalIOErrorInFunction(realIsPtr())
                        << "problem while reading header for object "
                        << is.name() << exit(FatalIOError);
                }
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                ++proci
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                UOPstream os(proci, pBufs);
                os << elems;
            }

            ok = is.good();
        }

        labelList recvSizes;
        pBufs.finishedSends(recvSizes);

        if (!UPstream::master(comm))
        {
            UIPstream is(UPstream::masterNo(), pBufs);
            is >> data;

            string buf(data.begin(), data.size());
            realIsPtr.reset
            (
                new IStringStream
                (
                    buf,
                    IOstream::ASCII,
                    IOstream::currentVersion,
                    fName
                )
            );
        }
    }

    Pstream::scatter(ok, Pstream::msgType(), comm);

    // version
    string versionString(realIsPtr().version().str());
    Pstream::scatter(versionString,  Pstream::msgType(), comm);
    realIsPtr().version(IOstream::versionNumber(versionString));

    // stream
    {
        OStringStream os;
        os << realIsPtr().format();
        string formatString(os.str());
        Pstream::scatter(formatString,  Pstream::msgType(), comm);
        realIsPtr().format(formatString);
    }

    word name(headerIO.name());
    Pstream::scatter(name, Pstream::msgType(), comm);
    headerIO.rename(name);
    Pstream::scatter(headerIO.headerClassName(), Pstream::msgType(), comm);
    Pstream::scatter(headerIO.note(), Pstream::msgType(), comm);
    //Pstream::scatter(headerIO.instance(), Pstream::msgType(), comm);
    //Pstream::scatter(headerIO.local(), Pstream::msgType(), comm);

    return realIsPtr;
}


void Foam::decomposedBlockData::gather
(
    const label comm,
    const label data,
    labelList& datas
)
{
    const label nProcs = UPstream::nProcs(comm);
    datas.setSize(nProcs);

    char* data0Ptr = reinterpret_cast<char*>(datas.begin());

    List<int> recvOffsets;
    List<int> recvSizes;
    if (UPstream::master(comm))
    {
        recvOffsets.setSize(nProcs);
        forAll(recvOffsets, proci)
        {
            // Note: truncating long int to int since UPstream::gather limited
            // to ints
            recvOffsets[proci] =
                int(reinterpret_cast<char*>(&datas[proci]) - data0Ptr);
        }
        recvSizes.setSize(nProcs, sizeof(label));
    }

    UPstream::gather
    (
        reinterpret_cast<const char*>(&data),
        sizeof(label),
        data0Ptr,
        recvSizes,
        recvOffsets,
        comm
    );
}


void Foam::decomposedBlockData::gatherSlaveData
(
    const label comm,
    const UList<char>& data,
    const labelUList& recvSizes,

    const label startProc,
    const label nProcs,

    List<int>& sliceOffsets,
    List<char>& recvData
)
{
    // Calculate master data
    List<int> sliceSizes;
    if (UPstream::master(comm))
    {
        const label numProcs = UPstream::nProcs(comm);

        sliceSizes.setSize(numProcs, 0);
        sliceOffsets.setSize(numProcs+1, 0);

        int totalSize = 0;
        label proci = startProc;
        for (label i = 0; i < nProcs; i++)
        {
            sliceSizes[proci] = int(recvSizes[proci]);
            sliceOffsets[proci] = totalSize;
            totalSize += sliceSizes[proci];
            ++proci;
        }
        sliceOffsets[proci] = totalSize;
        recvData.setSize(totalSize);
    }

    int nSend = 0;
    if
    (
       !UPstream::master(comm)
     && (UPstream::myProcNo(comm) >= startProc)
     && (UPstream::myProcNo(comm) < startProc+nProcs)
    )
    {
        // Note: UPstream::gather limited to int
        nSend = int(data.byteSize());
    }

    UPstream::gather
    (
        data.begin(),
        nSend,

        recvData.begin(),
        sliceSizes,
        sliceOffsets,
        comm
    );
}


Foam::label Foam::decomposedBlockData::calcNumProcs
(
    const label comm,
    const off_t maxBufferSize,
    const labelUList& recvSizes,
    const label startProci
)
{
    const label nProcs = UPstream::nProcs(comm);

    label nSendProcs = -1;
    if (UPstream::master(comm))
    {
        off_t totalSize = recvSizes[startProci];
        label proci = startProci+1;
        while (proci < nProcs && (totalSize+recvSizes[proci] < maxBufferSize))
        {
            totalSize += recvSizes[proci];
            proci++;
        }

        nSendProcs = proci-startProci;
    }

    // Scatter nSendProcs
    label n;
    UPstream::scatter
    (
        reinterpret_cast<const char*>(&nSendProcs),
        List<int>(nProcs, sizeof(nSendProcs)),
        List<int>(nProcs, 0),
        reinterpret_cast<char*>(&n),
        sizeof(n),
        comm
    );

    return n;
}


bool Foam::decomposedBlockData::writeBlocks
(
    const label comm,
    autoPtr<OSstream>& osPtr,
    List<std::streamoff>& start,
    const UList<char>& data,

    const labelUList& recvSizes,
    const PtrList<SubList<char>>& slaveData,

    const UPstream::commsTypes commsType,
    const bool syncReturnState
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::writeBlocks:"
            << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
            << " data:" << data.size()
            << " (master only) slaveData:" << slaveData.size()
            << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
    }

    const label nProcs = UPstream::nProcs(comm);

    bool ok = true;

    if (slaveData.size())
    {
        // Already have gathered the slave data. communicator only used to
        // check who is the master
        if (UPstream::master(comm))
        {
            OSstream& os = osPtr();

            start.setSize(nProcs);

            // Write master data
            {
                os << nl << "// Processor" << UPstream::masterNo() << nl;
                start[UPstream::masterNo()] = os.stdStream().tellp();
                os << data;
            }

            // Write slaves

            label slaveOffset = 0;

            for (label proci = 1; proci < nProcs; ++proci)
            {
                os << nl << nl << "// Processor" << proci << nl;
                start[proci] = os.stdStream().tellp();

                os << slaveData[proci];
                slaveOffset += recvSizes[proci];
            }

            ok = os.good();
        }
    }
    else if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            start.setSize(nProcs);

            OSstream& os = osPtr();

            // Write master data
            {
                os << nl << "// Processor" << UPstream::masterNo() << nl;
                start[UPstream::masterNo()] = os.stdStream().tellp();
                os << data;
            }
            // Write slaves
            List<char> elems;
            for (label proci = 1; proci < nProcs; ++proci)
            {
                elems.setSize(recvSizes[proci]);
                IPstream::read
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    elems.begin(),
                    elems.size(),
                    Pstream::msgType(),
                    comm
                );

                os << nl << nl << "// Processor" << proci << nl;
                start[proci] = os.stdStream().tellp();
                os << elems;
            }

            ok = os.good();
        }
        else
        {
            UOPstream::write
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                data.begin(),
                data.byteSize(),
                Pstream::msgType(),
                comm
            );
        }
    }
    else
    {
        // Write master data
        if (UPstream::master(comm))
        {
            start.setSize(nProcs);

            OSstream& os = osPtr();

            os << nl << "// Processor" << UPstream::masterNo() << nl;
            start[UPstream::masterNo()] = os.stdStream().tellp();
            os << data;
        }


        // Find out how many processor can be received into
        // maxMasterFileBufferSize

        // Starting slave processor and number of processors
        label startProc = 1;
        label nSendProcs = nProcs-1;

        while (nSendProcs > 0)
        {
            nSendProcs = calcNumProcs
            (
                comm,
                off_t
                (
                    fileOperations::masterUncollatedFileOperation::
                    maxMasterFileBufferSize
                ),
                recvSizes,
                startProc
            );

            if (startProc == nProcs || nSendProcs == 0)
            {
                break;
            }


            // Gather data from (a slice of) the slaves
            List<int> sliceOffsets;
            List<char> recvData;
            gatherSlaveData
            (
                comm,
                data,
                recvSizes,

                startProc,    // startProc,
                nSendProcs,    // nProcs,

                sliceOffsets,
                recvData
            );

            if (UPstream::master(comm))
            {
                OSstream& os = osPtr();
                // Write slaves
                for
                (
                    label proci = startProc;
                    proci < startProc+nSendProcs;
                    proci++
                )
                {
                    os << nl << nl << "// Processor" << proci << nl;
                    start[proci] = os.stdStream().tellp();

                    os <<
                        SubList<char>
                        (
                            recvData,
                            sliceOffsets[proci+1]-sliceOffsets[proci],
                            sliceOffsets[proci]
                        );
                }
            }

            startProc += nSendProcs;
        }

        if (UPstream::master(comm))
        {
            ok = osPtr().good();
        }
    }

    if (syncReturnState)
    {
        //- Enable to get synchronised error checking. Is the one that keeps
        //  slaves as slow as the master (which does all the writing)
        Pstream::scatter(ok, Pstream::msgType(), comm);
    }

    return ok;
}


bool Foam::decomposedBlockData::read()
{
    autoPtr<ISstream> isPtr;
    fileName objPath(fileHandler().filePath(false, *this, word::null));
    if (UPstream::master(comm_))
    {
        isPtr.reset(new IFstream(objPath));
        IOobject::readHeader(isPtr());
    }

    List<char>& data = *this;
    return readBlocks(comm_, isPtr, data, commsType_);
}


bool Foam::decomposedBlockData::writeData(Ostream& os) const
{
    const List<char>& data = *this;

    string str
    (
        reinterpret_cast<const char*>(data.cbegin()),
        data.byteSize()
    );

    IOobject io(*this);
    if (Pstream::master(comm_))
    {
        IStringStream is
        (
            str,
            IOstream::ASCII,
            IOstream::currentVersion,
            name()
        );
        io.readHeader(is);
    }

    // Scatter header information

    // version
    string versionString(os.version().str());
    Pstream::scatter(versionString, Pstream::msgType(), comm_);

    // stream
    string formatString;
    {
        OStringStream os;
        os << os.format();
        formatString  = os.str();
        Pstream::scatter(formatString, Pstream::msgType(), comm_);
    }

    //word masterName(name());
    //Pstream::scatter(masterName, Pstream::msgType(), comm_);

    Pstream::scatter(io.headerClassName(), Pstream::msgType(), comm_);
    Pstream::scatter(io.note(), Pstream::msgType(), comm_);
    //Pstream::scatter(io.instance(), Pstream::msgType(), comm);
    //Pstream::scatter(io.local(), Pstream::msgType(), comm);

    fileName masterLocation(instance()/db().dbDir()/local());
    Pstream::scatter(masterLocation, Pstream::msgType(), comm_);

    if (!Pstream::master(comm_))
    {
        writeHeader
        (
            os,
            IOstream::versionNumber(versionString),
            IOstream::formatEnum(formatString),
            io.headerClassName(),
            io.note(),
            masterLocation,
            name()
        );
    }

    os.writeQuoted(str, false);

    if (!Pstream::master(comm_))
    {
        IOobject::writeEndDivider(os);
    }

    return os.good();
}


bool Foam::decomposedBlockData::writeObject
(
    IOstream::streamFormat fmt,
    IOstream::versionNumber ver,
    IOstream::compressionType cmp,
    const bool valid
) const
{
    autoPtr<OSstream> osPtr;
    if (UPstream::master(comm_))
    {
        // Note: always write binary. These are strings so readable
        //       anyway. They have already be tokenised on the sending side.
        osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
        IOobject::writeHeader(osPtr());
    }

    labelList recvSizes;
    gather(comm_, label(this->byteSize()), recvSizes);

    List<std::streamoff> start;
    PtrList<SubList<char>> slaveData;  // dummy slave data
    return writeBlocks
    (
        comm_,
        osPtr,
        start,
        *this,
        recvSizes,
        slaveData,
        commsType_
    );
}


Foam::label Foam::decomposedBlockData::numBlocks(const fileName& fName)
{
    label nBlocks = 0;

    IFstream is(fName);
    is.fatalCheck("decomposedBlockData::numBlocks(const fileName&)");

    if (!is.good())
    {
        return nBlocks;
    }

    // FoamFile header
    token firstToken(is);

    if
    (
        is.good()
     && firstToken.isWord()
     && firstToken.wordToken() == "FoamFile"
    )
    {
        dictionary headerDict(is);
        is.version(headerDict.lookup("version"));
        is.format(headerDict.lookup("format"));

        // Obtain number of blocks directly
        if (headerDict.readIfPresent("blocks", nBlocks))
        {
            return nBlocks;
        }
    }

    // Fallback to brute force read of each data block
    List<char> data;
    while (is.good())
    {
        token sizeToken(is);
        if (!sizeToken.isLabel())
        {
            return nBlocks;
        }
        is.putBack(sizeToken);

        is >> data;
        nBlocks++;
    }

    return nBlocks;
}


// ************************************************************************* //