PstreamCombineGather.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2019-2022 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27Description
28 Variant of gather, scatter.
29 Normal gather uses:
30 - default construct and read (>>) from Istream
31 - binary operator and assignment operator to combine values
32
33 combineGather uses:
34 - construct from Istream
35 - modify operator which modifies its lhs
36
37\*---------------------------------------------------------------------------*/
38
39#include "OPstream.H"
40#include "IPstream.H"
41#include "IOstreams.H"
42#include "contiguous.H"
43
44// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
45
46template<class T, class CombineOp>
48(
49 const List<UPstream::commsStruct>& comms,
50 T& value,
51 const CombineOp& cop,
52 const int tag,
53 const label comm
54)
55{
56 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
57 {
58 // My communication order
59 const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
60
61 // Receive from my downstairs neighbours
62 for (const label belowID : myComm.below())
63 {
65 {
66 T received;
67
69 (
71 belowID,
72 reinterpret_cast<char*>(&received),
73 sizeof(T),
74 tag,
75 comm
76 );
77
78 if (debug & 2)
79 {
80 Pout<< " received from "
81 << belowID << " data:" << received << endl;
82 }
83
84 cop(value, received);
85 }
86 else
87 {
88 IPstream fromBelow
89 (
91 belowID,
92 0,
93 tag,
94 comm
95 );
96 T received(fromBelow);
97
98 if (debug & 2)
99 {
100 Pout<< " received from "
101 << belowID << " data:" << received << endl;
102 }
103
104 cop(value, received);
105 }
106 }
107
108 // Send up value
109 if (myComm.above() != -1)
110 {
111 if (debug & 2)
112 {
113 Pout<< " sending to " << myComm.above()
114 << " data:" << value << endl;
115 }
116
118 {
120 (
122 myComm.above(),
123 reinterpret_cast<const char*>(&value),
124 sizeof(T),
125 tag,
126 comm
127 );
128 }
129 else
130 {
131 OPstream toAbove
132 (
134 myComm.above(),
135 0,
136 tag,
137 comm
138 );
139 toAbove << value;
140 }
141 }
142 }
143}
144
145
146template<class T>
148(
149 const List<UPstream::commsStruct>& comms,
150 T& value,
151 const int tag,
152 const label comm
153)
154{
155 #ifndef Foam_Pstream_scatter_nobroadcast
156 Pstream::broadcast(value, comm);
157 #else
158 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
159 {
160 // My communication order
161 const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
162
163 // Receive from up
164 if (myComm.above() != -1)
165 {
167 {
169 (
171 myComm.above(),
172 reinterpret_cast<char*>(&value),
173 sizeof(T),
174 tag,
175 comm
176 );
177 }
178 else
179 {
180 IPstream fromAbove
181 (
183 myComm.above(),
184 0,
185 tag,
186 comm
187 );
188 value = T(fromAbove);
189 }
190 }
191
192 // Send to my downstairs neighbours
193 forAllReverse(myComm.below(), belowI)
194 {
195 const label belowID = myComm.below()[belowI];
196
198 {
200 (
202 belowID,
203 reinterpret_cast<const char*>(&value),
204 sizeof(T),
205 tag,
206 comm
207 );
208 }
209 else
210 {
211 OPstream toBelow
212 (
214 belowID,
215 0,
216 tag,
217 comm
218 );
219 toBelow << value;
220 }
221 }
222 }
223 #endif
224}
225
226
227template<class T, class CombineOp>
229(
230 T& value,
231 const CombineOp& cop,
232 const int tag,
233 const label comm
234)
235{
237 (
239 value,
240 cop,
241 tag,
242 comm
243 );
244}
245
246
247template<class T>
249(
250 T& value,
251 const int tag,
252 const label comm
253)
254{
255 #ifndef Foam_Pstream_scatter_nobroadcast
256 Pstream::broadcast(value, comm);
257 #else
258 combineScatter(UPstream::whichCommunication(comm), value, tag, comm);
259 #endif
260}
261
262
263template<class T, class CombineOp>
265(
266 const List<UPstream::commsStruct>& comms,
267 T& value,
268 const CombineOp& cop,
269 const int tag,
270 const label comm
271)
272{
273 Pstream::combineGather(comms, value, cop, tag, comm);
274 Pstream::broadcast(value, comm);
275}
276
277
278template<class T, class CombineOp>
280(
281 T& value,
282 const CombineOp& cop,
283 const int tag,
284 const label comm
285)
286{
287 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
288 {
289 const auto& comms = UPstream::whichCommunication(comm);
290
291 Pstream::combineGather(comms, value, cop, tag, comm);
292 Pstream::broadcast(value, comm);
293 }
294}
295
296
297// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
298
299template<class T, class CombineOp>
301(
302 const List<UPstream::commsStruct>& comms,
303 List<T>& values,
304 const CombineOp& cop,
305 const int tag,
306 const label comm
307)
308{
309 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
310 {
311 // My communication order
312 const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
313
314 // Receive from my downstairs neighbours
315 for (const label belowID : myComm.below())
316 {
318 {
319 List<T> received(values.size());
320
322 (
324 belowID,
325 received.data_bytes(),
326 received.size_bytes(),
327 tag,
328 comm
329 );
330
331 if (debug & 2)
332 {
333 Pout<< " received from "
334 << belowID << " data:" << received << endl;
335 }
336
337 forAll(values, i)
338 {
339 cop(values[i], received[i]);
340 }
341 }
342 else
343 {
344 IPstream fromBelow
345 (
347 belowID,
348 0,
349 tag,
350 comm
351 );
352 List<T> received(fromBelow);
353
354 if (debug & 2)
355 {
356 Pout<< " received from "
357 << belowID << " data:" << received << endl;
358 }
359
360 forAll(values, i)
361 {
362 cop(values[i], received[i]);
363 }
364 }
365 }
366
367 // Send up values
368 if (myComm.above() != -1)
369 {
370 if (debug & 2)
371 {
372 Pout<< " sending to " << myComm.above()
373 << " data:" << values << endl;
374 }
375
377 {
379 (
381 myComm.above(),
382 values.cdata_bytes(),
383 values.size_bytes(),
384 tag,
385 comm
386 );
387 }
388 else
389 {
390 OPstream toAbove
391 (
393 myComm.above(),
394 0,
395 tag,
396 comm
397 );
398 toAbove << values;
399 }
400 }
401 }
402}
403
404
405template<class T>
407(
408 const List<UPstream::commsStruct>& comms,
409 List<T>& values,
410 const int tag,
411 const label comm
412)
413{
414 #ifndef Foam_Pstream_scatter_nobroadcast
415 Pstream::broadcast(values, comm);
416 #else
417 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
418 {
419 // My communication order
420 const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
421
422 // Receive from up
423 if (myComm.above() != -1)
424 {
426 {
428 (
430 myComm.above(),
431 values.data_bytes(),
432 values.size_bytes(),
433 tag,
434 comm
435 );
436 }
437 else
438 {
439 IPstream fromAbove
440 (
442 myComm.above(),
443 0,
444 tag,
445 comm
446 );
447 fromAbove >> values;
448 }
449 }
450
451 // Send to my downstairs neighbours
452 forAllReverse(myComm.below(), belowI)
453 {
454 const label belowID = myComm.below()[belowI];
455
457 {
459 (
461 belowID,
462 values.cdata_bytes(),
463 values.size_bytes(),
464 tag,
465 comm
466 );
467 }
468 else
469 {
470 OPstream toBelow
471 (
473 belowID,
474 0,
475 tag,
476 comm
477 );
478 toBelow << values;
479 }
480 }
481 }
482 #endif
483}
484
485
486template<class T, class CombineOp>
488(
489 List<T>& values,
490 const CombineOp& cop,
491 const int tag,
492 const label comm
493)
494{
496 (
498 values,
499 cop,
500 tag,
501 comm
502 );
503}
504
505
506template<class T>
508(
509 List<T>& values,
510 const int tag,
511 const label comm
512)
513{
514 #ifndef Foam_Pstream_scatter_nobroadcast
515 Pstream::broadcast(values, comm);
516 #else
518 (
520 values,
521 tag,
522 comm
523 );
524 #endif
525}
526
527
528template<class T, class CombineOp>
530(
531 const List<UPstream::commsStruct>& comms,
532 List<T>& values,
533 const CombineOp& cop,
534 const int tag,
535 const label comm
536)
537{
538 Pstream::listCombineGather(comms, values, cop, tag, comm);
539 Pstream::broadcast(values, comm);
540}
541
542
543template<class T, class CombineOp>
545(
546 List<T>& values,
547 const CombineOp& cop,
548 const int tag,
549 const label comm
550)
551{
552 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
553 {
554 const auto& comms = UPstream::whichCommunication(comm);
555
556 Pstream::listCombineGather(comms, values, cop, tag, comm);
557 Pstream::broadcast(values, comm);
558 }
559}
560
561
562// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
563
564template<class Container, class CombineOp>
566(
567 const List<UPstream::commsStruct>& comms,
568 Container& values,
569 const CombineOp& cop,
570 const int tag,
571 const label comm
572)
573{
574 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
575 {
576 // My communication order
577 const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
578
579 // Receive from my downstairs neighbours
580 for (const label belowID : myComm.below())
581 {
582 // Map/HashTable: non-contiguous
583
584 IPstream fromBelow
585 (
587 belowID,
588 0,
589 tag,
590 comm
591 );
592 Container received(fromBelow);
593
594 if (debug & 2)
595 {
596 Pout<< " received from "
597 << belowID << " data:" << received << endl;
598 }
599
600 for
601 (
602 auto recvIter = received.cbegin();
603 recvIter != received.cend();
604 ++recvIter
605 )
606 {
607 auto masterIter = values.find(recvIter.key());
608
609 if (masterIter != values.end()) // == found()
610 {
611 // Combine with existing
612 cop(masterIter.val(), recvIter.val());
613 }
614 else
615 {
616 // Insert new key/value
617 values.insert(recvIter.key(), recvIter.val());
618 }
619 }
620 }
621
622 // Send up values
623 if (myComm.above() != -1)
624 {
625 if (debug & 2)
626 {
627 Pout<< " sending to " << myComm.above()
628 << " data:" << values << endl;
629 }
630
631 OPstream toAbove
632 (
634 myComm.above(),
635 0,
636 tag,
637 comm
638 );
639 toAbove << values;
640 }
641 }
642}
643
644
645template<class Container>
647(
648 const List<UPstream::commsStruct>& comms,
649 Container& values,
650 const int tag,
651 const label comm
652)
653{
654 #ifndef Foam_Pstream_scatter_nobroadcast
655 Pstream::broadcast(values, comm);
656 #else
657 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
658 {
659 // My communication order
660 const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
661
662 // Receive from up
663 if (myComm.above() != -1)
664 {
665 IPstream fromAbove
666 (
668 myComm.above(),
669 0,
670 tag,
671 comm
672 );
673 fromAbove >> values;
674
675 if (debug & 2)
676 {
677 Pout<< " received from "
678 << myComm.above() << " data:" << values << endl;
679 }
680 }
681
682 // Send to my downstairs neighbours
683 forAllReverse(myComm.below(), belowI)
684 {
685 const label belowID = myComm.below()[belowI];
686
687 if (debug & 2)
688 {
689 Pout<< " sending to " << belowID << " data:" << values << endl;
690 }
691
692 OPstream toBelow
693 (
695 belowID,
696 0,
697 tag,
698 comm
699 );
700 toBelow << values;
701 }
702 }
703 #endif
704}
705
706
707template<class Container, class CombineOp>
709(
710 Container& values,
711 const CombineOp& cop,
712 const int tag,
713 const label comm
714)
715{
717 (
719 values,
720 cop,
721 tag,
722 comm
723 );
724}
725
726
727template<class Container>
729(
730 Container& values,
731 const int tag,
732 const label comm
733)
734{
735 #ifndef Foam_Pstream_scatter_nobroadcast
736 Pstream::broadcast(values, comm);
737 #else
739 (
741 values,
742 tag,
743 comm
744 );
745 #endif
746}
747
748
749template<class Container, class CombineOp>
751(
752 const List<UPstream::commsStruct>& comms,
753 Container& values,
754 const CombineOp& cop,
755 const int tag,
756 const label comm
757)
758{
759 Pstream::mapCombineGather(comms, values, cop, tag, comm);
760 Pstream::broadcast(values, comm);
761}
762
763
764template<class Container, class CombineOp>
766(
767 Container& values,
768 const CombineOp& cop,
769 const int tag,
770 const label comm
771)
772{
773 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
774 {
775 const auto& comms = UPstream::whichCommunication(comm);
776
777 Pstream::mapCombineGather(comms, values, cop, tag, comm);
778 Pstream::broadcast(values, comm);
779 }
780}
781
782
783// ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Input inter-processor communications stream.
Definition: IPstream.H:57
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: List.H:77
Output inter-processor communications stream.
Definition: OPstream.H:57
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
static void listCombineAllGather(const List< commsStruct > &comms, List< T > &values, const CombineOp &cop, const int tag, const label comm)
After completion all processors have the same data.
static void combineGather(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag, const label comm)
static void combineScatter(const List< commsStruct > &comms, T &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
static void combineAllGather(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
static void listCombineScatter(const List< commsStruct > &comms, List< T > &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
static void mapCombineScatter(const List< commsStruct > &comms, Container &values, const int tag, const label comm)
Broadcast data: Distribute without modification.
static void broadcast(Type &value, const label comm=UPstream::worldComm)
static void mapCombineGather(const List< commsStruct > &comms, Container &values, const CombineOp &cop, const int tag, const label comm)
static void listCombineGather(const List< commsStruct > &comms, List< T > &values, const CombineOp &cop, const int tag, const label comm)
static void mapCombineAllGather(const List< commsStruct > &comms, Container &values, const CombineOp &cop, const int tag, const label comm)
After completion all processors have the same data.
virtual bool read()
Re-read model coefficients if they have changed.
char * data_bytes() noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:251
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:258
Structure for communicating between processors.
Definition: UPstream.H:81
const labelList & below() const noexcept
The procIDs of all processors directly below.
Definition: UPstream.H:134
label above() const noexcept
The procID of the processor directly above.
Definition: UPstream.H:128
static const List< commsStruct > & whichCommunication(const label communicator=worldComm)
Definition: UPstream.H:542
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:433
virtual bool write()
Write the output fields.
int myProcNo() const noexcept
Return processor number.
const volScalarField & T
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition: stdFoam.H:346
A template class to specify that a data type can be considered as being contiguous in memory.
Definition: contiguous.H:78