UIPread.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Read from UIPstream
29 
30 \*---------------------------------------------------------------------------*/
31 
32 #include "UIPstream.H"
33 #include "PstreamGlobals.H"
34 #include "profilingPstream.H"
35 #include "IOstreams.H"
36 
37 #include <mpi.h>
38 
39 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
40 
42 (
43  const commsTypes commsType,
44  const int fromProcNo,
45  DynamicList<char>& externalBuf,
46  label& externalBufPosition,
47  const int tag,
48  const label comm,
49  const bool clearAtEnd,
50  streamFormat format,
51  versionNumber version
52 )
53 :
54  UPstream(commsType),
55  Istream(format, version),
56  fromProcNo_(fromProcNo),
57  externalBuf_(externalBuf),
58  externalBufPosition_(externalBufPosition),
59  tag_(tag),
60  comm_(comm),
61  clearAtEnd_(clearAtEnd),
62  messageSize_(0)
63 {
64  setOpened();
65  setGood();
66 
68  {
69  // Message is already received into externalBuf
70  }
71  else
72  {
73  MPI_Status status;
74 
75  label wantedSize = externalBuf_.capacity();
76 
77  if (debug)
78  {
79  Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
80  << " tag:" << tag << " comm:" << comm_
81  << " wanted size:" << wantedSize
82  << Foam::endl;
83  }
84 
85 
86  // If the buffer size is not specified, probe the incoming message
87  // and set it
88  if (!wantedSize)
89  {
91 
92  MPI_Probe
93  (
94  fromProcNo_,
95  tag_,
97  &status
98  );
99  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
100 
101  // Assume these are from gathers ...
103 
104  externalBuf_.setCapacity(messageSize_);
105  wantedSize = messageSize_;
106 
107  if (debug)
108  {
109  Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
110  << Foam::endl;
111  }
112  }
113 
114  messageSize_ = UIPstream::read
115  (
116  commsType,
117  fromProcNo_,
118  externalBuf_.begin(),
119  wantedSize,
120  tag_,
121  comm_
122  );
123 
124  // Set addressed size. Leave actual allocated memory intact.
125  externalBuf_.setSize(messageSize_);
126 
127  if (!messageSize_)
128  {
129  setEof();
130  }
131  }
132 }
133 
134 
135 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
136 :
137  UPstream(buffers.commsType_),
138  Istream(buffers.format_, buffers.version_),
139  fromProcNo_(fromProcNo),
140  externalBuf_(buffers.recvBuf_[fromProcNo]),
141  externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
142  tag_(buffers.tag_),
143  comm_(buffers.comm_),
144  clearAtEnd_(true),
145  messageSize_(0)
146 {
147  if
148  (
150  && !buffers.finishedSendsCalled_
151  )
152  {
154  << "PstreamBuffers::finishedSends() never called." << endl
155  << "Please call PstreamBuffers::finishedSends() after doing"
156  << " all your sends (using UOPstream) and before doing any"
157  << " receives (using UIPstream)" << Foam::exit(FatalError);
158  }
159 
160  setOpened();
161  setGood();
162 
164  {
165  // Message is already received into externalBuf
166  messageSize_ = buffers.recvBuf_[fromProcNo].size();
167 
168  if (debug)
169  {
170  Pout<< "UIPstream::UIPstream PstreamBuffers :"
171  << " fromProcNo:" << fromProcNo
172  << " tag:" << tag_ << " comm:" << comm_
173  << " receive buffer size:" << messageSize_
174  << Foam::endl;
175  }
176  }
177  else
178  {
179  MPI_Status status;
180 
181  label wantedSize = externalBuf_.capacity();
182 
183  if (debug)
184  {
185  Pout<< "UIPstream::UIPstream PstreamBuffers :"
186  << " read from:" << fromProcNo
187  << " tag:" << tag_ << " comm:" << comm_
188  << " wanted size:" << wantedSize
189  << Foam::endl;
190  }
191 
192  // If the buffer size is not specified, probe the incoming message
193  // and set it
194  if (!wantedSize)
195  {
197 
198  MPI_Probe
199  (
200  fromProcNo_,
201  tag_,
203  &status
204  );
205  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
206 
207  // Assume these are from gathers ...
209 
210  externalBuf_.setCapacity(messageSize_);
211  wantedSize = messageSize_;
212 
213  if (debug)
214  {
215  Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
216  << wantedSize << Foam::endl;
217  }
218  }
219 
220  messageSize_ = UIPstream::read
221  (
222  commsType(),
223  fromProcNo_,
224  externalBuf_.begin(),
225  wantedSize,
226  tag_,
227  comm_
228  );
229 
230  // Set addressed size. Leave actual allocated memory intact.
231  externalBuf_.setSize(messageSize_);
232 
233  if (!messageSize_)
234  {
235  setEof();
236  }
237  }
238 }
239 
240 
241 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
242 
243 Foam::label Foam::UIPstream::read
244 (
245  const commsTypes commsType,
246  const int fromProcNo,
247  char* buf,
248  const std::streamsize bufSize,
249  const int tag,
250  const label communicator
251 )
252 {
253  if (debug)
254  {
255  Pout<< "UIPstream::read : starting read from:" << fromProcNo
256  << " tag:" << tag << " comm:" << communicator
257  << " wanted size:" << label(bufSize)
258  << " commsType:" << UPstream::commsTypeNames[commsType]
259  << Foam::endl;
260  }
261  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
262  {
263  Pout<< "UIPstream::read : starting read from:" << fromProcNo
264  << " tag:" << tag << " comm:" << communicator
265  << " wanted size:" << label(bufSize)
266  << " commsType:" << UPstream::commsTypeNames[commsType]
267  << " warnComm:" << UPstream::warnComm
268  << Foam::endl;
270  }
271 
273 
274  if (commsType == commsTypes::blocking || commsType == commsTypes::scheduled)
275  {
276  MPI_Status status;
277 
278  if
279  (
280  MPI_Recv
281  (
282  buf,
283  bufSize,
284  MPI_BYTE,
285  fromProcNo,
286  tag,
287  PstreamGlobals::MPICommunicators_[communicator],
288  &status
289  )
290  )
291  {
293  << "MPI_Recv cannot receive incoming message"
295 
296  return 0;
297  }
298 
300 
301  // Check size of message read
302 
303  int messageSize;
304  MPI_Get_count(&status, MPI_BYTE, &messageSize);
305 
306  if (debug)
307  {
308  Pout<< "UIPstream::read : finished read from:" << fromProcNo
309  << " tag:" << tag << " read size:" << label(bufSize)
310  << " commsType:" << UPstream::commsTypeNames[commsType]
311  << Foam::endl;
312  }
313 
314  if (messageSize > bufSize)
315  {
317  << "buffer (" << label(bufSize)
318  << ") not large enough for incoming message ("
319  << messageSize << ')'
321  }
322 
323  return messageSize;
324  }
325  else if (commsType == commsTypes::nonBlocking)
326  {
327  MPI_Request request;
328 
329  if
330  (
331  MPI_Irecv
332  (
333  buf,
334  bufSize,
335  MPI_BYTE,
336  fromProcNo,
337  tag,
338  PstreamGlobals::MPICommunicators_[communicator],
339  &request
340  )
341  )
342  {
344  << "MPI_Recv cannot start non-blocking receive"
346 
347  return 0;
348  }
349 
351 
352  if (debug)
353  {
354  Pout<< "UIPstream::read : started read from:" << fromProcNo
355  << " tag:" << tag << " read size:" << label(bufSize)
356  << " commsType:" << UPstream::commsTypeNames[commsType]
357  << " request:" << PstreamGlobals::outstandingRequests_.size()
358  << Foam::endl;
359  }
360 
361  PstreamGlobals::outstandingRequests_.append(request);
362 
363  // Assume the message is completely received.
364  return bufSize;
365  }
366 
368  << "Unsupported communications type " << int(commsType)
370 
371  return 0;
372 }
373 
374 
375 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:298
Foam::UPstream::commsTypes::nonBlocking
Foam::profilingPstream::addWaitTime
static void addWaitTime()
Add time increment to waitTime.
Definition: profilingPstream.H:174
Foam::error::printStack
static void printStack(Ostream &os)
Helper function to print a stack.
Definition: dummyPrintStack.C:36
IOstreams.H
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Foam::IOstream::setEof
void setEof()
Set stream to have reached eof.
Definition: IOstream.H:348
Foam::profilingPstream::beginTiming
static void beginTiming()
Update timer prior to measurement.
Definition: profilingPstream.H:138
Foam::PstreamGlobals::outstandingRequests_
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Definition: PstreamGlobals.C:32
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:350
Foam::DynamicList::capacity
label capacity() const noexcept
Size of the underlying storage.
Definition: DynamicListI.H:224
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::IOstream::setOpened
void setOpened()
Set stream opened.
Definition: IOstream.H:123
Foam::PstreamGlobals::MPICommunicators_
DynamicList< MPI_Comm > MPICommunicators_
Definition: PstreamGlobals.C:39
Foam::UIPstream::read
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:81
format
word format(conversionProperties.get< word >("format"))
Foam::profilingPstream::addGatherTime
static void addGatherTime()
Add time increment to gatherTime.
Definition: profilingPstream.H:156
Foam::UPstream::commsType
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:548
Foam::UPstream::commsTypeNames
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:77
Foam::UPstream::commsTypes::scheduled
Foam::FatalError
error FatalError
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
Foam::DynamicList::setSize
void setSize(const label nElem)
Alter addressable list size.
Definition: DynamicListI.H:282
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::DynamicList::setCapacity
void setCapacity(const label nElem)
Alter the size of the underlying storage.
Definition: DynamicListI.H:232
Foam::foamVersion::version
const std::string version
OpenFOAM version (name or stringified number) as a std::string.
PstreamGlobals.H
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:381
Foam::IOstream::setGood
void setGood()
Set stream to be good.
Definition: IOstream.H:141
Foam::UIPstream::UIPstream
UIPstream(const commsTypes commsType, const int fromProcNo, DynamicList< char > &externalBuf, label &externalBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, streamFormat format=BINARY, versionNumber version=currentVersion)
Construct given process index to read from and optional buffer size,.
Definition: UIPread.C:36
UIPstream.H
profilingPstream.H