Newer
Older
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2012 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include <cstring>
#include <cstdlib>
#include <csignal>
# define MPI_SCALAR MPI_FLOAT
# define MPI_SCALAR MPI_DOUBLE
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("GAMMANP", "number of instances");
validParOptions.insert("machinefile", "machine file");
}
{
MPI_Init(&argc, &argv);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myProcNo:" << myProcNo_ << endl;
}
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "bool IPstream::init(int& argc, char**& argv) : "
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
setParRun();
# ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
if (bufferSizeName.size())
{
int bufferSize = atoi(bufferSizeName.c_str());
if (bufferSize)
{
MPI_Buffer_attach(new char[bufferSize], bufferSize);
}
}
else
{
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "UPstream::init(int& argc, char**& argv) : "
<< "environment variable MPI_BUFFER_SIZE not defined"
<< Foam::abort(FatalError);
}
# endif
int processorNameLen;
char processorName[MPI_MAX_PROCESSOR_NAME];
MPI_Get_processor_name(processorName, &processorNameLen);
//signal(SIGABRT, stop);
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
return true;
}
if (debug)
{
Pout<< "UPstream::exit." << endl;
}
# ifndef SGIMPI
int size;
char* buff;
MPI_Buffer_detach(&buff, &size);
delete[] buff;
# endif
if (PstreamGlobals::outstandingRequests_.size())
{
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< "This should not happen for a normal code exit."
<< endl;
}
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop, const int tag)
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
scalar value;
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
}
else
{
scalar sum;
MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
Value = sum;
/*
int myProcNo = UPstream::myProcNo();
int nProcs = UPstream::nProcs();
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
//
// receive from children
//
int level = 1;
int thisLevelOffset = 2;
int childLevelOffset = thisLevelOffset/2;
int childProcId = 0;
while
(
(childLevelOffset < nProcs)
&& (myProcNo % thisLevelOffset) == 0
)
{
childProcId = myProcNo + childLevelOffset;
scalar value;
if (childProcId < nProcs)
{
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
level++;
thisLevelOffset <<= 1;
childLevelOffset = thisLevelOffset/2;
}
//
// send and receive from parent
//
{
int parentId = myProcNo - (myProcNo % thisLevelOffset);
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
//
// distribute to my children
//
level--;
thisLevelOffset >>= 1;
childLevelOffset = thisLevelOffset/2;
while (level > 0)
{
childProcId = myProcNo + childLevelOffset;
if (childProcId < nProcs)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
level--;
thisLevelOffset >>= 1;
childLevelOffset = thisLevelOffset/2;
}
*/
}
Pout<< "Foam::reduce : reduced value:" << Value << endl;
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();
}
void Foam::UPstream::resetRequests(const label i)
{
if (i < PstreamGlobals::outstandingRequests_.size())
{
PstreamGlobals::outstandingRequests_.setSize(i);
}
}
void Foam::UPstream::waitRequests(const label start)
if (debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()-start
<< " outstanding requests starting at " << start << endl;
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
(
PstreamGlobals::outstandingRequests_,
PstreamGlobals::outstandingRequests_.size() - start,
start
);
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
) << "MPI_Waitall returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequests : finished wait." << endl;
}
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
void Foam::UPstream::waitRequest(const label i)
{
if (debug)
{
Pout<< "UPstream::waitRequest : starting wait for request:" << i
<< endl;
}
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
"UPstream::waitRequest(const label)"
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
if
(
MPI_Wait
(
&PstreamGlobals::outstandingRequests_[i],
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"UPstream::waitRequest()"
) << "MPI_Wait returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequest : finished wait for request:" << i
<< endl;
}
}
Pout<< "UPstream::waitRequests : checking finishedRequest:" << i
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
Pout<< "UPstream::waitRequests : finished finishedRequest:" << i
// ************************************************************************* //