PstreamBuffers.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2021-2022 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "PstreamBuffers.H"
30#include "bitSet.H"
31
32// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
33
34void Foam::PstreamBuffers::finalExchange
35(
36 labelList& recvSizes,
37 const bool wait
38)
39{
40 // Could also check that it is not called twice
41 // but that is used for overlapping send/recv (eg, overset)
42 finishedSendsCalled_ = true;
43
44 if (commsType_ == UPstream::commsTypes::nonBlocking)
45 {
46 // all-to-all
47 Pstream::exchangeSizes(sendBuf_, recvSizes, comm_);
48
49 Pstream::exchange<DynamicList<char>, char>
50 (
51 sendBuf_,
52 recvSizes,
53 recvBuf_,
54 tag_,
55 comm_,
56 wait
57 );
58 }
59}
60
61
62void Foam::PstreamBuffers::finalExchange
63(
64 const labelUList& sendProcs,
65 const labelUList& recvProcs,
66 labelList& recvSizes,
67 const bool wait
68)
69{
70 // Could also check that it is not called twice
71 // but that is used for overlapping send/recv (eg, overset)
72 finishedSendsCalled_ = true;
73
74 if (commsType_ == UPstream::commsTypes::nonBlocking)
75 {
77 (
78 sendProcs,
79 recvProcs,
80 sendBuf_,
81 recvSizes,
82 tag_,
83 comm_
84 );
85
86 Pstream::exchange<DynamicList<char>, char>
87 (
88 sendBuf_,
89 recvSizes,
90 recvBuf_,
91 tag_,
92 comm_,
93 wait
94 );
95 }
96}
97
98
99void Foam::PstreamBuffers::finalExchangeGatherScatter
100(
101 const bool isGather,
102 const bool wait
103)
104{
105 // Could also check that it is not called twice
106 // but that is used for overlapping send/recv (eg, overset)
107 finishedSendsCalled_ = true;
108
109 if (commsType_ == UPstream::commsTypes::nonBlocking)
110 {
111 labelList recvSizes;
112
113 if (isGather)
114 {
115 // gather mode (all-to-one): master [0] <- everyone
116
117 recvSizes = UPstream::listGatherValues(sendBuf_[0].size(), comm_);
118
119 if (!UPstream::master(comm_))
120 {
121 recvSizes.resize_nocopy(recvBuf_.size());
122 recvSizes = Zero;
123 }
124 }
125 else
126 {
127 // scatter mode (one-to-all): master [0] -> everyone
128
129 recvSizes.resize_nocopy(sendBuf_.size());
130
131 if (UPstream::master(comm_))
132 {
133 forAll(sendBuf_, proci)
134 {
135 recvSizes[proci] = sendBuf_[proci].size();
136 }
137 }
138
139 const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
140
141 recvSizes = Zero;
142 recvSizes[0] = myRecv;
143 }
144
145
146 Pstream::exchange<DynamicList<char>, char>
147 (
148 sendBuf_,
149 recvSizes,
150 recvBuf_,
151 tag_,
152 comm_,
153 wait
154 );
155 }
156}
157
158
159// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
160
162(
163 const UPstream::commsTypes commsType,
164 const int tag,
165 const label comm,
167)
168:
169 finishedSendsCalled_(false),
170 allowClearRecv_(true),
171 format_(fmt),
172 commsType_(commsType),
173 tag_(tag),
174 comm_(comm),
175 sendBuf_(UPstream::nProcs(comm_)),
176 recvBuf_(UPstream::nProcs(comm_)),
177 recvBufPos_(UPstream::nProcs(comm_), Zero)
178{}
179
180
182(
183 const label comm,
184 const UPstream::commsTypes commsType,
185 const int tag,
187)
188:
189 finishedSendsCalled_(false),
190 allowClearRecv_(true),
191 format_(fmt),
192 commsType_(commsType),
193 tag_(tag),
194 comm_(comm),
195 sendBuf_(UPstream::nProcs(comm_)),
196 recvBuf_(UPstream::nProcs(comm_)),
197 recvBufPos_(UPstream::nProcs(comm_), Zero)
198{}
199
200
201// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
202
204{
205 // Check that all data has been consumed.
206 forAll(recvBufPos_, proci)
207 {
208 if (recvBufPos_[proci] < recvBuf_[proci].size())
209 {
211 << "Message from processor " << proci
212 << " Only consumed " << recvBufPos_[proci] << " of "
213 << recvBuf_[proci].size() << " bytes" << nl
215 }
216 }
217}
218
219
220// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
221
223{
224 for (DynamicList<char>& buf : sendBuf_)
225 {
226 buf.clear();
227 }
228 for (DynamicList<char>& buf : recvBuf_)
229 {
230 buf.clear();
231 }
232 recvBufPos_ = 0;
233
234 finishedSendsCalled_ = false;
235}
236
237
238void Foam::PstreamBuffers::clearRecv(const label proci)
239{
240 recvBuf_[proci].clear();
241 recvBufPos_[proci] = 0;
242}
243
244
246{
247 // Could also clear out entire sendBuf_, recvBuf_ and reallocate.
248 // Not sure if it makes much difference
249 for (DynamicList<char>& buf : sendBuf_)
250 {
251 buf.clearStorage();
252 }
253 for (DynamicList<char>& buf : recvBuf_)
254 {
255 buf.clearStorage();
256 }
257 recvBufPos_ = 0;
258
259 finishedSendsCalled_ = false;
260}
261
262
264{
265 for (const DynamicList<char>& buf : sendBuf_)
266 {
267 if (!buf.empty())
268 {
269 return true;
270 }
271 }
272 return false;
273}
274
275
277{
278 if (finishedSendsCalled_)
279 {
280 forAll(recvBufPos_, proci)
281 {
282 if (recvBuf_[proci].size() > recvBufPos_[proci])
283 {
284 return true;
285 }
286 }
287 }
288 #ifdef FULLDEBUG
289 else
290 {
292 << "Call finishedSends first" << exit(FatalError);
293 }
294 #endif
295
296 return false;
297}
298
299
300Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
301{
302 return sendBuf_[proci].size();
303}
304
305
306Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
307{
308 if (finishedSendsCalled_)
309 {
310 const label len(recvBuf_[proci].size() > recvBufPos_[proci]);
311
312 if (len > 0)
313 {
314 return len;
315 }
316 }
317 #ifdef FULLDEBUG
318 else
319 {
321 << "Call finishedSends first" << exit(FatalError);
322 }
323 #endif
324
325 return 0;
326}
327
328
330{
331 labelList counts(recvBuf_.size(), Zero);
332
333 if (finishedSendsCalled_)
334 {
335 forAll(recvBufPos_, proci)
336 {
337 const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
338
339 if (len > 0)
340 {
341 counts[proci] = len;
342 }
343 }
344 }
345 #ifdef FULLDEBUG
346 else
347 {
349 << "Call finishedSends first" << exit(FatalError);
350 }
351 #endif
352
353 return counts;
354}
355
356
359{
360 if (finishedSendsCalled_)
361 {
362 const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
363
364 if (len > 0)
365 {
366 return UList<char>
367 (
368 const_cast<char*>(&recvBuf_[proci][recvBufPos_[proci]]),
369 len
370 );
371 }
372 }
373 #ifdef FULLDEBUG
374 else
375 {
377 << "Call finishedSends first" << exit(FatalError);
378 }
379 #endif
380
381 return UList<char>();
382}
383
384
386{
387 bool old(allowClearRecv_);
388 allowClearRecv_ = on;
389 return old;
390}
391
392
394{
395 labelList recvSizes;
396 finalExchange(recvSizes, wait);
397}
398
399
401(
402 labelList& recvSizes,
403 const bool wait
404)
405{
406 finalExchange(recvSizes, wait);
407
408 if (commsType_ != UPstream::commsTypes::nonBlocking)
409 {
411 << "Obtaining sizes not supported in "
412 << UPstream::commsTypeNames[commsType_] << endl
413 << " since transfers already in progress. Use non-blocking instead."
414 << exit(FatalError);
415
416 // Note: maybe possible only if using different tag from write started
417 // by ~UOPstream. Needs some work.
418 }
419}
420
421
423(
424 const labelUList& sendProcs,
425 const labelUList& recvProcs,
426 const bool wait
427)
428{
429 labelList recvSizes;
430 finalExchange(sendProcs, recvProcs, recvSizes, wait);
431}
432
433
435(
436 const labelUList& sendProcs,
437 const labelUList& recvProcs,
438 labelList& recvSizes,
439 const bool wait
440)
441{
442 finalExchange(sendProcs, recvProcs, recvSizes, wait);
443
444 if (commsType_ != UPstream::commsTypes::nonBlocking)
445 {
447 << "Obtaining sizes not supported in "
448 << UPstream::commsTypeNames[commsType_] << endl
449 << " since transfers already in progress. Use non-blocking instead."
450 << exit(FatalError);
451
452 // Note: maybe possible only if using different tag from write started
453 // by ~UOPstream. Needs some work.
454 }
455}
456
457
459(
460 bitSet& sendConnections,
461 DynamicList<label>& sendProcs,
462 DynamicList<label>& recvProcs,
463 const bool wait
464)
465{
466 bool changed = (sendConnections.size() != nProcs());
467
468 if (changed)
469 {
470 sendConnections.resize(nProcs());
471 }
472
473 // Update send connections
474 // - reasonable to assume there are no self-sends on UPstream::myProcNo
475 forAll(sendBuf_, proci)
476 {
477 // ie, sendDataCount(proci) != 0
478 if (sendConnections.set(proci, !sendBuf_[proci].empty()))
479 {
480 // The state changed
481 changed = true;
482 }
483 }
484
485 reduce(changed, orOp<bool>());
486
487 if (changed)
488 {
489 // Create send/recv topology
490
491 // The send ranks
492 sendProcs.clear();
493 forAll(sendBuf_, proci)
494 {
495 // ie, sendDataCount(proci) != 0
496 if (!sendBuf_[proci].empty())
497 {
498 sendProcs.append(proci);
499 }
500 }
501
502 finishedSends(wait); // All-to-all
503
504 // The recv ranks
505 recvProcs.clear();
506 forAll(recvBuf_, proci)
507 {
508 // ie, recvDataCount(proci)
509 if (!recvBuf_[proci].empty())
510 {
511 recvProcs.append(proci);
512 }
513 }
514 }
515 else
516 {
517 // Use existing send/recv ranks
518
519 finishedSends(sendProcs, recvProcs, wait);
520 }
521
522 return changed;
523}
524
525
527{
528 finalExchangeGatherScatter(true, wait);
529}
530
531
533{
534 finalExchangeGatherScatter(false, wait);
535}
536
537
539(
540 labelList& recvSizes,
541 const bool wait
542)
543{
544 finalExchangeGatherScatter(true, wait);
545
546 if (commsType_ != UPstream::commsTypes::nonBlocking)
547 {
549 << "Obtaining sizes not supported in "
550 << UPstream::commsTypeNames[commsType_] << endl
551 << " since transfers already in progress. Use non-blocking instead."
552 << exit(FatalError);
553
554 // Note: maybe possible only if using different tag from write started
555 // by ~UOPstream. Needs some work.
556 }
557
558 // For nonBlocking mode, simply recover received sizes
559 // from the buffers themselves.
560
561 recvSizes = recvDataCounts();
562}
563
564
566(
567 labelList& recvSizes,
568 const bool wait
569)
570{
571 finalExchangeGatherScatter(false, wait);
572
573 if (commsType_ != UPstream::commsTypes::nonBlocking)
574 {
576 << "Obtaining sizes not supported in "
577 << UPstream::commsTypeNames[commsType_] << endl
578 << " since transfers already in progress. Use non-blocking instead."
579 << exit(FatalError);
580
581 // Note: maybe possible only if using different tag from write started
582 // by ~UOPstream. Needs some work.
583 }
584
585 // For nonBlocking mode, simply recover received sizes
586 // from the buffers themselves.
587
588 recvSizes = recvDataCounts();
589}
590
591
592// ************************************************************************* //
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition: DynamicList.H:72
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
Definition: DynamicListI.H:391
void append(const T &val)
Copy append an element to the end of this list.
Definition: DynamicListI.H:503
streamFormat
Data format (ascii | binary)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
Definition: PackedListI.H:409
label size() const noexcept
Number of entries.
Definition: PackedListI.H:377
Buffers for inter-processor communications streams (UOPstream, UIPstream).
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
const UList< char > peekRecvData(const label proci) const
bool allowClearRecv() const noexcept
void clearStorage()
Clear individual buffer storage and reset states.
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
bool hasSendData() const
True if any (local) send buffers have data.
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
labelList recvDataCounts() const
bool hasRecvData() const
label recvDataCount(const label proci) const
void finishedSends(const bool wait=true)
Mark sends as done.
void clear()
Clear individual buffers and reset states.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendData, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:94
Inter-processor communications stream.
Definition: UPstream.H:59
commsTypes
Types of communications.
Definition: UPstream.H:67
@ nonBlocking
"nonBlocking"
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:74
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:66
void set(const bitSet &bitset)
Set specified bits from another bitset.
Definition: bitSetI.H:590
splitCell * master() const
Definition: splitCell.H:113
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
List< label > labelList
A List of labels.
Definition: List.H:66
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
void reduce(const List< UPstream::commsStruct > &comms, T &value, const BinaryOp &bop, const int tag, const label comm)
errorManip< error > abort(error &err)
Definition: errorManip.H:144
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:131
error FatalError
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
UList< label > labelUList
A UList of labels.
Definition: UList.H:85
constexpr char nl
The newline '\n' character (0x0a)
Definition: Ostream.H:53
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333