Commit e794f35c authored by mattijs's avatar mattijs
Browse files

ENH: moved read index out of UIPstream into PstreamBuffers

So now we can read multiple times (using UIPstream) from PstreamBuffers
    PstreamBuffers pBufs
    UIPstream str1(procI, pBufs);
    str1>> ..
    UIPstream str2(procI, pBufs);
    str1>> ..
parent 91532a8b
......@@ -38,7 +38,8 @@ Foam::IPstream::IPstream
)
:
Pstream(commsType, bufSize),
UIPstream(commsType, fromProcNo, buf_)
UIPstream(commsType, fromProcNo, buf_, externalBufPosition_),
externalBufPosition_(0)
{}
......
......@@ -55,6 +55,9 @@ class IPstream
public UIPstream
{
//- Receive index
label externalBufPosition_;
public:
// Constructors
......
......@@ -51,10 +51,31 @@ Foam::PstreamBuffers::PstreamBuffers
version_(version),
sendBuf_(UPstream::nProcs()),
recvBuf_(UPstream::nProcs()),
recvBufPos_(UPstream::nProcs(), 0),
finishedSendsCalled_(false)
{}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::~PstreamBuffers()
{
// Check that all data has been consumed.
forAll(recvBufPos_, procI)
{
if (recvBufPos_[procI] < recvBuf_[procI].size())
{
FatalErrorIn("PstreamBuffers::~PstreamBuffers()")
<< "Message from processor " << procI
<< " not fully consumed. messageSize:" << recvBuf_[procI].size()
<< " bytes of which only " << recvBufPos_[procI]
<< " consumed."
<< Foam::abort(FatalError);
}
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
void Foam::PstreamBuffers::finishedSends(const bool block)
......
......@@ -106,6 +106,9 @@ class PstreamBuffers
//- receive buffer
List<DynamicList<char> > recvBuf_;
//- read position in recvBuf_
labelList recvBufPos_;
bool finishedSendsCalled_;
// Private member functions
......@@ -129,6 +132,10 @@ public:
IOstream::versionNumber version=IOstream::currentVersion
);
// Destructor
~PstreamBuffers();
// Member functions
......
......@@ -77,20 +77,6 @@ inline void Foam::UIPstream::readFromBuffer
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UIPstream::~UIPstream()
{
if (externalBufPosition_ < messageSize_)
{
FatalErrorIn("UIPstream::~UIPstream()")
<< "Message not fully consumed. messageSize:" << messageSize_
<< " bytes of which only " << externalBufPosition_
<< " consumed." << Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Istream& Foam::UIPstream::read(token& t)
......
......@@ -63,7 +63,7 @@ class UIPstream
DynamicList<char>& externalBuf_;
label externalBufPosition_;
label& externalBufPosition_;
const int tag_;
......@@ -94,6 +94,7 @@ public:
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
label& externalBufPosition,
const int tag = UPstream::msgType(),
streamFormat format=BINARY,
versionNumber version=currentVersion
......@@ -103,11 +104,6 @@ public:
UIPstream(const int fromProcNo, PstreamBuffers&);
// Destructor
~UIPstream();
// Member functions
// Inquiry
......
......@@ -36,6 +36,7 @@ Foam::UIPstream::UIPstream
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
label& externalBufPosition,
const int tag,
streamFormat format,
versionNumber version
......@@ -45,7 +46,7 @@ Foam::UIPstream::UIPstream
Istream(format, version),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(0),
externalBufPosition_(externalBufPosition),
tag_(tag),
messageSize_(0)
{
......@@ -56,6 +57,7 @@ Foam::UIPstream::UIPstream
"const commsTypes,"
"const int fromProcNo,"
"DynamicList<char>&,"
"label&,"
"const int tag,"
"streamFormat, versionNumber"
")"
......
......@@ -40,6 +40,7 @@ Foam::UIPstream::UIPstream
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
label& externalBufPosition,
const int tag,
streamFormat format,
versionNumber version
......@@ -49,7 +50,7 @@ Foam::UIPstream::UIPstream
Istream(format, version),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(0),
externalBufPosition_(externalBufPosition),
tag_(tag),
messageSize_(0)
{
......@@ -122,7 +123,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
Istream(buffers.format_, buffers.version_),
fromProcNo_(fromProcNo),
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(0),
externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
messageSize_(0)
{
......
......@@ -45,7 +45,7 @@ bool Foam::UOPstream::write
{
if (debug)
{
Pout<< "UIPstream::write : starting write to:" << toProcNo
Pout<< "UOPstream::write : starting write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< Foam::endl;
......@@ -67,7 +67,7 @@ bool Foam::UOPstream::write
if (debug)
{
Pout<< "UIPstream::write : finished write to:" << toProcNo
Pout<< "UOPstream::write : finished write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< Foam::endl;
......@@ -87,7 +87,7 @@ bool Foam::UOPstream::write
if (debug)
{
Pout<< "UIPstream::write : finished write to:" << toProcNo
Pout<< "UOPstream::write : finished write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< Foam::endl;
......@@ -110,7 +110,7 @@ bool Foam::UOPstream::write
if (debug)
{
Pout<< "UIPstream::write : started write to:" << toProcNo
Pout<< "UOPstream::write : started write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< " request:" << PstreamGlobals::outstandingRequests_.size()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment