UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UPstream.H"
30 #include "debug.H"
31 #include "registerSwitch.H"
32 #include "dictionary.H"
33 #include "IOstreams.H"
34 
35 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39  defineTypeNameAndDebug(UPstream, 0);
40 }
41 
42 const Foam::Enum
43 <
45 >
47 ({
48  { commsTypes::blocking, "blocking" },
49  { commsTypes::scheduled, "scheduled" },
50  { commsTypes::nonBlocking, "nonBlocking" },
51 });
52 
53 
54 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
55 
56 void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
57 {
58  if (nProcs == 0)
59  {
60  parRun_ = false;
61  haveThreads_ = haveThreads;
62 
64  label comm = allocateCommunicator(-1, labelList(Foam::one{}, 0), false);
65  if (comm != UPstream::worldComm)
66  {
68  << "problem : comm:" << comm
69  << " UPstream::worldComm:" << UPstream::worldComm
71  }
72 
73  Pout.prefix() = "";
74  Perr.prefix() = "";
75  }
76  else
77  {
78  parRun_ = true;
79  haveThreads_ = haveThreads;
80 
81  // Redo worldComm communicator (this has been created at static
82  // initialisation time)
84  label comm = allocateCommunicator(-1, identity(nProcs), true);
85  if (comm != UPstream::worldComm)
86  {
88  << "problem : comm:" << comm
89  << " UPstream::worldComm:" << UPstream::worldComm
91  }
92 
93  Pout.prefix() = '[' + name(myProcNo(comm)) + "] ";
94  Perr.prefix() = '[' + name(myProcNo(comm)) + "] ";
95  }
96 
97  if (debug)
98  {
99  Pout<< "UPstream::setParRun :"
100  << " nProcs:" << nProcs
101  << " haveThreads:" << haveThreads
102  << endl;
103  }
104 }
105 
106 
108 (
109  const label parentIndex,
110  const labelList& subRanks,
111  const bool doPstream
112 )
113 {
114  label index;
115  if (!freeComms_.empty())
116  {
117  index = freeComms_.remove(); // LIFO pop
118  }
119  else
120  {
121  // Extend storage
122  index = parentCommunicator_.size();
123 
124  myProcNo_.append(-1);
125  procIDs_.append(List<int>());
126  parentCommunicator_.append(-1);
127  linearCommunication_.append(List<commsStruct>());
128  treeCommunication_.append(List<commsStruct>());
129  }
130 
131  if (debug)
132  {
133  Pout<< "Communicators : Allocating communicator " << index << endl
134  << " parent : " << parentIndex << endl
135  << " procs : " << subRanks << endl
136  << endl;
137  }
138 
139  // Initialise; overwritten by allocatePstreamCommunicator
140  myProcNo_[index] = 0;
141 
142  // Convert from label to int
143  procIDs_[index].setSize(subRanks.size());
144  forAll(procIDs_[index], i)
145  {
146  procIDs_[index][i] = subRanks[i];
147 
148  // Enforce incremental order (so index is rank in next communicator)
149  if (i >= 1 && subRanks[i] <= subRanks[i-1])
150  {
152  << "subranks not sorted : " << subRanks
153  << " when allocating subcommunicator from parent "
154  << parentIndex
156  }
157  }
158  parentCommunicator_[index] = parentIndex;
159 
160  // Size but do not fill structure - this is done on-the-fly
161  linearCommunication_[index] = List<commsStruct>(procIDs_[index].size());
162  treeCommunication_[index] = List<commsStruct>(procIDs_[index].size());
163 
164  if (doPstream && parRun())
165  {
166  allocatePstreamCommunicator(parentIndex, index);
167  }
168 
169  return index;
170 }
171 
172 
174 (
175  const label communicator,
176  const bool doPstream
177 )
178 {
179  if (debug)
180  {
181  Pout<< "Communicators : Freeing communicator " << communicator << endl
182  << " parent : " << parentCommunicator_[communicator] << endl
183  << " myProcNo : " << myProcNo_[communicator] << endl
184  << endl;
185  }
186 
187  if (doPstream && parRun())
188  {
189  freePstreamCommunicator(communicator);
190  }
191  myProcNo_[communicator] = -1;
192  //procIDs_[communicator].clear();
193  parentCommunicator_[communicator] = -1;
194  linearCommunication_[communicator].clear();
195  treeCommunication_[communicator].clear();
196 
197  freeComms_.append(communicator); // LIFO push
198 }
199 
200 
201 void Foam::UPstream::freeCommunicators(const bool doPstream)
202 {
203  forAll(myProcNo_, communicator)
204  {
205  if (myProcNo_[communicator] != -1)
206  {
207  freeCommunicator(communicator, doPstream);
208  }
209  }
210 }
211 
212 
213 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
214 {
215  int procID = myProcID;
216  label comm = myComm;
217 
218  while (parent(comm) != -1)
219  {
220  const List<int>& parentRanks = UPstream::procID(comm);
221  procID = parentRanks[procID];
222  comm = UPstream::parent(comm);
223  }
224 
225  return procID;
226 }
227 
228 
229 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
230 {
231  const List<int>& parentRanks = procID(myComm);
232  label parentComm = parent(myComm);
233 
234  if (parentComm == -1)
235  {
236  return parentRanks.find(baseProcID);
237  }
238  else
239  {
240  const label parentRank = procNo(parentComm, baseProcID);
241  return parentRanks.find(parentRank);
242  }
243 }
244 
245 
246 Foam::label Foam::UPstream::procNo
247 (
248  const label myComm,
249  const label currentComm,
250  const int currentProcID
251 )
252 {
253  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
254  return procNo(myComm, physProcID);
255 }
256 
257 
258 template<>
261 {
262  UPstream::commsStruct& t = v_[procID];
263 
264  if (t.allBelow().size() + t.allNotBelow().size() + 1 != size())
265  {
266  // Not yet allocated
267 
268  label above(-1);
269  labelList below;
270  labelList allBelow;
271 
272  if (size() < UPstream::nProcsSimpleSum)
273  {
274  // Linear schedule
275 
276  if (procID == 0)
277  {
278  below.setSize(size()-1);
279  for (label procI = 1; procI < size(); procI++)
280  {
281  below[procI-1] = procI;
282  }
283  }
284  else
285  {
286  above = 0;
287  }
288  }
289  else
290  {
291  // Use tree like schedule. For 8 procs:
292  // (level 0)
293  // 0 receives from 1
294  // 2 receives from 3
295  // 4 receives from 5
296  // 6 receives from 7
297  // (level 1)
298  // 0 receives from 2
299  // 4 receives from 6
300  // (level 2)
301  // 0 receives from 4
302  //
303  // The sends/receives for all levels are collected per processor
304  // (one send per processor; multiple receives possible) creating
305  // a table:
306  //
307  // So per processor:
308  // proc receives from sends to
309  // ---- ------------- --------
310  // 0 1,2,4 -
311  // 1 - 0
312  // 2 3 0
313  // 3 - 2
314  // 4 5 0
315  // 5 - 4
316  // 6 7 4
317  // 7 - 6
318 
319  label mod = 0;
320 
321  for (label step = 1; step < size(); step = mod)
322  {
323  mod = step * 2;
324 
325  if (procID % mod)
326  {
327  above = procID - (procID % mod);
328  break;
329  }
330  else
331  {
332  for
333  (
334  label j = procID + step;
335  j < size() && j < procID + mod;
336  j += step
337  )
338  {
339  below.append(j);
340  }
341  for
342  (
343  label j = procID + step;
344  j < size() && j < procID + mod;
345  j++
346  )
347  {
348  allBelow.append(j);
349  }
350  }
351  }
352  }
353  t = UPstream::commsStruct(size(), procID, above, below, allBelow);
354  }
355  return t;
356 }
357 
358 
359 template<>
362 {
363  return const_cast<UList<UPstream::commsStruct>&>(*this).operator[](procID);
364 }
365 
366 
367 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
368 
369 bool Foam::UPstream::parRun_(false);
370 
371 bool Foam::UPstream::haveThreads_(false);
372 
373 int Foam::UPstream::msgType_(1);
374 
375 
376 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
377 
378 Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
379 
380 Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
381 
382 Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
383 
384 Foam::wordList Foam::UPstream::allWorlds_(Foam::one{}, "");
385 Foam::labelList Foam::UPstream::worldIDs_(Foam::one{}, 0);
386 
388 Foam::UPstream::linearCommunication_(10);
389 
391 Foam::UPstream::treeCommunication_(10);
392 
393 
394 // Allocate a serial communicator. This gets overwritten in parallel mode
395 // (by UPstream::setParRun())
397 (
398  -1,
400  false
401 );
402 
403 
405 (
406  Foam::debug::optimisationSwitch("floatTransfer", 0)
407 );
409 (
410  "floatTransfer",
411  bool,
413 );
414 
416 (
417  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
418 );
420 (
421  "nProcsSimpleSum",
422  int,
424 );
425 
427 (
428  commsTypeNames.get
429  (
430  "commsType",
432  )
433 );
434 
435 namespace Foam
436 {
437  // Register re-reader
439  :
441  {
442  public:
443 
444  addcommsTypeToOpt(const char* name)
445  :
447  {}
448 
449  virtual ~addcommsTypeToOpt() = default;
450 
451  virtual void readData(Foam::Istream& is)
452  {
455  }
456 
457  virtual void writeData(Foam::Ostream& os) const
458  {
460  }
461  };
462 
463  addcommsTypeToOpt addcommsTypeToOpt_("commsType");
464 }
465 
466 Foam::label Foam::UPstream::worldComm(0);
467 
468 Foam::label Foam::UPstream::warnComm(-1);
469 
471 (
472  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
473 );
475 (
476  "nPollProcInterfaces",
477  int,
479 );
480 
481 
483 (
484  Foam::debug::optimisationSwitch("maxCommsSize", 0)
485 );
487 (
488  "maxCommsSize",
489  int,
491 );
492 
493 
495 (
496  Foam::debug::optimisationSwitch("mpiBufferSize", 0)
497 );
498 
499 
500 // ************************************************************************* //
Foam::addcommsTypeToOpt::readData
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:451
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:296
Foam::labelList
List< label > labelList
A List of labels.
Definition: List.H:67
Foam::Enum
Enum is a wrapper around a list of names/values that represent particular enumeration (or int) values...
Definition: IOstreamOption.H:57
IOstreams.H
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
debug.H
Foam::UPstream::baseProcNo
static int baseProcNo(const label myComm, const int procID)
Definition: UPstream.C:213
UPstream.H
Foam::addcommsTypeToOpt::~addcommsTypeToOpt
virtual ~addcommsTypeToOpt()=default
Foam::DynamicList< int >
Foam::addcommsTypeToOpt
Definition: UPstream.C:438
Foam::one
A class representing the concept of 1 (one) that can be used to avoid manipulating objects known to b...
Definition: one.H:61
Foam::List::append
void append(const T &val)
Append an element at the end of the list.
Definition: ListI.H:175
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:369
Foam::UPstream::defaultCommsType
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:281
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::UPstream::haveThreads
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:439
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::addcommsTypeToOpt::addcommsTypeToOpt
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:444
Foam::UPstream::mpiBufferSize
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:290
Foam::UPstream::allocateCommunicator
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:108
Foam::UPstream::commsStruct::allNotBelow
const labelList & allNotBelow() const noexcept
Definition: UPstream.H:145
Foam::Istream
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition: Istream.H:61
Foam::List::setSize
void setSize(const label n)
Alias for resize()
Definition: List.H:222
Foam::UPstream::floatTransfer
static bool floatTransfer
Definition: UPstream.H:274
Foam::UPstream::procID
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:474
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:174
Foam::addcommsTypeToOpt_
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
Foam::UPstream::commsTypeNames
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:77
Foam::debug::optimisationSwitch
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:237
Foam::FatalError
error FatalError
os
OBJstream os(runTime.globalPath()/outputName)
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
registerOptSwitch
registerOptSwitch("floatTransfer", bool, Foam::UPstream::floatTransfer)
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
Foam::UPstream::commsStruct
Structure for communicating between processors.
Definition: UPstream.H:83
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::UPstream::nProcsSimpleSum
static int nProcsSimpleSum
Definition: UPstream.H:278
Foam::Perr
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Foam::UPstream::commsTypes
commsTypes
Types of communications.
Definition: UPstream.H:69
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
Foam::addcommsTypeToOpt::writeData
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:457
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:463
Foam::UPstream::maxCommsSize
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:287
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
Foam::List< label >
Foam::UList::operator[]
T & operator[](const label i)
Return element of UList.
Definition: UListI.H:299
Foam::UPstream::procNo
static label procNo(const label comm, const int baseProcID)
Definition: UPstream.C:229
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
dictionary.H
Foam::identity
labelList identity(const label len, label start=0)
Create identity map of the given length with (map[i] == i)
Definition: labelList.C:38
Foam::UPstream::freeCommunicators
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:201
Foam::UPstream::commsStruct::allBelow
const labelList & allBelow() const noexcept
Definition: UPstream.H:140
Foam::name
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:59
serialComm
Foam::UPstream::communicator serialComm(-1, Foam::labelList(Foam::one{}, 0), false)
registerSwitch.H
Foam::Ostream
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition: Ostream.H:56
Foam::UPstream::parent
static label parent(const label communicator)
Definition: UPstream.H:468
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:329
Foam::debug::optimisationSwitches
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict(s).
Definition: debug.C:219
Foam::Enum::read
EnumType read(Istream &is) const
Read a word from Istream and return the corresponding enumeration.
Definition: Enum.C:109
Foam::defineTypeNameAndDebug
defineTypeNameAndDebug(combustionModel, 0)
Foam::debug::addOptimisationObject
void addOptimisationObject(const char *name, simpleRegIOobject *obj)
Register optimisation switch read/write object.
Definition: debug.C:262
Foam::simpleRegIOobject
Abstract base class for registered object with I/O. Used in debug symbol registration.
Definition: simpleRegIOobject.H:52
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:445
Foam::UPstream::nPollProcInterfaces
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:284
Foam::prefixOSstream::prefix
const string & prefix() const noexcept
Return the stream prefix.
Definition: prefixOSstream.H:101