UPstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::UPstream
29 
30 Description
31  Inter-processor communications stream
32 
33 SourceFiles
34  UPstream.C
35  UPstreamCommsStruct.C
36  gatherScatter.C
37  combineGatherScatter.C
38  gatherScatterList.C
39 
40 \*---------------------------------------------------------------------------*/
41 
42 #ifndef UPstream_H
43 #define UPstream_H
44 
45 #include "labelList.H"
46 #include "DynamicList.H"
47 #include "HashTable.H"
48 #include "string.H"
49 #include "Enum.H"
50 #include "ListOps.H"
51 #include "LIFOStack.H"
52 
53 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
54 
55 namespace Foam
56 {
57 
58 /*---------------------------------------------------------------------------*\
59  Class UPstream Declaration
60 \*---------------------------------------------------------------------------*/
61 
62 class UPstream
63 {
64 public:
65 
66  //- Types of communications
67  enum class commsTypes
68  {
69  blocking,
70  scheduled,
72  };
73 
74  //- Names of the communication types
75  static const Enum<commsTypes> commsTypeNames;
76 
77 
78  // Public classes
79 
80  //- Structure for communicating between processors
81  class commsStruct
82  {
83  // Private data
84 
85  //- procID of above processor
86  label above_;
87 
88  //- procIDs of processors directly below me
89  labelList below_;
90 
91  //- procIDs of all processors below (so not just directly below)
92  labelList allBelow_;
93 
94  //- procIDs of all processors not below.
95  // (inverse set of allBelow_ and minus myProcNo)
96  labelList allNotBelow_;
97 
98 
99  public:
100 
101  // Constructors
102 
103  //- Construct null
104  commsStruct();
105 
106  //- Construct from components
108  (
109  const label above,
110  const labelList& below,
111  const labelList& allBelow,
112  const labelList& allNotBelow
113  );
114 
115  //- Construct from components; construct allNotBelow_
117  (
118  const label nProcs,
119  const label myProcID,
120  const label above,
121  const labelList& below,
122  const labelList& allBelow
123  );
124 
125 
126  // Member Functions
127 
128  // Access
129 
130  label above() const
131  {
132  return above_;
133  }
134 
135  const labelList& below() const
136  {
137  return below_;
138  }
139 
140  const labelList& allBelow() const
141  {
142  return allBelow_;
143  }
144 
145  const labelList& allNotBelow() const
146  {
147  return allNotBelow_;
148  }
149 
150 
151  // Member operators
152 
153  bool operator==(const commsStruct&) const;
154 
155  bool operator!=(const commsStruct&) const;
156 
157 
158  // Ostream Operator
159 
160  friend Ostream& operator<<(Ostream&, const commsStruct&);
161  };
162 
163 
164  //- combineReduce operator for lists. Used for counting.
165  struct listEq
166  {
167  template<class T>
168  void operator()(T& x, const T& y) const
169  {
170  forAll(y, i)
171  {
172  if (y[i].size())
173  {
174  x[i] = y[i];
175  }
176  }
177  }
178  };
179 
180 
181 private:
182 
183  // Private data
184 
185  //- By default this is not a parallel run
186  static bool parRun_;
187 
188  //- Have support for threads?
189  static bool haveThreads_;
190 
191  //- Standard transfer message type
192  static int msgType_;
193 
194  // Communicator specific data
195 
196  //- Free communicators
197  static LIFOStack<label> freeComms_;
198 
199  //- My processor number
200  static DynamicList<int> myProcNo_;
201 
202  //- List of process IDs
203  static DynamicList<List<int>> procIDs_;
204 
205  //- Parent communicator
206  static DynamicList<label> parentCommunicator_;
207 
208  //- Linear communication schedule
209  static DynamicList<List<commsStruct>> linearCommunication_;
210 
211  //- Multi level communication schedule
212  static DynamicList<List<commsStruct>> treeCommunication_;
213 
214 
215  // Private Member Functions
216 
217  //- Set data for parallel running
218  static void setParRun(const label nProcs, const bool haveThreads);
219 
220  //- Calculate linear communication schedule
221  static List<commsStruct> calcLinearComm(const label nProcs);
222 
223  //- Calculate tree communication schedule
224  static List<commsStruct> calcTreeComm(const label nProcs);
225 
226  //- Helper function for tree communication schedule determination
227  // Collects all processorIDs below a processor
228  static void collectReceives
229  (
230  const label procID,
231  const List<DynamicList<label>>& receives,
232  DynamicList<label>& allReceives
233  );
234 
235  //- Allocate a communicator with index
236  static void allocatePstreamCommunicator
237  (
238  const label parentIndex,
239  const label index
240  );
241 
242  //- Free a communicator
243  static void freePstreamCommunicator
244  (
245  const label index
246  );
247 
248 
249 protected:
250 
251  // Protected data
252 
253  //- Communications type of this stream
255 
256 public:
257 
258  // Declare name of the class and its debug switch
259  ClassName("UPstream");
260 
261 
262  // Static data
263 
264  //- Should compact transfer be used in which floats replace doubles
265  //- reducing the bandwidth requirement at the expense of some loss
266  //- in accuracy
267  static bool floatTransfer;
268 
269  //- Number of processors at which the sum algorithm changes from linear
270  //- to tree
271  static int nProcsSimpleSum;
272 
273  //- Default commsType
275 
276  //- Number of polling cycles in processor updates
277  static int nPollProcInterfaces;
278 
279  //- Optional maximum message size (bytes)
280  static int maxCommsSize;
281 
282  //- MPI buffer-size (bytes)
283  static const int mpiBufferSize;
284 
285  //- Default communicator (all processors)
286  static label worldComm;
287 
288  //- Debugging: warn for use of any communicator differing from warnComm
289  static label warnComm;
290 
291 
292  // Constructors
293 
294  //- Construct given optional buffer size
296  :
298  {}
299 
300 
301  // Member functions
302 
303  //- Allocate a new communicator
304  static label allocateCommunicator
305  (
306  const label parent,
307  const labelList& subRanks,
308  const bool doPstream = true
309  );
310 
311  //- Free a previously allocated communicator
312  static void freeCommunicator
313  (
314  const label communicator,
315  const bool doPstream = true
316  );
317 
318  //- Free all communicators
319  static void freeCommunicators(const bool doPstream);
320 
321  //- Helper class for allocating/freeing communicators
322  class communicator
323  {
324  label comm_;
325 
326  //- No copy construct
327  communicator(const communicator&) = delete;
328 
329  //- No copy assignment
330  void operator=(const communicator&) = delete;
331 
332  public:
333 
335  (
336  const label parent,
337  const labelList& subRanks,
338  const bool doPstream
339  )
340  :
341  comm_(allocateCommunicator(parent, subRanks, doPstream))
342  {}
343 
344  ~communicator()
345  {
346  freeCommunicator(comm_);
347  }
348 
349  operator label() const
350  {
351  return comm_;
352  }
353  };
354 
355  //- Return physical processor number (i.e. processor number in
356  //- worldComm) given communicator and procssor
357  static int baseProcNo(const label myComm, const int procID);
358 
359  //- Return processor number in communicator (given physical processor
360  //- number) (= reverse of baseProcNo)
361  static label procNo(const label comm, const int baseProcID);
362 
363  //- Return processor number in communicator (given processor number
364  //- and communicator)
365  static label procNo
366  (
367  const label myComm,
368  const label currentComm,
369  const int currentProcID
370  );
371 
372  //- Add the valid option this type of communications library
373  //- adds/requires on the command line
374  static void addValidParOptions(HashTable<string>& validParOptions);
375 
376  //- Initialisation function called from main
377  // Spawns slave processes and initialises inter-communication
378  static bool init(int& argc, char**& argv, const bool needsThread);
379 
380  //- Special purpose initialisation function.
381  // Performs a basic MPI_Init without any other setup.
382  // Only used for applications that need MPI communication when
383  // OpenFOAM is running in a non-parallel mode.
384  // \note Behaves as a no-op if MPI has already been initialized.
385  // Fatal if MPI has already been finalized.
386  static bool initNull();
387 
388 
389  // Non-blocking comms
390 
391  //- Get number of outstanding requests
392  static label nRequests();
393 
394  //- Truncate number of outstanding requests
395  static void resetRequests(const label sz);
396 
397  //- Wait until all requests (from start onwards) have finished.
398  static void waitRequests(const label start = 0);
399 
400  //- Wait until request i has finished.
401  static void waitRequest(const label i);
402 
403  //- Non-blocking comms: has request i finished?
404  static bool finishedRequest(const label i);
405 
406  static int allocateTag(const char*);
407 
408  static int allocateTag(const word&);
409 
410  static void freeTag(const char*, const int tag);
411 
412  static void freeTag(const word&, const int tag);
413 
414 
415  //- Is this a parallel run?
416  static bool& parRun()
417  {
418  return parRun_;
419  }
420 
421  //- Have support for threads
422  static bool haveThreads()
423  {
424  return haveThreads_;
425  }
426 
427  //- Number of processes in parallel run
428  static label nProcs(const label communicator = 0)
429  {
430  return procIDs_[communicator].size();
431  }
432 
433  //- Process index of the master
434  static constexpr int masterNo() noexcept
435  {
436  return 0;
437  }
438 
439  //- Am I the master process
440  static bool master(const label communicator = 0)
441  {
442  return myProcNo_[communicator] == masterNo();
443  }
444 
445  //- Number of this process (starting from masterNo() = 0)
446  static int myProcNo(const label communicator = 0)
447  {
448  return myProcNo_[communicator];
449  }
450 
451  static label parent(const label communicator)
452  {
453  return parentCommunicator_(communicator);
454  }
455 
456  //- Process ID of given process index
457  static List<int>& procID(label communicator)
458  {
459  return procIDs_[communicator];
460  }
461 
462  //- Process index of first slave
463  static constexpr int firstSlave() noexcept
464  {
465  return 1;
466  }
467 
468  //- Process index of last slave
469  static int lastSlave(const label communicator = 0)
470  {
471  return nProcs(communicator) - 1;
472  }
473 
474  //- Communication schedule for linear all-to-master (proc 0)
476  (
477  const label communicator = 0
478  )
479  {
480  return linearCommunication_[communicator];
481  }
482 
483  //- Communication schedule for tree all-to-master (proc 0)
485  (
486  const label communicator = 0
487  )
488  {
489  return treeCommunication_[communicator];
490  }
491 
492  //- Message tag of standard messages
493  static int& msgType()
494  {
495  return msgType_;
496  }
497 
498 
499  //- Get the communications type of the stream
500  commsTypes commsType() const
501  {
502  return commsType_;
503  }
504 
505  //- Set the communications type of the stream
507  {
508  commsTypes oldCommsType = commsType_;
509  commsType_ = ct;
510  return oldCommsType;
511  }
512 
513 
514  //- Shutdown (finalize) MPI as required.
515  // Uses MPI_Abort instead of MPI_Finalize if errNo is non-zero
516  static void shutdown(int errNo = 0);
517 
518  //- Call MPI_Abort with no other checks or cleanup
519  static void abort();
520 
521  //- Shutdown (finalize) MPI as required and exit program with errNo.
522  static void exit(int errNo = 1);
523 
524  //- Exchange label with all processors (in the communicator).
525  // sendData[proci] is the label to send to proci.
526  // After return recvData contains the data from the other processors.
527  static void allToAll
528  (
529  const labelUList& sendData,
530  labelUList& recvData,
531  const label communicator = 0
532  );
533 
534  //- Exchange data with all processors (in the communicator)
535  // sendSizes, sendOffsets give (per processor) the slice of
536  // sendData to send, similarly recvSizes, recvOffsets give the slice
537  // of recvData to receive
538  static void allToAll
539  (
540  const char* sendData,
541  const UList<int>& sendSizes,
542  const UList<int>& sendOffsets,
543 
544  char* recvData,
545  const UList<int>& recvSizes,
546  const UList<int>& recvOffsets,
547 
548  const label communicator = 0
549  );
550 
551  //- Receive data from all processors on the master
552  static void gather
553  (
554  const char* sendData,
555  int sendSize,
556 
557  char* recvData,
558  const UList<int>& recvSizes,
559  const UList<int>& recvOffsets,
560  const label communicator = 0
561  );
562 
563  //- Send data to all processors from the root of the communicator
564  static void scatter
565  (
566  const char* sendData,
567  const UList<int>& sendSizes,
568  const UList<int>& sendOffsets,
569 
570  char* recvData,
571  int recvSize,
572  const label communicator = 0
573  );
574 };
575 
576 
577 Ostream& operator<<(Ostream&, const UPstream::commsStruct&);
578 
579 // Template specialisation for access of commsStruct
580 template<>
581 UPstream::commsStruct&
583 
584 template<>
585 const UPstream::commsStruct&
587 
588 
589 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
590 
591 } // End namespace Foam
592 
593 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
594 
595 #endif
596 
597 // ************************************************************************* //
Foam::UPstream::allocateTag
static int allocateTag(const char *)
Definition: UPstream.C:1137
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:288
Foam::UPstream::commsTypes::blocking
Foam::UPstream::resetRequests
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:230
Foam::UPstream::commsTypes::nonBlocking
Foam::Enum< commsTypes >
Foam::UPstream::masterNo
static constexpr int masterNo() noexcept
Process index of the master.
Definition: UPstream.H:433
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:62
HashTable.H
Foam::UPstream::communicator::~communicator
~communicator()
Definition: UPstream.H:343
Foam::UPstream::commsStruct::commsStruct
commsStruct()
Construct null.
Definition: UPstreamCommsStruct.C:33
Foam::UPstream::baseProcNo
static int baseProcNo(const label myComm, const int procID)
Definition: UPstream.C:205
Foam::UPstream::haveThreads
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:421
Foam::DynamicList< int >
Foam::UPstream::nProcs
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:427
Foam::UPstream::parRun
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:415
Foam::UPstream::listEq
combineReduce operator for lists. Used for counting.
Definition: UPstream.H:164
Foam::UPstream::commsStruct::operator<<
friend Ostream & operator<<(Ostream &, const commsStruct &)
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:234
string.H
Foam::UPstream::abort
static void abort()
Call MPI_Abort with no other checks or cleanup.
Definition: UPstream.C:70
Foam::UPstream::defaultCommsType
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:273
Foam::UPstream::allToAll
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:172
Foam::UPstream::gather
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
Definition: UPstream.C:183
Foam::UPstream::UPstream
UPstream(const commsTypes commsType)
Construct given optional buffer size.
Definition: UPstream.H:294
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::UPstream::commsStruct::operator==
bool operator==(const commsStruct &) const
Definition: UPstreamCommsStruct.C:95
Foam::UPstream::commsStruct::allBelow
const labelList & allBelow() const
Definition: UPstream.H:139
Foam::operator<<
Ostream & operator<<(Ostream &, const boundaryPatch &p)
Write boundaryPatch as dictionary entries (without surrounding braces)
Definition: boundaryPatch.C:83
Foam::UPstream::mpiBufferSize
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:282
Foam::UPstream::allocateCommunicator
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:100
Foam::UPstream
Inter-processor communications stream.
Definition: UPstream.H:61
labelList.H
Foam::UPstream::lastSlave
static int lastSlave(const label communicator=0)
Process index of last slave.
Definition: UPstream.H:468
Foam::UPstream::floatTransfer
static bool floatTransfer
Definition: UPstream.H:266
Foam::T
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
Definition: FieldFieldFunctions.C:58
Foam::UPstream::waitRequest
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:238
Foam::UPstream::commsType
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:499
Foam::UPstream::addValidParOptions
static void addValidParOptions(HashTable< string > &validParOptions)
Definition: UPstream.C:34
Foam::UPstream::procID
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:456
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:166
Foam::UPstream::commsTypeNames
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:74
Foam::UPstream::commsTypes::scheduled
Foam::UPstream::scatter
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
Definition: UPstream.C:198
Foam::UPstream::commsStruct::allNotBelow
const labelList & allNotBelow() const
Definition: UPstream.H:144
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
Foam::UPstream::commsStruct::above
label above() const
Definition: UPstream.H:129
Foam::UPstream::commsStruct
Structure for communicating between processors.
Definition: UPstream.H:80
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:445
Foam::UPstream::master
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:439
Foam::UPstream::nProcsSimpleSum
static int nProcsSimpleSum
Definition: UPstream.H:270
Foam::UPstream::firstSlave
static constexpr int firstSlave() noexcept
Process index of first slave.
Definition: UPstream.H:462
Foam::HashTable
A HashTable similar to std::unordered_map.
Definition: HashTable.H:105
Foam::UPstream::msgType
static int & msgType()
Message tag of standard messages.
Definition: UPstream.H:492
Foam::UPstream::commsTypes
commsTypes
Types of communications.
Definition: UPstream.H:66
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:224
Foam::UPstream::listEq::operator()
void operator()(T &x, const T &y) const
Definition: UPstream.H:167
Foam::UPstream::commsType_
commsTypes commsType_
Communications type of this stream.
Definition: UPstream.H:253
Foam::UPstream::initNull
static bool initNull()
Special purpose initialisation function.
Definition: UPstream.C:38
Foam::UPstream::maxCommsSize
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:279
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:285
Foam::List< label >
Foam::UList::operator[]
T & operator[](const label i)
Return element of UList.
Definition: UListI.H:246
Foam::UPstream::procNo
static label procNo(const label comm, const int baseProcID)
Definition: UPstream.C:221
Foam::UList< label >
Foam::UPstream::ClassName
ClassName("UPstream")
Foam::UPstream::commsType
commsTypes commsType(const commsTypes ct)
Set the communications type of the stream.
Definition: UPstream.H:505
Foam::UPstream::shutdown
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
Definition: UPstream.C:59
Foam::UPstream::finishedRequest
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:242
x
x
Definition: LISASMDCalcMethod2.H:52
Foam::UPstream::freeCommunicators
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:193
DynamicList.H
ListOps.H
Various functions to operate on Lists.
Foam::Ostream
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition: Ostream.H:56
Foam::UPstream::parent
static label parent(const label communicator)
Definition: UPstream.H:450
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:321
Foam::UPstream::linearCommunication
static const List< commsStruct > & linearCommunication(const label communicator=0)
Communication schedule for linear all-to-master (proc 0)
Definition: UPstream.H:475
Foam::LIFOStack< label >
Foam::UPstream::init
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition: UPstream.C:48
Foam::UPstream::commsStruct::operator!=
bool operator!=(const commsStruct &) const
Definition: UPstreamCommsStruct.C:107
LIFOStack.H
Foam::UPstream::nPollProcInterfaces
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:276
Foam::UPstream::commsStruct::below
const labelList & below() const
Definition: UPstream.H:134
Foam::UPstream::treeCommunication
static const List< commsStruct > & treeCommunication(const label communicator=0)
Communication schedule for tree all-to-master (proc 0)
Definition: UPstream.H:484
y
scalar y
Definition: LISASMDCalcMethod1.H:14
Enum.H
Foam::UPstream::exit
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
Definition: UPstream.C:63
Foam::UPstream::freeTag
static void freeTag(const char *, const int tag)
Definition: UPstream.C:1195