PstreamExchange.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2016 OpenFOAM Foundation
9 Copyright (C) 2016-2022 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27Description
28 Exchange data.
29
30\*---------------------------------------------------------------------------*/
31
32#include "Pstream.H"
33#include "contiguous.H"
34#include "PstreamReduceOps.H"
35
36// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
37
38template<class Container, class T>
39void Foam::Pstream::exchangeContainer
40(
41 const UList<Container>& sendBufs,
42 const labelUList& recvSizes,
43 List<Container>& recvBufs,
44 const int tag,
45 const label comm,
46 const bool wait
47)
48{
49 const label startOfRequests = UPstream::nRequests();
50
51 // Set up receives
52 // ~~~~~~~~~~~~~~~
53
54 forAll(recvSizes, proci)
55 {
56 if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
57 {
59 (
61 proci,
62 recvBufs[proci].data_bytes(),
63 recvSizes[proci]*sizeof(T),
64 tag,
65 comm
66 );
67 }
68 }
69
70
71 // Set up sends
72 // ~~~~~~~~~~~~
73
74 forAll(sendBufs, proci)
75 {
76 if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0)
77 {
78 if
79 (
81 (
83 proci,
84 sendBufs[proci].cdata_bytes(),
85 sendBufs[proci].size_bytes(),
86 tag,
87 comm
88 )
89 )
90 {
92 << "Cannot send outgoing message. "
93 << "to:" << proci << " nBytes:"
94 << label(sendBufs[proci].size_bytes())
96 }
97 }
98 }
99
100
101 // Wait for all to finish
102 // ~~~~~~~~~~~~~~~~~~~~~~
103
104 if (wait)
105 {
106 UPstream::waitRequests(startOfRequests);
107 }
108}
109
110
111template<class T>
112void Foam::Pstream::exchangeBuf
113(
114 const labelUList& sendSizes,
115 const UList<const char*>& sendBufs,
116 const labelUList& recvSizes,
117 List<char*>& recvBufs,
118 const int tag,
119 const label comm,
120 const bool wait
121)
122{
123 const label startOfRequests = UPstream::nRequests();
124
125 // Set up receives
126 // ~~~~~~~~~~~~~~~
127
128 forAll(recvSizes, proci)
129 {
130 if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
131 {
133 (
135 proci,
136 recvBufs[proci],
137 recvSizes[proci]*sizeof(T),
138 tag,
139 comm
140 );
141 }
142 }
143
144
145 // Set up sends
146 // ~~~~~~~~~~~~
147
148 forAll(sendBufs, proci)
149 {
150 if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0)
151 {
152 if
153 (
155 (
157 proci,
158 sendBufs[proci],
159 sendSizes[proci]*sizeof(T),
160 tag,
161 comm
162 )
163 )
164 {
166 << "Cannot send outgoing message. "
167 << "to:" << proci << " nBytes:"
168 << label(sendSizes[proci]*sizeof(T))
170 }
171 }
172 }
173
174
175 // Wait for all to finish
176 // ~~~~~~~~~~~~~~~~~~~~~~
177
178 if (wait)
179 {
180 UPstream::waitRequests(startOfRequests);
181 }
182}
183
184
185template<class Container, class T>
187(
188 const UList<Container>& sendBufs,
189 const labelUList& recvSizes,
190 List<Container>& recvBufs,
191 const int tag,
192 const label comm,
193 const bool wait
194)
195{
196 // OR static_assert(is_contiguous<T>::value, "Contiguous data only!")
198 {
200 << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError);
201 }
202
203 if (sendBufs.size() != UPstream::nProcs(comm))
204 {
206 << "Size of list " << sendBufs.size()
207 << " does not equal the number of processors "
208 << UPstream::nProcs(comm)
210 }
211
212 recvBufs.resize_nocopy(sendBufs.size());
213
214 if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
215 {
216 // Presize all receive buffers
217 forAll(recvSizes, proci)
218 {
219 const label nRecv = recvSizes[proci];
220
221 if (proci != Pstream::myProcNo(comm) && nRecv > 0)
222 {
223 recvBufs[proci].resize_nocopy(nRecv);
224 }
225 }
226
227 if (UPstream::maxCommsSize <= 0)
228 {
229 // Do the exchanging in one go
230 exchangeContainer<Container, T>
231 (
232 sendBufs,
233 recvSizes,
234 recvBufs,
235 tag,
236 comm,
237 wait
238 );
239 }
240 else
241 {
242 // Determine the number of chunks to send. Note that we
243 // only have to look at the sending data since we are
244 // guaranteed that some processor's sending size is some other
245 // processor's receive size. Also we can ignore any local comms.
246
247 // We need to send chunks so the number of iterations:
248 // maxChunkSize iterations
249 // ------------ ----------
250 // 0 0
251 // 1..maxChunkSize 1
252 // maxChunkSize+1..2*maxChunkSize 2
253 // ...
254
255 const label maxChunkSize
256 (
257 max
258 (
259 static_cast<label>(1),
260 static_cast<label>(UPstream::maxCommsSize/sizeof(T))
261 )
262 );
263
264 label nChunks(0);
265 {
266 // Get max send count (elements)
267 forAll(sendBufs, proci)
268 {
269 if (proci != Pstream::myProcNo(comm))
270 {
271 nChunks = max(nChunks, sendBufs[proci].size());
272 }
273 }
274
275 // Convert from send count (elements) to number of chunks.
276 // Can normally calculate with (count-1), but add some safety
277 if (nChunks)
278 {
279 nChunks = 1 + (nChunks/maxChunkSize);
280 }
281 reduce(nChunks, maxOp<label>(), tag, comm);
282 }
283
284 labelList nRecv(sendBufs.size());
285 labelList nSend(sendBufs.size());
286 labelList startRecv(sendBufs.size(), Zero);
287 labelList startSend(sendBufs.size(), Zero);
288
289 List<const char*> charPtrSend(sendBufs.size());
290 List<char*> charPtrRecv(sendBufs.size());
291
292 for (label iter = 0; iter < nChunks; ++iter)
293 {
294 forAll(sendBufs, proci)
295 {
296 nSend[proci] = min
297 (
298 maxChunkSize,
299 sendBufs[proci].size()-startSend[proci]
300 );
301 nRecv[proci] = min
302 (
303 maxChunkSize,
304 recvBufs[proci].size()-startRecv[proci]
305 );
306
307 charPtrSend[proci] =
308 (
309 nSend[proci] > 0
310 ? reinterpret_cast<const char*>
311 (
312 &(sendBufs[proci][startSend[proci]])
313 )
314 : nullptr
315 );
316 charPtrRecv[proci] =
317 (
318 nRecv[proci] > 0
319 ? reinterpret_cast<char*>
320 (
321 &(recvBufs[proci][startRecv[proci]])
322 )
323 : nullptr
324 );
325 }
326
330
331 exchangeBuf<T>
332 (
333 nSend,
334 charPtrSend,
335 nRecv,
336 charPtrRecv,
337 tag,
338 comm,
339 wait
340 );
341
342 forAll(nSend, proci)
343 {
344 startSend[proci] += nSend[proci];
345 startRecv[proci] += nRecv[proci];
346 }
347 }
348 }
349 }
350
351 // Do myself
352 recvBufs[Pstream::myProcNo(comm)] = sendBufs[Pstream::myProcNo(comm)];
353}
354
355
356template<class Container>
358(
359 const labelUList& sendProcs,
360 const labelUList& recvProcs,
361 const Container& sendBufs,
362 labelList& recvSizes,
363 const label tag,
364 const label comm
365)
366{
367 if (sendBufs.size() != UPstream::nProcs(comm))
368 {
370 << "Size of container " << sendBufs.size()
371 << " does not equal the number of processors "
372 << UPstream::nProcs(comm)
374 }
375
376 labelList sendSizes(sendProcs.size());
377 forAll(sendProcs, i)
378 {
379 sendSizes[i] = sendBufs[sendProcs[i]].size();
380 }
381
382 recvSizes.resize_nocopy(sendBufs.size());
383 recvSizes = 0; // Ensure non-received entries are properly zeroed
384
385 const label startOfRequests = UPstream::nRequests();
386
387 for (const label proci : recvProcs)
388 {
390 (
392 proci,
393 reinterpret_cast<char*>(&recvSizes[proci]),
394 sizeof(label),
395 tag,
396 comm
397 );
398 }
399
400 forAll(sendProcs, i)
401 {
403 (
405 sendProcs[i],
406 reinterpret_cast<char*>(&sendSizes[i]),
407 sizeof(label),
408 tag,
409 comm
410 );
411 }
412
413 UPstream::waitRequests(startOfRequests);
414}
415
416
431
432
433template<class Container>
435(
436 const Container& sendBufs,
437 labelList& recvSizes,
438 const label comm
439)
440{
441 if (sendBufs.size() != UPstream::nProcs(comm))
442 {
444 << "Size of container " << sendBufs.size()
445 << " does not equal the number of processors "
446 << UPstream::nProcs(comm)
448 }
449
450 labelList sendSizes(sendBufs.size());
451 forAll(sendBufs, proci)
452 {
453 sendSizes[proci] = sendBufs[proci].size();
454 }
455 recvSizes.resize_nocopy(sendSizes.size());
456 UPstream::allToAll(sendSizes, recvSizes, comm);
457}
458
459
460template<class Container, class T>
462(
463 const UList<Container>& sendBufs,
464 List<Container>& recvBufs,
465 const int tag,
466 const label comm,
467 const bool wait
468)
469{
470 labelList recvSizes;
471 exchangeSizes(sendBufs, recvSizes, comm);
472
473 exchange<Container, T>(sendBufs, recvSizes, recvBufs, tag, comm, wait);
474}
475
476
477// ************************************************************************* //
Inter-processor communication reduction functions.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: List.H:77
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:146
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendData, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
static void exchange(const UList< Container > &sendData, const labelUList &recvSizes, List< Container > &recvData, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool wait=true)
virtual bool read()
Re-read model coefficients if they have changed.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:94
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
@ nonBlocking
"nonBlocking"
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:90
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:100
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange integer data with all processors (in the communicator).
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:287
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:433
virtual bool write()
Write the output fields.
int myProcNo() const noexcept
Return processor number.
const volScalarField & T
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:47
void reduce(const List< UPstream::commsStruct > &comms, T &value, const BinaryOp &bop, const int tag, const label comm)
label min(const labelHashSet &set, label minValue=labelMax)
Find the min value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:33
errorManip< error > abort(error &err)
Definition: errorManip.H:144
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:131
error FatalError
UList< label > labelUList
A UList of labels.
Definition: UList.H:85
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333
A template class to specify that a data type can be considered as being contiguous in memory.
Definition: contiguous.H:78