From d75c60d8aeee445d69db2c425d3a09fcd4837d3d Mon Sep 17 00:00:00 2001 From: Mark Olesen <Mark.Olesen@esi-group.com> Date: Tue, 16 Apr 2024 13:22:22 +0200 Subject: [PATCH] ENH: reduce communication argList at startup (#3142) - replace point-to-point transmission of the argList args/options with a broadcast. This is sufficient for most cases (without distributed roots). For "normal" cases (non-distributed roots) this will replace the nProcs-1 messages with a single broadcast. - for cases with distributed roots, distinguish between a single, identical root and different roots. An identical root can also be subsequently handled with a broadcast. Different roots will still require individual point-to-point communication. For cases with distributed roots, it will add the overhead of an additional broadcast. --- src/OpenFOAM/global/argList/argList.C | 294 ++++++++++++++++---------- 1 file changed, 181 insertions(+), 113 deletions(-) diff --git a/src/OpenFOAM/global/argList/argList.C b/src/OpenFOAM/global/argList/argList.C index 7f6966855a7..05934efede7 100644 --- a/src/OpenFOAM/global/argList/argList.C +++ b/src/OpenFOAM/global/argList/argList.C @@ -271,8 +271,8 @@ static bool printRootsSubscription if (index == -1) { - sortedProcs.append(host); - sortedRoots.append(i); + sortedProcs.push_back(host); + sortedRoots.push_back(i); } else if (roots[sortedRoots[index]] != root) { @@ -360,9 +360,9 @@ void Foam::argList::addArgument const string& usage ) { - validArgs.append(argName); + validArgs.push_back(argName); - // The first program argument starts at 1 - obtain index after the append + // The first program argument starts at 1 - obtain index after push_back() const label index = validArgs.size(); @@ -470,7 +470,7 @@ void Foam::argList::addNote(const string& note) { if (!note.empty()) { - notes.append(note); + notes.push_back(note); } } @@ -1096,8 +1096,8 @@ Foam::argList::argList if (strcmp(optName, "lib") == 0) { // The '-lib' option: - // Append name(s) to libs for later opening - libs().append(this->getList<fileName>(argi)); + // Add name(s) to libs for later opening + libs().push_back(this->getList<fileName>(argi)); } else if (strcmp(optName, "debug-switch") == 0) { @@ -1262,7 +1262,7 @@ void Foam::argList::parse const string timeString = clock::clockTime(); // Print the banner once only for parallel runs - if (Pstream::master() && bannerEnabled()) + if (UPstream::master() && bannerEnabled()) { IOobject::writeBanner(Info, true) << "Build : "; @@ -1353,16 +1353,16 @@ void Foam::argList::parse // Collect machine/pid, and check that the build is identical if (runControl_.parRun()) { - if (Pstream::master()) + if (UPstream::master()) { - hostMachine.resize(Pstream::nProcs()-1); - hostProcs.resize(Pstream::nProcs()-1); + hostMachine.resize(UPstream::nProcs()-1); + hostProcs.resize(UPstream::nProcs()-1); string procBuild; label procPid; int proci = 0; - for (const int subproci : Pstream::subProcs()) + for (const int subproci : UPstream::subProcs()) { - IPstream fromSubproc(Pstream::commsTypes::scheduled, subproci); + IPstream fromSubproc(UPstream::commsTypes::scheduled, subproci); fromSubproc >> procBuild >> hostMachine[proci] >> procPid; @@ -1384,8 +1384,8 @@ void Foam::argList::parse { OPstream toMaster ( - Pstream::commsTypes::scheduled, - Pstream::masterNo() + UPstream::commsTypes::scheduled, + UPstream::masterNo() ); toMaster << foamVersion::build << Foam::hostName() << Foam::pid(); } @@ -1395,14 +1395,34 @@ void Foam::argList::parse // Case is a single processor run unless it is running parallel int nProcs = 1; - // Roots if running distributed + // Roots if running distributed. Only sized on the master fileNameList roots; + enum distributedCodes + { + NON_DISTRIBUTED = 0, + DISTRIBUTED = 1, + DISTRIBUTED_SINGLE_ROOT = 2, + DISTRIBUTED_MULTIPLE_ROOTS = 3 + }; + + // Track which type of distributed roots etc are being used + label distributedType + ( + runControl_.distributed() + ? distributedCodes::DISTRIBUTED + : distributedCodes::NON_DISTRIBUTED + ); + + // Some cases where knowing the writeFormat can be useful... + // label writeFormat(-1); + + // If this actually is a parallel run if (runControl_.parRun()) { // For the master - if (Pstream::master()) + if (UPstream::master()) { // Establish rootPath_/globalCase_/case_ for master setCasePaths(); @@ -1414,7 +1434,7 @@ void Foam::argList::parse { bool adjustOpt = false; - if (isDir(source)) + if (Foam::isDir(source)) { source /= "decomposeParDict"; adjustOpt = true; @@ -1441,6 +1461,7 @@ void Foam::argList::parse { source = "-roots"; runControl_.distributed(true); + distributedType = distributedCodes::DISTRIBUTED; if (roots.empty()) { @@ -1457,6 +1478,7 @@ void Foam::argList::parse { source = "-hostRoots"; runControl_.distributed(true); + distributedType = distributedCodes::DISTRIBUTED; ITstream is(this->lookup("hostRoots")); @@ -1471,7 +1493,7 @@ void Foam::argList::parse } // Match machine names to roots - roots.resize(Pstream::nProcs()-1, fileName::null); + roots.resize(UPstream::nProcs()-1, fileName::null); for (const auto& hostRoot : hostRoots) { labelList matched @@ -1511,7 +1533,7 @@ void Foam::argList::parse dictNProcs = roots.size()+1; } } - else if (checkProcessorDirectories_ && Pstream::nProcs() > 1) + else if (checkProcessorDirectories_ && UPstream::nProcs() > 1) { // Check values from decomposeParDict @@ -1526,7 +1548,7 @@ void Foam::argList::parse // the masterUncollated/collated handler. Note that we // also have to protect the actual dictionary parsing since // it might trigger file access (e.g. #include, #codeStream) - const bool oldParRun = Pstream::parRun(false); + const bool oldParRun = UPstream::parRun(false); // Note: non-parallel running might update // fileOperation::nProcs() so store & restore below const label nOldProcs = fileHandler().nProcs(); @@ -1550,6 +1572,8 @@ void Foam::argList::parse { nDomainsReadOpt = IOobjectOption::MUST_READ; runControl_.distributed(true); + distributedType = distributedCodes::DISTRIBUTED; + decompDict.readEntry("roots", roots); if (roots.empty()) @@ -1587,10 +1611,10 @@ void Foam::argList::parse } } - Pstream::parRun(oldParRun); // Restore parallel state + UPstream::parRun(oldParRun); // Restore parallel state const_cast<fileOperation&>(fileHandler()).nProcs(nOldProcs); - if (Pstream::nProcs() == 1) + if (UPstream::nProcs() == 1) { Warning << "Running parallel on single processor. This only" @@ -1599,22 +1623,55 @@ void Foam::argList::parse } } - // Convenience: - // when a single root is specified, use it for all processes - if (roots.size() == 1) + + // Distributed roots + if (!roots.empty()) { - const fileName rootName(roots[0]); - roots.resize(Pstream::nProcs()-1, rootName); + for (fileName& dir : roots) + { + dir.expand(); + } - // Adjust dictNProcs for command-line '-roots' option - if (dictNProcs <= 0) + // Identical root specified everywhere? + // - use optimized single-root variant + if (roots.size() > 1 && roots.uniform()) { - dictNProcs = roots.size()+1; + roots.resize(1); + } + + if (roots.size() == 1) + { + // Single root specified, use it for all processes + distributedType = + distributedCodes::DISTRIBUTED_SINGLE_ROOT; + + // Adjust dictNProcs for command-line '-roots' option + if (dictNProcs <= 0) + { + dictNProcs = UPstream::nProcs(); + } + } + else if (roots.size() > 1) + { + distributedType = + distributedCodes::DISTRIBUTED_MULTIPLE_ROOTS; + + if (roots.size() != UPstream::nProcs()-1) + { + FatalError + << "Number of roots " << roots.size() + << " != number of sub-ranks " + << UPstream::nProcs()-1 + << exit(FatalError); + } } } + // // Check number of processors. + // + // nProcs => number of actual procs // dictNProcs => number of procs specified in decompositionDict // nProcDirs => number of processor directories @@ -1626,73 +1683,33 @@ void Foam::argList::parse if ( checkProcessorDirectories_ - && Pstream::nProcs() > 1 - && dictNProcs > Pstream::nProcs() + && UPstream::nProcs() > 1 ) { - FatalError - << this->relativePath(source) - << " specifies " << dictNProcs - << " processors but job was started with " - << Pstream::nProcs() << " processors." - << exit(FatalError); - } - - // Distributed data - if (roots.size()) - { - if (roots.size() != Pstream::nProcs()-1) + if (dictNProcs > UPstream::nProcs()) { FatalError - << "number of entries in roots " - << roots.size() - << " is not equal to the number of sub-processes " - << Pstream::nProcs()-1 + << this->relativePath(source) + << " specifies " << dictNProcs + << " processors but job was started with " + << UPstream::nProcs() << " ranks." << exit(FatalError); } - for (fileName& dir : roots) - { - dir.expand(); - } - - // Distribute the master's argument list (with new root) - const bool hadCaseOpt = options_.contains("case"); - for (const int subproci : Pstream::subProcs()) - { - options_.set("case", roots[subproci-1]/globalCase_); - - OPstream toProc(Pstream::commsTypes::scheduled, subproci); - - toProc - << args_ << options_ - << runControl_.distributed() - << label(runControl_.dryRun()) - << label(runControl_.verbose()); - } - options_.erase("case"); - - // Restore [-case dir] - if (hadCaseOpt) - { - options_.set("case", rootPath_/globalCase_); - } - } - else - { // Possibly going to fewer processors. // Check if all procDirs are there. + // NOTE: Only works when not using distributed roots! if ( - checkProcessorDirectories_ - && Pstream::nProcs() > 1 + // Can only rely on directory scanning *without* distributed roots! + roots.empty() && dictNProcs >= 1 - && dictNProcs < Pstream::nProcs() + && dictNProcs < UPstream::nProcs() ) { label nProcDirs = 0; { - const bool oldParRun(UPstream::parRun(false)); + const bool oldParRun = UPstream::parRun(false); // Don't cache processor directories (probably not // needed since master-only const int oldCacheLevel(fileOperation::cacheLevel(0)); @@ -1711,60 +1728,110 @@ void Foam::argList::parse UPstream::parRun(oldParRun); } - if (nProcDirs < UPstream::nProcs()) { FatalError - << "number of processor directories = " - << nProcDirs - << " is not equal to the number of processors = " + << "Number of processor directories = " << nProcDirs + << " is not equal to the number of ranks = " << UPstream::nProcs() << exit(FatalError); } } + } - // Distribute the master's argument list (unaltered) - for (const int proci : UPstream::subProcs()) - { - OPstream toProc(UPstream::commsTypes::scheduled, proci); - toProc - << args_ << options_ - << runControl_.distributed() - << label(runControl_.dryRun()) - << label(runControl_.verbose()); - } + // Broadcast the master's argument list (unaltered) + { + OPBstream toProcs(UPstream::worldComm); + + toProcs + << args_ << options_ + << distributedType + << label(runControl_.dryRun()) + << label(runControl_.verbose()); } } else { - // Collect the master's argument list - bool isDistributed; + // Receive the broadcasted master's argument list label numDryRun, numVerbose; - IPstream fromMaster - ( - Pstream::commsTypes::scheduled, - Pstream::masterNo() - ); + IPBstream fromMaster(UPstream::worldComm); fromMaster >> args_ >> options_ - >> isDistributed + >> distributedType >> numDryRun >> numVerbose; - runControl_.distributed(isDistributed); + runControl_.distributed(distributedType); runControl_.dryRun(numDryRun); runControl_.verbose(numVerbose); + } + + + // Final handling of distributed roots (if any) + if + ( + distributedType == distributedCodes::DISTRIBUTED_SINGLE_ROOT + ) + { + // The same root for all sub-ranks + // - use broadcast to transmit value + + fileName newCasePath; + + if (UPstream::master()) + { + newCasePath = roots[0]/globalCase_; + OPBstream::send(newCasePath); // worldComm + } + else + { + IPBstream::recv(newCasePath); // worldComm + options_.set("case", newCasePath); + } + } + else if + ( + distributedType == distributedCodes::DISTRIBUTED_MULTIPLE_ROOTS + ) + { + // Different roots for each sub-rank + // - use point-to-point communication to transmit values - // Establish rootPath_/globalCase_/case_ for sub-process + fileName newCasePath; + + if (UPstream::master()) + { + for (const int subproci : UPstream::subProcs()) + { + newCasePath = roots[subproci-1]/globalCase_; + OPstream::send(newCasePath, subproci); // worldComm + } + } + else + { + IPstream::recv(newCasePath, UPstream::masterNo()); // worldComm + options_.set("case", newCasePath); + } + } + + + // Establish rootPath_/globalCase_/case_ for sub-process + if (!UPstream::master()) + { setCasePaths(); } - nProcs = Pstream::nProcs(); - if (Pstream::nProcs() > 1) + + nProcs = UPstream::nProcs(); + if (UPstream::nProcs() > 1) { - case_ = globalCase_/("processor" + Foam::name(Pstream::myProcNo())); + case_ = + ( + globalCase_ + / ("processor" + Foam::name(UPstream::myProcNo())) + ); } else { @@ -1778,6 +1845,7 @@ void Foam::argList::parse case_ = globalCase_; // Redundant, but extra safety? } + // If needed, adjust fileHandler for distributed roots if (runControl_.distributed() && fileOperation::fileHandlerPtr_) { @@ -1834,7 +1902,7 @@ void Foam::argList::parse List<fileNameList> rankToDirs(UPstream::nProcs()); if (UPstream::master()) { - const bool oldParRun = Pstream::parRun(false); + const bool oldParRun = UPstream::parRun(false); // Note: non-parallel running might update // fileOperation::nProcs() so store & restore below const label nOldProcs = fileHandler().nProcs(); @@ -2025,7 +2093,7 @@ void Foam::argList::parse sigQuit::set(bannerEnabled()); sigSegv::set(bannerEnabled()); - if (Pstream::master() && bannerEnabled()) + if (UPstream::master() && bannerEnabled()) { Info<< "fileModificationChecking : " << "Monitoring run-time modified files using " @@ -2274,7 +2342,7 @@ bool Foam::argList::check(bool checkArgs, bool checkOpts) const { bool ok = true; - if (Pstream::master()) + if (UPstream::master()) { const label nargs = args_.size()-1; if (checkArgs && nargs != validArgs.size()) @@ -2332,7 +2400,7 @@ bool Foam::argList::checkRootCase() const const fileName pathDir(fileHandler().filePath(path(), false)); - if (checkProcessorDirectories_ && pathDir.empty() && Pstream::master()) + if (checkProcessorDirectories_ && pathDir.empty() && UPstream::master()) { // Allow non-existent processor directories on sub-processes, // to be created later (e.g. redistributePar) -- GitLab