UPstream.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2016-2022 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "Pstream.H"
30#include "PstreamReduceOps.H"
31#include "PstreamGlobals.H"
32#include "profilingPstream.H"
33#include "SubList.H"
34#include "UPstreamWrapping.H"
35#include "int.H"
37
38#include <mpi.h>
39#include <cstring>
40#include <cstdlib>
41#include <csignal>
42
43// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
44
45// The min value and default for MPI buffers length
46constexpr int minBufLen = 20000000;
47
48// Track if we have attached MPI buffers
49static bool ourBuffers = false;
50
51// Track if we initialized MPI
52static bool ourMpi = false;
53
54
55// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
56
57static void attachOurBuffers()
58{
59 if (ourBuffers)
60 {
61 return; // Already attached
62 }
63 ourBuffers = true;
64
65 // Use UPstream::mpiBufferSize (optimisationSwitch),
66 // but allow override with MPI_BUFFER_SIZE env variable (int value)
67
68#ifndef SGIMPI
69 int len = 0;
70
71 const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
72 if (str.empty() || !Foam::read(str, len) || len <= 0)
73 {
75 }
76
77 if (len < minBufLen)
78 {
79 len = minBufLen;
80 }
81
82 if (Foam::UPstream::debug)
83 {
84 Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
85 }
86
87 char* buf = new char[len];
88
89 if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
90 {
91 delete[] buf;
92 Foam::Pout<< "UPstream::init : could not attach buffer\n";
93 }
94#endif
95}
96
97
98static void detachOurBuffers()
99{
100 if (!ourBuffers)
101 {
102 return; // Nothing to detach
103 }
104 ourBuffers = false;
105
106 // Some MPI notes suggest that the return code is MPI_SUCCESS when
107 // no buffer is attached.
108 // Be extra careful and require a non-zero size as well.
109
110#ifndef SGIMPI
111 int len = 0;
112 char* buf = nullptr;
113
114 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
115 {
116 delete[] buf;
117 }
118#endif
119}
120
121
122// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
123
124// NOTE:
125// valid parallel options vary between implementations, but flag common ones.
126// if they are not removed by MPI_Init(), the subsequent argument processing
127// will notice that they are wrong
128void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
129{
130 validParOptions.insert("np", "");
131 validParOptions.insert("p4pg", "PI file");
132 validParOptions.insert("p4wd", "directory");
133 validParOptions.insert("p4amslave", "");
134 validParOptions.insert("p4yourname", "hostname");
135 validParOptions.insert("machinefile", "machine file");
136}
137
138
140{
141 int flag = 0;
142
143 MPI_Finalized(&flag);
144 if (flag)
145 {
146 // Already finalized - this is an error
148 << "MPI was already finalized - cannot perform MPI_Init\n"
150
151 return false;
152 }
153
154 MPI_Initialized(&flag);
155 if (flag)
156 {
157 if (debug)
158 {
159 Pout<< "UPstream::initNull : was already initialized\n";
160 }
161 }
162 else
163 {
164 // Not already initialized
165
166 MPI_Init_thread
167 (
168 nullptr, // argc
169 nullptr, // argv
170 MPI_THREAD_SINGLE,
171 &flag // provided_thread_support
172 );
173
174 ourMpi = true;
175 }
176
177 // Could also attach buffers etc.
178
179 return true;
180}
181
182
183bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
184{
185 int numprocs = 0, myRank = 0;
186 int provided_thread_support = 0;
187 int flag = 0;
188
189 MPI_Finalized(&flag);
190 if (flag)
191 {
192 // Already finalized - this is an error
194 << "MPI was already finalized - cannot perform MPI_Init" << endl
196
197 return false;
198 }
199
200 MPI_Initialized(&flag);
201 if (flag)
202 {
203 // Already initialized.
204 // Warn if we've called twice, but skip if initialized externally
205
206 if (ourMpi)
207 {
209 << "MPI was already initialized - cannot perform MPI_Init" << nl
210 << "This could indicate an application programming error!"
211 << endl;
212
213 return true;
214 }
215 else if (debug)
216 {
217 Pout<< "UPstream::init : was already initialized\n";
218 }
219 }
220 else
221 {
222 MPI_Init_thread
223 (
224 &argc,
225 &argv,
226 (
227 needsThread
228 ? MPI_THREAD_MULTIPLE
229 : MPI_THREAD_SINGLE
230 ),
231 &provided_thread_support
232 );
233
234 ourMpi = true;
235 }
236
237 // Check argument list for local world
238 label worldIndex = -1;
239 word world;
240 for (int argi = 1; argi < argc; ++argi)
241 {
242 if (strcmp(argv[argi], "-world") == 0)
243 {
244 worldIndex = argi++;
245 if (argi >= argc)
246 {
248 << "Missing world name to argument \"world\""
250 }
251 world = argv[argi];
252 break;
253 }
254 }
255
256 // Filter 'world' option
257 if (worldIndex != -1)
258 {
259 for (label i = worldIndex+2; i < argc; i++)
260 {
261 argv[i-2] = argv[i];
262 }
263 argc -= 2;
264 }
265
266 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
267 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
268
269 if (debug)
270 {
271 Pout<< "UPstream::init :"
272 << " thread-support : wanted:" << needsThread
273 << " obtained:"
274 << (
275 provided_thread_support == MPI_THREAD_MULTIPLE
276 ? "MPI_THREAD_MULTIPLE"
277 : "MPI_THREAD_SINGLE"
278 )
279 << " procs:" << numprocs
280 << " rank:" << myRank
281 << " world:" << world << endl;
282 }
283
284 if (worldIndex == -1 && numprocs <= 1)
285 {
287 << "attempt to run parallel on 1 processor"
289 }
290
291 // Initialise parallel structure
292 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
293
294 if (worldIndex != -1)
295 {
296 wordList worlds(numprocs);
297 worlds[Pstream::myProcNo()] = world;
298 Pstream::gatherList(worlds);
299 Pstream::broadcast(worlds);
300
301 // Compact
302 if (Pstream::master())
303 {
304 DynamicList<word> allWorlds(numprocs);
305 for (const word& world : worlds)
306 {
307 allWorlds.appendUniq(world);
308 }
309 allWorlds_ = std::move(allWorlds);
310
311 worldIDs_.setSize(numprocs);
312 forAll(worlds, proci)
313 {
314 const word& world = worlds[proci];
315 worldIDs_[proci] = allWorlds_.find(world);
316 }
317 }
318 Pstream::broadcasts(UPstream::worldComm, allWorlds_, worldIDs_);
319
320 DynamicList<label> subRanks;
321 forAll(worlds, proci)
322 {
323 if (worlds[proci] == worlds[Pstream::myProcNo()])
324 {
325 subRanks.append(proci);
326 }
327 }
328
329 // Allocate new communicator 1 with parent 0 (= mpi_world)
330 const label subComm = allocateCommunicator(0, subRanks, true);
331
332 // Override worldComm
333 UPstream::worldComm = subComm;
334 // For testing: warn use of non-worldComm
336
337 if (debug)
338 {
339 // Check
340 int subNProcs, subRank;
341 MPI_Comm_size
342 (
344 &subNProcs
345 );
346 MPI_Comm_rank
347 (
349 &subRank
350 );
351
352 Pout<< "UPstream::init : in world:" << world
353 << " using local communicator:" << subComm
354 << " with procs:" << subNProcs
355 << " and rank:" << subRank
356 << endl;
357 }
358
359 // Override Pout prefix (move to setParRun?)
360 Pout.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
361 Perr.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
362 }
363 else
364 {
365 // All processors use world 0
366 worldIDs_.setSize(numprocs, 0);
367 }
368
370
371 return true;
372}
373
374
375void Foam::UPstream::shutdown(int errNo)
376{
377 if (debug)
378 {
379 Pout<< "UPstream::shutdown\n";
380 }
381
382 int flag = 0;
383
384 MPI_Initialized(&flag);
385 if (!flag)
386 {
387 // No MPI initialized - we are done
388 return;
389 }
390
391 MPI_Finalized(&flag);
392 if (flag)
393 {
394 // Already finalized elsewhere?
395 if (ourMpi)
396 {
398 << "MPI was already finalized (by a connected program?)\n";
399 }
400 else if (debug)
401 {
402 Pout<< "UPstream::shutdown : was already finalized\n";
403 }
404 }
405 else
406 {
408 }
409
410
411 // Warn about any outstanding requests
412 {
413 label nOutstanding = 0;
414
416 {
418 {
419 ++nOutstanding;
420 }
421 }
422
424
425 if (nOutstanding)
426 {
428 << "There were still " << nOutstanding
429 << " outstanding MPI_Requests." << nl
430 << "Which means your code exited before doing a "
431 << " UPstream::waitRequests()." << nl
432 << "This should not happen for a normal code exit."
433 << nl;
434 }
435 }
436
437 // Clean mpi communicators
438 forAll(myProcNo_, communicator)
439 {
440 if (myProcNo_[communicator] != -1)
441 {
442 freePstreamCommunicator(communicator);
443 }
444 }
445
446 if (!flag)
447 {
448 // MPI not already finalized
449
450 if (!ourMpi)
451 {
453 << "Finalizing MPI, but was initialized elsewhere\n";
454 }
455
456 if (errNo == 0)
457 {
458 MPI_Finalize();
459 }
460 else
461 {
462 // Abort only locally or world?
463 MPI_Abort(MPI_COMM_WORLD, errNo);
464 }
465 }
466}
467
468
469void Foam::UPstream::exit(int errNo)
470{
471 UPstream::shutdown(errNo);
472 std::exit(errNo);
473}
474
475
477{
478 MPI_Abort(MPI_COMM_WORLD, 1);
479}
480
481
482// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
483
484void Foam::UPstream::allocatePstreamCommunicator
485(
486 const label parentIndex,
487 const label index
488)
489{
490 if (index == PstreamGlobals::MPIGroups_.size())
491 {
492 // Extend storage with dummy values
493 MPI_Group newGroup = MPI_GROUP_NULL;
494 PstreamGlobals::MPIGroups_.append(newGroup);
495 MPI_Comm newComm = MPI_COMM_NULL;
496 PstreamGlobals::MPICommunicators_.append(newComm);
497 }
498 else if (index > PstreamGlobals::MPIGroups_.size())
499 {
501 << "PstreamGlobals out of sync with UPstream data. Problem."
503 }
504
505
506 if (parentIndex == -1)
507 {
508 // Allocate world communicator
509
510 if (index != UPstream::worldComm)
511 {
513 << "world communicator should always be index "
515 }
516
517 PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
518 MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
519 MPI_Comm_rank
520 (
522 &myProcNo_[index]
523 );
524
525 // Set the number of processes to the actual number
526 int numProcs;
527 MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
528
529 //procIDs_[index] = identity(numProcs);
530 procIDs_[index].setSize(numProcs);
531 forAll(procIDs_[index], i)
532 {
533 procIDs_[index][i] = i;
534 }
535 }
536 else
537 {
538 // Create new group
539 MPI_Group_incl
540 (
541 PstreamGlobals::MPIGroups_[parentIndex],
542 procIDs_[index].size(),
543 procIDs_[index].cdata(),
545 );
546
547 #if defined(MSMPI_VER)
548 // ms-mpi (10.0 and others?) does not have MPI_Comm_create_group
549 MPI_Comm_create
550 (
554 );
555 #else
556 // Create new communicator for this group
557 MPI_Comm_create_group
558 (
563 );
564 #endif
565
566 if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
567 {
568 myProcNo_[index] = -1;
569 }
570 else
571 {
572 if
573 (
574 MPI_Comm_rank
575 (
577 &myProcNo_[index]
578 )
579 )
580 {
582 << "Problem :"
583 << " when allocating communicator at " << index
584 << " from ranks " << procIDs_[index]
585 << " of parent " << parentIndex
586 << " cannot find my own rank"
588 }
589 }
590 }
591}
592
593
594void Foam::UPstream::freePstreamCommunicator(const label communicator)
595{
596 if (communicator != 0)
597 {
598 if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
599 {
600 // Free communicator. Sets communicator to MPI_COMM_NULL
601 MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
602 }
603 if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
604 {
605 // Free greoup. Sets group to MPI_GROUP_NULL
606 MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
607 }
608 }
609}
610
611
612Foam::label Foam::UPstream::nRequests()
613{
615}
616
617
618void Foam::UPstream::resetRequests(const label i)
619{
621 {
623 }
624}
625
626
627void Foam::UPstream::waitRequests(const label start)
628{
629 if (UPstream::debug)
630 {
631 Pout<< "UPstream::waitRequests : starting wait for "
633 << " outstanding requests starting at " << start << endl;
634 }
635
637 {
638 SubList<MPI_Request> waitRequests
639 (
642 start
643 );
644
646
647 if
648 (
649 MPI_Waitall
650 (
651 waitRequests.size(),
652 waitRequests.data(),
653 MPI_STATUSES_IGNORE
654 )
655 )
656 {
658 << "MPI_Waitall returned with error" << Foam::endl;
659 }
660
662
663 resetRequests(start);
664 }
665
666 if (debug)
667 {
668 Pout<< "UPstream::waitRequests : finished wait." << endl;
669 }
670}
671
672
673void Foam::UPstream::waitRequest(const label i)
674{
675 if (debug)
676 {
677 Pout<< "UPstream::waitRequest : starting wait for request:" << i
678 << endl;
679 }
680
681 if (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
682 {
684 << "There are " << PstreamGlobals::outstandingRequests_.size()
685 << " outstanding send requests and you are asking for i=" << i
686 << nl
687 << "Maybe you are mixing blocking/non-blocking comms?"
689 }
690
692
693 if
694 (
695 MPI_Wait
696 (
698 MPI_STATUS_IGNORE
699 )
700 )
701 {
703 << "MPI_Wait returned with error" << Foam::endl;
704 }
705
707 // Push index onto free cache
709
710 if (debug)
711 {
712 Pout<< "UPstream::waitRequest : finished wait for request:" << i
713 << endl;
714 }
715}
716
717
718bool Foam::UPstream::finishedRequest(const label i)
719{
720 if (debug)
721 {
722 Pout<< "UPstream::finishedRequest : checking request:" << i
723 << endl;
724 }
725
727 {
729 << "There are " << PstreamGlobals::outstandingRequests_.size()
730 << " outstanding send requests and you are asking for i=" << i
731 << nl
732 << "Maybe you are mixing blocking/non-blocking comms?"
734 }
735
736 int flag;
737 MPI_Test
738 (
740 &flag,
741 MPI_STATUS_IGNORE
742 );
743
744 if (debug)
745 {
746 Pout<< "UPstream::finishedRequest : finished request:" << i
747 << endl;
748 }
749
750 return flag != 0;
751}
752
753
755{
756 int tag;
758 {
760 }
761 else
762 {
764 }
765
766 if (debug)
767 {
768 Pout<< "UPstream::allocateTag "
769 << s << " : tag:" << tag << endl;
770 }
771
772 return tag;
773}
774
775
776int Foam::UPstream::allocateTag(const std::string& s)
777{
778 int tag;
780 {
782 }
783 else
784 {
786 }
787
788 if (debug)
789 {
790 Pout<< "UPstream::allocateTag "
791 << s.c_str() << " : tag:" << tag << endl;
792 }
793
794 return tag;
795}
796
797
798void Foam::UPstream::freeTag(const char* s, const int tag)
799{
800 if (debug)
801 {
802 Pout<< "UPstream::freeTag "
803 << s << " tag:" << tag << endl;
804 }
806}
807
808
809void Foam::UPstream::freeTag(const std::string& s, const int tag)
810{
811 if (debug)
812 {
813 Pout<< "UPstream::freeTag "
814 << s.c_str() << " tag:" << tag << endl;
815 }
817}
818
819
820// ************************************************************************* //
Inter-processor communication reduction functions.
static void detachOurBuffers()
Definition: UPstream.C:98
static bool ourBuffers
Definition: UPstream.C:49
static bool ourMpi
Definition: UPstream.C:52
constexpr int minBufLen
Definition: UPstream.C:46
static void attachOurBuffers()
Definition: UPstream.C:57
bool found
T remove()
Remove and return the last element. Fatal on an empty list.
Definition: DynamicListI.H:655
void append(const T &val)
Copy append an element to the end of this list.
Definition: DynamicListI.H:503
void exit()
Job end with "exit" termination.
Definition: JobInfo.C:234
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
static void broadcast(Type &value, const label comm=UPstream::worldComm)
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:296
static void freeTag(const char *, const int tag)
Definition: UPstream.C:798
static bool initNull()
Special purpose initialisation function.
Definition: UPstream.C:37
static int allocateTag(const char *)
Definition: UPstream.C:754
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:556
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:290
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:90
static void addValidParOptions(HashTable< string > &validParOptions)
Definition: UPstream.C:33
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:100
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:96
static void abort()
Call MPI_Abort with no other checks or cleanup.
Definition: UPstream.C:69
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:108
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:104
void shutdown() const
Generate status=done in lock (only when run-state = master)
const string & prefix() const noexcept
Return the stream prefix.
int myProcNo() const noexcept
Return processor number.
static void beginTiming()
Update timer prior to measurement.
static void addWaitTime()
Add time increment to waitTime.
splitCell * master() const
Definition: splitCell.H:113
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
System signed integer.
#define WarningInFunction
Report a warning using Foam::Warning.
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
DynamicList< label > freedRequests_
DynamicList< MPI_Comm > MPICommunicators_
DynamicList< MPI_Group > MPIGroups_
DynamicList< int > freedTags_
Free'd message tags.
int nTags_
Max outstanding message tag operations.
string getEnv(const std::string &envName)
Get environment value for given envName.
Definition: MSwindows.C:371
List< word > wordList
A List of words.
Definition: fileName.H:63
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:108
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
errorManip< error > abort(error &err)
Definition: errorManip.H:144
error FatalError
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:59
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
constexpr char nl
The newline '\n' character (0x0a)
Definition: Ostream.H:53
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333