From 11964a47310fe5e9b9a27c1f8de4bfc495b0e2ee Mon Sep 17 00:00:00 2001 From: Mark Olesen <Mark.Olesen@esi-group.com> Date: Wed, 6 Sep 2023 14:45:16 +0200 Subject: [PATCH] ENH: reduce overhead and clearer data ownership for OFstreamCollator - local data to be written is now transferable into the OFstreamCollator. This avoids making a full copy when threading is active. - use plain lists for managing proc data * storage: List<List<char>> instead of PtrList<List<char>> * views: List<stdFoam::span<char>> instead of PtrList<SubList<char>> - use gather/write (unthreaded) as backstop if the output is too big to fit in the buffer size. Emit warning instead of FatalError --- .../collatedFileOperation/OFstreamCollator.C | 364 ++++++++++-------- .../collatedFileOperation/OFstreamCollator.H | 101 +++-- .../threadedCollatedOFstream.C | 17 +- .../threadedCollatedOFstream.H | 15 +- 4 files changed, 292 insertions(+), 205 deletions(-) diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index 6ee4bb36a87..d11b3b87f1c 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -47,9 +47,9 @@ bool Foam::OFstreamCollator::writeFile const label comm, const word& objectType, const fileName& fName, - const string& masterData, + const UList<char>& localData, const labelUList& recvSizes, - const UPtrList<SubList<char>>& slaveData, // optional slave data + const UList<stdFoam::span<char>>& procData, // optional proc data IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -58,18 +58,14 @@ bool Foam::OFstreamCollator::writeFile { if (debug) { - Pout<< "OFstreamCollator : Writing master " << label(masterData.size()) + Pout<< "OFstreamCollator : Writing local " << localData.size() << " bytes to " << fName << " using comm " << comm - << " and " << slaveData.size() << " sub-ranks" << endl; + << " and " << procData.size() << " sub-ranks" << endl; - forAll(slaveData, proci) + forAll(procData, proci) { - if (slaveData.set(proci)) - { - Pout<< " " << proci - << " size:" << slaveData[proci].size() - << endl; - } + Pout<< " " << proci << " size:" + << label(procData[proci].size()) << nl; } } @@ -104,32 +100,27 @@ bool Foam::OFstreamCollator::writeFile // for some mpi so default is non-blocking. const UPstream::commsTypes myCommsType ( + mag ( fileOperations::masterUncollatedFileOperation:: - maxMasterFileBufferSize == 0 - ) + maxMasterFileBufferSize + ) < 1 ? UPstream::commsTypes::scheduled : UPstream::commsTypes::nonBlocking ); - UList<char> slice - ( - const_cast<char*>(masterData.data()), - label(masterData.size()) - ); - - List<std::streamoff> blockOffset; + List<std::streamoff> blockOffsets; // Optional decomposedBlockData::writeBlocks ( comm, osPtr, - blockOffset, - slice, + blockOffsets, // or List<std::streamoff>::null() + localData, recvSizes, - slaveData, + procData, myCommsType, - false // do not reduce return state + false // do not sync return state ); if (osPtr && !osPtr->good()) @@ -140,17 +131,18 @@ bool Foam::OFstreamCollator::writeFile if (debug) { - Pout<< "OFstreamCollator : Finished writing " << masterData.size() - << " bytes"; + Pout<< "OFstreamCollator : Finished writing " + << localData.size() << " bytes"; + if (UPstream::master(comm)) { - off_t sum = 0; + off_t total = 0; for (const label recv : recvSizes) { - sum += recv; + total += recv; } // Use std::to_string to display long int - Pout<< " (overall " << std::to_string(sum) << ')'; + Pout<< " (overall " << std::to_string(total) << ')'; } Pout<< " to " << fName << " using comm " << comm << endl; @@ -167,13 +159,16 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) // Consume stack while (true) { - writeData* ptr = nullptr; + std::unique_ptr<writeData> ptr; { std::lock_guard<std::mutex> guard(handler.mutex_); + if (handler.objects_.size()) { - ptr = handler.objects_.pop(); + // FIFO + ptr.reset(handler.objects_.front()); + handler.objects_.pop_front(); } } @@ -181,51 +176,39 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { break; } - else - { - // Convert storage to pointers - PtrList<SubList<char>> slaveData; - if (ptr->slaveData_.size()) - { - slaveData.resize(ptr->slaveData_.size()); - forAll(slaveData, proci) - { - if (ptr->slaveData_.set(proci)) - { - slaveData.set - ( - proci, - new SubList<char> - ( - ptr->slaveData_[proci], - ptr->sizes_[proci] - ) - ); - } - } - } - bool ok = writeFile + writeData& obj = *ptr; + + // Obtain spans from storage + List<stdFoam::span<char>> procData(obj.procData_.size()); + forAll(procData, proci) + { + procData[proci] = stdFoam::span<char> ( - ptr->comm_, - ptr->objectType_, - ptr->pathName_, - ptr->data_, - ptr->sizes_, - slaveData, - ptr->streamOpt_, - ptr->atomic_, - ptr->append_, - ptr->headerEntries_ + const_cast<char*>(obj.procData_[proci].cdata()), + obj.procData_[proci].size() ); - if (!ok) - { - FatalIOErrorInFunction(ptr->pathName_) - << "Failed writing " << ptr->pathName_ - << exit(FatalIOError); - } + } - delete ptr; + bool ok = writeFile + ( + obj.comm_, + obj.objectType_, + obj.pathName_, + obj.localData_, + obj.sizes_, + procData, + obj.streamOpt_, + obj.atomic_, + obj.append_, + obj.headerEntries_ + ); + + if (!ok) + { + FatalIOErrorInFunction(obj.pathName_) + << "Failed writing " << obj.pathName_ + << exit(FatalIOError); } //sleep(1); } @@ -248,14 +231,14 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const { while (true) { - // Count files to be written + // The pending output size(s) off_t totalSize = 0; { std::lock_guard<std::mutex> guard(mutex_); - forAllConstIters(objects_, iter) + for (const writeData* ptr : objects_) { - totalSize += iter()->size(); + if (ptr) totalSize += ptr->size(); } } @@ -287,17 +270,7 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize) : - maxBufferSize_(maxBufferSize), - threadRunning_(false), - localComm_(UPstream::worldComm), - threadComm_ - ( - UPstream::allocateCommunicator - ( - localComm_, - labelRange(UPstream::nProcs(localComm_)) - ) - ) + OFstreamCollator(maxBufferSize, UPstream::worldComm) {} @@ -312,6 +285,7 @@ Foam::OFstreamCollator::OFstreamCollator localComm_(comm), threadComm_ ( + // dupComm UPstream::allocateCommunicator ( localComm_, @@ -345,7 +319,7 @@ bool Foam::OFstreamCollator::write ( const word& objectType, const fileName& fName, - const string& data, + DynamicList<char>&& localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -355,78 +329,109 @@ bool Foam::OFstreamCollator::write { // Determine (on master) sizes to receive. Note: do NOT use thread // communicator - labelList recvSizes; - decomposedBlockData::gather(localComm_, label(data.size()), recvSizes); + const labelList recvSizes + ( + UPstream::listGatherValues<label>(localData.size(), localComm_) + ); off_t totalSize = 0; label maxLocalSize = 0; + + if (UPstream::master(localComm_)) { - if (UPstream::master(localComm_)) + for (const label recvSize : recvSizes) { - for (const label recvSize : recvSizes) - { - totalSize += recvSize; - maxLocalSize = max(maxLocalSize, recvSize); - } + totalSize += recvSize; + maxLocalSize = max(maxLocalSize, recvSize); } - Pstream::broadcasts(localComm_, totalSize, maxLocalSize); } + Pstream::broadcasts(localComm_, totalSize, maxLocalSize); + + + // Determine how things will be gathered and written... + + enum class dispatchModes { GATHER_WRITE, PREFETCH_THREADED, FULL_THREADED }; + + dispatchModes dispatch(dispatchModes::GATHER_WRITE); if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) + { + dispatch = dispatchModes::GATHER_WRITE; + } + else if (totalSize <= maxBufferSize_) + { + // Total size can be stored locally + // - gather all data now and only do the writing in the thread + + dispatch = dispatchModes::PREFETCH_THREADED; + } + else + { + // Gather data and write in the thread + + dispatch = dispatchModes::FULL_THREADED; + + if (!UPstream::haveThreads()) + { + WarningInFunction + << "MPI not initialized with thread support." << nl + << " maxThreadFileBufferSize = 0 to disable threading" << nl + << " or maxThreadFileBufferSize > " << totalSize + << " to collate before threaded writing." << nl << nl; + + dispatch = dispatchModes::GATHER_WRITE; + } + } + + + // ----------- + // Dispatching + // ----------- + + if (dispatch == dispatchModes::GATHER_WRITE) { if (debug) { - Pout<< "OFstreamCollator : non-thread gather and write of " << fName - << " using local comm " << localComm_ << endl; + Pout<< "OFstreamCollator : non-thread gather/write " + << "(local comm: " << localComm_ << ") of " + << fName << endl; } + // Direct collating and writing (so master blocks until all written!) - const PtrList<SubList<char>> dummySlaveData; return writeFile ( localComm_, objectType, fName, - data, + localData, recvSizes, - dummySlaveData, + UList<stdFoam::span<char>>(), // dummy proc data streamOpt, atomic, append, headerEntries ); } - else if (totalSize <= maxBufferSize_) + else if (dispatch == dispatchModes::PREFETCH_THREADED) { - // Total size can be stored locally so receive all data now and only - // do the writing in the thread - if (debug) { Pout<< "OFstreamCollator : non-thread gather; thread write of " << fName << endl; } - if (Pstream::master(localComm_)) + if (UPstream::master(localComm_)) { waitForBufferSpace(totalSize); } - - // Receive in chunks of labelMax (2^31-1) since this is the maximum - // size that a List can be - - autoPtr<writeData> fileAndDataPtr + std::unique_ptr<writeData> fileAndDataPtr ( new writeData ( threadComm_, // Note: comm not actually used anymore objectType, fName, - ( - Pstream::master(localComm_) - ? data // Only used on master - : string::null - ), recvSizes, streamOpt, atomic, @@ -434,63 +439,81 @@ bool Foam::OFstreamCollator::write headerEntries ) ); - writeData& fileAndData = fileAndDataPtr(); + auto& fileAndData = *fileAndDataPtr; - PtrList<List<char>>& slaveData = fileAndData.slaveData_; + List<List<char>>& procData = fileAndData.procData_; + if (UPstream::master(localComm_)) + { + // Move in local data (master only!) + fileAndData.transfer(localData); - UList<char> slice(const_cast<char*>(data.data()), label(data.size())); + // Storage for receive data + procData.resize(UPstream::nProcs(localComm_)); + + for (const int proci : UPstream::subProcs(localComm_)) + { + procData[proci].resize(recvSizes[proci]); + } + } + else if (UPstream::is_subrank(localComm_)) + { + // Requires a size for decomposedBlockData::writeBlocks() logic + procData.resize(UPstream::nProcs(localComm_)); + } - slaveData.setSize(recvSizes.size()); // Gather all data onto master. Is done in local communicator since - // not in write thread. Note that we do not store in contiguous - // buffer since that would limit to 2G chars. + // not in write thread. const label startOfRequests = UPstream::nRequests(); - if (Pstream::master(localComm_)) + if (UPstream::master(localComm_)) { - for (label proci = 1; proci < slaveData.size(); proci++) + for (const int proci : UPstream::subProcs(localComm_)) { - slaveData.set(proci, new List<char>(recvSizes[proci])); + List<char>& procSlice = procData[proci]; + if (procSlice.empty()) continue; + UIPstream::read ( UPstream::commsTypes::nonBlocking, proci, - slaveData[proci].data(), - slaveData[proci].size_bytes(), - Pstream::msgType(), + procSlice.data_bytes(), + procSlice.size_bytes(), + UPstream::msgType(), localComm_ ); } } - else + else if (UPstream::is_subrank(localComm_) && !localData.empty()) { if ( !UOPstream::write ( UPstream::commsTypes::nonBlocking, - 0, - slice.cdata(), - slice.size_bytes(), - Pstream::msgType(), + UPstream::masterNo(), + localData.cdata_bytes(), + localData.size_bytes(), + UPstream::msgType(), localComm_ ) ) { FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << 0 << " nBytes:" - << label(slice.size_bytes()) + << "Cannot send outgoing message (size: " + << localData.size() << ") to master" << nl << Foam::abort(FatalError); } } UPstream::waitRequests(startOfRequests); + // The localData has been moved (master) or communicated + localData.clearStorage(); + { std::lock_guard<std::mutex> guard(mutex_); - // Append to thread buffer - objects_.push(fileAndDataPtr.ptr()); + // Append to thread buffer (as FIFO), take ownership + objects_.push_back(fileAndDataPtr.release()); // Start thread if not running if (!threadRunning_) @@ -517,49 +540,46 @@ bool Foam::OFstreamCollator::write return true; } - else + else if (dispatch == dispatchModes::FULL_THREADED) { if (debug) { - Pout<< "OFstreamCollator : thread gather and write of " << fName - << " using communicator " << threadComm_ << endl; + Pout<< "OFstreamCollator : thread gather and write " + << "(thread comm: " << threadComm_ + << ") of " << fName << endl; } - if (!UPstream::haveThreads()) + if (UPstream::master(localComm_)) { - FatalErrorInFunction - << "mpi does not seem to have thread support." - << " Make sure to set buffer size 'maxThreadFileBufferSize'" - << " to at least " << totalSize - << " to be able to do the collating before threading." - << exit(FatalError); + waitForBufferSpace(localData.size()); } - if (Pstream::master(localComm_)) - { - waitForBufferSpace(data.size()); - } + std::unique_ptr<writeData> fileAndDataPtr + ( + new writeData + ( + threadComm_, + objectType, + fName, + recvSizes, + streamOpt, + atomic, + append, + headerEntries + ) + ); + + // Move in local data (all procs) + fileAndDataPtr->transfer(localData); { std::lock_guard<std::mutex> guard(mutex_); - // Push all file info on buffer. Note that no slave data provided + // Append to thread buffer (as FIFO), take ownership + objects_.push_back(fileAndDataPtr.release()); + + // Note: no proc data provided // so it will trigger communication inside the thread - objects_.push - ( - new writeData - ( - threadComm_, - objectType, - fName, - data, - recvSizes, - streamOpt, - atomic, - append, - headerEntries - ) - ); if (!threadRunning_) { @@ -584,6 +604,12 @@ bool Foam::OFstreamCollator::write return true; } + + FatalErrorInFunction + << "Unknown dispatch mode: " << int(dispatch) + << " - programming error?" << abort(FatalError); + + return false; } @@ -591,7 +617,7 @@ void Foam::OFstreamCollator::waitAll() { // Wait for all buffer space to be available i.e. wait for all jobs // to finish - if (Pstream::master(localComm_)) + if (UPstream::master(localComm_)) { if (debug) { diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H index 006090c69ab..c06a8f1d506 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2019-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -51,14 +51,14 @@ SourceFiles #ifndef Foam_OFstreamCollator_H #define Foam_OFstreamCollator_H -#include <thread> -#include <mutex> #include "IOstream.H" -#include "labelList.H" -#include "FIFOStack.H" -#include "SubList.H" +#include "List.H" +#include "CircularBuffer.H" // As FIFO #include "dictionary.H" +#include <mutex> +#include <thread> + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // namespace Foam @@ -72,26 +72,33 @@ class OFstreamCollator { // Private Class + //- Holds data to be written struct writeData { const label comm_; const word objectType_; const fileName pathName_; - const string data_; + DynamicList<char> localData_; const labelList sizes_; - PtrList<List<char>> slaveData_; + List<List<char>> procData_; const IOstreamOption streamOpt_; IOstreamOption::atomicType atomic_; IOstreamOption::appendType append_; const dictionary headerEntries_; + writeData() = delete; // No default construct + writeData(const writeData&) = delete; // No copy construct + writeData(writeData&&) = delete; // No move construct + void operator=(const writeData&) = delete; // No copy assign + void operator=(writeData&&) = delete; // No move assign + + //- Construct without local data writeData ( const label comm, const word& objectType, const fileName& pathName, - const string& data, - const labelList& sizes, + const labelUList& sizes, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -101,27 +108,30 @@ class OFstreamCollator comm_(comm), objectType_(objectType), pathName_(pathName), - data_(data), + localData_(), sizes_(sizes), - slaveData_(), + procData_(), streamOpt_(streamOpt), atomic_(atomic), append_(append), headerEntries_(headerEntries) {} - //- The (approximate) size of master + any optional slave data + //- Move reset local data + void transfer(DynamicList<char>& localData) + { + localData_.transfer(localData); + } + + //- The (approximate) size of local + any optional proc data off_t size() const { - off_t totalSize = data_.size(); - forAll(slaveData_, i) + off_t total = localData_.size(); + for (const auto& data : procData_) { - if (slaveData_.set(i)) - { - totalSize += slaveData_[i].size(); - } + total += data.size(); } - return totalSize; + return total; } }; @@ -135,8 +145,8 @@ class OFstreamCollator std::unique_ptr<std::thread> thread_; - //- Stack of files to write + contents - FIFOStack<writeData*> objects_; + //- FIFO of files to write and their contents + CircularBuffer<writeData*> objects_; //- Whether thread is running (and not exited) bool threadRunning_; @@ -156,9 +166,9 @@ class OFstreamCollator const label comm, const word& objectType, const fileName& fName, - const string& masterData, + const UList<char>& localData, const labelUList& recvSizes, - const UPtrList<SubList<char>>& slaveData, + const UList<stdFoam::span<char>>& procData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -181,7 +191,8 @@ public: // Constructors - //- Construct from buffer size. 0 = do not use thread + //- Construct from buffer size (0 = do not use thread) + //- and with worldComm explicit OFstreamCollator(const off_t maxBufferSize); //- Construct from buffer size (0 = do not use thread) @@ -195,14 +206,15 @@ public: // Member Functions - //- Write file with contents. - // Blocks until writethread has space available + //- Write file with contents, possibly taking ownership of the + //- content. + // Blocks until write-thread has space available // (total file sizes < maxBufferSize) bool write ( const word& objectType, - const fileName&, - const string& data, + const fileName& fName, + DynamicList<char>&& localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -210,6 +222,37 @@ public: const dictionary& headerEntries = dictionary::null ); + //- Write file with contents. + FOAM_DEPRECATED_FOR(2023-09, "use write with movable content") + bool write + ( + const word& objectType, + const fileName& fName, + const std::string& s, + IOstreamOption streamOpt, + IOstreamOption::atomicType atomic, + IOstreamOption::appendType append, + const bool useThread = true, + const dictionary& headerEntries = dictionary::null + ) + { + DynamicList<char> charData; + charData.setCapacity(s.size()); + std::copy(s.begin(), s.end(), charData.begin()); + + return write + ( + objectType, + fName, + std::move(charData), + streamOpt, + atomic, + append, + useThread, + headerEntries + ); + } + //- Wait for all thread actions to have finished void waitAll(); }; diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C index 57af56c2682..ab64489f369 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2020-2022 OpenCFD Ltd. + Copyright (C) 2020-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -41,7 +41,7 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream const bool useThread ) : - OStringStream(streamOpt), + OCharStream(streamOpt), writer_(writer), pathName_(pathName), atomic_(atomic), @@ -74,11 +74,22 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream Foam::threadedCollatedOFstream::~threadedCollatedOFstream() { + commit(); +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +void Foam::threadedCollatedOFstream::commit() +{ + // Take ownership of serialized content, without copying or reallocation + DynamicList<char> charData(OCharStream::release()); + writer_.write ( decomposedBlockData::typeName, pathName_, - str(), + std::move(charData), IOstreamOption(IOstreamOption::BINARY, version(), compression_), atomic_, IOstreamOption::NON_APPEND, diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H index ad91139b832..6a47e48b85b 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -39,7 +39,7 @@ SourceFiles #define Foam_threadedCollatedOFstream_H #include "dictionary.H" -#include "StringStream.H" +#include "SpanStream.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -55,7 +55,7 @@ class OFstreamCollator; class threadedCollatedOFstream : - public OStringStream + public OCharStream { // Private Data @@ -78,6 +78,11 @@ class threadedCollatedOFstream dictionary headerEntries_; + // Private Member Functions + + //- Commit buffered information + void commit(); + public: // Constructors @@ -102,12 +107,14 @@ public: ); - //- Destructor + //- Destructor - commits buffered information to file ~threadedCollatedOFstream(); // Member Functions + // -> using OCharStream::rewind + //- Define the header entries for the data block(s) void setHeaderEntries(const dictionary& dict); }; -- GitLab