From 6b691f1776634135adb16540dd4ddb0edeeb9d4d Mon Sep 17 00:00:00 2001 From: mattijs <mattijs> Date: Tue, 11 Aug 2009 23:08:00 +0100 Subject: [PATCH] provision for non-blocking comms --- src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C | 10 +++++++- src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 25 +++++++++++++++++++ src/Pstream/mpi/IPread.C | 23 ++++++++++++++--- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C index 3d889af9615..4ad3f6bbece 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C @@ -78,7 +78,15 @@ inline void Foam::IPstream::readFromBuffer // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // Foam::IPstream::~IPstream() -{} +{ + if (bufPosition_ < messageSize_) + { + FatalErrorIn("IPstream::~IPstream()") + << "Message not fully consumed. messageSize:" << messageSize_ + << " bytes of which only " << bufPosition_ + << " consumed." << Foam::abort(FatalError); + } +} diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index dff94c259f7..5e8e5cbbb78 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -350,6 +350,31 @@ public: return oldCommsType; } + //- Transfer buffer + const List<char>& buf() const + { + return buf_; + } + + //- Transfer buffer + List<char>& buf() + { + return buf_; + } + + //- Current buffer read/write location + int bufPosition() const + { + return bufPosition_; + } + + //- Current buffer read/write location + int& bufPosition() + { + return bufPosition_; + } + + //- Exit program static void exit(int errnum = 1); diff --git a/src/Pstream/mpi/IPread.C b/src/Pstream/mpi/IPread.C index 2b6953765ab..c08c6b56ab2 100644 --- a/src/Pstream/mpi/IPread.C +++ b/src/Pstream/mpi/IPread.C @@ -61,24 +61,38 @@ Foam::IPstream::IPstream MPI_Status status; + // Cannot use buf_.size() since appends a few bytes extra + label realBufSize = bufSize; + // If the buffer size is not specified, probe the incomming message // and set it if (!bufSize) { + if (commsType == nonBlocking) + { + FatalErrorIn + ( + "IPstream::IPstream(const commsTypes, const int, " + "const label, streamFormat, versionNumber)" + ) << "Can use nonBlocking mode only with pre-allocated buffers" + << Foam::abort(FatalError); + } + MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status); MPI_Get_count(&status, MPI_BYTE, &messageSize_); buf_.setSize(messageSize_); + realBufSize = buf_.size(); } - messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size()); + messageSize_ = read(commsType, fromProcNo_, buf_.begin(), realBufSize); if (!messageSize_) { FatalErrorIn ( - "IPstream::IPstream(const int fromProcNo, " - "const label bufSize, streamFormat format, versionNumber version)" + "IPstream::IPstream(const commsTypes, const int, " + "const label, streamFormat, versionNumber)" ) << "read failed" << Foam::abort(FatalError); } @@ -173,7 +187,8 @@ Foam::label Foam::IPstream::read IPstream_outstandingRequests_.append(request); - return 1; + // Assume the message is completely received. + return bufSize; } else { -- GitLab