Newer
Older
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2014 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include <cstring>
#include <cstdlib>
#include <csignal>
# define MPI_SCALAR MPI_FLOAT
# define MPI_SCALAR MPI_DOUBLE
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
}
{
MPI_Init(&argc, &argv);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myRank:" << myRank << endl;
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "bool IPstream::init(int& argc, char**& argv) : "
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
// Initialise parallel structure
setParRun(numprocs);
# ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
if (bufferSizeName.size())
{
int bufferSize = atoi(bufferSizeName.c_str());
if (bufferSize)
{
MPI_Buffer_attach(new char[bufferSize], bufferSize);
}
}
else
{
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "UPstream::init(int& argc, char**& argv) : "
<< "environment variable MPI_BUFFER_SIZE not defined"
<< Foam::abort(FatalError);
}
# endif
//int processorNameLen;
//char processorName[MPI_MAX_PROCESSOR_NAME];
//
//MPI_Get_processor_name(processorName, &processorNameLen);
//processorName[processorNameLen] = '\0';
//Pout<< "Processor name:" << processorName << endl;
return true;
}
if (debug)
{
Pout<< "UPstream::exit." << endl;
}
# ifndef SGIMPI
int size;
char* buff;
MPI_Buffer_detach(&buff, &size);
delete[] buff;
# endif
if (PstreamGlobals::outstandingRequests_.size())
{
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< "This should not happen for a normal code exit."
<< endl;
}
// Clean mpi communicators
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
{
freePstreamCommunicator(communicator);
}
}
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
void Foam::reduce
(
scalar& Value,
const minOp<scalar>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
void Foam::reduce
(
vector2D& Value,
const sumOp<vector2D>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag,
const label communicator
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
vector2D twoScalars(Value, scalar(Count));
reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
Value = twoScalars.x();
Count = twoScalars.y();
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator,
label& requestID
)
{
#ifdef MPIX_COMM_TYPE_SHARED
// Assume mpich2 with non-blocking collectives extensions. Once mpi3
// is available this will change.
MPI_Request request;
scalar v = Value;
MPIX_Ireduce
(
&v,
&Value,
1,
MPI_SCALAR,
MPI_SUM,
0, //root
PstreamGlobals::MPICommunicators_[communicator],
&request
);
requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
if (debug)
{
Pout<< "UPstream::allocateRequest for non-blocking reduce"
<< " : request:" << requestID
<< endl;
#else
// Non-blocking not yet implemented in mpi
reduce(Value, bop, tag, communicator);
requestID = -1;
#endif
}
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,
const label index
)
{
if (index == PstreamGlobals::MPIGroups_.size())
{
// Extend storage with dummy values
MPI_Group newGroup;
PstreamGlobals::MPIGroups_.append(newGroup);
MPI_Comm newComm;
PstreamGlobals::MPICommunicators_.append(newComm);
}
else if (index > PstreamGlobals::MPIGroups_.size())
{
FatalErrorIn
(
"UPstream::allocatePstreamCommunicator\n"
"(\n"
" const label parentIndex,\n"
" const labelList& subRanks\n"
")\n"
) << "PstreamGlobals out of sync with UPstream data. Problem."
<< Foam::exit(FatalError);
}
if (parentIndex == -1)
{
// Allocate world communicator
if (index != UPstream::worldComm)
{
FatalErrorIn
(
"UPstream::allocateCommunicator\n"
"(\n"
" const label parentIndex,\n"
" const labelList& subRanks\n"
")\n"
) << "world communicator should always be index "
<< UPstream::worldComm << Foam::exit(FatalError);
}
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
// Set the number of processes to the actual number
int numProcs;
MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
procIDs_[index] = identity(numProcs);
}
else
{
// Create new group
MPI_Group_incl
(
PstreamGlobals::MPIGroups_[parentIndex],
procIDs_[index].size(),
procIDs_[index].begin(),
&PstreamGlobals::MPIGroups_[index]
);
// Create new communicator
MPI_Comm_create
(
PstreamGlobals::MPICommunicators_[parentIndex],
PstreamGlobals::MPIGroups_[index],
&PstreamGlobals::MPICommunicators_[index]
);
if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
{
myProcNo_[index] = -1;
}
else
{
mattijs
committed
if
mattijs
committed
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
)
)
{
FatalErrorIn
(
"UPstream::allocatePstreamCommunicator\n"
"(\n"
" const label,\n"
" const labelList&\n"
")\n"
) << "Problem :"
<< " when allocating communicator at " << index
<< " from ranks " << procIDs_[index]
<< " of parent " << parentIndex
<< " cannot find my own rank"
<< Foam::exit(FatalError);
}
}
}
}
void Foam::UPstream::freePstreamCommunicator(const label communicator)
{
if (communicator != UPstream::worldComm)
{
if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
{
// Free communicator. Sets communicator to MPI_COMM_NULL
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
}
if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
{
// Free greoup. Sets group to MPI_GROUP_NULL
MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
}
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();
}
void Foam::UPstream::resetRequests(const label i)
{
if (i < PstreamGlobals::outstandingRequests_.size())
{
PstreamGlobals::outstandingRequests_.setSize(i);
}
}
void Foam::UPstream::waitRequests(const label start)
if (debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()-start
<< " outstanding requests starting at " << start << endl;
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
(
PstreamGlobals::outstandingRequests_,
PstreamGlobals::outstandingRequests_.size() - start,
start
);
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
) << "MPI_Waitall returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequests : finished wait." << endl;
}
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
void Foam::UPstream::waitRequest(const label i)
{
if (debug)
{
Pout<< "UPstream::waitRequest : starting wait for request:" << i
<< endl;
}
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
"UPstream::waitRequest(const label)"
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
if
(
MPI_Wait
(
&PstreamGlobals::outstandingRequests_[i],
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"UPstream::waitRequest()"
) << "MPI_Wait returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequest : finished wait for request:" << i
<< endl;
}
}
Pout<< "UPstream::finishedRequest : checking request:" << i
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
Pout<< "UPstream::finishedRequest : finished request:" << i
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
int Foam::UPstream::allocateTag(const char* s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::allocateTag " << s
<< " : tag:" << tag
<< endl;
}
return tag;
}
int Foam::UPstream::allocateTag(const word& s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::allocateTag " << s
<< " : tag:" << tag
<< endl;
}
return tag;
}
void Foam::UPstream::freeTag(const char* s, const int tag)
{
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
void Foam::UPstream::freeTag(const word& s, const int tag)
{
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
// ************************************************************************* //