exchange.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2016 OpenFOAM Foundation
9  Copyright (C) 2016-2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Exchange data.
29 
30 \*---------------------------------------------------------------------------*/
31 
32 #include "Pstream.H"
33 #include "contiguous.H"
34 #include "PstreamReduceOps.H"
35 
36 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
37 
38 template<class Container, class T>
39 void Foam::Pstream::exchangeContainer
40 (
41  const UList<Container>& sendBufs,
42  const labelUList& recvSizes,
43  List<Container>& recvBufs,
44  const int tag,
45  const label comm,
46  const bool block
47 )
48 {
49  const label startOfRequests = Pstream::nRequests();
50 
51  // Set up receives
52  // ~~~~~~~~~~~~~~~
53 
54  forAll(recvSizes, proci)
55  {
56  if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
57  {
59  (
61  proci,
62  recvBufs[proci].data_bytes(),
63  recvSizes[proci]*sizeof(T),
64  tag,
65  comm
66  );
67  }
68  }
69 
70 
71  // Set up sends
72  // ~~~~~~~~~~~~
73 
74  forAll(sendBufs, proci)
75  {
76  if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0)
77  {
78  if
79  (
81  (
83  proci,
84  sendBufs[proci].cdata_bytes(),
85  sendBufs[proci].size_bytes(),
86  tag,
87  comm
88  )
89  )
90  {
92  << "Cannot send outgoing message. "
93  << "to:" << proci << " nBytes:"
94  << label(sendBufs[proci].size()*sizeof(T))
96  }
97  }
98  }
99 
100 
101  // Wait for all to finish
102  // ~~~~~~~~~~~~~~~~~~~~~~
103 
104  if (block)
105  {
106  Pstream::waitRequests(startOfRequests);
107  }
108 }
109 
110 
111 template<class T>
112 void Foam::Pstream::exchangeBuf
113 (
114  const labelUList& sendSizes,
115  const UList<const char*>& sendBufs,
116  const labelUList& recvSizes,
117  List<char*>& recvBufs,
118  const int tag,
119  const label comm,
120  const bool block
121 )
122 {
123  const label startOfRequests = Pstream::nRequests();
124 
125  // Set up receives
126  // ~~~~~~~~~~~~~~~
127 
128  forAll(recvSizes, proci)
129  {
130  if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
131  {
133  (
135  proci,
136  recvBufs[proci],
137  recvSizes[proci]*sizeof(T),
138  tag,
139  comm
140  );
141  }
142  }
143 
144 
145  // Set up sends
146  // ~~~~~~~~~~~~
147 
148  forAll(sendBufs, proci)
149  {
150  if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0)
151  {
152  if
153  (
155  (
157  proci,
158  sendBufs[proci],
159  sendSizes[proci]*sizeof(T),
160  tag,
161  comm
162  )
163  )
164  {
166  << "Cannot send outgoing message. "
167  << "to:" << proci << " nBytes:"
168  << label(sendSizes[proci]*sizeof(T))
170  }
171  }
172  }
173 
174 
175  // Wait for all to finish
176  // ~~~~~~~~~~~~~~~~~~~~~~
177 
178  if (block)
179  {
180  Pstream::waitRequests(startOfRequests);
181  }
182 }
183 
184 
185 template<class Container, class T>
187 (
188  const UList<Container>& sendBufs,
189  const labelUList& recvSizes,
190  List<Container>& recvBufs,
191  const int tag,
192  const label comm,
193  const bool block
194 )
195 {
196  // OR static_assert(is_contiguous<T>::value, "Contiguous data only!")
198  {
200  << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError);
201  }
202 
203  if (sendBufs.size() != UPstream::nProcs(comm))
204  {
206  << "Size of list " << sendBufs.size()
207  << " does not equal the number of processors "
208  << UPstream::nProcs(comm)
210  }
211 
212  recvBufs.setSize(sendBufs.size());
213 
214  if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
215  {
216  // Presize all receive buffers
217  forAll(recvSizes, proci)
218  {
219  const label nRecv = recvSizes[proci];
220 
221  if (proci != Pstream::myProcNo(comm) && nRecv > 0)
222  {
223  recvBufs[proci].setSize(nRecv);
224  }
225  }
226 
227  if (Pstream::maxCommsSize <= 0)
228  {
229  // Do the exchanging in one go
230  exchangeContainer<Container, T>
231  (
232  sendBufs,
233  recvSizes,
234  recvBufs,
235  tag,
236  comm,
237  block
238  );
239  }
240  else
241  {
242  // Determine the number of chunks to send. Note that we
243  // only have to look at the sending data since we are
244  // guaranteed that some processor's sending size is some other
245  // processor's receive size. Also we can ignore any local comms.
246 
247  label maxNSend = 0;
248  forAll(sendBufs, proci)
249  {
250  if (proci != Pstream::myProcNo(comm))
251  {
252  maxNSend = max(maxNSend, sendBufs[proci].size());
253  }
254  }
255 
256  const label maxNBytes = sizeof(T)*maxNSend;
257 
258  // We need to send maxNBytes bytes so the number of iterations:
259  // maxNBytes iterations
260  // --------- ----------
261  // 0 0
262  // 1..maxCommsSize 1
263  // maxCommsSize+1..2*maxCommsSize 2
264  // etc.
265 
266  label nIter;
267  if (maxNBytes == 0)
268  {
269  nIter = 0;
270  }
271  else
272  {
273  nIter = (maxNBytes-1)/Pstream::maxCommsSize+1;
274  }
275  reduce(nIter, maxOp<label>(), tag, comm);
276 
277 
278  List<const char*> charSendBufs(sendBufs.size());
279  List<char*> charRecvBufs(sendBufs.size());
280 
281  labelList nRecv(sendBufs.size());
282  labelList startRecv(sendBufs.size(), Zero);
283  labelList nSend(sendBufs.size());
284  labelList startSend(sendBufs.size(), Zero);
285 
286  for (label iter = 0; iter < nIter; iter++)
287  {
288  forAll(sendBufs, proci)
289  {
290  nSend[proci] = min
291  (
293  sendBufs[proci].size()-startSend[proci]
294  );
295  charSendBufs[proci] =
296  (
297  nSend[proci] > 0
298  ? reinterpret_cast<const char*>
299  (
300  &(sendBufs[proci][startSend[proci]])
301  )
302  : nullptr
303  );
304 
305  nRecv[proci] = min
306  (
308  recvBufs[proci].size()-startRecv[proci]
309  );
310 
311  charRecvBufs[proci] =
312  (
313  nRecv[proci] > 0
314  ? reinterpret_cast<char*>
315  (
316  &(recvBufs[proci][startRecv[proci]])
317  )
318  : nullptr
319  );
320  }
321 
322  exchangeBuf<T>
323  (
324  nSend,
325  charSendBufs,
326  nRecv,
327  charRecvBufs,
328  tag,
329  comm,
330  block
331  );
332 
333  forAll(nSend, proci)
334  {
335  startSend[proci] += nSend[proci];
336  startRecv[proci] += nRecv[proci];
337  }
338  }
339  }
340  }
341 
342  // Do myself
343  recvBufs[Pstream::myProcNo(comm)] = sendBufs[Pstream::myProcNo(comm)];
344 }
345 
346 
347 template<class Container>
349 (
350  const Container& sendBufs,
351  labelList& recvSizes,
352  const label comm
353 )
354 {
355  if (sendBufs.size() != UPstream::nProcs(comm))
356  {
358  << "Size of container " << sendBufs.size()
359  << " does not equal the number of processors "
360  << UPstream::nProcs(comm)
362  }
363 
364  labelList sendSizes(sendBufs.size());
365  forAll(sendBufs, proci)
366  {
367  sendSizes[proci] = sendBufs[proci].size();
368  }
369  recvSizes.setSize(sendSizes.size());
370  allToAll(sendSizes, recvSizes, comm);
371 }
372 
373 
374 template<class Container, class T>
376 (
377  const UList<Container>& sendBufs,
378  List<Container>& recvBufs,
379  const int tag,
380  const label comm,
381  const bool block
382 )
383 {
384  labelList recvSizes;
385  exchangeSizes(sendBufs, recvSizes, comm);
386 
387  exchange<Container, T>(sendBufs, recvSizes, recvBufs, tag, comm, block);
388 }
389 
390 
391 // ************************************************************************* //
Foam::maxOp
Definition: ops.H:223
Foam::block
Creates a single block of cells from point coordinates, numbers of cells in each direction and an exp...
Definition: block.H:58
Foam::UOPstream::write
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Write given buffer to given processor.
Definition: UOPwrite.C:36
Foam::Zero
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:131
Foam::Pstream::exchangeSizes
static void exchangeSizes(const Container &sendData, labelList &sizes, const label comm=UPstream::worldComm)
Helper: exchange sizes of sendData. sendData is the data per.
Definition: exchange.C:349
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:262
Foam::UIPstream::read
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Read into given buffer from given processor.
Definition: UIPread.C:81
Foam::min
label min(const labelHashSet &set, label minValue=labelMax)
Find the min value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:33
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:51
Foam::List::setSize
void setSize(const label n)
Alias for resize()
Definition: List.H:222
Foam::T
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
Definition: FieldFieldFunctions.C:58
Foam::max
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:47
Foam::FatalError
error FatalError
Foam::Pstream::exchange
static void exchange(const UList< Container > &sendData, const labelUList &recvSizes, List< Container > &recvData, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool block=true)
Helper: exchange contiguous data. Sends sendData, receives into.
Definition: exchange.C:187
Pstream.H
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
PstreamReduceOps.H
Inter-processor communication reduction functions.
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
Foam::UPstream::commsTypes::nonBlocking
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:252
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:463
Foam::UPstream::maxCommsSize
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:287
contiguous.H
Foam::UPstream::parRun
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:433
Foam::List
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:63
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
Foam::UList::size
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
Foam::labelUList
UList< label > labelUList
A UList of labels.
Definition: UList.H:85
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:445
Foam::is_contiguous
A template class to specify that a data type can be considered as being contiguous in memory.
Definition: contiguous.H:75