UPstream.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2015-2022 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "UPstream.H"
30#include "debug.H"
31#include "registerSwitch.H"
32#include "dictionary.H"
33#include "IOstreams.H"
34
35// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36
37namespace Foam
38{
40}
41
42const Foam::Enum
43<
45>
47({
48 { commsTypes::blocking, "blocking" },
49 { commsTypes::scheduled, "scheduled" },
50 { commsTypes::nonBlocking, "nonBlocking" },
51});
52
53
54// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
55
56void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
57{
58 if (nProcs == 0)
59 {
60 parRun_ = false;
61 haveThreads_ = haveThreads;
62
64 label comm = allocateCommunicator(-1, labelList(Foam::one{}, 0), false);
65 if (comm != UPstream::worldComm)
66 {
68 << "problem : comm:" << comm
69 << " UPstream::worldComm:" << UPstream::worldComm
71 }
72
73 Pout.prefix() = "";
74 Perr.prefix() = "";
75 }
76 else
77 {
78 parRun_ = true;
79 haveThreads_ = haveThreads;
80
81 // Redo worldComm communicator (this has been created at static
82 // initialisation time)
84 label comm = allocateCommunicator(-1, identity(nProcs), true);
85 if (comm != UPstream::worldComm)
86 {
88 << "problem : comm:" << comm
89 << " UPstream::worldComm:" << UPstream::worldComm
91 }
92
93 Pout.prefix() = '[' + name(myProcNo(comm)) + "] ";
94 Perr.prefix() = '[' + name(myProcNo(comm)) + "] ";
95 }
96
97 if (debug)
98 {
99 Pout<< "UPstream::setParRun :"
100 << " nProcs:" << nProcs
101 << " haveThreads:" << haveThreads
102 << endl;
103 }
104}
105
106
108(
109 const label parentIndex,
110 const labelList& subRanks,
111 const bool doPstream
112)
113{
114 label index;
115 if (!freeComms_.empty())
116 {
117 index = freeComms_.remove(); // LIFO pop
118 }
119 else
120 {
121 // Extend storage
122 index = parentCommunicator_.size();
123
124 myProcNo_.append(-1);
125 procIDs_.append(List<int>());
126 parentCommunicator_.append(-1);
127 linearCommunication_.append(List<commsStruct>());
128 treeCommunication_.append(List<commsStruct>());
129 }
130
131 if (debug)
132 {
133 Pout<< "Communicators : Allocating communicator " << index << endl
134 << " parent : " << parentIndex << endl
135 << " procs : " << subRanks << endl
136 << endl;
137 }
138
139 // Initialise; overwritten by allocatePstreamCommunicator
140 myProcNo_[index] = 0;
141
142 // Convert from label to int
143 procIDs_[index].setSize(subRanks.size());
144 forAll(procIDs_[index], i)
145 {
146 procIDs_[index][i] = subRanks[i];
147
148 // Enforce incremental order (so index is rank in next communicator)
149 if (i >= 1 && subRanks[i] <= subRanks[i-1])
150 {
152 << "subranks not sorted : " << subRanks
153 << " when allocating subcommunicator from parent "
154 << parentIndex
156 }
157 }
158 parentCommunicator_[index] = parentIndex;
159
160 // Size but do not fill structure - this is done on-the-fly
161 linearCommunication_[index] = List<commsStruct>(procIDs_[index].size());
162 treeCommunication_[index] = List<commsStruct>(procIDs_[index].size());
163
164 if (doPstream && parRun())
165 {
166 allocatePstreamCommunicator(parentIndex, index);
167 }
168
169 return index;
170}
171
172
174(
175 const label communicator,
176 const bool doPstream
177)
178{
179 if (debug)
180 {
181 Pout<< "Communicators : Freeing communicator " << communicator << endl
182 << " parent : " << parentCommunicator_[communicator] << endl
183 << " myProcNo : " << myProcNo_[communicator] << endl
184 << endl;
185 }
186
187 if (doPstream && parRun())
188 {
189 freePstreamCommunicator(communicator);
190 }
191 myProcNo_[communicator] = -1;
192 //procIDs_[communicator].clear();
193 parentCommunicator_[communicator] = -1;
194 linearCommunication_[communicator].clear();
195 treeCommunication_[communicator].clear();
196
197 freeComms_.append(communicator); // LIFO push
198}
199
200
201void Foam::UPstream::freeCommunicators(const bool doPstream)
202{
203 forAll(myProcNo_, communicator)
204 {
205 if (myProcNo_[communicator] != -1)
206 {
207 freeCommunicator(communicator, doPstream);
208 }
209 }
210}
211
212
213int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
214{
215 int procID = myProcID;
216 label comm = myComm;
217
218 while (parent(comm) != -1)
219 {
220 const List<int>& parentRanks = UPstream::procID(comm);
221 procID = parentRanks[procID];
222 comm = UPstream::parent(comm);
223 }
224
225 return procID;
226}
227
228
229Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
230{
231 const List<int>& parentRanks = procID(myComm);
232 label parentComm = parent(myComm);
233
234 if (parentComm == -1)
235 {
236 return parentRanks.find(baseProcID);
237 }
238 else
239 {
240 const label parentRank = procNo(parentComm, baseProcID);
241 return parentRanks.find(parentRank);
242 }
243}
244
245
247(
248 const label myComm,
249 const label currentComm,
250 const int currentProcID
251)
252{
253 label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
254 return procNo(myComm, physProcID);
255}
256
257
258template<>
261{
262 UPstream::commsStruct& t = v_[procID];
263
264 if (t.allBelow().size() + t.allNotBelow().size() + 1 != size())
265 {
266 // Not yet allocated
267
268 label above(-1);
269 labelList below;
270 labelList allBelow;
271
272 if (size() < UPstream::nProcsSimpleSum)
273 {
274 // Linear schedule
275
276 if (procID == 0)
277 {
278 below.setSize(size()-1);
279 for (label procI = 1; procI < size(); procI++)
280 {
281 below[procI-1] = procI;
282 }
283 }
284 else
285 {
286 above = 0;
287 }
288 }
289 else
290 {
291 // Use tree like schedule. For 8 procs:
292 // (level 0)
293 // 0 receives from 1
294 // 2 receives from 3
295 // 4 receives from 5
296 // 6 receives from 7
297 // (level 1)
298 // 0 receives from 2
299 // 4 receives from 6
300 // (level 2)
301 // 0 receives from 4
302 //
303 // The sends/receives for all levels are collected per processor
304 // (one send per processor; multiple receives possible) creating
305 // a table:
306 //
307 // So per processor:
308 // proc receives from sends to
309 // ---- ------------- --------
310 // 0 1,2,4 -
311 // 1 - 0
312 // 2 3 0
313 // 3 - 2
314 // 4 5 0
315 // 5 - 4
316 // 6 7 4
317 // 7 - 6
318
319 label mod = 0;
320
321 for (label step = 1; step < size(); step = mod)
322 {
323 mod = step * 2;
324
325 if (procID % mod)
326 {
327 above = procID - (procID % mod);
328 break;
329 }
330 else
331 {
332 for
333 (
334 label j = procID + step;
335 j < size() && j < procID + mod;
336 j += step
337 )
338 {
339 below.append(j);
340 }
341 for
342 (
343 label j = procID + step;
344 j < size() && j < procID + mod;
345 j++
346 )
347 {
348 allBelow.append(j);
349 }
350 }
351 }
352 }
353 t = UPstream::commsStruct(size(), procID, above, below, allBelow);
354 }
355 return t;
356}
357
358
359template<>
362{
363 return const_cast<UList<UPstream::commsStruct>&>(*this).operator[](procID);
364}
365
366
367// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
368
369bool Foam::UPstream::parRun_(false);
370
371bool Foam::UPstream::haveThreads_(false);
372
373int Foam::UPstream::msgType_(1);
374
375
376Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
377
378Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
379
380Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
381
382Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
383
384Foam::wordList Foam::UPstream::allWorlds_(Foam::one{}, "");
385Foam::labelList Foam::UPstream::worldIDs_(Foam::one{}, 0);
386
388Foam::UPstream::linearCommunication_(10);
389
391Foam::UPstream::treeCommunication_(10);
392
393
394// Allocate a serial communicator. This gets overwritten in parallel mode
395// (by UPstream::setParRun())
397(
398 -1,
400 false
401);
402
403
405(
406 Foam::debug::optimisationSwitch("floatTransfer", 0)
407);
409(
410 "floatTransfer",
411 bool,
413);
414
416(
417 Foam::debug::optimisationSwitch("nProcsSimpleSum", 0)
418);
420(
421 "nProcsSimpleSum",
422 int,
424);
425
427(
428 commsTypeNames.get
429 (
430 "commsType",
432 )
433);
434
435namespace Foam
436{
437 // Register re-reader
439 :
441 {
442 public:
443
445 :
446 ::Foam::simpleRegIOobject(Foam::debug::addOptimisationObject, name)
447 {}
448
449 virtual ~addcommsTypeToOpt() = default;
450
451 virtual void readData(Foam::Istream& is)
452 {
455 }
456
457 virtual void writeData(Foam::Ostream& os) const
458 {
460 }
461 };
462
464}
465
466Foam::label Foam::UPstream::worldComm(0);
467
468Foam::label Foam::UPstream::warnComm(-1);
469
471(
472 Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
473);
475(
476 "nPollProcInterfaces",
477 int,
479);
480
481
483(
484 Foam::debug::optimisationSwitch("maxCommsSize", 0)
485);
487(
488 "maxCommsSize",
489 int,
491);
492
493
495(
496 Foam::debug::optimisationSwitch("mpiBufferSize", 0)
497);
498
499
500// ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Foam::UPstream::communicator serialComm(-1, Foam::labelList(Foam::one{}, 0), false)
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition: DynamicList.H:72
Enum is a wrapper around a list of names/values that represent particular enumeration (or int) values...
Definition: Enum.H:61
EnumType read(Istream &is) const
Read a word from Istream and return the corresponding enumeration.
Definition: Enum.C:109
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition: Istream.H:64
void setSize(const label n)
Alias for resize()
Definition: List.H:218
void append(const T &val)
Append an element at the end of the list.
Definition: ListI.H:175
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition: Ostream.H:62
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:94
label find(const T &val, label pos=0) const
Find index of the first occurrence of the value.
Definition: UList.C:212
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
T & operator[](const label i)
Return element of UList.
Definition: UListI.H:299
Structure for communicating between processors.
Definition: UPstream.H:81
const labelList & allBelow() const noexcept
Definition: UPstream.H:141
const labelList & allNotBelow() const noexcept
Definition: UPstream.H:148
Helper class for allocating/freeing communicators.
Definition: UPstream.H:330
Inter-processor communications stream.
Definition: UPstream.H:59
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:463
commsTypes
Types of communications.
Definition: UPstream.H:67
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:296
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:201
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
Definition: UPstream.H:74
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:174
static bool floatTransfer
Definition: UPstream.H:275
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:290
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:445
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:439
static int nProcsSimpleSum
Number of processors to change from linear to tree communication.
Definition: UPstream.H:278
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:284
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:108
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:287
static label procNo(const label comm, const int baseProcID)
Definition: UPstream.C:229
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:281
static int baseProcNo(const label myComm, const int procID)
Definition: UPstream.C:213
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:451
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:457
virtual ~addcommsTypeToOpt()=default
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:444
A class representing the concept of 1 (one) that can be used to avoid manipulating objects known to b...
Definition: one.H:62
const string & prefix() const noexcept
Return the stream prefix.
Abstract base class for registered object with I/O. Used in debug symbol registration.
splitCell * parent() const
Definition: splitCell.H:103
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
Definition: className.H:121
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
OBJstream os(runTime.globalPath()/outputName)
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict(s).
Definition: debug.C:219
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:237
Namespace for OpenFOAM.
labelList identity(const label len, label start=0)
Return an identity map of the given length with (map[i] == i)
Definition: labelList.C:38
List< label > labelList
A List of labels.
Definition: List.H:66
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
errorManip< error > abort(error &err)
Definition: errorManip.H:144
error FatalError
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:59
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
#define registerOptSwitch(Name, Type, SwitchVar)
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333