Commit 3682f2a6 authored by Mark Olesen's avatar Mark Olesen

ENH: support external MPI initialize / finalize (#1266)

parent 400e3b43
Test-parallel-external-init.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-external-init
sinclude $(GENERAL_RULES)/mplib$(WM_MPLIB)
sinclude $(DEFAULT_RULES)/mplib$(WM_MPLIB)
EXE_INC = $(PFLAGS) $(PINC) $(c++LESSWARN)
EXE_LIBS = $(PLIBS)
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2019 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-parallel-external-init
Description
Simulate starting MPI outside of OpenFOAM
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "Time.H"
#include "IPstream.H"
#include "OPstream.H"
#include "vector.H"
#include "IOstreams.H"
#include "Pstream.H"
#include <mpi.h>
#include <iostream>
using namespace Foam;
bool startMPI()
{
int nprocs = 0, rank = 0;
MPI_Init(nullptr, nullptr);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (nprocs && rank == 0)
{
std::cout<< nl << "Using MPI with " << nprocs << " procs" << nl << nl;
}
return true;
}
bool stopMPI()
{
Info<< nl << "Stopping MPI" << nl << nl;
MPI_Finalize();
return true;
}
string message()
{
return
(
"rank " + name(Pstream::myProcNo())
+ " / " + name(Pstream::nProcs()) + "\n"
);
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noCheckProcessorDirectories();
UPstream::debug = 1;
startMPI();
#include "setRootCase.H"
Pout<< message().c_str();
stopMPI();
Info<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //
......@@ -48,8 +48,81 @@ License
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// file-scope: min value and default for mpiBufferSize
static const int minBufferSize = 20000000;
// The min value and default for MPI buffers length
constexpr int minBufLen = 20000000;
// Track if we have attached MPI buffers
static bool ourBuffers = false;
// Track if we initialized MPI
static bool ourMpi = false;
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
static void attachOurBuffers()
{
if (ourBuffers)
{
return; // Already attached
}
ourBuffers = true;
// Use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with MPI_BUFFER_SIZE env variable (int value)
#ifndef SGIMPI
int len = 0;
const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
if (str.empty() || !Foam::read(str, len) || len <= 0)
{
len = Foam::UPstream::mpiBufferSize;
}
if (len < minBufLen)
{
len = minBufLen;
}
if (Foam::UPstream::debug)
{
Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
}
char* buf = new char[len];
if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
{
delete[] buf;
Foam::Pout<< "UPstream::init : could not attach buffer\n";
}
#endif
}
static void detachOurBuffers()
{
if (!ourBuffers)
{
return; // Nothing to detach
}
ourBuffers = false;
// Some MPI notes suggest that the return code is MPI_SUCCESS when
// no buffer is attached.
// Be extra careful and require a non-zero size as well.
#ifndef SGIMPI
int len = 0;
char* buf = nullptr;
if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
{
delete[] buf;
}
#endif
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
......@@ -78,7 +151,7 @@ bool Foam::UPstream::initNull()
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init" << endl
<< "MPI was already finalized - cannot perform MPI_Init\n"
<< Foam::abort(FatalError);
return false;
......@@ -87,17 +160,27 @@ bool Foam::UPstream::initNull()
MPI_Initialized(&flag);
if (flag)
{
// Already initialized - nothing to do
return true;
if (debug)
{
Pout<< "UPstream::initNull : was already initialized\n";
}
}
else
{
// Not already initialized
MPI_Init_thread
(
nullptr, // argc
nullptr, // argv
MPI_THREAD_SINGLE,
&flag // provided_thread_support
);
MPI_Init_thread
(
nullptr, // argc
nullptr, // argv
MPI_THREAD_SINGLE,
&flag // provided_thread_support
);
ourMpi = true;
}
// Could also attach buffers etc.
return true;
}
......@@ -105,6 +188,8 @@ bool Foam::UPstream::initNull()
bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
{
int numprocs = 0, myRank = 0;
int provided_thread_support = 0;
int flag = 0;
MPI_Finalized(&flag);
......@@ -121,38 +206,47 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
MPI_Initialized(&flag);
if (flag)
{
// Already initialized - issue warning and skip the rest
WarningInFunction
<< "MPI was already initialized - cannot perform MPI_Init" << nl
<< "This could indicate an application programming error!" << endl;
return true;
}
// Already initialized.
// Warn if we've called twice, but skip if initialized externally
if (ourMpi)
{
WarningInFunction
<< "MPI was already initialized - cannot perform MPI_Init" << nl
<< "This could indicate an application programming error!"
<< endl;
//MPI_Init(&argc, &argv);
int provided_thread_support;
MPI_Init_thread
(
&argc,
&argv,
return true;
}
else if (debug)
{
Pout<< "UPstream::init : was already initialized\n";
}
}
else
{
MPI_Init_thread
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
),
&provided_thread_support
);
&argc,
&argv,
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
),
&provided_thread_support
);
ourMpi = true;
}
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myRank:" << myRank << endl;
Pout<< "UPstream::init : procs=" << numprocs
<< " rank:" << myRank << endl;
}
if (numprocs <= 1)
......@@ -162,39 +256,10 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
<< Foam::abort(FatalError);
}
// Initialise parallel structure
setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
#ifndef SGIMPI
{
// Normally use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with the MPI_BUFFER_SIZE env variable
// which has an int value
int bufSize = 0;
const std::string str = Foam::getEnv("MPI_BUFFER_SIZE");
if (str.empty() || !Foam::read(str, bufSize) || bufSize <= 0)
{
bufSize = mpiBufferSize;
}
if (bufSize < minBufferSize)
{
bufSize = minBufferSize;
}
if (debug)
{
Pout<< "UPstream::init : mpi-buffer-size " << bufSize << endl;
}
// TBD: could add error handling here.
// Delete allocated and leave if we fail to attach the buffer?
MPI_Buffer_attach(new char[bufSize], bufSize);
}
#endif
attachOurBuffers();
return true;
}
......@@ -204,7 +269,7 @@ void Foam::UPstream::exit(int errnum)
{
if (debug)
{
Pout<< "UPstream::exit." << endl;
Pout<< "UPstream::exit\n";
}
int flag = 0;
......@@ -220,43 +285,35 @@ void Foam::UPstream::exit(int errnum)
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - warn and exit
WarningInFunction
<< "MPI was already finalized (perhaps by a connected program)"
<< endl;
std::exit(1);
return;
}
#ifndef SGIMPI
{
// Some MPI notes suggest that the return code is MPI_SUCCESS when
// no buffer is attached.
// Be extra careful and require a non-zero size as well.
int bufSize = 0;
char* buf = nullptr;
flag = MPI_Buffer_detach(&buf, &bufSize);
if (MPI_SUCCESS == flag && bufSize)
// Already finalized elsewhere?
if (ourMpi)
{
delete[] buf;
WarningInFunction
<< "MPI was already finalized (by a connected program?)\n";
}
else if (debug)
{
Pout<< "UPstream::exit : was already finalized\n";
}
}
else
{
detachOurBuffers();
}
#endif
if (PstreamGlobals::outstandingRequests_.size())
const label nOutstanding = PstreamGlobals::outstandingRequests_.size();
if (nOutstanding)
{
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
WarningInFunction
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< " UPstream::waitRequests()." << endl
<< "There were still " << nOutstanding
<< " outstanding MPI_Requests." << nl
<< "Which means your code exited before doing a "
<< " UPstream::waitRequests()." << nl
<< "This should not happen for a normal code exit."
<< endl;
<< nl;
}
// Clean mpi communicators
......@@ -268,15 +325,27 @@ void Foam::UPstream::exit(int errnum)
}
}
if (errnum == 0)
{
MPI_Finalize();
std::exit(errnum);
}
else
if (!flag)
{
MPI_Abort(MPI_COMM_WORLD, errnum);
// MPI not already finalized
if (!ourMpi)
{
WarningInFunction
<< "Finalizing MPI, but was initialized elsewhere\n";
}
if (errnum == 0)
{
MPI_Finalize();
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
std::exit(errnum);
}
......@@ -786,7 +855,7 @@ void Foam::UPstream::resetRequests(const label i)
void Foam::UPstream::waitRequests(const label start)
{
if (debug)
if (UPstream::debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()-start
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment