Commit 7e200aec authored by mattijs's avatar mattijs
Browse files

optional no wait on Pstream::exchange

parent 2352682a
......@@ -33,7 +33,7 @@ Foam::OPstream::OPstream
const commsTypes commsType,
const int toProcNo,
const label bufSize,
const label tag,
const int tag,
streamFormat format,
versionNumber version
)
......
......@@ -66,7 +66,7 @@ public:
const commsTypes commsType,
const int toProcNo,
const label bufSize = 0,
const label tag = UPstream::msgType(),
const int tag = UPstream::msgType(),
streamFormat format=BINARY,
versionNumber version=currentVersion
);
......
......@@ -234,14 +234,15 @@ public:
//- Exchange data. Sends sendData, receives into recvData, sets
// sizes (not bytes). sizes[p0][p1] is what processor p0 has
// sent to p1. Continuous data only.
//template <template<class> class ListType, class T>
// If block=true will wait for all transfers to finish.
template <class Container, class T>
static void exchange
(
const List<Container >&,
List<Container >&,
labelListList& sizes,
const label tag = UPstream::msgType()
const int tag = UPstream::msgType(),
const bool block = true
);
};
......
......@@ -40,7 +40,7 @@ namespace Foam
Foam::PstreamBuffers::PstreamBuffers
(
const UPstream::commsTypes commsType,
const label tag,
const int tag,
IOstream::streamFormat format,
IOstream::versionNumber version
)
......@@ -57,7 +57,7 @@ Foam::PstreamBuffers::PstreamBuffers
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
void Foam::PstreamBuffers::finishedSends()
void Foam::PstreamBuffers::finishedSends(const bool block)
{
finishedSendsCalled_ = true;
......@@ -69,13 +69,14 @@ void Foam::PstreamBuffers::finishedSends()
sendBuf_,
recvBuf_,
sizes,
tag_
tag_,
block
);
}
}
void Foam::PstreamBuffers::finishedSends(labelListList& sizes)
void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block)
{
finishedSendsCalled_ = true;
......@@ -89,7 +90,8 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes)
sendBuf_,
recvBuf_,
sizes,
tag_
tag_,
block
);
}
else
......@@ -104,7 +106,7 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes)
}
// Send sizes across.
label oldTag = UPstream::msgType();
int oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
combineReduce(sizes, UPstream::listEq());
UPstream::msgType() = oldTag;
......
......@@ -50,7 +50,6 @@ Description
}
pBuffers.finishedSends(); // no-op for blocking
Pstream::waitRequests(); // no-op for blocking
for (label procI = 0; procI < Pstream::nProcs(); procI++)
{
......@@ -95,7 +94,7 @@ class PstreamBuffers
//- Communications type of this stream
const UPstream::commsTypes commsType_;
const label tag_;
const int tag_;
const IOstream::streamFormat format_;
......@@ -125,7 +124,7 @@ public:
PstreamBuffers
(
const UPstream::commsTypes commsType,
const label tag = UPstream::msgType(),
const int tag = UPstream::msgType(),
IOstream::streamFormat format=IOstream::BINARY,
IOstream::versionNumber version=IOstream::currentVersion
);
......@@ -134,12 +133,13 @@ public:
// Member functions
//- Mark all sends as having been done. This will start receives
// in non-blocking mode.
void finishedSends();
// in non-blocking mode. If block will wait for all transfers to
// finish (only relevant for nonBlocking mode)
void finishedSends(const bool block = true);
//- Mark all sends as having been done. Same as above but also returns
// sizes (bytes) transferred.
void finishedSends(labelListList& sizes);
void finishedSends(labelListList& sizes, const bool block = true);
};
......
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Read token and binary block from UIPstream
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "UIPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
const label tag,
streamFormat format,
versionNumber version
)
:
UPstream(commsType),
Istream(format, version),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(0),
tag_(tag),
messageSize_(0)
{
setOpened();
setGood();
if (commsType == UPstream::nonBlocking)
{
// Message is already received into externalBuf
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
}
messageSize_ = UIPstream::read
(
commsType,
fromProcNo_,
externalBuf_.begin(),
wantedSize
);
UPstream::msgType() = oldTag;
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
if (!messageSize_)
{
FatalErrorIn
(
"UIPstream::UIPstream(const commsTypes, const int, "
"DynamicList<char>&, streamFormat, versionNumber)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
}
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, buffers.version_),
fromProcNo_(fromProcNo),
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(0),
tag_(buffers.tag_),
messageSize_(0)
{
if (commsType() != UPstream::scheduled && !buffers.finishedSendsCalled_)
{
FatalErrorIn("UIPstream::UIPstream(const int, PstreamBuffers&)")
<< "PstreamBuffers::finishedSends() never called." << endl
<< "Please call PstreamBuffers::finishedSends() after doing"
<< " all your sends (using UOPstream) and before doing any"
<< " receives (using UIPstream)" << Foam::exit(FatalError);
}
setOpened();
setGood();
if (commsType() == UPstream::nonBlocking)
{
// Message is already received into externalBuf
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
}
messageSize_ = UIPstream::read
(
commsType(),
fromProcNo_,
externalBuf_.begin(),
wantedSize
);
UPstream::msgType() = oldTag;
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
if (!messageSize_)
{
FatalErrorIn
(
"UIPstream::UIPstream(const int, PstreamBuffers&)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
}
// ************************************************************************* //
......@@ -65,7 +65,7 @@ class UIPstream
label externalBufPosition_;
const label tag_;
const int tag_;
label messageSize_;
......@@ -94,7 +94,7 @@ public:
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
const label tag = UPstream::msgType(),
const int tag = UPstream::msgType(),
streamFormat format=BINARY,
versionNumber version=currentVersion
);
......@@ -128,7 +128,8 @@ public:
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag = UPstream::msgType()
);
//- Return next token from stream
......
......@@ -91,7 +91,7 @@ Foam::UOPstream::UOPstream
const commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const label tag,
const int tag,
const bool sendAtDestruct,
streamFormat format,
versionNumber version
......@@ -129,9 +129,6 @@ Foam::UOPstream::~UOPstream()
{
if (sendAtDestruct_)
{
label oldTag = Pstream::msgType();
Pstream::msgType() = tag_;
if
(
!UOPstream::write
......@@ -139,7 +136,8 @@ Foam::UOPstream::~UOPstream()
commsType_,
toProcNo_,
sendBuf_.begin(),
sendBuf_.size()
sendBuf_.size(),
tag_
)
)
{
......@@ -148,8 +146,6 @@ Foam::UOPstream::~UOPstream()
<< " to processor " << toProcNo_
<< Foam::abort(FatalError);
}
UPstream::msgType() = oldTag;
}
}
......
......@@ -64,7 +64,7 @@ class UOPstream
DynamicList<char>& sendBuf_;
const label tag_;
const int tag_;
const bool sendAtDestruct_;
......@@ -93,7 +93,7 @@ public:
const commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const label tag = UPstream::msgType(),
const int tag = UPstream::msgType(),
const bool sendAtDestruct = true,
streamFormat format=BINARY,
versionNumber version=currentVersion
......@@ -127,7 +127,8 @@ public:
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag = UPstream::msgType()
);
//- Write next token to stream
......
......@@ -46,7 +46,8 @@ void Pstream::exchange
const List<Container >& sendBufs,
List<Container >& recvBufs,
labelListList& sizes,
const label tag
const int tag,
const bool block
)
{
if (UPstream::parRun())
......@@ -80,7 +81,7 @@ void Pstream::exchange
}
// Send sizes across.
label oldTag = UPstream::msgType();
int oldTag = UPstream::msgType();
UPstream::msgType() = tag;
combineReduce(sizes, UPstream::listEq());
UPstream::msgType() = oldTag;
......@@ -97,16 +98,14 @@ void Pstream::exchange
if (procI != Pstream::myProcNo() && nRecv > 0)
{
recvBufs[procI].setSize(nRecv);
label oldTag = UPstream::msgType();
UPstream::msgType() = tag;
UIPstream::read
(
UPstream::nonBlocking,
procI,
reinterpret_cast<char*>(recvBufs[procI].begin()),
nRecv*sizeof(T)
nRecv*sizeof(T),
tag
);
UPstream::msgType() = oldTag;
}
}
......@@ -118,9 +117,6 @@ void Pstream::exchange
{
if (procI != Pstream::myProcNo() && sendBufs[procI].size() > 0)
{
label oldTag = UPstream::msgType();
UPstream::msgType() = tag;
if
(
!UOPstream::write
......@@ -128,7 +124,8 @@ void Pstream::exchange
UPstream::nonBlocking,
procI,
reinterpret_cast<const char*>(sendBufs[procI].begin()),
sendBufs[procI].size()*sizeof(T)
sendBufs[procI].size()*sizeof(T),
tag
)
)
{
......@@ -138,7 +135,6 @@ void Pstream::exchange
<< label(sendBufs[procI].size()*sizeof(T))
<< Foam::abort(FatalError);
}
UPstream::msgType() = oldTag;
}
}
......@@ -146,7 +142,10 @@ void Pstream::exchange
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
Pstream::waitRequests();
if (block)
{
Pstream::waitRequests();
}
}
// Do myself
......
......@@ -23,14 +23,12 @@ License
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Read token and binary block from IPstream
Read from UIPstream
\*---------------------------------------------------------------------------*/
#include "UIPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UIPstream::UIPstream
......@@ -38,7 +36,7 @@ Foam::UIPstream::UIPstream
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
const label tag,
const int tag,
streamFormat format,
versionNumber version
)
......@@ -51,28 +49,29 @@ Foam::UIPstream::UIPstream
tag_(tag),
messageSize_(0)
{
notImplemented
(
"UIPstream::UIPstream"
"("
"const commsTypes,"
"const int fromProcNo,"
"DynamicList<char>&,"
"const label tag,"
"streamFormat, versionNumber"
")"
);
notImplemented
(
"UIPstream::UIPstream"
"("
"const commsTypes,"
"const int fromProcNo,"
"DynamicList<char>&,"
"const int tag,"
"streamFormat, versionNumber"
")"
);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
int Foam::UIPstream::read
Foam::label Foam::UIPstream::read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag
)
{
notImplemented
......@@ -82,7 +81,8 @@ int Foam::UIPstream::read
"const commsTypes,"
"const int fromProcNo,"
"char* buf,"
"const label bufSize"
"const label bufSize,"
"const int tag"
")"
);
......
......@@ -29,8 +29,6 @@ Description
#include "UOPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UOPstream::write
......@@ -38,21 +36,23 @@ bool Foam::UOPstream::write
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag
)
{
notImplemented
(
"UOPstream::write"
"("
"const commsTypes commsType,"
"const int fromProcNo,"
"char* buf,"
"const label bufSize"
")"
);
return false;
notImplemented
(
"UOPstream::write"
"("
"const commsTypes commsType,"
"const int fromProcNo,"
"char* buf,"
"const label bufSize,"
"const int tag"
")"
);
return false;
}
......
......@@ -40,7 +40,7 @@ Foam::UIPstream::UIPstream
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
const label tag,
const int tag,
streamFormat format,
versionNumber version
)
......@@ -66,18 +66,11 @@ Foam::UIPstream::UIPstream
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
Pout<< "UIPstream::UIPstream() starting receive from " << fromProcNo_
<< " with tag:" << msgType()
<< Foam::endl;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)