UIPread.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019-2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Read from UIPstream
29 
30 \*---------------------------------------------------------------------------*/
31 
32 #include "UIPstream.H"
33 #include "PstreamGlobals.H"
34 #include "profilingPstream.H"
35 #include "IOstreams.H"
36 
37 #include <mpi.h>
38 
39 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
40 
42 (
43  const commsTypes commsType,
44  const int fromProcNo,
45  DynamicList<char>& receiveBuf,
46  label& receiveBufPosition,
47  const int tag,
48  const label comm,
49  const bool clearAtEnd,
50  IOstreamOption::streamFormat fmt
51 )
52 :
53  UPstream(commsType),
54  Istream(fmt, IOstreamOption::currentVersion),
55  fromProcNo_(fromProcNo),
56  recvBuf_(receiveBuf),
57  recvBufPos_(receiveBufPosition),
58  tag_(tag),
59  comm_(comm),
60  clearAtEnd_(clearAtEnd),
61  messageSize_(0)
62 {
63  setOpened();
64  setGood();
65 
67  {
68  // Message is already received into buffer
69  }
70  else
71  {
72  if (debug)
73  {
74  Pout<< "UIPstream::UIPstream :"
75  << " read from:" << fromProcNo
76  << " tag:" << tag_ << " comm:" << comm_
77  << " wanted size:" << recvBuf_.capacity()
78  << Foam::endl;
79  }
80 
81  // No buffer size allocated/specified - probe size of incoming message
82  if (!recvBuf_.capacity())
83  {
85 
86  MPI_Status status;
87 
88  MPI_Probe
89  (
90  fromProcNo_,
91  tag_,
93  &status
94  );
95  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
96 
97  // Assume these are from gathers ...
99 
100  recvBuf_.resize(messageSize_);
101 
102  if (debug)
103  {
104  Pout<< "UIPstream::UIPstream : probed size:"
105  << messageSize_ << Foam::endl;
106  }
107  }
108 
109  messageSize_ = UIPstream::read
110  (
111  commsType,
112  fromProcNo_,
113  recvBuf_.data(),
114  recvBuf_.capacity(),
115  tag_,
116  comm_
117  );
118 
119  // Set addressed size. Leave actual allocated memory intact.
120  recvBuf_.resize(messageSize_);
121 
122  if (!messageSize_)
123  {
124  setEof();
125  }
126  }
127 }
128 
129 
130 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
131 :
132  UPstream(buffers.commsType_),
133  Istream(buffers.format_, IOstreamOption::currentVersion),
134  fromProcNo_(fromProcNo),
135  recvBuf_(buffers.recvBuf_[fromProcNo]),
136  recvBufPos_(buffers.recvBufPos_[fromProcNo]),
137  tag_(buffers.tag_),
138  comm_(buffers.comm_),
139  clearAtEnd_(true),
140  messageSize_(0)
141 {
142  if
143  (
145  && !buffers.finishedSendsCalled_
146  )
147  {
149  << "PstreamBuffers::finishedSends() never called." << endl
150  << "Please call PstreamBuffers::finishedSends() after doing"
151  << " all your sends (using UOPstream) and before doing any"
152  << " receives (using UIPstream)" << Foam::exit(FatalError);
153  }
154 
155  setOpened();
156  setGood();
157 
159  {
160  // Message is already received into buffer
161  messageSize_ = recvBuf_.size();
162 
163  if (debug)
164  {
165  Pout<< "UIPstream::UIPstream PstreamBuffers :"
166  << " fromProcNo:" << fromProcNo
167  << " tag:" << tag_ << " comm:" << comm_
168  << " receive buffer size:" << messageSize_
169  << Foam::endl;
170  }
171  }
172  else
173  {
174  if (debug)
175  {
176  Pout<< "UIPstream::UIPstream PstreamBuffers :"
177  << " read from:" << fromProcNo
178  << " tag:" << tag_ << " comm:" << comm_
179  << " wanted size:" << recvBuf_.capacity()
180  << Foam::endl;
181  }
182 
183  // No buffer size allocated/specified - probe size of incoming message
184  if (!recvBuf_.capacity())
185  {
187 
188  MPI_Status status;
189 
190  MPI_Probe
191  (
192  fromProcNo_,
193  tag_,
195  &status
196  );
197  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
198 
199  // Assume these are from gathers ...
201 
202  recvBuf_.resize(messageSize_);
203 
204  if (debug)
205  {
206  Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
207  << messageSize_ << Foam::endl;
208  }
209  }
210 
211  messageSize_ = UIPstream::read
212  (
213  commsType(),
214  fromProcNo_,
215  recvBuf_.data(),
216  recvBuf_.capacity(),
217  tag_,
218  comm_
219  );
220 
221  // Set addressed size. Leave actual allocated memory intact.
222  recvBuf_.resize(messageSize_);
223 
224  if (!messageSize_)
225  {
226  setEof();
227  }
228  }
229 }
230 
231 
232 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
233 
234 Foam::label Foam::UIPstream::read
235 (
236  const commsTypes commsType,
237  const int fromProcNo,
238  char* buf,
239  const std::streamsize bufSize,
240  const int tag,
241  const label communicator
242 )
243 {
244  if (debug)
245  {
246  Pout<< "UIPstream::read : starting read from:" << fromProcNo
247  << " tag:" << tag << " comm:" << communicator
248  << " wanted size:" << label(bufSize)
249  << " commsType:" << UPstream::commsTypeNames[commsType]
250  << Foam::endl;
251  }
252  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
253  {
254  Pout<< "UIPstream::read : starting read from:" << fromProcNo
255  << " tag:" << tag << " comm:" << communicator
256  << " wanted size:" << label(bufSize)
257  << " commsType:" << UPstream::commsTypeNames[commsType]
258  << " warnComm:" << UPstream::warnComm
259  << Foam::endl;
261  }
262 
264 
265  if
266  (
267  commsType == commsTypes::blocking
268  || commsType == commsTypes::scheduled
269  )
270  {
271  MPI_Status status;
272 
273  if
274  (
275  MPI_Recv
276  (
277  buf,
278  bufSize,
279  MPI_BYTE,
280  fromProcNo,
281  tag,
282  PstreamGlobals::MPICommunicators_[communicator],
283  &status
284  )
285  )
286  {
288  << "MPI_Recv cannot receive incoming message"
290 
291  return 0;
292  }
293 
295 
296  // Check size of message read
297 
298  int messageSize;
299  MPI_Get_count(&status, MPI_BYTE, &messageSize);
300 
301  if (debug)
302  {
303  Pout<< "UIPstream::read : finished read from:" << fromProcNo
304  << " tag:" << tag << " read size:" << label(bufSize)
305  << " commsType:" << UPstream::commsTypeNames[commsType]
306  << Foam::endl;
307  }
308 
309  if (messageSize > bufSize)
310  {
312  << "buffer (" << label(bufSize)
313  << ") not large enough for incoming message ("
314  << messageSize << ')'
316  }
317 
318  return messageSize;
319  }
320  else if (commsType == commsTypes::nonBlocking)
321  {
322  MPI_Request request;
323 
324  if
325  (
326  MPI_Irecv
327  (
328  buf,
329  bufSize,
330  MPI_BYTE,
331  fromProcNo,
332  tag,
333  PstreamGlobals::MPICommunicators_[communicator],
334  &request
335  )
336  )
337  {
339  << "MPI_Recv cannot start non-blocking receive"
341 
342  return 0;
343  }
344 
346 
347  if (debug)
348  {
349  Pout<< "UIPstream::read : started read from:" << fromProcNo
350  << " tag:" << tag << " read size:" << label(bufSize)
351  << " commsType:" << UPstream::commsTypeNames[commsType]
352  << " request:" << PstreamGlobals::outstandingRequests_.size()
353  << Foam::endl;
354  }
355 
356  PstreamGlobals::outstandingRequests_.append(request);
357 
358  // Assume the message is completely received.
359  return bufSize;
360  }
361 
363  << "Unsupported communications type " << int(commsType)
365 
366  return 0;
367 }
368 
369 
370 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:296
Foam::DynamicList::resize
void resize(const label len)
Definition: DynamicListI.H:353
Foam::UPstream::commsType
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:547
Foam::profilingPstream::addWaitTime
static void addWaitTime()
Add time increment to waitTime.
Definition: profilingPstream.H:174
Foam::error::printStack
static void printStack(Ostream &os)
Helper function to print a stack.
Definition: dummyPrintStack.C:36
IOstreams.H
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Foam::profilingPstream::beginTiming
static void beginTiming()
Update timer prior to measurement.
Definition: profilingPstream.H:138
Foam::PstreamGlobals::outstandingRequests_
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Definition: PstreamGlobals.C:32
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:369
Foam::DynamicList::capacity
label capacity() const noexcept
Size of the underlying storage.
Definition: DynamicListI.H:287
Foam::IOstream::setOpened
void setOpened() noexcept
Set stream opened.
Definition: IOstream.H:129
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::UIPstream::read
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Read into given buffer from given processor.
Definition: UIPread.C:81
Foam::IOstream::setGood
void setGood() noexcept
Set stream state to be good.
Definition: IOstream.H:147
Foam::PstreamGlobals::MPICommunicators_
DynamicList< MPI_Comm > MPICommunicators_
Definition: PstreamGlobals.C:39
Foam::IOstream::setEof
void setEof() noexcept
Set stream state as reached 'eof'.
Definition: IOstream.H:357
Foam::UIPstream::UIPstream
UIPstream(const commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Definition: UIPread.C:37
Foam::profilingPstream::addGatherTime
static void addGatherTime()
Add time increment to gatherTime.
Definition: profilingPstream.H:156
Foam::UPstream::commsTypeNames
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:77
Foam::FatalError
error FatalError
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
PstreamGlobals.H
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
Foam::UPstream::commsTypes::nonBlocking
Foam::UPstream::commsTypes::scheduled
UIPstream.H
profilingPstream.H