Skip to content
Snippets Groups Projects
collatedFileOperation.C 15.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*---------------------------------------------------------------------------*\
      =========                 |
      \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
       \\    /   O peration     |
    
    OpenFOAM bot's avatar
    OpenFOAM bot committed
        \\  /    A nd           | www.openfoam.com
    
    -------------------------------------------------------------------------------
    
    OpenFOAM bot's avatar
    OpenFOAM bot committed
        Copyright (C) 2017-2018 OpenFOAM Foundation
    
        Copyright (C) 2020-2023 OpenCFD Ltd.
    
    -------------------------------------------------------------------------------
    License
        This file is part of OpenFOAM.
    
        OpenFOAM is free software: you can redistribute it and/or modify it
        under the terms of the GNU General Public License as published by
        the Free Software Foundation, either version 3 of the License, or
        (at your option) any later version.
    
        OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
        ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
        FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
        for more details.
    
        You should have received a copy of the GNU General Public License
        along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.
    
    \*---------------------------------------------------------------------------*/
    
    #include "collatedFileOperation.H"
    #include "addToRunTimeSelectionTable.H"
    #include "Pstream.H"
    #include "Time.H"
    #include "threadedCollatedOFstream.H"
    #include "decomposedBlockData.H"
    #include "registerSwitch.H"
    #include "masterOFstream.H"
    #include "OFstream.H"
    
    #include "foamVersion.H"
    
    
    /* * * * * * * * * * * * * * * Static Member Data  * * * * * * * * * * * * * */
    
    namespace Foam
    {
    namespace fileOperations
    {
        defineTypeNameAndDebug(collatedFileOperation, 0);
        addToRunTimeSelectionTable
        (
            fileOperation,
            collatedFileOperation,
            word
        );
    
        addToRunTimeSelectionTable
        (
            fileOperation,
            collatedFileOperation,
            comm
        );
    
    
        float collatedFileOperation::maxThreadFileBufferSize
        (
            debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
        );
        registerOptSwitch
        (
            "maxThreadFileBufferSize",
            float,
            collatedFileOperation::maxThreadFileBufferSize
        );
    
        // Threaded MPI: depending on buffering
    
        addNamedToRunTimeSelectionTable
        (
            fileOperationInitialise,
    
            fileOperationInitialise_collated,
    
    // * * * * * * * * * * * * Protected Member Functions  * * * * * * * * * * * //
    
    void Foam::fileOperations::collatedFileOperation::printBanner
    (
    
        const bool withRanks
    
    ) const
    {
        DetailInfo
            << "I/O    : " << this->type();
    
        if (maxThreadFileBufferSize == 0)
        {
            DetailInfo
                << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl
                << "         Writing may be slow for large file sizes."
                << endl;
        }
        else
        {
            DetailInfo
                << " [threaded] (maxThreadFileBufferSize = "
                << maxThreadFileBufferSize << ")." << nl
                << "         Requires buffer large enough to collect all data"
                   " or thread support" << nl
                << "         enabled in MPI. If MPI thread support cannot be"
                   " enabled, deactivate" << nl
                << "         threading by setting maxThreadFileBufferSize"
                   " to 0 in" << nl
                << "         OpenFOAM etc/controlDict" << endl;
        }
    
    
            fileOperation::printRanks();
    
        //- fileModificationChecking already set by base class (masterUncollated)
    
        // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
        // {
        //     WarningInFunction
        //         << "Resetting fileModificationChecking to timeStamp" << endl;
        // }
        // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
        // {
        //     WarningInFunction
        //         << "Resetting fileModificationChecking to inotify" << endl;
        // }
    
    bool Foam::fileOperations::collatedFileOperation::appendObject
    (
        const regIOobject& io,
        const fileName& pathName,
    
        IOstreamOption streamOpt
    
        const label proci = detectProcessorPath(io.objectPath());
    
            Pout<< "collatedFileOperation::writeObject :"
                << " For local object : " << io.name()
    
                << " appending processor " << proci
                << " data to " << pathName << endl;
        }
        if (proci == -1)
        {
            FatalErrorInFunction
    
                << "Invalid processor path: " << pathName
    
        const bool isIOmaster = fileOperation::isIOrank(proci);
    
        // Update meta-data for current state
    
        {
            const_cast<regIOobject&>(io).updateMetaData();
        }
    
    
        // Note: cannot do append + compression. This is a limitation
        // of ogzstream (or rather most compressed formats)
    
        //
        // File should always be created as non-atomic
        // (consistency between append/non-append)
    
            // UNCOMPRESSED (binary only)
    
            IOstreamOption(IOstreamOption::BINARY, streamOpt.version()),
    
            (isIOmaster ? IOstreamOption::NON_APPEND : IOstreamOption::APPEND)
    
        );
    
        if (!os.good())
        {
            FatalIOErrorInFunction(os)
                << "Cannot open for appending"
                << exit(FatalIOError);
        }
    
    
            decomposedBlockData::writeHeader(os, streamOpt, io);
    
        std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
    
            isIOmaster  // With FoamFile header on master
    
        return (blockOffset >= 0) && os.good();
    
    // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
    
    namespace Foam
    {
    
    // Construction helper: self/world/local communicator and IO ranks
    static Tuple2<label, labelList> getCommPattern()
    {
        // Default is COMM_WORLD (single master)
        Tuple2<label, labelList> commAndIORanks
        (
            UPstream::worldComm,
            fileOperation::getGlobalIORanks()
        );
    
        if (UPstream::parRun() && commAndIORanks.second().size() > 1)
        {
            // Multiple masters: ranks for my IO range
            commAndIORanks.first() = UPstream::allocateCommunicator
            (
                UPstream::worldComm,
                fileOperation::subRanks(commAndIORanks.second())
            );
        }
    
        return commAndIORanks;
    }
    
    } // End namespace Foam
    
    
    
    // * * * * * * * * * * * * * * * * Constructors  * * * * * * * * * * * * * * //
    
    
    mattijs's avatar
    mattijs committed
    void Foam::fileOperations::collatedFileOperation::init(bool verbose)
    {
        verbose = (verbose && Foam::infoDetailLevel > 0);
    
        if (verbose)
        {
            this->printBanner(ioRanks_.size());
        }
    }
    
    
    
    Foam::fileOperations::collatedFileOperation::collatedFileOperation
    (
    
        masterUncollatedFileOperation
        (
    
            getCommPattern(),
            false,  // distributedRoots
            false   // verbose
    
        managedComm_(getManagedComm(comm_)),  // Possibly locally allocated
    
        writer_(mag(maxThreadFileBufferSize), comm_)
    
    mattijs's avatar
    mattijs committed
        init(verbose);
    
    Foam::fileOperations::collatedFileOperation::collatedFileOperation
    (
    
        const Tuple2<label, labelList>& commAndIORanks,
        const bool distributedRoots,
    
        masterUncollatedFileOperation
        (
            commAndIORanks,
            distributedRoots,
            false   // verbose
        ),
    
        managedComm_(-1),  // Externally managed
    
        writer_(mag(maxThreadFileBufferSize), comm_)
    
    mattijs's avatar
    mattijs committed
        init(verbose);
    
    void Foam::fileOperations::collatedFileOperation::storeComm() const
    {
        // From externally -> locally managed
        managedComm_ = getManagedComm(comm_);
    }
    
    
    
    // * * * * * * * * * * * * * * * * Destructor  * * * * * * * * * * * * * * * //
    
    Foam::fileOperations::collatedFileOperation::~collatedFileOperation()
    {
    
        // Wait for any outstanding file operations
        flush();
    
    
        UPstream::freeCommunicator(managedComm_);
    
    // * * * * * * * * * * * * * * * Member Functions  * * * * * * * * * * * * * //
    
    Foam::fileName Foam::fileOperations::collatedFileOperation::objectPath
    (
        const IOobject& io,
        const word& typeName
    ) const
    {
        // Replacement for objectPath
        if (io.time().processorCase())
        {
    
            return masterUncollatedFileOperation::localObjectPath
    
                fileOperation::PROCOBJECT,
                "dummy",        // not used for processorsobject
    
            return masterUncollatedFileOperation::localObjectPath
    
                io.instance()
            );
        }
    }
    
    
    bool Foam::fileOperations::collatedFileOperation::writeObject
    (
        const regIOobject& io,
    
        IOstreamOption streamOpt,
    
    ) const
    {
        const Time& tm = io.time();
        const fileName& inst = io.instance();
    
    
        // Update meta-data for current state
        const_cast<regIOobject&>(io).updateMetaData();
    
    
        if (inst.isAbsolute() || !tm.processorCase())
        {
    
            // Note: delay mkdir to masterOFstream so it does not get created
            //       if not needed (e.g. when running distributed)
    
            const fileName pathName(io.objectPath());
    
                Pout<< "collatedFileOperation::writeObject :"
                    << " For object : " << io.name()
    
                    << " falling back to master-only output to " << io.path()
                    << endl;
            }
    
    
            // Note: currently still NON_ATOMIC (Dec-2022)
    
                IOstreamOption::NON_APPEND,
    
            // If any of these fail, return
            // (leave error handling to Ostream class)
    
            const bool ok =
            (
                os.good()
             && io.writeHeader(os)
             && io.writeData(os)
            );
    
            if (ok)
    
        }
        else
        {
            // Construct the equivalent processors/ directory
    
            const fileName path(processorsPath(io, inst, processorsDir(io)));
    
            // Note: delay mkdir to masterOFstream so it does not get created
            //       if not needed (e.g. when running distributed)
    
            const fileName pathName(path/io.name());
    
            if (io.global() || io.globalObject())
    
                    Pout<< "collatedFileOperation::writeObject :"
                        << " For global object : " << io.name()
    
                        << " falling back to master-only output to " << pathName
                        << endl;
                }
    
    
                // Note: currently still NON_ATOMIC (Dec-2022)
    
                    IOstreamOption::NON_APPEND,
    
                // If any of these fail, return
                // (leave error handling to Ostream class)
    
                const bool ok =
                (
                    os.good()
                 && io.writeHeader(os)
                 && io.writeData(os)
                );
    
                if (ok)
    
            else if (!UPstream::parRun())
    
            {
                // Special path for e.g. decomposePar. Append to
    
                // processorsDDD/ file
    
                    Pout<< "collatedFileOperation::writeObject :"
                        << " For object : " << io.name()
    
                        << " appending to " << pathName << endl;
                }
    
    
                return appendObject(io, pathName, streamOpt);
    
                // Re-check static maxThreadFileBufferSize variable to see
                // if needs to use threading
    
                const bool useThread = (maxThreadFileBufferSize != 0);
    
                    Pout<< "collatedFileOperation::writeObject :"
                        << " For object : " << io.name()
    
                        << " starting collating output to " << pathName
                        << " useThread:" << useThread << endl;
    
                // Note: currently still NON_ATOMIC (Dec-2022)
    
                threadedCollatedOFstream os
                (
                    writer_,
                    pathName,
    
                if (UPstream::master(comm_))
    
                    // Suppress comment banner
                    const bool old = IOobject::bannerEnabled(false);
    
                    ok = ok && io.writeHeader(os);
    
                    IOobject::bannerEnabled(old);
    
    
                    // Additional header content
                    dictionary dict;
                    decomposedBlockData::writeExtraHeaderContent
                    (
                        dict,
                        streamOpt,
                        io
                    );
                    os.setHeaderEntries(dict);
    
                ok = ok && io.writeData(os);
                // No end divider for collated output
    
                return ok;
    
    void Foam::fileOperations::collatedFileOperation::flush() const
    {
        if (debug)
        {
            Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
                << endl;
        }
        masterUncollatedFileOperation::flush();
        // Wait for thread to finish (note: also removes thread)
        writer_.waitAll();
    }
    
    
    Foam::word Foam::fileOperations::collatedFileOperation::processorsDir
    (
        const fileName& fName
    ) const
    {
    
        {
            const List<int>& procs(UPstream::procID(comm_));
    
    
            word procDir(processorsBaseDir+Foam::name(nProcs_));
    
            if (procs.size() != nProcs_)
    
                  + "-"
                  + Foam::name(procs.last());
            }
            return procDir;
        }
        else
        {
            word procDir(processorsBaseDir+Foam::name(nProcs_));
    
            if (ioRanks_.size())
            {
                // Detect current processor number
                label proci = detectProcessorPath(fName);
    
                if (proci != -1)
                {
                    // Find lowest io rank
                    label minProc = 0;
                    label maxProc = nProcs_-1;
    
                    for (const label ranki : ioRanks_)
    
    
                    // Add range if not all processors
                    if (maxProc-minProc+1 != nProcs_)
                    {
                        procDir +=
                          + "_"
                          + Foam::name(minProc)
                          + "-"
                          + Foam::name(maxProc);
                    }
    
                }
            }
    
            return procDir;
        }
    }
    
    
    Foam::word Foam::fileOperations::collatedFileOperation::processorsDir
    (
        const IOobject& io
    ) const
    {
        return processorsDir(io.objectPath());
    }
    
    
    
    // ************************************************************************* //