UIPstreamRead.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2019-2021 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "UIPstream.H"
30#include "PstreamGlobals.H"
31#include "profilingPstream.H"
32#include "IOstreams.H"
33
34#include <mpi.h>
35
36// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
37
38void Foam::UIPstream::bufferIPCrecv()
39{
40 // Called by constructor
41 if (debug)
42 {
43 Pout<< "UIPstream IPC read buffer :"
44 << " from:" << fromProcNo_
45 << " tag:" << tag_ << " comm:" << comm_
46 << " wanted size:" << recvBuf_.capacity()
47 << Foam::endl;
48 }
49
50 // No buffer size allocated/specified - probe size of incoming message
51 if (!recvBuf_.capacity())
52 {
54
55 MPI_Status status;
56
57 MPI_Probe
58 (
60 tag_,
62 &status
63 );
64 MPI_Get_count(&status, MPI_BYTE, &messageSize_);
65
66 // Assume these are from gathers ...
68
70
71 if (debug)
72 {
73 Pout<< "UIPstream::UIPstream : probed size:"
75 }
76 }
77
79 (
80 commsType(),
82 recvBuf_.data(),
84 tag_,
85 comm_
86 );
87
88 // Set addressed size. Leave actual allocated memory intact.
90
91 if (!messageSize_)
92 {
93 setEof();
94 }
95}
96
97
98// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
99
100Foam::label Foam::UIPstream::read
101(
102 const commsTypes commsType,
103 const int fromProcNo,
104 char* buf,
105 const std::streamsize bufSize,
106 const int tag,
107 const label communicator
108)
109{
110 if (debug)
111 {
112 Pout<< "UIPstream::read : starting read from:" << fromProcNo
113 << " tag:" << tag << " comm:" << communicator
114 << " wanted size:" << label(bufSize)
115 << " commsType:" << UPstream::commsTypeNames[commsType]
116 << Foam::endl;
117 }
118 if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
119 {
120 Pout<< "UIPstream::read : starting read from:" << fromProcNo
121 << " tag:" << tag << " comm:" << communicator
122 << " wanted size:" << label(bufSize)
123 << " commsType:" << UPstream::commsTypeNames[commsType]
124 << " warnComm:" << UPstream::warnComm
125 << Foam::endl;
127 }
128
130
131 if
132 (
133 commsType == commsTypes::blocking
134 || commsType == commsTypes::scheduled
135 )
136 {
137 MPI_Status status;
138
139 if
140 (
141 MPI_Recv
142 (
143 buf,
144 bufSize,
145 MPI_BYTE,
146 fromProcNo,
147 tag,
149 &status
150 )
151 )
152 {
154 << "MPI_Recv cannot receive incoming message"
156 return 0;
157 }
158
160
161 // Check size of message read
162
163 int messageSize;
164 MPI_Get_count(&status, MPI_BYTE, &messageSize);
165
166 if (debug)
167 {
168 Pout<< "UIPstream::read : finished read from:" << fromProcNo
169 << " tag:" << tag << " read size:" << label(bufSize)
170 << " commsType:" << UPstream::commsTypeNames[commsType]
171 << Foam::endl;
172 }
173
174 if (messageSize > bufSize)
175 {
177 << "buffer (" << label(bufSize)
178 << ") not large enough for incoming message ("
179 << messageSize << ')'
181 }
182
183 return messageSize;
184 }
185 else if (commsType == commsTypes::nonBlocking)
186 {
187 MPI_Request request;
188
189 if
190 (
191 MPI_Irecv
192 (
193 buf,
194 bufSize,
195 MPI_BYTE,
196 fromProcNo,
197 tag,
199 &request
200 )
201 )
202 {
204 << "MPI_Irecv cannot start non-blocking receive"
206
207 return 0;
208 }
209
211
212 if (debug)
213 {
214 Pout<< "UIPstream::read : started read from:" << fromProcNo
215 << " tag:" << tag << " read size:" << label(bufSize)
216 << " commsType:" << UPstream::commsTypeNames[commsType]
217 << " request:" << PstreamGlobals::outstandingRequests_.size()
218 << Foam::endl;
219 }
220
222
223 // Assume the message is completely received.
224 return bufSize;
225 }
226
228 << "Unsupported communications type " << int(commsType)
230
231 return 0;
232}
233
234
235// ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
label capacity() const noexcept
Size of the underlying storage.
Definition: DynamicListI.H:287
void resize(const label len)
Definition: DynamicListI.H:353
void setEof() noexcept
Set stream state as reached 'eof'.
Definition: IOstream.H:357
virtual bool read()
Re-read model coefficients if they have changed.
const int tag_
Definition: UIPstream.H:93
const label comm_
Definition: UIPstream.H:95
DynamicList< char > & recvBuf_
Definition: UIPstream.H:89
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:237
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:296
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:74
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:562
static void printStack(Ostream &os)
Helper function to print a stack.
static void beginTiming()
Update timer prior to measurement.
static void addWaitTime()
Add time increment to waitTime.
static void addGatherTime()
Add time increment to gatherTime.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
errorManip< error > abort(error &err)
Definition: errorManip.H:144
error FatalError
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.