UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2016-2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "Pstream.H"
30 #include "PstreamReduceOps.H"
31 #include "PstreamGlobals.H"
32 #include "profilingPstream.H"
33 #include "SubList.H"
34 #include "allReduce.H"
35 #include "int.H"
36 #include "collatedFileOperation.H"
37 
38 #include <mpi.h>
39 #include <cstring>
40 #include <cstdlib>
41 #include <csignal>
42 
43 #if defined(WM_SP)
44  #define MPI_SCALAR MPI_FLOAT
45  #define MPI_SOLVESCALAR MPI_FLOAT
46 #elif defined(WM_SPDP)
47  #define MPI_SCALAR MPI_FLOAT
48  #define MPI_SOLVESCALAR MPI_DOUBLE
49 #elif defined(WM_DP)
50  #define MPI_SCALAR MPI_DOUBLE
51  #define MPI_SOLVESCALAR MPI_DOUBLE
52 #endif
53 
54 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
55 
56 // The min value and default for MPI buffers length
57 constexpr int minBufLen = 20000000;
58 
59 // Track if we have attached MPI buffers
60 static bool ourBuffers = false;
61 
62 // Track if we initialized MPI
63 static bool ourMpi = false;
64 
65 
66 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
67 
68 static void attachOurBuffers()
69 {
70  if (ourBuffers)
71  {
72  return; // Already attached
73  }
74  ourBuffers = true;
75 
76  // Use UPstream::mpiBufferSize (optimisationSwitch),
77  // but allow override with MPI_BUFFER_SIZE env variable (int value)
78 
79 #ifndef SGIMPI
80  int len = 0;
81 
82  const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
83  if (str.empty() || !Foam::read(str, len) || len <= 0)
84  {
86  }
87 
88  if (len < minBufLen)
89  {
90  len = minBufLen;
91  }
92 
94  {
95  Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
96  }
97 
98  char* buf = new char[len];
99 
100  if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
101  {
102  delete[] buf;
103  Foam::Pout<< "UPstream::init : could not attach buffer\n";
104  }
105 #endif
106 }
107 
108 
109 static void detachOurBuffers()
110 {
111  if (!ourBuffers)
112  {
113  return; // Nothing to detach
114  }
115  ourBuffers = false;
116 
117  // Some MPI notes suggest that the return code is MPI_SUCCESS when
118  // no buffer is attached.
119  // Be extra careful and require a non-zero size as well.
120 
121 #ifndef SGIMPI
122  int len = 0;
123  char* buf = nullptr;
124 
125  if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
126  {
127  delete[] buf;
128  }
129 #endif
130 }
131 
132 
133 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
134 
135 // NOTE:
136 // valid parallel options vary between implementations, but flag common ones.
137 // if they are not removed by MPI_Init(), the subsequent argument processing
138 // will notice that they are wrong
139 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
140 {
141  validParOptions.insert("np", "");
142  validParOptions.insert("p4pg", "PI file");
143  validParOptions.insert("p4wd", "directory");
144  validParOptions.insert("p4amslave", "");
145  validParOptions.insert("p4yourname", "hostname");
146  validParOptions.insert("machinefile", "machine file");
147 }
148 
149 
151 {
152  int flag = 0;
153 
154  MPI_Finalized(&flag);
155  if (flag)
156  {
157  // Already finalized - this is an error
159  << "MPI was already finalized - cannot perform MPI_Init\n"
161 
162  return false;
163  }
164 
165  MPI_Initialized(&flag);
166  if (flag)
167  {
168  if (debug)
169  {
170  Pout<< "UPstream::initNull : was already initialized\n";
171  }
172  }
173  else
174  {
175  // Not already initialized
176 
177  MPI_Init_thread
178  (
179  nullptr, // argc
180  nullptr, // argv
181  MPI_THREAD_SINGLE,
182  &flag // provided_thread_support
183  );
184 
185  ourMpi = true;
186  }
187 
188  // Could also attach buffers etc.
189 
190  return true;
191 }
192 
193 
194 bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
195 {
196  int numprocs = 0, myRank = 0;
197  int provided_thread_support = 0;
198  int flag = 0;
199 
200  MPI_Finalized(&flag);
201  if (flag)
202  {
203  // Already finalized - this is an error
205  << "MPI was already finalized - cannot perform MPI_Init" << endl
207 
208  return false;
209  }
210 
211  MPI_Initialized(&flag);
212  if (flag)
213  {
214  // Already initialized.
215  // Warn if we've called twice, but skip if initialized externally
216 
217  if (ourMpi)
218  {
220  << "MPI was already initialized - cannot perform MPI_Init" << nl
221  << "This could indicate an application programming error!"
222  << endl;
223 
224  return true;
225  }
226  else if (debug)
227  {
228  Pout<< "UPstream::init : was already initialized\n";
229  }
230  }
231  else
232  {
233  MPI_Init_thread
234  (
235  &argc,
236  &argv,
237  (
238  needsThread
239  ? MPI_THREAD_MULTIPLE
240  : MPI_THREAD_SINGLE
241  ),
242  &provided_thread_support
243  );
244 
245  ourMpi = true;
246  }
247 
248  // Check argument list for local world
249  label worldIndex = -1;
250  word world;
251  for (int argi = 1; argi < argc; ++argi)
252  {
253  if (strcmp(argv[argi], "-world") == 0)
254  {
255  worldIndex = argi++;
256  if (argi >= argc)
257  {
259  << "Missing world name to argument \"world\""
261  }
262  world = argv[argi];
263  break;
264  }
265  }
266 
267  // Filter 'world' option
268  if (worldIndex != -1)
269  {
270  for (label i = worldIndex+2; i < argc; i++)
271  {
272  argv[i-2] = argv[i];
273  }
274  argc -= 2;
275  }
276 
277  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
278  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
279 
280  if (debug)
281  {
282  Pout<< "UPstream::init : procs:" << numprocs
283  << " rank:" << myRank
284  << " world:" << world << endl;
285  }
286 
287  if (worldIndex == -1 && numprocs <= 1)
288  {
290  << "attempt to run parallel on 1 processor"
292  }
293 
294  // Initialise parallel structure
295  setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
296 
297  if (worldIndex != -1)
298  {
299  wordList worlds(numprocs);
300  worlds[Pstream::myProcNo()] = world;
301  Pstream::gatherList(worlds);
302  Pstream::scatterList(worlds);
303 
304  // Compact
305  if (Pstream::master())
306  {
307  DynamicList<word> allWorlds(numprocs);
308  for (const auto& world : worlds)
309  {
310  if (!allWorlds.found(world))
311  {
312  allWorlds.append(world);
313  }
314  }
315  allWorlds_ = std::move(allWorlds);
316 
317  worldIDs_.setSize(numprocs);
318  forAll(worlds, proci)
319  {
320  const word& world = worlds[proci];
321  worldIDs_[proci] = allWorlds_.find(world);
322  }
323  }
324  Pstream::scatter(allWorlds_);
325  Pstream::scatter(worldIDs_);
326 
327  DynamicList<label> subRanks;
328  forAll(worlds, proci)
329  {
330  if (worlds[proci] == worlds[Pstream::myProcNo()])
331  {
332  subRanks.append(proci);
333  }
334  }
335 
336  // Allocate new communicator 1 with parent 0 (= mpi_world)
337  const label subComm = allocateCommunicator(0, subRanks, true);
338 
339  // Override worldComm
340  UPstream::worldComm = subComm;
341  // For testing: warn use of non-worldComm
343 
344  if (debug)
345  {
346  // Check
347  int subNProcs, subRank;
348  MPI_Comm_size
349  (
351  &subNProcs
352  );
353  MPI_Comm_rank
354  (
356  &subRank
357  );
358 
359  Pout<< "UPstream::init : in world:" << world
360  << " using local communicator:" << subComm
361  << " with procs:" << subNProcs
362  << " and rank:" << subRank
363  << endl;
364  }
365 
366  // Override Pout prefix (move to setParRun?)
367  Pout.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
368  Perr.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
369  }
370  else
371  {
372  // All processors use world 0
373  worldIDs_.setSize(numprocs, 0);
374  }
375 
377 
378  return true;
379 }
380 
381 
382 void Foam::UPstream::shutdown(int errNo)
383 {
384  if (debug)
385  {
386  Pout<< "UPstream::shutdown\n";
387  }
388 
389  int flag = 0;
390 
391  MPI_Initialized(&flag);
392  if (!flag)
393  {
394  // No MPI initialized - we are done
395  return;
396  }
397 
398  MPI_Finalized(&flag);
399  if (flag)
400  {
401  // Already finalized elsewhere?
402  if (ourMpi)
403  {
405  << "MPI was already finalized (by a connected program?)\n";
406  }
407  else if (debug)
408  {
409  Pout<< "UPstream::shutdown : was already finalized\n";
410  }
411  }
412  else
413  {
415  }
416 
417 
418  // Warn about any outstanding requests
419  {
420  label nOutstanding = 0;
421 
423  {
424  if (!PstreamGlobals::freedRequests_.found(requestID))
425  {
426  ++nOutstanding;
427  }
428  }
429 
431 
432  if (nOutstanding)
433  {
435  << "There were still " << nOutstanding
436  << " outstanding MPI_Requests." << nl
437  << "Which means your code exited before doing a "
438  << " UPstream::waitRequests()." << nl
439  << "This should not happen for a normal code exit."
440  << nl;
441  }
442  }
443 
444  // Clean mpi communicators
445  forAll(myProcNo_, communicator)
446  {
447  if (myProcNo_[communicator] != -1)
448  {
449  freePstreamCommunicator(communicator);
450  }
451  }
452 
453  if (!flag)
454  {
455  // MPI not already finalized
456 
457  if (!ourMpi)
458  {
460  << "Finalizing MPI, but was initialized elsewhere\n";
461  }
462 
463  if (errNo == 0)
464  {
465  MPI_Finalize();
466  }
467  else
468  {
469  // Abort only locally or world?
470  MPI_Abort(MPI_COMM_WORLD, errNo);
471  }
472  }
473 }
474 
475 
476 void Foam::UPstream::exit(int errNo)
477 {
478  UPstream::shutdown(errNo);
479  std::exit(errNo);
480 }
481 
482 
484 {
485  MPI_Abort(MPI_COMM_WORLD, 1);
486 }
487 
488 
489 void Foam::reduce
490 (
491  scalar& Value,
492  const sumOp<scalar>& bop,
493  const int tag,
494  const label communicator
495 )
496 {
497  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
498  {
499  Pout<< "** reducing:" << Value << " with comm:" << communicator
500  << " warnComm:" << UPstream::warnComm
501  << endl;
503  }
504  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
505 }
506 
507 
508 void Foam::reduce
509 (
510  scalar& Value,
511  const minOp<scalar>& bop,
512  const int tag,
513  const label communicator
514 )
515 {
516  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
517  {
518  Pout<< "** reducing:" << Value << " with comm:" << communicator
519  << " warnComm:" << UPstream::warnComm
520  << endl;
522  }
523  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
524 }
525 
526 
527 void Foam::reduce
528 (
529  vector2D& Value,
530  const sumOp<vector2D>& bop,
531  const int tag,
532  const label communicator
533 )
534 {
535  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
536  {
537  Pout<< "** reducing:" << Value << " with comm:" << communicator
538  << " warnComm:" << UPstream::warnComm
539  << endl;
541  }
542  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
543 }
544 
545 
546 void Foam::sumReduce
547 (
548  scalar& Value,
549  label& Count,
550  const int tag,
551  const label communicator
552 )
553 {
554  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
555  {
556  Pout<< "** sumReduce:" << Value << " with comm:" << communicator
557  << " warnComm:" << UPstream::warnComm
558  << endl;
560  }
561  vector2D twoScalars(Value, scalar(Count));
562  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
563 
564  Value = twoScalars.x();
565  Count = twoScalars.y();
566 }
567 
568 
569 void Foam::reduce
570 (
571  scalar& Value,
572  const sumOp<scalar>& bop,
573  const int tag,
574  const label communicator,
575  label& requestID
576 )
577 {
578  iallReduce<scalar>(&Value, 1, MPI_SCALAR, MPI_SUM, communicator, requestID);
579 }
580 
581 
582 void Foam::reduce
583 (
584  scalar values[],
585  const int size,
586  const sumOp<scalar>& bop,
587  const int tag,
588  const label communicator,
589  label& requestID
590 )
591 {
592  iallReduce<scalar>
593  (
594  values,
595  size,
596  MPI_SCALAR,
597  MPI_SUM,
598  communicator,
599  requestID
600  );
601 }
602 
603 
604 #if defined(WM_SPDP)
605 void Foam::reduce
606 (
607  solveScalar& Value,
608  const sumOp<solveScalar>& bop,
609  const int tag,
610  const label communicator
611 )
612 {
613  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
614  {
615  Pout<< "** reducing:" << Value << " with comm:" << communicator
616  << " warnComm:" << UPstream::warnComm
617  << endl;
619  }
620  allReduce(Value, 1, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
621 }
622 
623 
624 void Foam::reduce
625 (
626  solveScalar& Value,
627  const minOp<solveScalar>& bop,
628  const int tag,
629  const label communicator
630 )
631 {
632  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
633  {
634  Pout<< "** reducing:" << Value << " with comm:" << communicator
635  << " warnComm:" << UPstream::warnComm
636  << endl;
638  }
639  allReduce(Value, 1, MPI_SOLVESCALAR, MPI_MIN, bop, tag, communicator);
640 }
641 
642 
643 void Foam::reduce
644 (
645  Vector2D<solveScalar>& Value,
646  const sumOp<Vector2D<solveScalar>>& bop,
647  const int tag,
648  const label communicator
649 )
650 {
651  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
652  {
653  Pout<< "** reducing:" << Value << " with comm:" << communicator
654  << " warnComm:" << UPstream::warnComm
655  << endl;
657  }
658  allReduce(Value, 2, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
659 }
660 
661 
662 void Foam::sumReduce
663 (
664  solveScalar& Value,
665  label& Count,
666  const int tag,
667  const label communicator
668 )
669 {
670  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
671  {
672  Pout<< "** reducing:" << Value << " with comm:" << communicator
673  << " warnComm:" << UPstream::warnComm
674  << endl;
676  }
677  Vector2D<solveScalar> twoScalars(Value, solveScalar(Count));
678  reduce(twoScalars, sumOp<Vector2D<solveScalar>>(), tag, communicator);
679 
680  Value = twoScalars.x();
681  Count = twoScalars.y();
682 }
683 
684 
685 void Foam::reduce
686 (
687  solveScalar& Value,
688  const sumOp<solveScalar>& bop,
689  const int tag,
690  const label communicator,
691  label& requestID
692 )
693 {
694  iallReduce<solveScalar>
695  (
696  &Value,
697  1,
698  MPI_SOLVESCALAR,
699  MPI_SUM,
700  communicator,
701  requestID
702  );
703 }
704 
705 
706 void Foam::reduce
707 (
708  solveScalar values[],
709  const int size,
710  const sumOp<solveScalar>& bop,
711  const int tag,
712  const label communicator,
713  label& requestID
714 )
715 {
716  iallReduce<solveScalar>
717  (
718  values,
719  size,
720  MPI_SOLVESCALAR,
721  MPI_SUM,
722  communicator,
723  requestID
724  );
725 }
726 #endif
727 
728 
730 (
731  const labelUList& sendData,
732  labelUList& recvData,
733  const label communicator
734 )
735 {
736  const label np = nProcs(communicator);
737 
738  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
739  {
740  Pout<< "** allToAll :"
741  << " np:" << np
742  << " sendData:" << sendData.size()
743  << " with comm:" << communicator
744  << " warnComm:" << UPstream::warnComm
745  << endl;
747  }
748 
749  if (sendData.size() != np || recvData.size() != np)
750  {
752  << "Size of sendData " << sendData.size()
753  << " or size of recvData " << recvData.size()
754  << " is not equal to the number of processors in the domain "
755  << np
757  }
758 
759  if (!UPstream::parRun())
760  {
761  recvData.deepCopy(sendData);
762  }
763  else
764  {
766 
767  if
768  (
769  MPI_Alltoall
770  (
771  // NOTE: const_cast is a temporary hack for
772  // backward-compatibility with versions of OpenMPI < 1.7.4
773  const_cast<label*>(sendData.begin()),
774  sizeof(label),
775  MPI_BYTE,
776  recvData.begin(),
777  sizeof(label),
778  MPI_BYTE,
780  )
781  )
782  {
784  << "MPI_Alltoall failed for " << sendData
785  << " on communicator " << communicator
787  }
788 
790  }
791 }
792 
793 
795 (
796  const char* sendData,
797  const UList<int>& sendSizes,
798  const UList<int>& sendOffsets,
799 
800  char* recvData,
801  const UList<int>& recvSizes,
802  const UList<int>& recvOffsets,
803 
804  const label communicator
805 )
806 {
807  const label np = nProcs(communicator);
808 
810  {
811  Pout<< "** allToAll :"
812  << " sendSizes:" << sendSizes
813  << " sendOffsets:" << sendOffsets
814  << " with comm:" << communicator
815  << " warnComm:" << UPstream::warnComm
816  << endl;
818  }
819 
820  if
821  (
822  sendSizes.size() != np
823  || sendOffsets.size() != np
824  || recvSizes.size() != np
825  || recvOffsets.size() != np
826  )
827  {
829  << "Size of sendSize " << sendSizes.size()
830  << ", sendOffsets " << sendOffsets.size()
831  << ", recvSizes " << recvSizes.size()
832  << " or recvOffsets " << recvOffsets.size()
833  << " is not equal to the number of processors in the domain "
834  << np
836  }
837 
838  if (!UPstream::parRun())
839  {
840  if (recvSizes[0] != sendSizes[0])
841  {
843  << "Bytes to send " << sendSizes[0]
844  << " does not equal bytes to receive " << recvSizes[0]
846  }
847  std::memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
848  }
849  else
850  {
852 
853  if
854  (
855  MPI_Alltoallv
856  (
857  const_cast<char*>(sendData),
858  const_cast<int*>(sendSizes.begin()),
859  const_cast<int*>(sendOffsets.begin()),
860  MPI_BYTE,
861  recvData,
862  const_cast<int*>(recvSizes.begin()),
863  const_cast<int*>(recvOffsets.begin()),
864  MPI_BYTE,
866  )
867  )
868  {
870  << "MPI_Alltoallv failed for sendSizes " << sendSizes
871  << " recvSizes " << recvSizes
872  << " communicator " << communicator
874  }
875 
877  }
878 }
879 
880 
882 (
883  const char* sendData,
884  int sendSize,
885 
886  char* recvData,
887  const UList<int>& recvSizes,
888  const UList<int>& recvOffsets,
889  const label communicator
890 )
891 {
892  const label np = nProcs(communicator);
893 
894  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
895  {
896  Pout<< "** allToAll :"
897  << " np:" << np
898  << " recvSizes:" << recvSizes
899  << " recvOffsets:" << recvOffsets
900  << " with comm:" << communicator
901  << " warnComm:" << UPstream::warnComm
902  << endl;
904  }
905 
906  if
907  (
908  UPstream::master(communicator)
909  && (recvSizes.size() != np || recvOffsets.size() < np)
910  )
911  {
912  // Note: allow recvOffsets to be e.g. 1 larger than np so we
913  // can easily loop over the result
914 
916  << "Size of recvSizes " << recvSizes.size()
917  << " or recvOffsets " << recvOffsets.size()
918  << " is not equal to the number of processors in the domain "
919  << np
921  }
922 
923  if (!UPstream::parRun())
924  {
925  std::memmove(recvData, sendData, sendSize);
926  }
927  else
928  {
930 
931  if
932  (
933  MPI_Gatherv
934  (
935  const_cast<char*>(sendData),
936  sendSize,
937  MPI_BYTE,
938  recvData,
939  const_cast<int*>(recvSizes.begin()),
940  const_cast<int*>(recvOffsets.begin()),
941  MPI_BYTE,
942  0,
943  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
944  )
945  )
946  {
948  << "MPI_Gatherv failed for sendSize " << sendSize
949  << " recvSizes " << recvSizes
950  << " communicator " << communicator
952  }
953 
955  }
956 }
957 
958 
960 (
961  const char* sendData,
962  const UList<int>& sendSizes,
963  const UList<int>& sendOffsets,
964 
965  char* recvData,
966  int recvSize,
967  const label communicator
968 )
969 {
970  const label np = nProcs(communicator);
971 
972  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
973  {
974  Pout<< "** allToAll :"
975  << " np:" << np
976  << " sendSizes:" << sendSizes
977  << " sendOffsets:" << sendOffsets
978  << " with comm:" << communicator
979  << " warnComm:" << UPstream::warnComm
980  << endl;
982  }
983 
984  if
985  (
986  UPstream::master(communicator)
987  && (sendSizes.size() != np || sendOffsets.size() != np)
988  )
989  {
991  << "Size of sendSizes " << sendSizes.size()
992  << " or sendOffsets " << sendOffsets.size()
993  << " is not equal to the number of processors in the domain "
994  << np
996  }
997 
998  if (!UPstream::parRun())
999  {
1000  std::memmove(recvData, sendData, recvSize);
1001  }
1002  else
1003  {
1005 
1006  if
1007  (
1008  MPI_Scatterv
1009  (
1010  const_cast<char*>(sendData),
1011  const_cast<int*>(sendSizes.begin()),
1012  const_cast<int*>(sendOffsets.begin()),
1013  MPI_BYTE,
1014  recvData,
1015  recvSize,
1016  MPI_BYTE,
1017  0,
1018  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
1019  )
1020  )
1021  {
1023  << "MPI_Scatterv failed for sendSizes " << sendSizes
1024  << " sendOffsets " << sendOffsets
1025  << " communicator " << communicator
1026  << Foam::abort(FatalError);
1027  }
1028 
1030  }
1031 }
1032 
1033 
1034 void Foam::UPstream::allocatePstreamCommunicator
1035 (
1036  const label parentIndex,
1037  const label index
1038 )
1039 {
1040  if (index == PstreamGlobals::MPIGroups_.size())
1041  {
1042  // Extend storage with dummy values
1043  MPI_Group newGroup = MPI_GROUP_NULL;
1044  PstreamGlobals::MPIGroups_.append(newGroup);
1045  MPI_Comm newComm = MPI_COMM_NULL;
1046  PstreamGlobals::MPICommunicators_.append(newComm);
1047  }
1048  else if (index > PstreamGlobals::MPIGroups_.size())
1049  {
1051  << "PstreamGlobals out of sync with UPstream data. Problem."
1052  << Foam::exit(FatalError);
1053  }
1054 
1055 
1056  if (parentIndex == -1)
1057  {
1058  // Allocate world communicator
1059 
1060  if (index != UPstream::worldComm)
1061  {
1063  << "world communicator should always be index "
1065  }
1066 
1067  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
1068  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
1069  MPI_Comm_rank
1070  (
1072  &myProcNo_[index]
1073  );
1074 
1075  // Set the number of processes to the actual number
1076  int numProcs;
1077  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
1078 
1079  //procIDs_[index] = identity(numProcs);
1080  procIDs_[index].setSize(numProcs);
1081  forAll(procIDs_[index], i)
1082  {
1083  procIDs_[index][i] = i;
1084  }
1085  }
1086  else
1087  {
1088  // Create new group
1089  MPI_Group_incl
1090  (
1091  PstreamGlobals::MPIGroups_[parentIndex],
1092  procIDs_[index].size(),
1093  procIDs_[index].begin(),
1095  );
1096 
1097  // Create new communicator
1098  MPI_Comm_create
1099  (
1100  PstreamGlobals::MPICommunicators_[parentIndex],
1103  );
1104 
1105  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
1106  {
1107  myProcNo_[index] = -1;
1108  }
1109  else
1110  {
1111  if
1112  (
1113  MPI_Comm_rank
1114  (
1116  &myProcNo_[index]
1117  )
1118  )
1119  {
1121  << "Problem :"
1122  << " when allocating communicator at " << index
1123  << " from ranks " << procIDs_[index]
1124  << " of parent " << parentIndex
1125  << " cannot find my own rank"
1126  << Foam::exit(FatalError);
1127  }
1128  }
1129  }
1130 }
1131 
1132 
1133 void Foam::UPstream::freePstreamCommunicator(const label communicator)
1134 {
1135  if (communicator != 0)
1136  {
1137  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
1138  {
1139  // Free communicator. Sets communicator to MPI_COMM_NULL
1140  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
1141  }
1142  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
1143  {
1144  // Free greoup. Sets group to MPI_GROUP_NULL
1145  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
1146  }
1147  }
1148 }
1149 
1150 
1151 Foam::label Foam::UPstream::nRequests()
1152 {
1154 }
1155 
1156 
1157 void Foam::UPstream::resetRequests(const label i)
1158 {
1159  if (i < PstreamGlobals::outstandingRequests_.size())
1160  {
1162  }
1163 }
1164 
1165 
1166 void Foam::UPstream::waitRequests(const label start)
1167 {
1168  if (UPstream::debug)
1169  {
1170  Pout<< "UPstream::waitRequests : starting wait for "
1171  << PstreamGlobals::outstandingRequests_.size()-start
1172  << " outstanding requests starting at " << start << endl;
1173  }
1174 
1176  {
1177  SubList<MPI_Request> waitRequests
1178  (
1180  PstreamGlobals::outstandingRequests_.size() - start,
1181  start
1182  );
1183 
1185 
1186  if
1187  (
1188  MPI_Waitall
1189  (
1190  waitRequests.size(),
1191  waitRequests.begin(),
1192  MPI_STATUSES_IGNORE
1193  )
1194  )
1195  {
1197  << "MPI_Waitall returned with error" << Foam::endl;
1198  }
1199 
1201 
1202  resetRequests(start);
1203  }
1204 
1205  if (debug)
1206  {
1207  Pout<< "UPstream::waitRequests : finished wait." << endl;
1208  }
1209 }
1210 
1211 
1212 void Foam::UPstream::waitRequest(const label i)
1213 {
1214  if (debug)
1215  {
1216  Pout<< "UPstream::waitRequest : starting wait for request:" << i
1217  << endl;
1218  }
1219 
1220  if (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
1221  {
1223  << "There are " << PstreamGlobals::outstandingRequests_.size()
1224  << " outstanding send requests and you are asking for i=" << i
1225  << nl
1226  << "Maybe you are mixing blocking/non-blocking comms?"
1227  << Foam::abort(FatalError);
1228  }
1229 
1231 
1232  if
1233  (
1234  MPI_Wait
1235  (
1237  MPI_STATUS_IGNORE
1238  )
1239  )
1240  {
1242  << "MPI_Wait returned with error" << Foam::endl;
1243  }
1244 
1246  // Push index onto free cache
1248 
1249  if (debug)
1250  {
1251  Pout<< "UPstream::waitRequest : finished wait for request:" << i
1252  << endl;
1253  }
1254 }
1255 
1256 
1257 bool Foam::UPstream::finishedRequest(const label i)
1258 {
1259  if (debug)
1260  {
1261  Pout<< "UPstream::finishedRequest : checking request:" << i
1262  << endl;
1263  }
1264 
1265  if (i >= PstreamGlobals::outstandingRequests_.size())
1266  {
1268  << "There are " << PstreamGlobals::outstandingRequests_.size()
1269  << " outstanding send requests and you are asking for i=" << i
1270  << nl
1271  << "Maybe you are mixing blocking/non-blocking comms?"
1272  << Foam::abort(FatalError);
1273  }
1274 
1275  int flag;
1276  MPI_Test
1277  (
1279  &flag,
1280  MPI_STATUS_IGNORE
1281  );
1282 
1283  if (debug)
1284  {
1285  Pout<< "UPstream::finishedRequest : finished request:" << i
1286  << endl;
1287  }
1288 
1289  return flag != 0;
1290 }
1291 
1292 
1294 {
1295  int tag;
1296  if (PstreamGlobals::freedTags_.size())
1297  {
1299  }
1300  else
1301  {
1302  tag = PstreamGlobals::nTags_++;
1303  }
1304 
1305  if (debug)
1306  {
1307  //if (UPstream::lateBlocking > 0)
1308  //{
1309  // string& poutp = Pout.prefix();
1310  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
1311  // Perr.prefix() = Pout.prefix();
1312  //}
1313  Pout<< "UPstream::allocateTag " << s
1314  << " : tag:" << tag
1315  << endl;
1316  }
1317 
1318  return tag;
1319 }
1320 
1321 
1323 {
1324  int tag;
1325  if (PstreamGlobals::freedTags_.size())
1326  {
1328  }
1329  else
1330  {
1331  tag = PstreamGlobals::nTags_++;
1332  }
1333 
1334  if (debug)
1335  {
1336  //if (UPstream::lateBlocking > 0)
1337  //{
1338  // string& poutp = Pout.prefix();
1339  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
1340  // Perr.prefix() = Pout.prefix();
1341  //}
1342  Pout<< "UPstream::allocateTag " << s
1343  << " : tag:" << tag
1344  << endl;
1345  }
1346 
1347  return tag;
1348 }
1349 
1350 
1351 void Foam::UPstream::freeTag(const char* s, const int tag)
1352 {
1353  if (debug)
1354  {
1355  //if (UPstream::lateBlocking > 0)
1356  //{
1357  // string& poutp = Pout.prefix();
1358  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
1359  // Perr.prefix() = Pout.prefix();
1360  //}
1361  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
1362  }
1364 }
1365 
1366 
1367 void Foam::UPstream::freeTag(const word& s, const int tag)
1368 {
1369  if (debug)
1370  {
1371  //if (UPstream::lateBlocking > 0)
1372  //{
1373  // string& poutp = Pout.prefix();
1374  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
1375  // Perr.prefix() = Pout.prefix();
1376  //}
1377  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
1378  }
1380 }
1381 
1382 
1383 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::allocateTag
static int allocateTag(const char *)
Definition: UPstream.C:1293
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:298
Foam::UPstream::resetRequests
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:230
int.H
System signed integer.
collatedFileOperation.H
Foam::profilingPstream::addWaitTime
static void addWaitTime()
Add time increment to waitTime.
Definition: profilingPstream.H:174
Foam::error::printStack
static void printStack(Ostream &os)
Helper function to print a stack.
Definition: dummyPrintStack.C:36
SubList.H
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:62
stdFoam::begin
constexpr auto begin(C &c) -> decltype(c.begin())
Return iterator to the beginning of the container c.
Definition: stdFoam.H:97
Foam::PstreamGlobals::freedRequests_
DynamicList< label > freedRequests_
Definition: PstreamGlobals.C:33
s
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Definition: gmvOutputSpray.H:25
Foam::HashTableOps::values
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:149
Foam::Pstream::scatterList
static void scatterList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Scatter data. Reverse of gatherList.
Definition: gatherScatterList.C:215
Foam::read
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:108
Foam::UPstream::parRun
static bool & parRun()
Test if this a parallel run, or allow modify access.
Definition: UPstream.H:434
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:234
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:458
Foam::Pstream::scatter
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
Definition: gatherScatter.C:150
Foam::profilingPstream::beginTiming
static void beginTiming()
Update timer prior to measurement.
Definition: profilingPstream.H:138
Foam::PstreamGlobals::outstandingRequests_
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Definition: PstreamGlobals.C:32
Foam::UPstream::abort
static void abort()
Call MPI_Abort with no other checks or cleanup.
Definition: UPstream.C:70
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:350
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::UList::begin
iterator begin()
Return an iterator to begin traversing the UList.
Definition: UListI.H:276
Foam::getEnv
string getEnv(const std::string &envName)
Get environment value for given envName.
Definition: MSwindows.C:371
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::profilingPstream::addScatterTime
static void addScatterTime()
Add time increment to scatterTime.
Definition: profilingPstream.H:162
Foam::PstreamGlobals::MPICommunicators_
DynamicList< MPI_Comm > MPICommunicators_
Definition: PstreamGlobals.C:39
Foam::wordList
List< word > wordList
A List of words.
Definition: fileName.H:59
Foam::allReduce
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
Definition: allReduceTemplates.C:36
Foam::PstreamGlobals::MPIGroups_
DynamicList< MPI_Group > MPIGroups_
Definition: PstreamGlobals.C:40
ourMpi
static bool ourMpi
Definition: UPstream.C:63
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:51
Foam::UPstream::mpiBufferSize
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:292
Foam::name
word name(const complex &c)
Return string representation of complex.
Definition: complex.C:76
Foam::profilingPstream::addGatherTime
static void addGatherTime()
Add time increment to gatherTime.
Definition: profilingPstream.H:156
Foam::DynamicList::append
DynamicList< T, SizeMin > & append(const T &val)
Append an element to the end of this list.
Definition: DynamicListI.H:474
Foam::UPstream::waitRequest
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:238
Foam::UPstream::addValidParOptions
static void addValidParOptions(HashTable< string > &validParOptions)
Definition: UPstream.C:34
Foam::FatalError
error FatalError
Pstream.H
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
detachOurBuffers
static void detachOurBuffers()
Definition: UPstream.C:109
PstreamReduceOps.H
Inter-processor communication reduction functions.
allReduce.H
Various functions to wrap MPI_Allreduce.
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
PstreamGlobals.H
found
bool found
Definition: TABSMDCalcMethod2.H:32
Foam::Perr
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Foam::profilingPstream::addAllToAllTime
static void addAllToAllTime()
Add time increment to allToAllTime.
Definition: profilingPstream.H:180
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:381
Foam::Pstream::gatherList
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
Definition: gatherScatterList.C:52
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:224
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:464
Foam::nl
constexpr char nl
Definition: Ostream.H:385
Foam::UPstream::scatter
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=worldComm)
Send data to all processors from the root of the communicator.
Definition: UPstream.C:198
Foam::vector2D
Vector2D< scalar > vector2D
A 2D vector of scalars obtained from the generic Vector2D.
Definition: vector2D.H:51
Foam::UPstream::initNull
static bool initNull()
Special purpose initialisation function.
Definition: UPstream.C:38
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:295
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
Foam::UPstream::shutdown
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
Definition: UPstream.C:59
Foam::UPstream::finishedRequest
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:242
Foam::DynamicList::remove
T remove()
Remove and return the last element. Fatal on an empty list.
Definition: DynamicListI.H:653
Foam::PstreamGlobals::freedTags_
DynamicList< int > freedTags_
Free'd message tags.
Definition: PstreamGlobals.C:37
Foam::UList::size
void size(const label n) noexcept
Override size to be inconsistent with allocated storage.
Definition: UListI.H:360
ourBuffers
static bool ourBuffers
Definition: UPstream.C:60
attachOurBuffers
static void attachOurBuffers()
Definition: UPstream.C:68
minBufLen
constexpr int minBufLen
Definition: UPstream.C:57
Foam::sumReduce
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
Definition: PstreamReduceOps.H:133
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:331
Foam::labelUList
UList< label > labelUList
A UList of labels.
Definition: UList.H:80
Foam::UPstream::init
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition: UPstream.C:48
Foam::UPstream::allToAll
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=worldComm)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:172
Foam::PstreamGlobals::nTags_
int nTags_
Max outstanding message tag operations.
Definition: PstreamGlobals.C:35
WarningInFunction
#define WarningInFunction
Report a warning using Foam::Warning.
Definition: messageStream.H:303
Foam::prefixOSstream::prefix
const string & prefix() const
Return the stream prefix.
Definition: prefixOSstream.H:101
Foam::UPstream::gather
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=worldComm)
Receive data from all processors on the master.
Definition: UPstream.C:183
profilingPstream.H
Foam::UPstream::exit
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
Definition: UPstream.C:63
Foam::UPstream::freeTag
static void freeTag(const char *, const int tag)
Definition: UPstream.C:1351