PstreamBuffers.H
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2021-2022 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27Class
28 Foam::PstreamBuffers
29
30Description
31 Buffers for inter-processor communications streams (UOPstream, UIPstream).
32
33 Use UOPstream to stream data into buffers, call finishedSends() to
34 notify that data is in buffers and then use IUPstream to get data out
35 of received buffers. Works with both blocking and nonBlocking. Does
36 not make much sense with scheduled since there you would not need these
37 explicit buffers.
38
39 Example usage:
40 \code
41 PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
42
43 for (const int proci : Pstream::allProcs())
44 {
45 if (proci != Pstream::myProcNo())
46 {
47 someObject vals;
48
49 UOPstream send(proci, pBufs);
50 send << vals;
51 }
52 }
53
54 pBufs.finishedSends(); // no-op for blocking
55
56 for (const int proci : Pstream::allProcs())
57 {
58 if (proci != Pstream::myProcNo())
59 {
60 UIPstream recv(proci, pBufs);
61 someObject vals(recv);
62 }
63 }
64 \endcode
65
66 There are additional special versions of finishedSends() for
67 restricted neighbour communication as well as for special
68 one-to-all and all-to-one communication patterns.
69 For example,
70 \code
71 PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
72
73 if (Pstream::master())
74 {
75 someObject vals;
76 for (const int proci : Pstream::subProcs())
77 {
78 UOPstream send(proci, pBufs);
79 send << vals;
80 }
81 }
82
83 pBufs.finishedScatters();
84
85 if (!Pstream::master())
86 {
87 UIPstream recv(Pstream::masterNo(), pBufs);
88 someObject vals(recv);
89 }
90 \endcode
91
92SourceFiles
93 PstreamBuffers.C
94
95\*---------------------------------------------------------------------------*/
96
97#include "Pstream.H"
98
99#ifndef Foam_PstreamBuffers_H
100#define Foam_PstreamBuffers_H
101
102#include "DynamicList.H"
103#include "UPstream.H"
104#include "IOstream.H"
105
106// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
107
108namespace Foam
109{
110
111// Forward Declarations
112class bitSet;
113
114/*---------------------------------------------------------------------------*\
115 Class PstreamBuffers Declaration
116\*---------------------------------------------------------------------------*/
118class PstreamBuffers
119{
120 // Friendship
121 friend class UOPstreamBase; // Access to sendBuf_
122 friend class UIPstreamBase; // Access to recvBuf_, recvBufPos_
123
124
125 // Private Data
126
127 //- Track if sends are complete
128 bool finishedSendsCalled_;
129
130 //- Permit clear of individual receive buffer by external access
131 bool allowClearRecv_;
132
133 //- Buffer format (ascii | binary)
134 const IOstreamOption::streamFormat format_;
135
136 //- Communications type of this stream
137 const UPstream::commsTypes commsType_;
138
139 //- The transfer message type
140 const int tag_;
141
142 //- Communicator
143 const label comm_;
144
145 //- Send buffer. Size is nProcs()
146 List<DynamicList<char>> sendBuf_;
147
148 //- Receive buffer. Size is nProcs()
149 List<DynamicList<char>> recvBuf_;
150
151 //- Current read positions within recvBuf_. Size is nProcs()
152 labelList recvBufPos_;
153
154
155 // Private Member Functions
156
157 //- Mark all sends as having been done.
158 // This will start receives (nonBlocking comms).
159 void finalExchange(labelList& recvSizes, const bool wait);
160
161 //- Mark sends as done.
162 // Only exchange sizes using the sendProcs/recvProcs subset
163 // (nonBlocking comms).
164 void finalExchange
165 (
166 const labelUList& sendProcs,
167 const labelUList& recvProcs,
168 labelList& recvSizes,
169 const bool wait
170 );
171
172 //- For all-to-one or one-to-all
173 void finalExchangeGatherScatter(const bool isGather, const bool wait);
174
175
176public:
177
178 // Constructors
179
180 //- Construct given comms type, message tag, communicator, IO format
181 explicit PstreamBuffers
182 (
184 const int tag = UPstream::msgType(),
185 const label comm = UPstream::worldComm,
187 );
188
189 //- Construct given communicator, comms type, message tag, IO format
190 explicit PstreamBuffers
191 (
192 const label comm,
194 const int tag = UPstream::msgType(),
196 );
197
198
199 //- Destructor - checks that all data have been consumed
201
202
203 // Member Functions
204
205 // Access
206
207 //- The associated buffer format (ascii | binary)
209 {
210 return format_;
211 }
212
213 //- The communications type of the stream
215 {
216 return commsType_;
217 }
218
219 //- The transfer message type
220 int tag() const noexcept
221 {
222 return tag_;
223 }
224
225 //- Communicator
226 label comm() const noexcept
227 {
228 return comm_;
229 }
230
231
232 // Sizing
233
234 //- Number of ranks associated with PstreamBuffers
235 label nProcs() const noexcept
236 {
237 return recvBufPos_.size();
238 }
239
240 //- Range of ranks indices associated with PstreamBuffers
242 {
243 // Proc 0 -> nProcs (int value)
244 return UPstream::rangeType(static_cast<int>(nProcs()));
245 }
246
247 //- Range of sub-processes indices associated with PstreamBuffers
249 {
250 // Proc 1 -> nProcs (int value)
251 return UPstream::rangeType(1, static_cast<int>(nProcs()-1));
252 }
253
254
255 // Queries
256
257 //- True if finishedSends() or finishedNeighbourSends() has been called
258 bool finished() const noexcept
259 {
260 return finishedSendsCalled_;
261 }
262
263 //- Is clearStorage of individual receive buffer by external hooks
264 //- allowed? (default: true)
265 bool allowClearRecv() const noexcept
266 {
267 return allowClearRecv_;
268 }
269
270 //- True if any (local) send buffers have data
271 bool hasSendData() const;
272
273 //- True if any (local) recv buffers have unconsumed data.
274 //- Must call finishedSends() or other finished.. method first!
275 bool hasRecvData() const;
276
277 //- Number of send bytes for the specified processor.
278 label sendDataCount(const label proci) const;
279
280 //- Number of unconsumed receive bytes for the specified processor.
281 //- Must call finishedSends() or other finished.. method first!
282 label recvDataCount(const label proci) const;
283
284 //- Number of unconsumed receive bytes for all processors.
285 //- Must call finishedSends() or other finished.. method first!
287
288 //- Number of unconsumed receive bytes for the specified processor.
289 //- Must call finishedSends() or other finished.. method first!
290 // The method is only useful in limited situations, such as when
291 // PstreamBuffers has been used to fill contiguous data
292 // (eg, using OPstream::write).
293 const UList<char> peekRecvData(const label proci) const;
294
295
296 // Edit
297
298 //- Clear individual buffers and reset states.
299 // Does not remove the buffer storage.
300 void clear();
301
302 //- Clear an individual receive buffer (eg, data not required)
303 // Does not remove the buffer storage.
304 void clearRecv(const label proci);
305
306 //- Clear individual buffer storage and reset states.
307 void clearStorage();
308
309 //- Change allowClearRecv, return previous value
310 bool allowClearRecv(bool on) noexcept;
311
312
313 // Regular Functions
314
315 //- Mark sends as done
316 //
317 // Non-blocking mode: populates receive buffers (all-to-all).
318 // \param wait wait for requests to complete (in nonBlocking mode)
319 void finishedSends(const bool wait = true);
320
321 //- Mark sends as done.
322 //- Recovers the sizes (bytes) received.
323 //
324 // Non-blocking mode: populates receive buffers (all-to-all).
325 // \param[out] recvSizes the sizes (bytes) received
326 // \param wait wait for requests to complete (in nonBlocking mode)
327 //
328 // \warning currently only valid for nonBlocking comms.
329 void finishedSends(labelList& recvSizes, const bool wait = true);
330
331
332 // Functions with restricted neighbours
333
334 //- Mark sends as done using subset of send/recv ranks
335 //- to exchange data on.
336 //
337 // Non-blocking mode: populates receive buffers.
338 // \param sendProcs ranks used for sends
339 // \param recvProcs ranks used for recvs
340 // \param wait wait for requests to complete (in nonBlocking mode)
341 //
342 // \warning currently only valid for nonBlocking comms.
343 void finishedSends
344 (
345 const labelUList& sendProcs,
346 const labelUList& recvProcs,
347 const bool wait = true
348 );
349
350 //- Mark sends as done using subset of send/recv ranks
351 //- to exchange data on. Recovers the sizes (bytes) received.
352 //
353 // Non-blocking mode: populates receive buffers.
354 //
355 // \param sendProcs ranks used for sends
356 // \param recvProcs ranks used for recvs
357 // \param[out] recvSizes the sizes (bytes) received
358 // \param wait wait for requests to complete (in nonBlocking mode)
359 //
360 // \warning currently only valid for nonBlocking comms.
361 void finishedSends
362 (
363 const labelUList& sendProcs,
364 const labelUList& recvProcs,
365 labelList& recvSizes,
366 const bool wait = true
367 );
368
369 //- A caching version that uses a limited send/recv connectivity.
370 //
371 // Non-blocking mode: populates receive buffers.
372 // \param sendConnections on/off for sending ranks
373 // \param sendProcs ranks used for sends
374 // \param recvProcs ranks used for recvs
375 // \param wait wait for requests to complete (in nonBlocking mode)
376 //
377 // \return True if the send/recv connectivity changed
378 //
379 // \warning currently only valid for nonBlocking comms.
380 bool finishedSends
381 (
382 bitSet& sendConnections,
383 DynamicList<label>& sendProcs,
384 DynamicList<label>& recvProcs,
385 const bool wait = true
386 );
387
388 //- Mark sends as done using subset of send/recv ranks
389 //- and recover the sizes (bytes) received.
390 //
391 // Non-blocking mode: populates receive buffers.
392 //
393 // \param neighProcs ranks used for sends/recvs
394 // \param wait wait for requests to complete (in nonBlocking mode)
395 //
396 // \warning currently only valid for nonBlocking comms.
397 // \note Same as finishedSends with identical sendProcs/recvProcs
399 (
400 const labelUList& neighProcs,
401 const bool wait = true
402 )
403 {
404 finishedSends(neighProcs, neighProcs, wait);
405 }
406
407 //- Mark sends as done using subset of send/recv ranks
408 //- and recover the sizes (bytes) received.
409 //
410 // Non-blocking mode: it will populate receive buffers.
411 //
412 // \param neighProcs ranks used for sends/recvs
413 // \param[out] recvSizes the sizes (bytes) received
414 // \param wait wait for requests to complete (in nonBlocking mode)
415 //
416 // \warning currently only valid for nonBlocking mode.
418 (
419 const labelUList& neighProcs,
420 labelList& recvSizes,
421 const bool wait = true
422 )
423 {
424 finishedSends(neighProcs, neighProcs, recvSizes, wait);
425 }
426
427
428 // Gather/scatter modes
429
430 //- Mark all sends to master as done.
431 //
432 // Non-blocking mode: populates receive buffers.
433 // Can use recvDataCounts() method to recover sizes received.
434 //
435 // \param wait wait for requests to complete (in nonBlocking mode)
436 //
437 // \warning currently only valid for nonBlocking comms.
438 void finishedGathers(const bool wait = true);
439
440 //- Mark all sends to master as done.
441 //- Recovers the sizes (bytes) received.
442 //
443 // Non-blocking mode: populates receive buffers (all-to-one).
444 // \param[out] recvSizes the sizes (bytes) received
445 // \param wait wait for requests to complete (in nonBlocking mode)
446 //
447 // \warning currently only valid for nonBlocking comms.
448 void finishedGathers(labelList& recvSizes, const bool wait = true);
449
450 //- Mark all sends to sub-procs as done.
451 //
452 // Non-blocking mode: populates receive buffers.
453 // Can use recvDataCounts() method to recover sizes received.
454 //
455 // \param wait wait for requests to complete (in nonBlocking mode)
456 //
457 // \warning currently only valid for nonBlocking comms.
458 void finishedScatters(const bool wait = true);
459
460 //- Mark all sends to sub-procs as done.
461 //- Recovers the sizes (bytes) received.
462 //
463 // Non-blocking mode: populates receive buffers (all-to-one).
464 // \param[out] recvSizes the sizes (bytes) received
465 // \param wait wait for requests to complete (in nonBlocking mode)
466 //
467 // \warning currently only valid for nonBlocking comms.
468 void finishedScatters(labelList& recvSizes, const bool wait = true);
469};
470
471
472// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
473
474} // End namespace Foam
475
476// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
477
478#endif
479
480// ************************************************************************* //
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition: DynamicList.H:72
streamFormat
Data format (ascii | binary)
An interval of (signed) integers defined by a start and a size.
Definition: IntRange.H:64
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: List.H:77
Buffers for inter-processor communications streams (UOPstream, UIPstream).
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
const UList< char > peekRecvData(const label proci) const
UPstream::rangeType allProcs() const noexcept
Range of ranks indices associated with PstreamBuffers.
UPstream::commsTypes commsType() const noexcept
The communications type of the stream.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
bool allowClearRecv() const noexcept
int tag() const noexcept
The transfer message type.
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
void clearStorage()
Clear individual buffer storage and reset states.
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
bool hasSendData() const
True if any (local) send buffers have data.
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
labelList recvDataCounts() const
bool hasRecvData() const
label recvDataCount(const label proci) const
void finishedSends(const bool wait=true)
Mark sends as done.
label comm() const noexcept
Communicator.
void clear()
Clear individual buffers and reset states.
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
UPstream::rangeType subProcs() const noexcept
Range of sub-processes indices associated with PstreamBuffers.
IOstreamOption::streamFormat format() const noexcept
The associated buffer format (ascii | binary)
Base class for input inter-processor communications stream (ie, parallel streams)....
Definition: UIPstream.H:62
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
Base class for output inter-processor communications stream (ie, parallel streams)....
Definition: UOPstream.H:62
commsTypes
Types of communications.
Definition: UPstream.H:67
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:556
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
IntRange< int > rangeType
Int ranges are used for MPI ranks (processes)
Definition: UPstream.H:63
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:66
Namespace for OpenFOAM.
List< label > labelList
A List of labels.
Definition: List.H:66
const direction noexcept
Definition: Scalar.H:223
UList< label > labelUList
A UList of labels.
Definition: UList.H:85