diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index 5aa41da120035e07c6a4a852f5dd0e11201209e6..88101b4a890dd2ff1ebbb305ec0474befe898ac3 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -47,9 +47,9 @@ bool Foam::OFstreamCollator::writeFile const label comm, const word& objectType, const fileName& fName, - const string& masterData, + const UList<char>& localData, const labelUList& recvSizes, - const UPtrList<SubList<char>>& slaveData, // optional slave data + const UList<stdFoam::span<char>>& procData, // optional proc data IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -58,18 +58,14 @@ bool Foam::OFstreamCollator::writeFile { if (debug) { - Pout<< "OFstreamCollator : Writing master " << label(masterData.size()) + Pout<< "OFstreamCollator : Writing local " << localData.size() << " bytes to " << fName << " using comm " << comm - << " and " << slaveData.size() << " sub-ranks" << endl; + << " and " << procData.size() << " sub-ranks" << endl; - forAll(slaveData, proci) + forAll(procData, proci) { - if (slaveData.set(proci)) - { - Pout<< " " << proci - << " size:" << slaveData[proci].size() - << endl; - } + Pout<< " " << proci << " size:" + << label(procData[proci].size()) << nl; } } @@ -91,7 +87,7 @@ bool Foam::OFstreamCollator::writeFile streamOpt, // streamOpt for container objectType, "", // note - "", // location (leave empty instead inaccurate) + "", // location (leave empty, otherwise inaccurate) fName.name(), // object name headerEntries ); @@ -104,32 +100,27 @@ bool Foam::OFstreamCollator::writeFile // for some mpi so default is non-blocking. const UPstream::commsTypes myCommsType ( + mag ( fileOperations::masterUncollatedFileOperation:: - maxMasterFileBufferSize == 0 - ) + maxMasterFileBufferSize + ) < 1 ? UPstream::commsTypes::scheduled : UPstream::commsTypes::nonBlocking ); - UList<char> slice - ( - const_cast<char*>(masterData.data()), - label(masterData.size()) - ); - - List<std::streamoff> blockOffset; + List<std::streamoff> blockOffsets; // Optional decomposedBlockData::writeBlocks ( comm, osPtr, - blockOffset, - slice, + blockOffsets, // or List<std::streamoff>::null() + localData, recvSizes, - slaveData, + procData, myCommsType, - false // do not reduce return state + false // do not sync return state ); if (osPtr && !osPtr->good()) @@ -140,17 +131,18 @@ bool Foam::OFstreamCollator::writeFile if (debug) { - Pout<< "OFstreamCollator : Finished writing " << masterData.size() - << " bytes"; + Pout<< "OFstreamCollator : Finished writing " + << localData.size() << " bytes"; + if (UPstream::master(comm)) { - off_t sum = 0; + off_t total = 0; for (const label recv : recvSizes) { - sum += recv; + total += recv; } // Use std::to_string to display long int - Pout<< " (overall " << std::to_string(sum) << ')'; + Pout<< " (overall " << std::to_string(total) << ')'; } Pout<< " to " << fName << " using comm " << comm << endl; @@ -167,13 +159,16 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) // Consume stack while (true) { - writeData* ptr = nullptr; + std::unique_ptr<writeData> ptr; { std::lock_guard<std::mutex> guard(handler.mutex_); + if (handler.objects_.size()) { - ptr = handler.objects_.pop(); + // FIFO + ptr.reset(handler.objects_.front()); + handler.objects_.pop_front(); } } @@ -181,51 +176,39 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { break; } - else - { - // Convert storage to pointers - PtrList<SubList<char>> slaveData; - if (ptr->slaveData_.size()) - { - slaveData.resize(ptr->slaveData_.size()); - forAll(slaveData, proci) - { - if (ptr->slaveData_.set(proci)) - { - slaveData.set - ( - proci, - new SubList<char> - ( - ptr->slaveData_[proci], - ptr->sizes_[proci] - ) - ); - } - } - } - bool ok = writeFile + writeData& obj = *ptr; + + // Obtain spans from storage + List<stdFoam::span<char>> procData(obj.procData_.size()); + forAll(procData, proci) + { + procData[proci] = stdFoam::span<char> ( - ptr->comm_, - ptr->objectType_, - ptr->pathName_, - ptr->data_, - ptr->sizes_, - slaveData, - ptr->streamOpt_, - ptr->atomic_, - ptr->append_, - ptr->headerEntries_ + const_cast<char*>(obj.procData_[proci].cdata()), + obj.procData_[proci].size() ); - if (!ok) - { - FatalIOErrorInFunction(ptr->pathName_) - << "Failed writing " << ptr->pathName_ - << exit(FatalIOError); - } + } + + bool ok = writeFile + ( + obj.comm_, + obj.objectType_, + obj.pathName_, + obj.localData_, + obj.sizes_, + procData, + obj.streamOpt_, + obj.atomic_, + obj.append_, + obj.headerEntries_ + ); - delete ptr; + if (!ok) + { + FatalIOErrorInFunction(obj.pathName_) + << "Failed writing " << obj.pathName_ + << exit(FatalIOError); } //sleep(1); } @@ -248,14 +231,14 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const { while (true) { - // Count files to be written + // The pending output size(s) off_t totalSize = 0; { std::lock_guard<std::mutex> guard(mutex_); - forAllConstIters(objects_, iter) + for (const writeData* ptr : objects_) { - totalSize += iter()->size(); + if (ptr) totalSize += ptr->size(); } } @@ -287,17 +270,7 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize) : - maxBufferSize_(maxBufferSize), - threadRunning_(false), - localComm_(UPstream::worldComm), - threadComm_ - ( - UPstream::allocateCommunicator - ( - localComm_, - labelRange(UPstream::nProcs(localComm_)) - ) - ) + OFstreamCollator(maxBufferSize, UPstream::worldComm) {} @@ -312,6 +285,7 @@ Foam::OFstreamCollator::OFstreamCollator localComm_(comm), threadComm_ ( + // dupComm UPstream::allocateCommunicator ( localComm_, @@ -345,7 +319,7 @@ bool Foam::OFstreamCollator::write ( const word& objectType, const fileName& fName, - const string& data, + DynamicList<char>&& localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -355,78 +329,112 @@ bool Foam::OFstreamCollator::write { // Determine (on master) sizes to receive. Note: do NOT use thread // communicator - labelList recvSizes; - decomposedBlockData::gather(localComm_, label(data.size()), recvSizes); + const labelList recvSizes + ( + UPstream::listGatherValues<label>(localData.size(), localComm_) + ); off_t totalSize = 0; label maxLocalSize = 0; + + if (UPstream::master(localComm_)) { - if (UPstream::master(localComm_)) + for (const label recvSize : recvSizes) { - for (const label recvSize : recvSizes) - { - totalSize += recvSize; - maxLocalSize = max(maxLocalSize, recvSize); - } + totalSize += recvSize; + maxLocalSize = max(maxLocalSize, recvSize); } - Pstream::broadcasts(localComm_, totalSize, maxLocalSize); } + Pstream::broadcasts(localComm_, totalSize, maxLocalSize); + + + // Determine how things will be gathered and written... + + enum class dispatchModes { DIRECT_WRITE, THREADED_WRITE, FULL_THREADED }; + + dispatchModes dispatch(dispatchModes::DIRECT_WRITE); if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) + { + dispatch = dispatchModes::DIRECT_WRITE; + } + else if (totalSize <= maxBufferSize_) + { + // Total size can be stored locally + // - gather all data now and only do the writing in the thread + + dispatch = dispatchModes::THREADED_WRITE; + } + else + { + // Gather data and write in the thread + + dispatch = dispatchModes::FULL_THREADED; + + if (!UPstream::haveThreads()) + { + WarningInFunction + << "MPI not initialized with thread support." << nl + << " maxThreadFileBufferSize = 0 to disable threading" << nl + << " or maxThreadFileBufferSize > " << totalSize + << " to collate before threaded writing." << nl << nl; + + dispatch = dispatchModes::DIRECT_WRITE; + } + } + + + // ----------- + // Dispatching + // ----------- + + if (dispatch == dispatchModes::DIRECT_WRITE) { if (debug) { - Pout<< "OFstreamCollator : non-thread gather and write of " << fName - << " using local comm " << localComm_ << endl; + Pout<< "OFstreamCollator : non-thread gather " + << "(local comm: " << localComm_ + << "); non-thread write of " + << fName << endl; } + // Direct collating and writing (so master blocks until all written!) - const PtrList<SubList<char>> dummySlaveData; return writeFile ( localComm_, objectType, fName, - data, + localData, recvSizes, - dummySlaveData, + UList<stdFoam::span<char>>::null(), // dummy proc data streamOpt, atomic, append, headerEntries ); } - else if (totalSize <= maxBufferSize_) + else if (dispatch == dispatchModes::THREADED_WRITE) { - // Total size can be stored locally so receive all data now and only - // do the writing in the thread - if (debug) { - Pout<< "OFstreamCollator : non-thread gather; thread write of " + Pout<< "OFstreamCollator : non-thread gather " + << "(local comm: " << localComm_ + << "); thread write of " << fName << endl; } - if (Pstream::master(localComm_)) + if (UPstream::master(localComm_)) { waitForBufferSpace(totalSize); } - - // Receive in chunks of labelMax (2^31-1) since this is the maximum - // size that a List can be - - autoPtr<writeData> fileAndDataPtr + std::unique_ptr<writeData> fileAndDataPtr ( new writeData ( - threadComm_, // Note: comm not actually used anymore + threadComm_, objectType, fName, - ( - Pstream::master(localComm_) - ? data // Only used on master - : string::null - ), recvSizes, streamOpt, atomic, @@ -434,63 +442,86 @@ bool Foam::OFstreamCollator::write headerEntries ) ); - writeData& fileAndData = fileAndDataPtr(); + auto& fileAndData = *fileAndDataPtr; - PtrList<List<char>>& slaveData = fileAndData.slaveData_; + List<List<char>>& procData = fileAndData.procData_; + if (UPstream::master(localComm_)) + { + // Move in local data (master only!) + fileAndData.transfer(localData); - UList<char> slice(const_cast<char*>(data.data()), label(data.size())); + // Storage for receive data + procData.resize(UPstream::nProcs(localComm_)); + + for (const int proci : UPstream::subProcs(localComm_)) + { + procData[proci].resize(recvSizes[proci]); + } + } + else if (UPstream::is_subrank(localComm_)) + { + // Requires a size for decomposedBlockData::writeBlocks() logic + procData.resize(UPstream::nProcs(localComm_)); + } - slaveData.setSize(recvSizes.size()); // Gather all data onto master. Is done in local communicator since - // not in write thread. Note that we do not store in contiguous - // buffer since that would limit to 2G chars. + // not in write thread. const label startOfRequests = UPstream::nRequests(); - if (Pstream::master(localComm_)) + + const int messageTag = UPstream::msgType(); + + if (UPstream::master(localComm_)) { - for (label proci = 1; proci < slaveData.size(); proci++) + for (const int proci : UPstream::subProcs(localComm_)) { - slaveData.set(proci, new List<char>(recvSizes[proci])); + List<char>& procSlice = procData[proci]; + if (procSlice.empty()) continue; + UIPstream::read ( UPstream::commsTypes::nonBlocking, proci, - slaveData[proci].data(), - slaveData[proci].size_bytes(), - Pstream::msgType(), + procSlice.data_bytes(), + procSlice.size_bytes(), + messageTag, localComm_ ); } } - else + else if (UPstream::is_subrank(localComm_) && !localData.empty()) { if ( !UOPstream::write ( UPstream::commsTypes::nonBlocking, - 0, - slice.cdata(), - slice.size_bytes(), - Pstream::msgType(), + UPstream::masterNo(), + localData.cdata_bytes(), + localData.size_bytes(), + messageTag, localComm_ ) ) { FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << 0 << " nBytes:" - << label(slice.size_bytes()) + << "Failure to send message (size: " + << localData.size() << ") to master" << nl << Foam::abort(FatalError); } } UPstream::waitRequests(startOfRequests); + // The localData has been moved (master) or communicated + localData.clearStorage(); + + + // Queue up for threading { std::lock_guard<std::mutex> guard(mutex_); - // Append to thread buffer - objects_.push(fileAndDataPtr.ptr()); + // Add to thread buffer (as FIFO), take ownership + objects_.push_back(fileAndDataPtr.release()); // Start thread if not running if (!threadRunning_) @@ -517,49 +548,48 @@ bool Foam::OFstreamCollator::write return true; } - else + else if (dispatch == dispatchModes::FULL_THREADED) { if (debug) { - Pout<< "OFstreamCollator : thread gather and write of " << fName - << " using communicator " << threadComm_ << endl; + Pout<< "OFstreamCollator : thread gather; thread write " + << "(thread comm: " << threadComm_ + << ") of " << fName << endl; } - if (!UPstream::haveThreads()) + if (UPstream::master(localComm_)) { - FatalErrorInFunction - << "mpi does not seem to have thread support." - << " Make sure to set buffer size 'maxThreadFileBufferSize'" - << " to at least " << totalSize - << " to be able to do the collating before threading." - << exit(FatalError); + waitForBufferSpace(localData.size()); } - if (Pstream::master(localComm_)) - { - waitForBufferSpace(data.size()); - } + std::unique_ptr<writeData> fileAndDataPtr + ( + new writeData + ( + threadComm_, + objectType, + fName, + recvSizes, + streamOpt, + atomic, + append, + headerEntries + ) + ); + + // Move in local data (all procs) + fileAndDataPtr->transfer(localData); + + // Queue up for threading { std::lock_guard<std::mutex> guard(mutex_); - // Push all file info on buffer. Note that no slave data provided - // so it will trigger communication inside the thread - objects_.push - ( - new writeData - ( - threadComm_, - objectType, - fName, - data, - recvSizes, - streamOpt, - atomic, - append, - headerEntries - ) - ); + // Add to thread buffer (as FIFO), take ownership + objects_.push_back(fileAndDataPtr.release()); + + // Note: no proc data provided + // so it will trigger communication inside the thread!!! if (!threadRunning_) { @@ -584,14 +614,21 @@ bool Foam::OFstreamCollator::write return true; } + + FatalErrorInFunction + << "Unknown dispatch mode: " << int(dispatch) + << " - programming error?" << abort(FatalError); + + return false; } void Foam::OFstreamCollator::waitAll() { - // Wait for all buffer space to be available i.e. wait for all jobs - // to finish - if (Pstream::master(localComm_)) + // Wait for all buffer space to be available + // - ie, wait for all jobs to finish + + if (UPstream::master(localComm_)) { if (debug) { diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H index 006090c69ab3f309c34e220145dbe645fadd2f90..c06a8f1d506e5307743b266ee71ed5b05915a0e3 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2019-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -51,14 +51,14 @@ SourceFiles #ifndef Foam_OFstreamCollator_H #define Foam_OFstreamCollator_H -#include <thread> -#include <mutex> #include "IOstream.H" -#include "labelList.H" -#include "FIFOStack.H" -#include "SubList.H" +#include "List.H" +#include "CircularBuffer.H" // As FIFO #include "dictionary.H" +#include <mutex> +#include <thread> + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // namespace Foam @@ -72,26 +72,33 @@ class OFstreamCollator { // Private Class + //- Holds data to be written struct writeData { const label comm_; const word objectType_; const fileName pathName_; - const string data_; + DynamicList<char> localData_; const labelList sizes_; - PtrList<List<char>> slaveData_; + List<List<char>> procData_; const IOstreamOption streamOpt_; IOstreamOption::atomicType atomic_; IOstreamOption::appendType append_; const dictionary headerEntries_; + writeData() = delete; // No default construct + writeData(const writeData&) = delete; // No copy construct + writeData(writeData&&) = delete; // No move construct + void operator=(const writeData&) = delete; // No copy assign + void operator=(writeData&&) = delete; // No move assign + + //- Construct without local data writeData ( const label comm, const word& objectType, const fileName& pathName, - const string& data, - const labelList& sizes, + const labelUList& sizes, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -101,27 +108,30 @@ class OFstreamCollator comm_(comm), objectType_(objectType), pathName_(pathName), - data_(data), + localData_(), sizes_(sizes), - slaveData_(), + procData_(), streamOpt_(streamOpt), atomic_(atomic), append_(append), headerEntries_(headerEntries) {} - //- The (approximate) size of master + any optional slave data + //- Move reset local data + void transfer(DynamicList<char>& localData) + { + localData_.transfer(localData); + } + + //- The (approximate) size of local + any optional proc data off_t size() const { - off_t totalSize = data_.size(); - forAll(slaveData_, i) + off_t total = localData_.size(); + for (const auto& data : procData_) { - if (slaveData_.set(i)) - { - totalSize += slaveData_[i].size(); - } + total += data.size(); } - return totalSize; + return total; } }; @@ -135,8 +145,8 @@ class OFstreamCollator std::unique_ptr<std::thread> thread_; - //- Stack of files to write + contents - FIFOStack<writeData*> objects_; + //- FIFO of files to write and their contents + CircularBuffer<writeData*> objects_; //- Whether thread is running (and not exited) bool threadRunning_; @@ -156,9 +166,9 @@ class OFstreamCollator const label comm, const word& objectType, const fileName& fName, - const string& masterData, + const UList<char>& localData, const labelUList& recvSizes, - const UPtrList<SubList<char>>& slaveData, + const UList<stdFoam::span<char>>& procData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -181,7 +191,8 @@ public: // Constructors - //- Construct from buffer size. 0 = do not use thread + //- Construct from buffer size (0 = do not use thread) + //- and with worldComm explicit OFstreamCollator(const off_t maxBufferSize); //- Construct from buffer size (0 = do not use thread) @@ -195,14 +206,15 @@ public: // Member Functions - //- Write file with contents. - // Blocks until writethread has space available + //- Write file with contents, possibly taking ownership of the + //- content. + // Blocks until write-thread has space available // (total file sizes < maxBufferSize) bool write ( const word& objectType, - const fileName&, - const string& data, + const fileName& fName, + DynamicList<char>&& localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, @@ -210,6 +222,37 @@ public: const dictionary& headerEntries = dictionary::null ); + //- Write file with contents. + FOAM_DEPRECATED_FOR(2023-09, "use write with movable content") + bool write + ( + const word& objectType, + const fileName& fName, + const std::string& s, + IOstreamOption streamOpt, + IOstreamOption::atomicType atomic, + IOstreamOption::appendType append, + const bool useThread = true, + const dictionary& headerEntries = dictionary::null + ) + { + DynamicList<char> charData; + charData.setCapacity(s.size()); + std::copy(s.begin(), s.end(), charData.begin()); + + return write + ( + objectType, + fName, + std::move(charData), + streamOpt, + atomic, + append, + useThread, + headerEntries + ); + } + //- Wait for all thread actions to have finished void waitAll(); }; diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C index d9b0ddc0bb8d5ecc760ba52b5b750abe6a5691d0..579d70541aaeb005313cb77c6ec2a6641f38f9de 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2020-2022 OpenCFD Ltd. + Copyright (C) 2020-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -41,7 +41,7 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream const bool useThread ) : - OStringStream(streamOpt), + OCharStream(streamOpt), writer_(writer), pathName_(pathName), atomic_(atomic), @@ -74,11 +74,22 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream Foam::threadedCollatedOFstream::~threadedCollatedOFstream() { + commit(); +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +void Foam::threadedCollatedOFstream::commit() +{ + // Take ownership of serialized content, without copying or reallocation + DynamicList<char> charData(OCharStream::release()); + writer_.write ( decomposedBlockData::typeName, pathName_, - str(), + std::move(charData), IOstreamOption(IOstreamOption::BINARY, version(), compression_), atomic_, IOstreamOption::NO_APPEND, diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H index ad91139b832791f9b5faa55fbcdc7298631115e4..6a47e48b85ba31575df7d534e1aec3236a13f4e0 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -39,7 +39,7 @@ SourceFiles #define Foam_threadedCollatedOFstream_H #include "dictionary.H" -#include "StringStream.H" +#include "SpanStream.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -55,7 +55,7 @@ class OFstreamCollator; class threadedCollatedOFstream : - public OStringStream + public OCharStream { // Private Data @@ -78,6 +78,11 @@ class threadedCollatedOFstream dictionary headerEntries_; + // Private Member Functions + + //- Commit buffered information + void commit(); + public: // Constructors @@ -102,12 +107,14 @@ public: ); - //- Destructor + //- Destructor - commits buffered information to file ~threadedCollatedOFstream(); // Member Functions + // -> using OCharStream::rewind + //- Define the header entries for the data block(s) void setHeaderEntries(const dictionary& dict); };