From 7bf1ea1643a378b046d6659a04c03ef2785ed083 Mon Sep 17 00:00:00 2001
From: Mark Olesen <Mark.Olesen@esi-group.com>
Date: Tue, 22 Feb 2022 16:59:02 +0100
Subject: [PATCH] ENH: avoid all-to-all communication in isoAdvection (#2371)

- the data front for isoAdvection can be particularly sparse and at
  higher processor counts there is an advantage to avoiding all-to-all
  communication for the PstreamBuffers exchange

Based on code changes from T.Aoyagi(RIST), A.Azami(RIST)
---
 .../polyMesh/syncTools/syncToolsTemplates.C   |  35 ++++--
 .../fvMesh/zoneDistribute/zoneDistribute.C    | 119 ++++++++++++++----
 .../fvMesh/zoneDistribute/zoneDistribute.H    |  36 +++---
 .../fvMesh/zoneDistribute/zoneDistributeI.H   | 101 ++++++++++-----
 .../isoAdvection/isoAdvection.C               |  15 ++-
 5 files changed, 219 insertions(+), 87 deletions(-)

diff --git a/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C b/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C
index 7491722996f..2384e7b678c 100644
--- a/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C
+++ b/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C
@@ -127,6 +127,7 @@ void Foam::syncTools::syncPointMap
 
     if (Pstream::parRun())
     {
+        DynamicList<label> sendRecvProcs;
         PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
 
         // Send
@@ -137,6 +138,7 @@ void Foam::syncTools::syncPointMap
             if (ppp && pp.nPoints())
             {
                 const auto& procPatch = *ppp;
+                const label nbrProci = procPatch.neighbProcNo();
 
                 // Get data per patchPoint in neighbouring point numbers.
 
@@ -157,12 +159,14 @@ void Foam::syncTools::syncPointMap
                     }
                 }
 
-                UOPstream toNeighb(procPatch.neighbProcNo(), pBufs);
-                toNeighb << patchInfo;
+                sendRecvProcs.append(nbrProci);
+                UOPstream toNbr(nbrProci, pBufs);
+                toNbr << patchInfo;
             }
         }
 
-        pBufs.finishedSends();
+        // Limit exchange to involved procs
+        pBufs.finishedSends(sendRecvProcs, sendRecvProcs);
 
         // Receive and combine.
         for (const polyPatch& pp : patches)
@@ -172,8 +176,9 @@ void Foam::syncTools::syncPointMap
             if (ppp && pp.nPoints())
             {
                 const auto& procPatch = *ppp;
+                const label nbrProci = procPatch.neighbProcNo();
 
-                UIPstream fromNbr(procPatch.neighbProcNo(), pBufs);
+                UIPstream fromNbr(nbrProci, pBufs);
                 Map<T> nbrPatchInfo(fromNbr);
 
                 // Transform
@@ -377,6 +382,7 @@ void Foam::syncTools::syncEdgeMap
 
     if (Pstream::parRun())
     {
+        DynamicList<label> sendRecvProcs;
         PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
 
         // Send
@@ -387,6 +393,7 @@ void Foam::syncTools::syncEdgeMap
             if (ppp && pp.nEdges())
             {
                 const auto& procPatch = *ppp;
+                const label nbrProci = procPatch.neighbProcNo();
 
                 // Get data per patch edge in neighbouring edge.
 
@@ -409,12 +416,15 @@ void Foam::syncTools::syncEdgeMap
                     }
                 }
 
-                UOPstream toNeighb(procPatch.neighbProcNo(), pBufs);
-                toNeighb << patchInfo;
+                sendRecvProcs.append(nbrProci);
+                UOPstream toNbr(nbrProci, pBufs);
+                toNbr << patchInfo;
             }
         }
 
-        pBufs.finishedSends();
+        // Limit exchange to involved procs
+        pBufs.finishedSends(sendRecvProcs, sendRecvProcs);
+
 
         // Receive and combine.
         for (const polyPatch& pp : patches)
@@ -1113,6 +1123,7 @@ void Foam::syncTools::syncBoundaryFaceList
         }
         else
         {
+            DynamicList<label> sendRecvProcs;
             PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
 
             // Send
@@ -1123,6 +1134,7 @@ void Foam::syncTools::syncBoundaryFaceList
                 if (ppp && pp.size())
                 {
                     const auto& procPatch = *ppp;
+                    const label nbrProci = procPatch.neighbProcNo();
 
                     const SubList<T> fld
                     (
@@ -1131,12 +1143,15 @@ void Foam::syncTools::syncBoundaryFaceList
                         pp.start()-boundaryOffset
                     );
 
-                    UOPstream toNbr(procPatch.neighbProcNo(), pBufs);
-                    toNbr << fld;;
+                    sendRecvProcs.append(nbrProci);
+                    UOPstream toNbr(nbrProci, pBufs);
+                    toNbr << fld;
                 }
             }
 
-            pBufs.finishedSends();
+            // Limit exchange to involved procs
+            pBufs.finishedSends(sendRecvProcs, sendRecvProcs);
+
 
             // Receive and combine.
             for (const polyPatch& pp : patches)
diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C
index aad80178c01..d6762911702 100644
--- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C
+++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.C
@@ -6,7 +6,7 @@
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
     Copyright (C) 2020 DLR
-    Copyright (C) 2020 OpenCFD Ltd.
+    Copyright (C) 2020-2022 OpenCFD Ltd.
 -------------------------------------------------------------------------------
 License
     This file is part of OpenFOAM.
@@ -93,11 +93,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh)
 :
     MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh),
     coupledBoundaryPoints_(coupledFacesPatch()().meshPoints()),
-    send_(Pstream::nProcs()),
+    send_(UPstream::nProcs()),
     stencil_(zoneCPCStencil::New(mesh)),
-    gblIdx_(stencil_.globalNumbering())
-{
-}
+    globalNumbering_(stencil_.globalNumbering()),
+    sendTo_(),   // Initial zero-sized
+    recvFrom_()  // Initial zero-sized
+{}
 
 
 // * * * * * * * * * * * * * * * * Selectors  * * * * * * * * * * * * * * //
@@ -124,7 +125,11 @@ void Foam::zoneDistribute::updateStencil(const boolList& zone)
 }
 
 
-void Foam::zoneDistribute::setUpCommforZone(const boolList& zone,bool updateStencil)
+void Foam::zoneDistribute::setUpCommforZone
+(
+    const boolList& zone,
+    bool updateStencil
+)
 {
     zoneCPCStencil& stencil = zoneCPCStencil::New(mesh_);
 
@@ -133,60 +138,120 @@ void Foam::zoneDistribute::setUpCommforZone(const boolList& zone,bool updateSten
         stencil.updateStencil(zone);
     }
 
-    const labelHashSet comms = stencil.needsComm();
+    if (UPstream::parRun())
+    {
+        if (sendTo_.empty())
+        {
+            // First time
+            sendTo_.resize(UPstream::nProcs());
+            recvFrom_.resize(UPstream::nProcs());
+            sendTo_ = false;
+            recvFrom_ = false;
+        }
+
+        const labelHashSet& comms = stencil.needsComm();
 
-    List<labelHashSet> needed(Pstream::nProcs());
+        List<labelHashSet> needed(UPstream::nProcs());
 
-    if (Pstream::parRun())
-    {
         for (const label celli : comms)
         {
             if (zone[celli])
             {
                 for (const label gblIdx : stencil_[celli])
                 {
-                    if (!gblIdx_.isLocal(gblIdx))
+                    if (!globalNumbering_.isLocal(gblIdx))
                     {
-                        const label procID = gblIdx_.whichProcID (gblIdx);
+                        const label procID =
+                            globalNumbering_.whichProcID(gblIdx);
                         needed[procID].insert(gblIdx);
                     }
                 }
             }
         }
 
-        PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
+
+        PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
+        labelList recvSizes;
 
         // Stream data into buffer
-        for (const int domain : Pstream::allProcs())
+        for (const int proci : UPstream::allProcs())
         {
-            if (domain != Pstream::myProcNo())
+            if (proci != UPstream::myProcNo() && !needed[proci].empty())
             {
                 // Put data into send buffer
-                UOPstream toDomain(domain, pBufs);
+                UOPstream toProc(proci, pBufs);
 
-                toDomain << needed[domain];
+                toProc << needed[proci].sortedToc();
             }
         }
 
-        // wait until everything is written.
-        pBufs.finishedSends();
 
-        for (const int domain : Pstream::allProcs())
+        // Need update, or use existing partial communication info?
+        bool fullUpdate = false;
+        for (const int proci : UPstream::allProcs())
         {
-            send_[domain].clear();
+            if
+            (
+                proci != UPstream::myProcNo()
+             && (!sendTo_[proci] && !needed[proci].empty())
+            )
+            {
+                // Changed state sendTo_ from false -> true
+                sendTo_[proci] = true;
+                fullUpdate = true;
+            }
+        }
+
 
-            if (domain != Pstream::myProcNo())
+        if (returnReduce(fullUpdate, orOp<bool>()))
+        {
+            pBufs.finishedSends(recvSizes);
+
+            // Update which ones receive
+            for (const int proci : UPstream::allProcs())
             {
-                // get data from send buffer
-                UIPstream fromDomain(domain, pBufs);
+                recvFrom_[proci] = (recvSizes[proci] > 0);
+            }
+        }
+        else
+        {
+            // No change in senders...
+            // - can communicate with a subset of processors
+            DynamicList<label> sendProcs;
+            DynamicList<label> recvProcs;
 
-                fromDomain >> send_[domain];
+            for (const int proci : UPstream::allProcs())
+            {
+                if (sendTo_[proci])
+                {
+                    sendProcs.append(proci);
+                }
+                if (recvFrom_[proci])
+                {
+                    recvProcs.append(proci);
+                }
             }
+
+            // Wait until everything is written
+            pBufs.finishedSends(sendProcs, recvProcs, recvSizes);
         }
-    }
-}
 
+        for (const int proci : UPstream::allProcs())
+        {
+            send_[proci].clear();
 
+            if
+            (
+                proci != UPstream::myProcNo()
+             && recvFrom_[proci]   // Or: (recvSizes[proci] > 0)
+            )
+            {
+                UIPstream fromProc(proci, pBufs);
+                fromProc >> send_[proci];
+            }
+        }
+    }
+}
 
 
 // ************************************************************************* //
diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H
index 4edaf8b62b5..0dbb7b2f3b3 100644
--- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H
+++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistribute.H
@@ -6,6 +6,7 @@
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
     Copyright (C) 2020 DLR
+    Copyright (C) 2022 OpenCFD Ltd.
 -------------------------------------------------------------------------------
 License
     This file is part of OpenFOAM.
@@ -37,12 +38,13 @@ Description
     Original code supplied by Henning Scheufler, DLR (2019)
 
 SourceFiles
+    zoneDistributeI.H
     zoneDistribute.C
 
 \*---------------------------------------------------------------------------*/
 
-#ifndef zoneDistribute_H
-#define zoneDistribute_H
+#ifndef Foam_zoneDistribute_H
+#define Foam_zoneDistribute_H
 
 #include "fvMesh.H"
 #include "globalIndex.H"
@@ -52,7 +54,6 @@ SourceFiles
 #include "IOobject.H"
 #include "MeshObject.H"
 
-
 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
 
 namespace Foam
@@ -68,20 +69,30 @@ class zoneDistribute
 {
     // Private Data
 
-
         //- labels of the points on coupled patches
         labelList coupledBoundaryPoints_;
 
-        //- storage of the addressing for processor-to-processor comms
-        List<labelHashSet> send_;
+        //- Addressing for processor-to-processor comms
+        List<labelList> send_;
 
         //- Return patch of all coupled faces.
         autoPtr<indirectPrimitivePatch> coupledFacesPatch() const;
 
         zoneCPCStencil& stencil_;
 
-        const globalIndex& gblIdx_;
+        //- Global number into cells and faces
+        const globalIndex& globalNumbering_;
+
+        //- Parallel: send data to processors (true/false)
+        //  updated in setUpCommforZone
+        boolList sendTo_;
+
+        //- Parallel: receive data from processors (true/false)
+        //  updated in setUpCommforZone
+        boolList recvFrom_;
+
 
+    // Private Member Functions
 
         //- Gives patchNumber and patchFaceNumber for a given
         //- Geometric volume field
@@ -114,15 +125,12 @@ public:
         //- Construct from fvMesh
         explicit zoneDistribute(const fvMesh&);
 
-
-    // Selectors
-
+        //- Selector
         static zoneDistribute& New(const fvMesh&);
 
 
     //- Destructor
-
-        virtual ~zoneDistribute() = default;
+    virtual ~zoneDistribute() = default;
 
 
     // Member Functions
@@ -142,7 +150,7 @@ public:
         //- Addressing reference
         const globalIndex& globalNumbering() const noexcept
         {
-            return gblIdx_;
+            return globalNumbering_;
         }
 
         //- Gives patchNumber and patchFaceNumber for a given
@@ -170,8 +178,6 @@ public:
             const boolList& zone,
             const GeometricField<Type, fvPatchField, volMesh>& phi
         );
-
-
 };
 
 
diff --git a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H
index 4dd0309075e..81b33c565b5 100644
--- a/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H
+++ b/src/finiteVolume/fvMesh/zoneDistribute/zoneDistributeI.H
@@ -6,7 +6,7 @@
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
     Copyright (C) 2019-2020 DLR
-    Copyright (C) 2020 OpenCFD Ltd.
+    Copyright (C) 2020-2022 OpenCFD Ltd.
 -------------------------------------------------------------------------------
 License
     This file is part of OpenFOAM.
@@ -85,10 +85,10 @@ Type Foam::zoneDistribute::getValue
     const label gblIdx
 ) const
 {
-    if (gblIdx_.isLocal(gblIdx))
+    if (globalNumbering_.isLocal(gblIdx))
     {
-        const label idx = gblIdx_.toLocal(gblIdx);
-        return getLocalValue(phi,idx);
+        const label localIdx = globalNumbering_.toLocal(gblIdx);
+        return getLocalValue(phi,localIdx);
     }
     else // from other proc
     {
@@ -111,7 +111,6 @@ Foam::Map<Foam::Field<Type>> Foam::zoneDistribute::getFields
             << "size of phi:" <<  phi.size()
             << "do not match. Did the mesh change?"
             << exit(FatalError);
-
     }
 
 
@@ -120,20 +119,20 @@ Foam::Map<Foam::Field<Type>> Foam::zoneDistribute::getFields
 
     Map<Field<Type>> stencilWithValues;
 
-    DynamicField<Type> tmpField(100);
+    DynamicField<Type> tmpField(128);
 
-    forAll(zone,celli)
+    forAll(zone, celli)
     {
         if (zone[celli])
         {
-            tmpField.clearStorage();
+            tmpField.clear();
 
             for (const label gblIdx : stencil_[celli])
             {
                 tmpField.append(getValue(phi,neiValues,gblIdx));
             }
 
-            stencilWithValues.insert(celli,tmpField);
+            stencilWithValues.emplace(celli, tmpField);
         }
     }
 
@@ -155,55 +154,97 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
             << "size of phi:" <<  phi.size()
             << "do not match. Did the mesh change?"
             << exit(FatalError);
-
     }
 
 
     // Get values from other proc
     Map<Type> neiValues;
-    List<Map<Type>> sendValues(Pstream::nProcs());
 
-    if (Pstream::parRun())
+    if (UPstream::parRun())
     {
-        forAll(send_, domaini)
+        const bool commsPrepared = (sendTo_.size() == UPstream::nProcs());
+
+        if (!commsPrepared)
+        {
+            FatalErrorInFunction
+                << "The send/recv information not initialized - "
+                << "likely that setUpCommforZone() was not called"
+                << endl;
+
+            // But don't exit/abort for now
+        }
+
+
+        List<Map<Type>> sendValues(UPstream::nProcs());
+
+        forAll(send_, proci)
         {
-            for (const label sendIdx : send_[domaini])
+            for (const label sendIdx : send_[proci])
             {
-                sendValues[domaini].insert
+                sendValues[proci].insert
                 (
                     sendIdx,
-                    getLocalValue(phi,gblIdx_.toLocal(sendIdx))
+                    getLocalValue(phi, globalNumbering_.toLocal(sendIdx))
                 );
             }
         }
 
-        PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
+
+        PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
+        labelList recvSizes;
 
         // Stream data into buffer
-        for (const int domain : Pstream::allProcs())
+        for (const int proci : UPstream::allProcs())
         {
-            if (domain != Pstream::myProcNo())
+            if (proci != UPstream::myProcNo() && !sendValues[proci].empty())
             {
                 // Put data into send buffer
-                UOPstream toDomain(domain, pBufs);
+                UOPstream toProc(proci, pBufs);
+                toProc << sendValues[proci];
+            }
+        }
+
+
+        if (commsPrepared)
+        {
+            DynamicList<label> sendProcs;
+            DynamicList<label> recvProcs;
 
-                toDomain << sendValues[domain];
+            for (const int proci : UPstream::allProcs())
+            {
+                if (sendTo_[proci])
+                {
+                    sendProcs.append(proci);
+                }
+                if (recvFrom_[proci])
+                {
+                    recvProcs.append(proci);
+                }
             }
+
+            pBufs.finishedSends(sendProcs, recvProcs, recvSizes);
         }
+        else
+        {
+            // Fallback to all-to-all communication
 
-        // Wait until everything is written.
-        pBufs.finishedSends();
+            pBufs.finishedSends(recvSizes);
+        }
 
-        Map<Type> tmpValue;
 
-        for (const int domain : Pstream::allProcs())
+        for (const int proci : UPstream::allProcs())
         {
-            if (domain != Pstream::myProcNo())
+            // Do not use recvFrom_[proci] here
+            // - could be incorrect if comms are not prepared!
+
+            if
+            (
+                proci != UPstream::myProcNo()
+             && (recvSizes[proci] > 0)
+            )
             {
-                // Get data from send buffer
-                UIPstream fromDomain(domain, pBufs);
-
-                fromDomain >> tmpValue;
+                UIPstream fromProc(proci, pBufs);
+                Map<Type> tmpValue(fromProc);
 
                 neiValues += tmpValue;
             }
diff --git a/src/transportModels/geometricVoF/advectionSchemes/isoAdvection/isoAdvection.C b/src/transportModels/geometricVoF/advectionSchemes/isoAdvection/isoAdvection.C
index dc5cbeaeced..145962b805d 100644
--- a/src/transportModels/geometricVoF/advectionSchemes/isoAdvection/isoAdvection.C
+++ b/src/transportModels/geometricVoF/advectionSchemes/isoAdvection/isoAdvection.C
@@ -6,7 +6,7 @@
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
     Copyright (C) 2016-2017 DHI
-    Modified code Copyright (C) 2016-2017 OpenCFD Ltd.
+    Modified code Copyright (C) 2016-2022 OpenCFD Ltd.
     Modified code Copyright (C) 2019-2020 DLR
     Modified code Copyright (C) 2018, 2021 Johan Roenby
 -------------------------------------------------------------------------------
@@ -497,6 +497,7 @@ Foam::DynamicList<Foam::label>  Foam::isoAdvection::syncProcPatches
 
     if (Pstream::parRun())
     {
+        DynamicList<label> sendRecvProcs;
         PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
 
         // Send
@@ -504,10 +505,12 @@ Foam::DynamicList<Foam::label>  Foam::isoAdvection::syncProcPatches
         {
             const processorPolyPatch& procPatch =
                 refCast<const processorPolyPatch>(patches[patchi]);
+            const label nbrProci = procPatch.neighbProcNo();
 
-            UOPstream toNbr(procPatch.neighbProcNo(), pBufs);
-            const scalarField& pFlux = dVf.boundaryField()[patchi];
+            sendRecvProcs.append(nbrProci);
+            UOPstream toNbr(nbrProci, pBufs);
 
+            const scalarField& pFlux = dVf.boundaryField()[patchi];
             const List<label>& surfCellFacesOnProcPatch =
                 surfaceCellFacesOnProcPatches_[patchi];
 
@@ -520,7 +523,8 @@ Foam::DynamicList<Foam::label>  Foam::isoAdvection::syncProcPatches
             toNbr << surfCellFacesOnProcPatch << dVfPatch;
         }
 
-        pBufs.finishedSends();
+        // Limit exchange to involved procs
+        pBufs.finishedSends(sendRecvProcs, sendRecvProcs);
 
 
         // Receive and combine
@@ -528,8 +532,9 @@ Foam::DynamicList<Foam::label>  Foam::isoAdvection::syncProcPatches
         {
             const processorPolyPatch& procPatch =
                 refCast<const processorPolyPatch>(patches[patchi]);
+            const label nbrProci = procPatch.neighbProcNo();
 
-            UIPstream fromNeighb(procPatch.neighbProcNo(), pBufs);
+            UIPstream fromNeighb(nbrProci, pBufs);
             List<label> faceIDs;
             List<scalar> nbrdVfs;
 
-- 
GitLab