ParSortableList.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2020 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "ParSortableList.H"
30#include "SortableList.H"
31#include "Pstream.H"
32#include "ListListOps.H"
33#include "PstreamReduceOps.H"
34
35// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
36
37template<class Type>
39(
40 const List<Type>& elems,
41 Ostream& os
42) const
43{
44 os << '(';
45
46 forAll(elems, elemI)
47 {
48 os << ' ' << elems[elemI];
49 }
50 os << ')';
51}
52
53
54template<class Type>
56(
57 const List<Type>& values,
58 const labelList& indices,
59 const label fromProcNo,
60 label& destI,
61 List<taggedValue>& dest
62) const
63{
64 forAll(values, elemI)
65 {
66 taggedValue& tagVal = dest[destI];
67
68 tagVal.value() = values[elemI];
69 tagVal.index() = indices[elemI];
70 tagVal.procID() = fromProcNo;
71
72 destI++;
73 }
74}
75
76
77template<class Type>
79(
80 const List<Type>& elems,
81 List<Type>& pivots
82) const
83{
84 pivots.setSize(Pstream::nProcs());
85
86 label pivotPos = 0;
87
88 forAll(pivots, pivotI)
89 {
90 pivots[pivotI] = elems[pivotPos];
91
92 pivotPos += elems.size()/Pstream::nProcs();
93 }
94}
95
96
97template<class Type>
99(
100 List<Type>& values,
101 labelList& indices,
102 const label bufSize,
103 const label destProci
104) const
105{
106 if (destProci != Pstream::myProcNo())
107 {
108 values.setSize(bufSize);
109 indices.setSize(bufSize);
110
111 if (debug)
112 {
113 Pout<< "Sending to " << destProci << " elements:" << values
114 << endl;
115 }
116
117 {
118 OPstream toSlave(Pstream::commsTypes::blocking, destProci);
119 toSlave << values << indices;
120 }
121 }
122}
123
124
125// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
126
127template<class Type>
129:
130 List<Type>(values),
131 indices_(0),
132 procs_(0)
133{
134 sort();
135}
136
137
138template<class Type>
140:
141 List<Type>(size),
142 indices_(0),
143 procs_(0)
144{}
145
146
147// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
148
149template<class Type>
151{
152 //
153 // 0. Get total size of dataset.
154 //
155
156 label n = this->size();
157
159
160
161 // 1. Sort list locally
162 SortableList<Type> sorted(*this);
163
164 // Collect elements at pivot points
165 labelListList sortedGatherList(Pstream::nProcs());
166
167 labelList& pivots = sortedGatherList[Pstream::myProcNo()];
168
169 getPivots(sorted, pivots);
170
171 if (debug)
172 {
173 Pout<< "pivots:";
174 write(pivots, Pout);
175 Pout<< endl;
176 }
177
178
179 //
180 // 2. Combine pivotlist per processor onto master, sort, get pivots.
181 //
182
183 Pstream::gatherList(sortedGatherList);
184
185 if (Pstream::master())
186 {
187 labelList allPivots =
188 ListListOps::combine<labelList>
189 (
190 sortedGatherList,
192 );
193
194 SortableList<Type> sortedPivots(allPivots);
195
196 if (debug)
197 {
198 Pout<< "allPivots:";
199 write(allPivots, Pout);
200 Pout<< endl;
201 }
202
203 getPivots(sortedPivots, pivots);
204 }
205 Pstream::scatter(pivots);
206
207 if (debug)
208 {
209 Pout<< "new pivots:";
210 write(pivots, Pout);
211 Pout<< endl;
212 }
213
214
215 //
216 // 3. Distribute pivots & distribute.
217 //
218
219 label pivotI = 1;
220 label destProci = 0;
221
222 // Buffer for my own data. Keep original index together with value.
223 labelList ownValues(sorted.size());
224 labelList ownIndices(sorted.size());
225 label ownI = 0;
226
227 // Buffer for sending data
228 labelList sendValues(sorted.size());
229 labelList sendIndices(sorted.size());
230 label sendI = 0;
231
232 forAll(sorted, sortedI)
233 {
234 if ((pivotI < Pstream::nProcs()) && (sorted[sortedI] > pivots[pivotI]))
235 {
236 checkAndSend(sendValues, sendIndices, sendI, destProci);
237
238 // Reset buffer.
239 sendValues.setSize(sorted.size());
240 sendIndices.setSize(sorted.size());
241 sendI = 0;
242
243 pivotI++;
244 destProci++;
245 }
246
247 if (destProci != Pstream::myProcNo())
248 {
249 sendValues[sendI] = sorted[sortedI];
250 sendIndices[sendI] = sorted.indices()[sortedI];
251 sendI++;
252 }
253 else
254 {
255 ownValues[ownI] = sorted[sortedI];
256 ownIndices[ownI] = sorted.indices()[sortedI];
257 ownI++;
258 }
259 }
260
261
262 // Handle trailing send buffer
263 if (sendI != 0)
264 {
265 checkAndSend(sendValues, sendIndices, sendI, destProci);
266 }
267
268 // Print ownValues
269 ownValues.setSize(ownI);
270 ownIndices.setSize(ownI);
271
272 if (debug & 2)
273 {
274 Pout<< "Not sending (to myself) elements "
275 << ownValues << endl;
276 }
277
278 //
279 // 4. Combine pieces from all processors & sort. Use indices() from
280 // SortableList to remember source processor number.
281 //
282
283 // Allocate receive buffer. Acc. to paper upper bound is 2*n/p
284 // (n=total size, p=nProcs). Resize later on.
285 List<taggedValue> combinedValues(2 * n/Pstream::nProcs());
286
287 label combinedI = 0;
288
289 for (const int proci : Pstream::allProcs())
290 {
291 if (proci == Pstream::myProcNo())
292 {
293 if (debug & 2)
294 {
295 Pout<< "Copying from own:" << ownValues << endl;
296 }
297
298 // Copy ownValues,ownIndices into combined buffer
299 copyInto(ownValues, ownIndices, proci, combinedI, combinedValues);
300 }
301 else
302 {
303 labelList recValues;
304 labelList recIndices;
305
306 {
307 if (debug)
308 {
309 Pout<< "Receiving from " << proci << endl;
310 }
311
312 IPstream fromSlave(Pstream::commsTypes::blocking, proci);
313
314 fromSlave >> recValues >> recIndices;
315
316 if (debug & 2)
317 {
318 Pout<< "Received from " << proci
319 << " elements:" << recValues << endl;
320 }
321 }
322
323 if (debug)
324 {
325 Pout<< "Copying starting at:" << combinedI << endl;
326 }
327 copyInto(recValues, recIndices, proci, combinedI, combinedValues);
328 }
329 }
330 combinedValues.setSize(combinedI);
331
332 // Sort according to values
333 Foam::sort(combinedValues);
334
335 // Copy into *this
336 this->setSize(combinedI);
337 indices_.setSize(combinedI);
338 procs_.setSize(combinedI);
339
340 forAll(combinedValues, elemI)
341 {
342 this->operator[](elemI) = combinedValues[elemI].value();
343 indices_[elemI] = combinedValues[elemI].index();
344 procs_[elemI] = combinedValues[elemI].procID();
345 }
346}
347
348
349// ************************************************************************* //
Inter-processor communication reduction functions.
label n
Input inter-processor communications stream.
Definition: IPstream.H:57
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: List.H:77
void setSize(const label n)
Alias for resize()
Definition: List.H:218
Implementation of PSRS parallel sorting routine.
void sort()
(stable) sort the list (if changed after construction time)
UPstream::rangeType allProcs() const noexcept
Range of ranks indices associated with PstreamBuffers.
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
static void scatter(const List< commsStruct > &comms, T &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
A list that is sorted upon construction or when explicitly requested with the sort() method.
Definition: SortableList.H:63
const labelList & indices() const noexcept
Return the list of sorted indices. Updated every sort.
Definition: SortableList.H:108
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:94
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
virtual bool write()
Write the output fields.
int myProcNo() const noexcept
Return processor number.
splitCell * master() const
Definition: splitCell.H:113
OBJstream os(runTime.globalPath()/outputName)
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:149
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
void reduce(const List< UPstream::commsStruct > &comms, T &value, const BinaryOp &bop, const int tag, const label comm)
void sort(UList< T > &list)
Sort the list.
Definition: UList.C:342
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
runTime write()
points setSize(newPointi)
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333