UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UPstream.H"
30 #include "debug.H"
31 #include "registerSwitch.H"
32 #include "dictionary.H"
33 #include "IOstreams.H"
34 
35 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39  defineTypeNameAndDebug(UPstream, 0);
40 }
41 
42 const Foam::Enum
43 <
45 >
47 ({
48  { commsTypes::blocking, "blocking" },
49  { commsTypes::scheduled, "scheduled" },
50  { commsTypes::nonBlocking, "nonBlocking" },
51 });
52 
53 
54 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
55 
56 void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
57 {
58  if (nProcs == 0)
59  {
60  parRun_ = false;
61  haveThreads_ = haveThreads;
62 
64  label comm = allocateCommunicator(-1, labelList(Foam::one{}, 0), false);
65  if (comm != UPstream::worldComm)
66  {
68  << "problem : comm:" << comm
69  << " UPstream::worldComm:" << UPstream::worldComm
71  }
72 
73  Pout.prefix() = "";
74  Perr.prefix() = "";
75  }
76  else
77  {
78  parRun_ = true;
79  haveThreads_ = haveThreads;
80 
81  // Redo worldComm communicator (this has been created at static
82  // initialisation time)
84  label comm = allocateCommunicator(-1, identity(nProcs), true);
85  if (comm != UPstream::worldComm)
86  {
88  << "problem : comm:" << comm
89  << " UPstream::worldComm:" << UPstream::worldComm
91  }
92 
93  Pout.prefix() = '[' + name(myProcNo(comm)) + "] ";
94  Perr.prefix() = '[' + name(myProcNo(comm)) + "] ";
95  }
96 }
97 
98 
100 (
101  const label parentIndex,
102  const labelList& subRanks,
103  const bool doPstream
104 )
105 {
106  label index;
107  if (!freeComms_.empty())
108  {
109  index = freeComms_.pop();
110  }
111  else
112  {
113  // Extend storage
114  index = parentCommunicator_.size();
115 
116  myProcNo_.append(-1);
117  procIDs_.append(List<int>(0));
118  parentCommunicator_.append(-1);
119  linearCommunication_.append(List<commsStruct>(0));
120  treeCommunication_.append(List<commsStruct>(0));
121  }
122 
123  if (debug)
124  {
125  Pout<< "Communicators : Allocating communicator " << index << endl
126  << " parent : " << parentIndex << endl
127  << " procs : " << subRanks << endl
128  << endl;
129  }
130 
131  // Initialise; overwritten by allocatePstreamCommunicator
132  myProcNo_[index] = 0;
133 
134  // Convert from label to int
135  procIDs_[index].setSize(subRanks.size());
136  forAll(procIDs_[index], i)
137  {
138  procIDs_[index][i] = subRanks[i];
139 
140  // Enforce incremental order (so index is rank in next communicator)
141  if (i >= 1 && subRanks[i] <= subRanks[i-1])
142  {
144  << "subranks not sorted : " << subRanks
145  << " when allocating subcommunicator from parent "
146  << parentIndex
148  }
149  }
150  parentCommunicator_[index] = parentIndex;
151 
152  // Size but do not fill structure - this is done on-the-fly
153  linearCommunication_[index] = List<commsStruct>(procIDs_[index].size());
154  treeCommunication_[index] = List<commsStruct>(procIDs_[index].size());
155 
156  if (doPstream && parRun())
157  {
158  allocatePstreamCommunicator(parentIndex, index);
159  }
160 
161  return index;
162 }
163 
164 
166 (
167  const label communicator,
168  const bool doPstream
169 )
170 {
171  if (debug)
172  {
173  Pout<< "Communicators : Freeing communicator " << communicator << endl
174  << " parent : " << parentCommunicator_[communicator] << endl
175  << " myProcNo : " << myProcNo_[communicator] << endl
176  << endl;
177  }
178 
179  if (doPstream && parRun())
180  {
181  freePstreamCommunicator(communicator);
182  }
183  myProcNo_[communicator] = -1;
184  //procIDs_[communicator].clear();
185  parentCommunicator_[communicator] = -1;
186  linearCommunication_[communicator].clear();
187  treeCommunication_[communicator].clear();
188 
189  freeComms_.push(communicator);
190 }
191 
192 
193 void Foam::UPstream::freeCommunicators(const bool doPstream)
194 {
195  forAll(myProcNo_, communicator)
196  {
197  if (myProcNo_[communicator] != -1)
198  {
199  freeCommunicator(communicator, doPstream);
200  }
201  }
202 }
203 
204 
205 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
206 {
207  int procID = myProcID;
208  label comm = myComm;
209 
210  while (parent(comm) != -1)
211  {
212  const List<int>& parentRanks = UPstream::procID(comm);
213  procID = parentRanks[procID];
214  comm = UPstream::parent(comm);
215  }
216 
217  return procID;
218 }
219 
220 
221 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
222 {
223  const List<int>& parentRanks = procID(myComm);
224  label parentComm = parent(myComm);
225 
226  if (parentComm == -1)
227  {
228  return parentRanks.find(baseProcID);
229  }
230  else
231  {
232  const label parentRank = procNo(parentComm, baseProcID);
233  return parentRanks.find(parentRank);
234  }
235 }
236 
237 
238 Foam::label Foam::UPstream::procNo
239 (
240  const label myComm,
241  const label currentComm,
242  const int currentProcID
243 )
244 {
245  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
246  return procNo(myComm, physProcID);
247 }
248 
249 
250 template<>
253 {
254  UPstream::commsStruct& t = v_[procID];
255 
256  if (t.allBelow().size() + t.allNotBelow().size() + 1 != size())
257  {
258  // Not yet allocated
259 
260  label above(-1);
261  labelList below;
262  labelList allBelow;
263 
264  if (size() < UPstream::nProcsSimpleSum)
265  {
266  // Linear schedule
267 
268  if (procID == 0)
269  {
270  below.setSize(size()-1);
271  for (label procI = 1; procI < size(); procI++)
272  {
273  below[procI-1] = procI;
274  }
275  }
276  else
277  {
278  above = 0;
279  }
280  }
281  else
282  {
283  // Use tree like schedule. For 8 procs:
284  // (level 0)
285  // 0 receives from 1
286  // 2 receives from 3
287  // 4 receives from 5
288  // 6 receives from 7
289  // (level 1)
290  // 0 receives from 2
291  // 4 receives from 6
292  // (level 2)
293  // 0 receives from 4
294  //
295  // The sends/receives for all levels are collected per processor
296  // (one send per processor; multiple receives possible) creating
297  // a table:
298  //
299  // So per processor:
300  // proc receives from sends to
301  // ---- ------------- --------
302  // 0 1,2,4 -
303  // 1 - 0
304  // 2 3 0
305  // 3 - 2
306  // 4 5 0
307  // 5 - 4
308  // 6 7 4
309  // 7 - 6
310 
311  label mod = 0;
312 
313  for (label step = 1; step < size(); step = mod)
314  {
315  mod = step * 2;
316 
317  if (procID % mod)
318  {
319  above = procID - (procID % mod);
320  break;
321  }
322  else
323  {
324  for
325  (
326  label j = procID + step;
327  j < size() && j < procID + mod;
328  j += step
329  )
330  {
331  below.append(j);
332  }
333  for
334  (
335  label j = procID + step;
336  j < size() && j < procID + mod;
337  j++
338  )
339  {
340  allBelow.append(j);
341  }
342  }
343  }
344  }
345  t = UPstream::commsStruct(size(), procID, above, below, allBelow);
346  }
347  return t;
348 }
349 
350 
351 template<>
354 {
355  return const_cast<UList<UPstream::commsStruct>&>(*this).operator[](procID);
356 }
357 
358 
359 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
360 
361 bool Foam::UPstream::parRun_(false);
362 
363 bool Foam::UPstream::haveThreads_(false);
364 
365 int Foam::UPstream::msgType_(1);
366 
367 
368 Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
369 
370 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
371 
372 Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
373 
374 Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
375 
376 Foam::wordList Foam::UPstream::allWorlds_(Foam::one{}, "");
377 Foam::labelList Foam::UPstream::worldIDs_(Foam::one{}, 0);
378 
380 Foam::UPstream::linearCommunication_(10);
381 
383 Foam::UPstream::treeCommunication_(10);
384 
385 
386 // Allocate a serial communicator. This gets overwritten in parallel mode
387 // (by UPstream::setParRun())
389 (
390  -1,
392  false
393 );
394 
395 
397 (
398  Foam::debug::optimisationSwitch("floatTransfer", 0)
399 );
401 (
402  "floatTransfer",
403  bool,
405 );
406 
408 (
409  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
410 );
412 (
413  "nProcsSimpleSum",
414  int,
416 );
417 
419 (
420  commsTypeNames.get
421  (
422  "commsType",
424  )
425 );
426 
427 namespace Foam
428 {
429  // Register re-reader
431  :
433  {
434  public:
435 
436  addcommsTypeToOpt(const char* name)
437  :
439  {}
440 
441  virtual ~addcommsTypeToOpt() = default;
442 
443  virtual void readData(Foam::Istream& is)
444  {
447  }
448 
449  virtual void writeData(Foam::Ostream& os) const
450  {
452  }
453  };
454 
455  addcommsTypeToOpt addcommsTypeToOpt_("commsType");
456 }
457 
458 Foam::label Foam::UPstream::worldComm(0);
459 
460 Foam::label Foam::UPstream::warnComm(-1);
461 
463 (
464  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
465 );
467 (
468  "nPollProcInterfaces",
469  int,
471 );
472 
473 
475 (
476  Foam::debug::optimisationSwitch("maxCommsSize", 0)
477 );
479 (
480  "maxCommsSize",
481  int,
483 );
484 
485 
487 (
488  Foam::debug::optimisationSwitch("mpiBufferSize", 0)
489 );
490 
491 
492 // ************************************************************************* //
Foam::addcommsTypeToOpt::readData
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:443
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:298
Foam::labelList
List< label > labelList
A List of labels.
Definition: List.H:71
Foam::Enum
Enum is a wrapper around a list of names/values that represent particular enumeration (or int) values...
Definition: IOstreamOption.H:57
IOstreams.H
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
debug.H
Foam::UPstream::baseProcNo
static int baseProcNo(const label myComm, const int procID)
Definition: UPstream.C:205
Foam::UPstream::haveThreads
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:440
UPstream.H
Foam::addcommsTypeToOpt::~addcommsTypeToOpt
virtual ~addcommsTypeToOpt()=default
Foam::DynamicList< int >
Foam::addcommsTypeToOpt
Definition: UPstream.C:430
Foam::one
A class representing the concept of 1 (one) that can be used to avoid manipulating objects known to b...
Definition: one.H:61
Foam::List::append
void append(const T &val)
Append an element at the end of the list.
Definition: ListI.H:182
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:350
Foam::UPstream::defaultCommsType
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:283
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::addcommsTypeToOpt::addcommsTypeToOpt
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:436
Foam::UPstream::commsStruct::allBelow
const labelList & allBelow() const
Definition: UPstream.H:142
Foam::UPstream::mpiBufferSize
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:292
Foam::UPstream::allocateCommunicator
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:100
Foam::Istream
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition: Istream.H:61
Foam::name
word name(const complex &c)
Return string representation of complex.
Definition: complex.C:76
Foam::UPstream::floatTransfer
static bool floatTransfer
Definition: UPstream.H:276
Foam::UPstream::procID
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:475
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:166
Foam::addcommsTypeToOpt_
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
Foam::UPstream::commsTypeNames
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:77
Foam::debug::optimisationSwitch
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:237
Foam::FatalError
error FatalError
Foam::UPstream::commsStruct::allNotBelow
const labelList & allNotBelow() const
Definition: UPstream.H:147
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
registerOptSwitch
registerOptSwitch("floatTransfer", bool, Foam::UPstream::floatTransfer)
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
Foam::UPstream::commsStruct
Structure for communicating between processors.
Definition: UPstream.H:83
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::UPstream::nProcsSimpleSum
static int nProcsSimpleSum
Definition: UPstream.H:280
Foam::Perr
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Foam::UPstream::commsTypes
commsTypes
Types of communications.
Definition: UPstream.H:69
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:381
Foam::addcommsTypeToOpt::writeData
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:449
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:464
Foam::UPstream::maxCommsSize
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:289
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:295
Foam::List< label >
Foam::UList::operator[]
T & operator[](const label i)
Return element of UList.
Definition: UListI.H:246
Foam::UPstream::procNo
static label procNo(const label comm, const int baseProcID)
Definition: UPstream.C:221
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
dictionary.H
Foam::identity
labelList identity(const label len, label start=0)
Create identity map of the given length with (map[i] == i)
Definition: labelList.C:38
Foam::UPstream::freeCommunicators
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:193
serialComm
Foam::UPstream::communicator serialComm(-1, Foam::labelList(Foam::one{}, 0), false)
registerSwitch.H
Foam::Ostream
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition: Ostream.H:56
Foam::UPstream::parent
static label parent(const label communicator)
Definition: UPstream.H:469
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:331
Foam::LIFOStack
A LIFO stack based on a singly-linked list.
Definition: LIFOStack.H:51
Foam::List::setSize
void setSize(const label newSize)
Alias for resize(const label)
Definition: ListI.H:146
Foam::debug::optimisationSwitches
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict(s).
Definition: debug.C:219
Foam::Enum::read
EnumType read(Istream &is) const
Read a word from Istream and return the corresponding enumeration.
Definition: Enum.C:109
Foam::defineTypeNameAndDebug
defineTypeNameAndDebug(combustionModel, 0)
Foam::prefixOSstream::prefix
const string & prefix() const
Return the stream prefix.
Definition: prefixOSstream.H:101
Foam::debug::addOptimisationObject
void addOptimisationObject(const char *name, simpleRegIOobject *obj)
Register optimisation switch read/write object.
Definition: debug.C:262
Foam::simpleRegIOobject
Abstract base class for registered object with I/O. Used in debug symbol registration.
Definition: simpleRegIOobject.H:52
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:446
Foam::UPstream::nPollProcInterfaces
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:286