ParSortableList.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "ParSortableList.H"
30 #include "SortableList.H"
31 #include "Pstream.H"
32 #include "ListListOps.H"
33 #include "PstreamReduceOps.H"
34 
35 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
36 
37 template<class Type>
39 (
40  const List<Type>& elems,
41  Ostream& os
42 ) const
43 {
44  os << '(';
45 
46  forAll(elems, elemI)
47  {
48  os << ' ' << elems[elemI];
49  }
50  os << ')';
51 }
52 
53 
54 template<class Type>
56 (
57  const List<Type>& values,
58  const labelList& indices,
59  const label fromProcNo,
60  label& destI,
61  List<taggedValue>& dest
62 ) const
63 {
64  forAll(values, elemI)
65  {
66  taggedValue& tagVal = dest[destI];
67 
68  tagVal.value() = values[elemI];
69  tagVal.index() = indices[elemI];
70  tagVal.procID() = fromProcNo;
71 
72  destI++;
73  }
74 }
75 
76 
77 template<class Type>
79 (
80  const List<Type>& elems,
81  List<Type>& pivots
82 ) const
83 {
84  pivots.setSize(Pstream::nProcs());
85 
86  label pivotPos = 0;
87 
88  forAll(pivots, pivotI)
89  {
90  pivots[pivotI] = elems[pivotPos];
91 
92  pivotPos += elems.size()/Pstream::nProcs();
93  }
94 }
95 
96 
97 template<class Type>
99 (
100  List<Type>& values,
101  labelList& indices,
102  const label bufSize,
103  const label destProci
104 ) const
105 {
106  if (destProci != Pstream::myProcNo())
107  {
108  values.setSize(bufSize);
109  indices.setSize(bufSize);
110 
111  if (debug)
112  {
113  Pout<< "Sending to " << destProci << " elements:" << values
114  << endl;
115  }
116 
117  {
118  OPstream toSlave(Pstream::commsTypes::blocking, destProci);
119  toSlave << values << indices;
120  }
121  }
122 }
123 
124 
125 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
126 
127 template<class Type>
129 :
130  List<Type>(values),
131  indices_(0),
132  procs_(0)
133 {
134  sort();
135 }
136 
137 
138 template<class Type>
140 :
141  List<Type>(size),
142  indices_(0),
143  procs_(0)
144 {}
145 
146 
147 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
148 
149 template<class Type>
151 {
152  //
153  // 0. Get total size of dataset.
154  //
155 
156  label n = this->size();
157 
158  reduce(n, sumOp<label>());
159 
160 
161  // 1. Sort list locally
162  SortableList<Type> sorted(*this);
163 
164  // Collect elements at pivot points
165  labelListList sortedGatherList(Pstream::nProcs());
166 
167  labelList& pivots = sortedGatherList[Pstream::myProcNo()];
168 
169  getPivots(sorted, pivots);
170 
171  if (debug)
172  {
173  Pout<< "pivots:";
174  write(pivots, Pout);
175  Pout<< endl;
176  }
177 
178 
179  //
180  // 2. Combine pivotlist per processor onto master, sort, get pivots.
181  //
182 
183  Pstream::gatherList(sortedGatherList);
184 
185  if (Pstream::master())
186  {
187  labelList allPivots =
188  ListListOps::combine<labelList>
189  (
190  sortedGatherList,
192  );
193 
194  SortableList<Type> sortedPivots(allPivots);
195 
196  if (debug)
197  {
198  Pout<< "allPivots:";
199  write(allPivots, Pout);
200  Pout<< endl;
201  }
202 
203  getPivots(sortedPivots, pivots);
204  }
205  Pstream::scatter(pivots);
206 
207  if (debug)
208  {
209  Pout<< "new pivots:";
210  write(pivots, Pout);
211  Pout<< endl;
212  }
213 
214 
215  //
216  // 3. Distribute pivots & distribute.
217  //
218 
219  label pivotI = 1;
220  label destProci = 0;
221 
222  // Buffer for my own data. Keep original index together with value.
223  labelList ownValues(sorted.size());
224  labelList ownIndices(sorted.size());
225  label ownI = 0;
226 
227  // Buffer for sending data
228  labelList sendValues(sorted.size());
229  labelList sendIndices(sorted.size());
230  label sendI = 0;
231 
232  forAll(sorted, sortedI)
233  {
234  if ((pivotI < Pstream::nProcs()) && (sorted[sortedI] > pivots[pivotI]))
235  {
236  checkAndSend(sendValues, sendIndices, sendI, destProci);
237 
238  // Reset buffer.
239  sendValues.setSize(sorted.size());
240  sendIndices.setSize(sorted.size());
241  sendI = 0;
242 
243  pivotI++;
244  destProci++;
245  }
246 
247  if (destProci != Pstream::myProcNo())
248  {
249  sendValues[sendI] = sorted[sortedI];
250  sendIndices[sendI] = sorted.indices()[sortedI];
251  sendI++;
252  }
253  else
254  {
255  ownValues[ownI] = sorted[sortedI];
256  ownIndices[ownI] = sorted.indices()[sortedI];
257  ownI++;
258  }
259  }
260 
261 
262  // Handle trailing send buffer
263  if (sendI != 0)
264  {
265  checkAndSend(sendValues, sendIndices, sendI, destProci);
266  }
267 
268  // Print ownValues
269  ownValues.setSize(ownI);
270  ownIndices.setSize(ownI);
271 
272  if (debug & 2)
273  {
274  Pout<< "Not sending (to myself) elements "
275  << ownValues << endl;
276  }
277 
278  //
279  // 4. Combine pieces from all processors & sort. Use indices() from
280  // SortableList to remember source processor number.
281  //
282 
283  // Allocate receive buffer. Acc. to paper upper bound is 2*n/p
284  // (n=total size, p=nProcs). Resize later on.
285  List<taggedValue> combinedValues(2 * n/Pstream::nProcs());
286 
287  label combinedI = 0;
288 
289  for (const int proci : Pstream::allProcs())
290  {
291  if (proci == Pstream::myProcNo())
292  {
293  if (debug & 2)
294  {
295  Pout<< "Copying from own:" << ownValues << endl;
296  }
297 
298  // Copy ownValues,ownIndices into combined buffer
299  copyInto(ownValues, ownIndices, proci, combinedI, combinedValues);
300  }
301  else
302  {
303  labelList recValues;
304  labelList recIndices;
305 
306  {
307  if (debug)
308  {
309  Pout<< "Receiving from " << proci << endl;
310  }
311 
312  IPstream fromSlave(Pstream::commsTypes::blocking, proci);
313 
314  fromSlave >> recValues >> recIndices;
315 
316  if (debug & 2)
317  {
318  Pout<< "Received from " << proci
319  << " elements:" << recValues << endl;
320  }
321  }
322 
323  if (debug)
324  {
325  Pout<< "Copying starting at:" << combinedI << endl;
326  }
327  copyInto(recValues, recIndices, proci, combinedI, combinedValues);
328  }
329  }
330  combinedValues.setSize(combinedI);
331 
332  // Sort according to values
333  Foam::sort(combinedValues);
334 
335  // Copy into *this
336  this->setSize(combinedI);
337  indices_.setSize(combinedI);
338  procs_.setSize(combinedI);
339 
340  forAll(combinedValues, elemI)
341  {
342  this->operator[](elemI) = combinedValues[elemI].value();
343  indices_[elemI] = combinedValues[elemI].index();
344  procs_[elemI] = combinedValues[elemI].procID();
345  }
346 }
347 
348 
349 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::labelList
List< label > labelList
A List of labels.
Definition: List.H:67
setSize
points setSize(newPointi)
Foam::accessOp
Definition: UList.H:668
Foam::HashTableOps::values
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:149
ListListOps.H
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:457
Foam::Pstream::scatter
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
Definition: gatherScatter.C:150
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:369
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::sumOp
Definition: ops.H:213
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
n
label n
Definition: TABSMDCalcMethod2.H:31
SortableList.H
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:51
Foam::List::setSize
void setSize(const label n)
Alias for resize()
Definition: List.H:222
Foam::sort
void sort(UList< T > &a)
Definition: UList.C:261
Foam::ParSortableList::sort
void sort()
(stable) sort the list (if changed after construction time)
Definition: ParSortableList.C:150
Foam::ParSortableList
Implementation of PSRS parallel sorting routine.
Definition: ParSortableList.H:71
os
OBJstream os(runTime.globalPath()/outputName)
Foam::SortableList
A list that is sorted upon construction or when explicitly requested with the sort() method.
Definition: List.H:60
Pstream.H
PstreamReduceOps.H
Inter-processor communication reduction functions.
Foam::ParSortableList::ParSortableList
ParSortableList(const UList< Type > &)
Construct from List, sorting the elements.
Definition: ParSortableList.C:128
Foam::Pstream::gatherList
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
Definition: gatherScatterList.C:52
Foam::UPstream::allProcs
static rangeType allProcs(const label communicator=worldComm)
Range of process indices for all processes.
Definition: UPstream.H:508
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:463
Foam::List
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:63
Foam::UList< Type >
Foam::SortableList::indices
const labelList & indices() const
Return the list of sorted indices. Updated every sort.
Definition: SortableList.H:108
Foam::vtk::write
void write(vtk::formatter &fmt, const Type &val, const label n=1)
Component-wise write of a value (N times)
Definition: foamVtkOutputTemplates.C:36
Foam::IPstream
Input inter-processor communications stream.
Definition: IPstream.H:53
ParSortableList.H
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:445
Foam::UPstream::commsTypes::blocking