UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2016-2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "Pstream.H"
30 #include "PstreamReduceOps.H"
31 #include "PstreamGlobals.H"
32 #include "profilingPstream.H"
33 #include "SubList.H"
34 #include "allReduce.H"
35 #include "int.H"
36 #include "collatedFileOperation.H"
37 
38 #include <mpi.h>
39 #include <cstring>
40 #include <cstdlib>
41 #include <csignal>
42 
43 #if defined(WM_SP)
44  #define MPI_SCALAR MPI_FLOAT
45  #define MPI_SOLVESCALAR MPI_FLOAT
46 #elif defined(WM_SPDP)
47  #define MPI_SCALAR MPI_FLOAT
48  #define MPI_SOLVESCALAR MPI_DOUBLE
49 #elif defined(WM_DP)
50  #define MPI_SCALAR MPI_DOUBLE
51  #define MPI_SOLVESCALAR MPI_DOUBLE
52 #endif
53 
54 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
55 
56 // The min value and default for MPI buffers length
57 constexpr int minBufLen = 20000000;
58 
59 // Track if we have attached MPI buffers
60 static bool ourBuffers = false;
61 
62 // Track if we initialized MPI
63 static bool ourMpi = false;
64 
65 
66 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
67 
68 static void attachOurBuffers()
69 {
70  if (ourBuffers)
71  {
72  return; // Already attached
73  }
74  ourBuffers = true;
75 
76  // Use UPstream::mpiBufferSize (optimisationSwitch),
77  // but allow override with MPI_BUFFER_SIZE env variable (int value)
78 
79 #ifndef SGIMPI
80  int len = 0;
81 
82  const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
83  if (str.empty() || !Foam::read(str, len) || len <= 0)
84  {
86  }
87 
88  if (len < minBufLen)
89  {
90  len = minBufLen;
91  }
92 
94  {
95  Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
96  }
97 
98  char* buf = new char[len];
99 
100  if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
101  {
102  delete[] buf;
103  Foam::Pout<< "UPstream::init : could not attach buffer\n";
104  }
105 #endif
106 }
107 
108 
109 static void detachOurBuffers()
110 {
111  if (!ourBuffers)
112  {
113  return; // Nothing to detach
114  }
115  ourBuffers = false;
116 
117  // Some MPI notes suggest that the return code is MPI_SUCCESS when
118  // no buffer is attached.
119  // Be extra careful and require a non-zero size as well.
120 
121 #ifndef SGIMPI
122  int len = 0;
123  char* buf = nullptr;
124 
125  if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
126  {
127  delete[] buf;
128  }
129 #endif
130 }
131 
132 
133 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
134 
135 // NOTE:
136 // valid parallel options vary between implementations, but flag common ones.
137 // if they are not removed by MPI_Init(), the subsequent argument processing
138 // will notice that they are wrong
139 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
140 {
141  validParOptions.insert("np", "");
142  validParOptions.insert("p4pg", "PI file");
143  validParOptions.insert("p4wd", "directory");
144  validParOptions.insert("p4amslave", "");
145  validParOptions.insert("p4yourname", "hostname");
146  validParOptions.insert("machinefile", "machine file");
147 }
148 
149 
151 {
152  int flag = 0;
153 
154  MPI_Finalized(&flag);
155  if (flag)
156  {
157  // Already finalized - this is an error
159  << "MPI was already finalized - cannot perform MPI_Init\n"
161 
162  return false;
163  }
164 
165  MPI_Initialized(&flag);
166  if (flag)
167  {
168  if (debug)
169  {
170  Pout<< "UPstream::initNull : was already initialized\n";
171  }
172  }
173  else
174  {
175  // Not already initialized
176 
177  MPI_Init_thread
178  (
179  nullptr, // argc
180  nullptr, // argv
181  MPI_THREAD_SINGLE,
182  &flag // provided_thread_support
183  );
184 
185  ourMpi = true;
186  }
187 
188  // Could also attach buffers etc.
189 
190  return true;
191 }
192 
193 
194 bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
195 {
196  int numprocs = 0, myRank = 0;
197  int provided_thread_support = 0;
198  int flag = 0;
199 
200  MPI_Finalized(&flag);
201  if (flag)
202  {
203  // Already finalized - this is an error
205  << "MPI was already finalized - cannot perform MPI_Init" << endl
207 
208  return false;
209  }
210 
211  MPI_Initialized(&flag);
212  if (flag)
213  {
214  // Already initialized.
215  // Warn if we've called twice, but skip if initialized externally
216 
217  if (ourMpi)
218  {
220  << "MPI was already initialized - cannot perform MPI_Init" << nl
221  << "This could indicate an application programming error!"
222  << endl;
223 
224  return true;
225  }
226  else if (debug)
227  {
228  Pout<< "UPstream::init : was already initialized\n";
229  }
230  }
231  else
232  {
233  MPI_Init_thread
234  (
235  &argc,
236  &argv,
237  (
238  needsThread
239  ? MPI_THREAD_MULTIPLE
240  : MPI_THREAD_SINGLE
241  ),
242  &provided_thread_support
243  );
244 
245  ourMpi = true;
246  }
247 
248  // Check argument list for local world
249  label worldIndex = -1;
250  word world;
251  for (int argi = 1; argi < argc; ++argi)
252  {
253  if (strcmp(argv[argi], "-world") == 0)
254  {
255  worldIndex = argi++;
256  if (argi >= argc)
257  {
259  << "Missing world name to argument \"world\""
261  }
262  world = argv[argi];
263  break;
264  }
265  }
266 
267  // Filter 'world' option
268  if (worldIndex != -1)
269  {
270  for (label i = worldIndex+2; i < argc; i++)
271  {
272  argv[i-2] = argv[i];
273  }
274  argc -= 2;
275  }
276 
277  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
278  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
279 
280  if (debug)
281  {
282  Pout<< "UPstream::init :"
283  << " thread-support : wanted:" << needsThread
284  << " obtained:"
285  << (
286  provided_thread_support == MPI_THREAD_MULTIPLE
287  ? "MPI_THREAD_MULTIPLE"
288  : "MPI_THREAD_SINGLE"
289  )
290  << " procs:" << numprocs
291  << " rank:" << myRank
292  << " world:" << world << endl;
293  }
294 
295  if (worldIndex == -1 && numprocs <= 1)
296  {
298  << "attempt to run parallel on 1 processor"
300  }
301 
302  // Initialise parallel structure
303  setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
304 
305  if (worldIndex != -1)
306  {
307  wordList worlds(numprocs);
308  worlds[Pstream::myProcNo()] = world;
309  Pstream::gatherList(worlds);
310  Pstream::scatterList(worlds);
311 
312  // Compact
313  if (Pstream::master())
314  {
315  DynamicList<word> allWorlds(numprocs);
316  for (const word& world : worlds)
317  {
318  allWorlds.appendUniq(world);
319  }
320  allWorlds_ = std::move(allWorlds);
321 
322  worldIDs_.setSize(numprocs);
323  forAll(worlds, proci)
324  {
325  const word& world = worlds[proci];
326  worldIDs_[proci] = allWorlds_.find(world);
327  }
328  }
329  Pstream::scatter(allWorlds_);
330  Pstream::scatter(worldIDs_);
331 
332  DynamicList<label> subRanks;
333  forAll(worlds, proci)
334  {
335  if (worlds[proci] == worlds[Pstream::myProcNo()])
336  {
337  subRanks.append(proci);
338  }
339  }
340 
341  // Allocate new communicator 1 with parent 0 (= mpi_world)
342  const label subComm = allocateCommunicator(0, subRanks, true);
343 
344  // Override worldComm
345  UPstream::worldComm = subComm;
346  // For testing: warn use of non-worldComm
348 
349  if (debug)
350  {
351  // Check
352  int subNProcs, subRank;
353  MPI_Comm_size
354  (
356  &subNProcs
357  );
358  MPI_Comm_rank
359  (
361  &subRank
362  );
363 
364  Pout<< "UPstream::init : in world:" << world
365  << " using local communicator:" << subComm
366  << " with procs:" << subNProcs
367  << " and rank:" << subRank
368  << endl;
369  }
370 
371  // Override Pout prefix (move to setParRun?)
372  Pout.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
373  Perr.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
374  }
375  else
376  {
377  // All processors use world 0
378  worldIDs_.setSize(numprocs, 0);
379  }
380 
382 
383  return true;
384 }
385 
386 
387 void Foam::UPstream::shutdown(int errNo)
388 {
389  if (debug)
390  {
391  Pout<< "UPstream::shutdown\n";
392  }
393 
394  int flag = 0;
395 
396  MPI_Initialized(&flag);
397  if (!flag)
398  {
399  // No MPI initialized - we are done
400  return;
401  }
402 
403  MPI_Finalized(&flag);
404  if (flag)
405  {
406  // Already finalized elsewhere?
407  if (ourMpi)
408  {
410  << "MPI was already finalized (by a connected program?)\n";
411  }
412  else if (debug)
413  {
414  Pout<< "UPstream::shutdown : was already finalized\n";
415  }
416  }
417  else
418  {
420  }
421 
422 
423  // Warn about any outstanding requests
424  {
425  label nOutstanding = 0;
426 
428  {
429  if (!PstreamGlobals::freedRequests_.found(requestID))
430  {
431  ++nOutstanding;
432  }
433  }
434 
436 
437  if (nOutstanding)
438  {
440  << "There were still " << nOutstanding
441  << " outstanding MPI_Requests." << nl
442  << "Which means your code exited before doing a "
443  << " UPstream::waitRequests()." << nl
444  << "This should not happen for a normal code exit."
445  << nl;
446  }
447  }
448 
449  // Clean mpi communicators
450  forAll(myProcNo_, communicator)
451  {
452  if (myProcNo_[communicator] != -1)
453  {
454  freePstreamCommunicator(communicator);
455  }
456  }
457 
458  if (!flag)
459  {
460  // MPI not already finalized
461 
462  if (!ourMpi)
463  {
465  << "Finalizing MPI, but was initialized elsewhere\n";
466  }
467 
468  if (errNo == 0)
469  {
470  MPI_Finalize();
471  }
472  else
473  {
474  // Abort only locally or world?
475  MPI_Abort(MPI_COMM_WORLD, errNo);
476  }
477  }
478 }
479 
480 
481 void Foam::UPstream::exit(int errNo)
482 {
483  UPstream::shutdown(errNo);
484  std::exit(errNo);
485 }
486 
487 
489 {
490  MPI_Abort(MPI_COMM_WORLD, 1);
491 }
492 
493 
494 void Foam::reduce
495 (
496  scalar& Value,
497  const sumOp<scalar>& bop,
498  const int tag,
499  const label communicator
500 )
501 {
502  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
503  {
504  Pout<< "** reducing:" << Value << " with comm:" << communicator
505  << " warnComm:" << UPstream::warnComm
506  << endl;
508  }
509  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
510 }
511 
512 
513 void Foam::reduce
514 (
515  scalar& Value,
516  const minOp<scalar>& bop,
517  const int tag,
518  const label communicator
519 )
520 {
521  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
522  {
523  Pout<< "** reducing:" << Value << " with comm:" << communicator
524  << " warnComm:" << UPstream::warnComm
525  << endl;
527  }
528  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
529 }
530 
531 
532 void Foam::reduce
533 (
534  vector2D& Value,
535  const sumOp<vector2D>& bop,
536  const int tag,
537  const label communicator
538 )
539 {
540  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
541  {
542  Pout<< "** reducing:" << Value << " with comm:" << communicator
543  << " warnComm:" << UPstream::warnComm
544  << endl;
546  }
547  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
548 }
549 
550 
551 void Foam::sumReduce
552 (
553  scalar& Value,
554  label& Count,
555  const int tag,
556  const label communicator
557 )
558 {
559  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
560  {
561  Pout<< "** sumReduce:" << Value << " with comm:" << communicator
562  << " warnComm:" << UPstream::warnComm
563  << endl;
565  }
566  vector2D twoScalars(Value, scalar(Count));
567  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
568 
569  Value = twoScalars.x();
570  Count = twoScalars.y();
571 }
572 
573 
574 void Foam::reduce
575 (
576  scalar& Value,
577  const sumOp<scalar>& bop,
578  const int tag,
579  const label communicator,
580  label& requestID
581 )
582 {
583  iallReduce<scalar>(&Value, 1, MPI_SCALAR, MPI_SUM, communicator, requestID);
584 }
585 
586 
587 void Foam::reduce
588 (
589  scalar values[],
590  const int size,
591  const sumOp<scalar>& bop,
592  const int tag,
593  const label communicator,
594  label& requestID
595 )
596 {
597  iallReduce<scalar>
598  (
599  values,
600  size,
601  MPI_SCALAR,
602  MPI_SUM,
603  communicator,
604  requestID
605  );
606 }
607 
608 
609 #if defined(WM_SPDP)
610 void Foam::reduce
611 (
612  solveScalar& Value,
613  const sumOp<solveScalar>& bop,
614  const int tag,
615  const label communicator
616 )
617 {
618  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
619  {
620  Pout<< "** reducing:" << Value << " with comm:" << communicator
621  << " warnComm:" << UPstream::warnComm
622  << endl;
624  }
625  allReduce(Value, 1, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
626 }
627 
628 
629 void Foam::reduce
630 (
631  solveScalar& Value,
632  const minOp<solveScalar>& bop,
633  const int tag,
634  const label communicator
635 )
636 {
637  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
638  {
639  Pout<< "** reducing:" << Value << " with comm:" << communicator
640  << " warnComm:" << UPstream::warnComm
641  << endl;
643  }
644  allReduce(Value, 1, MPI_SOLVESCALAR, MPI_MIN, bop, tag, communicator);
645 }
646 
647 
648 void Foam::reduce
649 (
650  Vector2D<solveScalar>& Value,
651  const sumOp<Vector2D<solveScalar>>& bop,
652  const int tag,
653  const label communicator
654 )
655 {
656  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
657  {
658  Pout<< "** reducing:" << Value << " with comm:" << communicator
659  << " warnComm:" << UPstream::warnComm
660  << endl;
662  }
663  allReduce(Value, 2, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
664 }
665 
666 
667 void Foam::sumReduce
668 (
669  solveScalar& Value,
670  label& Count,
671  const int tag,
672  const label communicator
673 )
674 {
675  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
676  {
677  Pout<< "** reducing:" << Value << " with comm:" << communicator
678  << " warnComm:" << UPstream::warnComm
679  << endl;
681  }
682  Vector2D<solveScalar> twoScalars(Value, solveScalar(Count));
683  reduce(twoScalars, sumOp<Vector2D<solveScalar>>(), tag, communicator);
684 
685  Value = twoScalars.x();
686  Count = twoScalars.y();
687 }
688 
689 
690 void Foam::reduce
691 (
692  solveScalar& Value,
693  const sumOp<solveScalar>& bop,
694  const int tag,
695  const label communicator,
696  label& requestID
697 )
698 {
699  iallReduce<solveScalar>
700  (
701  &Value,
702  1,
703  MPI_SOLVESCALAR,
704  MPI_SUM,
705  communicator,
706  requestID
707  );
708 }
709 
710 
711 void Foam::reduce
712 (
713  solveScalar values[],
714  const int size,
715  const sumOp<solveScalar>& bop,
716  const int tag,
717  const label communicator,
718  label& requestID
719 )
720 {
721  iallReduce<solveScalar>
722  (
723  values,
724  size,
725  MPI_SOLVESCALAR,
726  MPI_SUM,
727  communicator,
728  requestID
729  );
730 }
731 #endif
732 
733 
735 (
736  const labelUList& sendData,
737  labelUList& recvData,
738  const label communicator
739 )
740 {
741  const label np = nProcs(communicator);
742 
743  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
744  {
745  Pout<< "** allToAll :"
746  << " np:" << np
747  << " sendData:" << sendData.size()
748  << " with comm:" << communicator
749  << " warnComm:" << UPstream::warnComm
750  << endl;
752  }
753 
754  if (sendData.size() != np || recvData.size() != np)
755  {
757  << "Size of sendData " << sendData.size()
758  << " or size of recvData " << recvData.size()
759  << " is not equal to the number of processors in the domain "
760  << np
762  }
763 
764  if (!UPstream::parRun())
765  {
766  recvData.deepCopy(sendData);
767  }
768  else
769  {
771 
772  if
773  (
774  MPI_Alltoall
775  (
776  // NOTE: const_cast is a temporary hack for
777  // backward-compatibility with versions of OpenMPI < 1.7.4
778  const_cast<label*>(sendData.cdata()),
779  sizeof(label),
780  MPI_BYTE,
781  recvData.data(),
782  sizeof(label),
783  MPI_BYTE,
785  )
786  )
787  {
789  << "MPI_Alltoall failed for " << sendData
790  << " on communicator " << communicator
792  }
793 
795  }
796 }
797 
798 
800 (
801  const char* sendData,
802  const UList<int>& sendSizes,
803  const UList<int>& sendOffsets,
804 
805  char* recvData,
806  const UList<int>& recvSizes,
807  const UList<int>& recvOffsets,
808 
809  const label communicator
810 )
811 {
812  const label np = nProcs(communicator);
813 
815  {
816  Pout<< "** MPI_Alltoallv :"
817  << " sendSizes:" << sendSizes
818  << " sendOffsets:" << sendOffsets
819  << " with comm:" << communicator
820  << " warnComm:" << UPstream::warnComm
821  << endl;
823  }
824 
825  if
826  (
827  sendSizes.size() != np
828  || sendOffsets.size() != np
829  || recvSizes.size() != np
830  || recvOffsets.size() != np
831  )
832  {
834  << "Size of sendSize " << sendSizes.size()
835  << ", sendOffsets " << sendOffsets.size()
836  << ", recvSizes " << recvSizes.size()
837  << " or recvOffsets " << recvOffsets.size()
838  << " is not equal to the number of processors in the domain "
839  << np
841  }
842 
843  if (!UPstream::parRun())
844  {
845  if (recvSizes[0] != sendSizes[0])
846  {
848  << "Bytes to send " << sendSizes[0]
849  << " does not equal bytes to receive " << recvSizes[0]
851  }
852  std::memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
853  }
854  else
855  {
857 
858  if
859  (
860  MPI_Alltoallv
861  (
862  const_cast<char*>(sendData),
863  const_cast<int*>(sendSizes.cdata()),
864  const_cast<int*>(sendOffsets.cdata()),
865  MPI_BYTE,
866  recvData,
867  const_cast<int*>(recvSizes.cdata()),
868  const_cast<int*>(recvOffsets.cdata()),
869  MPI_BYTE,
871  )
872  )
873  {
875  << "MPI_Alltoallv failed for sendSizes " << sendSizes
876  << " recvSizes " << recvSizes
877  << " communicator " << communicator
879  }
880 
882  }
883 }
884 
885 
887 (
888  const char* sendData,
889  int sendSize,
890 
891  char* recvData,
892  int recvSize,
893  const label communicator
894 )
895 {
896  const label np = nProcs(communicator);
897 
898  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
899  {
900  Pout<< "** MPI_Gather :"
901  << " np:" << np
902  << " recvSize:" << recvSize
903  << " with comm:" << communicator
904  << " warnComm:" << UPstream::warnComm
905  << endl;
907  }
908 
909  if (!UPstream::parRun())
910  {
911  std::memmove(recvData, sendData, recvSize);
912  }
913  else
914  {
916 
917  if
918  (
919  MPI_Gather
920  (
921  const_cast<char*>(sendData),
922  sendSize,
923  MPI_BYTE,
924  recvData,
925  recvSize,
926  MPI_BYTE,
927  0,
928  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
929  )
930  )
931  {
933  << "MPI_Gather failed for sendSize " << sendSize
934  << " recvSize " << recvSize
935  << " communicator " << communicator
937  }
938 
940  }
941 }
942 
943 
945 (
946  const char* sendData,
947  int sendSize,
948 
949  char* recvData,
950  int recvSize,
951  const label communicator
952 )
953 {
954  const label np = nProcs(communicator);
955 
956  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
957  {
958  Pout<< "** MPI_Scatter :"
959  << " np:" << np
960  << " recvSize:" << recvSize
961  << " with comm:" << communicator
962  << " warnComm:" << UPstream::warnComm
963  << endl;
965  }
966 
967  if (!UPstream::parRun())
968  {
969  std::memmove(recvData, sendData, recvSize);
970  }
971  else
972  {
974 
975  if
976  (
977  MPI_Scatter
978  (
979  const_cast<char*>(sendData),
980  sendSize,
981  MPI_BYTE,
982  recvData,
983  recvSize,
984  MPI_BYTE,
985  0,
986  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
987  )
988  )
989  {
991  << "MPI_Scatter failed for sendSize " << sendSize
992  << " recvSize " << recvSize
993  << " communicator " << communicator
995  }
996 
998  }
999 }
1000 
1001 
1003 (
1004  const char* sendData,
1005  int sendSize,
1006 
1007  char* recvData,
1008  const UList<int>& recvSizes,
1009  const UList<int>& recvOffsets,
1010  const label communicator
1011 )
1012 {
1013  const label np = nProcs(communicator);
1014 
1015  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
1016  {
1017  Pout<< "** MPI_Gatherv :"
1018  << " np:" << np
1019  << " recvSizes:" << recvSizes
1020  << " recvOffsets:" << recvOffsets
1021  << " with comm:" << communicator
1022  << " warnComm:" << UPstream::warnComm
1023  << endl;
1025  }
1026 
1027  if
1028  (
1029  UPstream::master(communicator)
1030  && (recvSizes.size() != np || recvOffsets.size() < np)
1031  )
1032  {
1033  // Note: allow recvOffsets to be e.g. 1 larger than np so we
1034  // can easily loop over the result
1035 
1037  << "Size of recvSizes " << recvSizes.size()
1038  << " or recvOffsets " << recvOffsets.size()
1039  << " is not equal to the number of processors in the domain "
1040  << np
1041  << Foam::abort(FatalError);
1042  }
1043 
1044  if (!UPstream::parRun())
1045  {
1046  // recvSizes[0] may be invalid - use sendSize instead
1047  std::memmove(recvData, sendData, sendSize);
1048  }
1049  else
1050  {
1052 
1053  if
1054  (
1055  MPI_Gatherv
1056  (
1057  const_cast<char*>(sendData),
1058  sendSize,
1059  MPI_BYTE,
1060  recvData,
1061  const_cast<int*>(recvSizes.cdata()),
1062  const_cast<int*>(recvOffsets.cdata()),
1063  MPI_BYTE,
1064  0,
1065  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
1066  )
1067  )
1068  {
1070  << "MPI_Gatherv failed for sendSize " << sendSize
1071  << " recvSizes " << recvSizes
1072  << " communicator " << communicator
1073  << Foam::abort(FatalError);
1074  }
1075 
1077  }
1078 }
1079 
1080 
1082 (
1083  const char* sendData,
1084  const UList<int>& sendSizes,
1085  const UList<int>& sendOffsets,
1086 
1087  char* recvData,
1088  int recvSize,
1089  const label communicator
1090 )
1091 {
1092  const label np = nProcs(communicator);
1093 
1094  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
1095  {
1096  Pout<< "** MPI_Scatterv :"
1097  << " np:" << np
1098  << " sendSizes:" << sendSizes
1099  << " sendOffsets:" << sendOffsets
1100  << " with comm:" << communicator
1101  << " warnComm:" << UPstream::warnComm
1102  << endl;
1104  }
1105 
1106  if
1107  (
1108  UPstream::master(communicator)
1109  && (sendSizes.size() != np || sendOffsets.size() != np)
1110  )
1111  {
1113  << "Size of sendSizes " << sendSizes.size()
1114  << " or sendOffsets " << sendOffsets.size()
1115  << " is not equal to the number of processors in the domain "
1116  << np
1117  << Foam::abort(FatalError);
1118  }
1119 
1120  if (!UPstream::parRun())
1121  {
1122  std::memmove(recvData, sendData, recvSize);
1123  }
1124  else
1125  {
1127 
1128  if
1129  (
1130  MPI_Scatterv
1131  (
1132  const_cast<char*>(sendData),
1133  const_cast<int*>(sendSizes.cdata()),
1134  const_cast<int*>(sendOffsets.cdata()),
1135  MPI_BYTE,
1136  recvData,
1137  recvSize,
1138  MPI_BYTE,
1139  0,
1140  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
1141  )
1142  )
1143  {
1145  << "MPI_Scatterv failed for sendSizes " << sendSizes
1146  << " sendOffsets " << sendOffsets
1147  << " communicator " << communicator
1148  << Foam::abort(FatalError);
1149  }
1150 
1152  }
1153 }
1154 
1155 
1156 void Foam::UPstream::allocatePstreamCommunicator
1157 (
1158  const label parentIndex,
1159  const label index
1160 )
1161 {
1162  if (index == PstreamGlobals::MPIGroups_.size())
1163  {
1164  // Extend storage with dummy values
1165  MPI_Group newGroup = MPI_GROUP_NULL;
1166  PstreamGlobals::MPIGroups_.append(newGroup);
1167  MPI_Comm newComm = MPI_COMM_NULL;
1168  PstreamGlobals::MPICommunicators_.append(newComm);
1169  }
1170  else if (index > PstreamGlobals::MPIGroups_.size())
1171  {
1173  << "PstreamGlobals out of sync with UPstream data. Problem."
1174  << Foam::exit(FatalError);
1175  }
1176 
1177 
1178  if (parentIndex == -1)
1179  {
1180  // Allocate world communicator
1181 
1182  if (index != UPstream::worldComm)
1183  {
1185  << "world communicator should always be index "
1187  }
1188 
1189  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
1190  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
1191  MPI_Comm_rank
1192  (
1194  &myProcNo_[index]
1195  );
1196 
1197  // Set the number of processes to the actual number
1198  int numProcs;
1199  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
1200 
1201  //procIDs_[index] = identity(numProcs);
1202  procIDs_[index].setSize(numProcs);
1203  forAll(procIDs_[index], i)
1204  {
1205  procIDs_[index][i] = i;
1206  }
1207  }
1208  else
1209  {
1210  // Create new group
1211  MPI_Group_incl
1212  (
1213  PstreamGlobals::MPIGroups_[parentIndex],
1214  procIDs_[index].size(),
1215  procIDs_[index].cdata(),
1217  );
1218 
1219  #if defined(MSMPI_VER)
1220  // ms-mpi (10.0 and others?) does not have MPI_Comm_create_group
1221  MPI_Comm_create
1222  (
1223  PstreamGlobals::MPICommunicators_[parentIndex],
1226  );
1227  #else
1228  // Create new communicator for this group
1229  MPI_Comm_create_group
1230  (
1231  PstreamGlobals::MPICommunicators_[parentIndex],
1233  Pstream::msgType(),
1235  );
1236  #endif
1237 
1238  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
1239  {
1240  myProcNo_[index] = -1;
1241  }
1242  else
1243  {
1244  if
1245  (
1246  MPI_Comm_rank
1247  (
1249  &myProcNo_[index]
1250  )
1251  )
1252  {
1254  << "Problem :"
1255  << " when allocating communicator at " << index
1256  << " from ranks " << procIDs_[index]
1257  << " of parent " << parentIndex
1258  << " cannot find my own rank"
1259  << Foam::exit(FatalError);
1260  }
1261  }
1262  }
1263 }
1264 
1265 
1266 void Foam::UPstream::freePstreamCommunicator(const label communicator)
1267 {
1268  if (communicator != 0)
1269  {
1270  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
1271  {
1272  // Free communicator. Sets communicator to MPI_COMM_NULL
1273  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
1274  }
1275  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
1276  {
1277  // Free greoup. Sets group to MPI_GROUP_NULL
1278  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
1279  }
1280  }
1281 }
1282 
1283 
1284 Foam::label Foam::UPstream::nRequests()
1285 {
1287 }
1288 
1289 
1290 void Foam::UPstream::resetRequests(const label i)
1291 {
1292  if (i < PstreamGlobals::outstandingRequests_.size())
1293  {
1295  }
1296 }
1297 
1298 
1299 void Foam::UPstream::waitRequests(const label start)
1300 {
1301  if (UPstream::debug)
1302  {
1303  Pout<< "UPstream::waitRequests : starting wait for "
1304  << PstreamGlobals::outstandingRequests_.size()-start
1305  << " outstanding requests starting at " << start << endl;
1306  }
1307 
1309  {
1310  SubList<MPI_Request> waitRequests
1311  (
1313  PstreamGlobals::outstandingRequests_.size() - start,
1314  start
1315  );
1316 
1318 
1319  if
1320  (
1321  MPI_Waitall
1322  (
1323  waitRequests.size(),
1324  waitRequests.data(),
1325  MPI_STATUSES_IGNORE
1326  )
1327  )
1328  {
1330  << "MPI_Waitall returned with error" << Foam::endl;
1331  }
1332 
1334 
1335  resetRequests(start);
1336  }
1337 
1338  if (debug)
1339  {
1340  Pout<< "UPstream::waitRequests : finished wait." << endl;
1341  }
1342 }
1343 
1344 
1345 void Foam::UPstream::waitRequest(const label i)
1346 {
1347  if (debug)
1348  {
1349  Pout<< "UPstream::waitRequest : starting wait for request:" << i
1350  << endl;
1351  }
1352 
1353  if (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
1354  {
1356  << "There are " << PstreamGlobals::outstandingRequests_.size()
1357  << " outstanding send requests and you are asking for i=" << i
1358  << nl
1359  << "Maybe you are mixing blocking/non-blocking comms?"
1360  << Foam::abort(FatalError);
1361  }
1362 
1364 
1365  if
1366  (
1367  MPI_Wait
1368  (
1370  MPI_STATUS_IGNORE
1371  )
1372  )
1373  {
1375  << "MPI_Wait returned with error" << Foam::endl;
1376  }
1377 
1379  // Push index onto free cache
1381 
1382  if (debug)
1383  {
1384  Pout<< "UPstream::waitRequest : finished wait for request:" << i
1385  << endl;
1386  }
1387 }
1388 
1389 
1390 bool Foam::UPstream::finishedRequest(const label i)
1391 {
1392  if (debug)
1393  {
1394  Pout<< "UPstream::finishedRequest : checking request:" << i
1395  << endl;
1396  }
1397 
1398  if (i >= PstreamGlobals::outstandingRequests_.size())
1399  {
1401  << "There are " << PstreamGlobals::outstandingRequests_.size()
1402  << " outstanding send requests and you are asking for i=" << i
1403  << nl
1404  << "Maybe you are mixing blocking/non-blocking comms?"
1405  << Foam::abort(FatalError);
1406  }
1407 
1408  int flag;
1409  MPI_Test
1410  (
1412  &flag,
1413  MPI_STATUS_IGNORE
1414  );
1415 
1416  if (debug)
1417  {
1418  Pout<< "UPstream::finishedRequest : finished request:" << i
1419  << endl;
1420  }
1421 
1422  return flag != 0;
1423 }
1424 
1425 
1427 {
1428  int tag;
1429  if (PstreamGlobals::freedTags_.size())
1430  {
1432  }
1433  else
1434  {
1435  tag = PstreamGlobals::nTags_++;
1436  }
1437 
1438  if (debug)
1439  {
1440  //if (UPstream::lateBlocking > 0)
1441  //{
1442  // string& poutp = Pout.prefix();
1443  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
1444  // Perr.prefix() = Pout.prefix();
1445  //}
1446  Pout<< "UPstream::allocateTag " << s
1447  << " : tag:" << tag
1448  << endl;
1449  }
1450 
1451  return tag;
1452 }
1453 
1454 
1456 {
1457  int tag;
1458  if (PstreamGlobals::freedTags_.size())
1459  {
1461  }
1462  else
1463  {
1464  tag = PstreamGlobals::nTags_++;
1465  }
1466 
1467  if (debug)
1468  {
1469  //if (UPstream::lateBlocking > 0)
1470  //{
1471  // string& poutp = Pout.prefix();
1472  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
1473  // Perr.prefix() = Pout.prefix();
1474  //}
1475  Pout<< "UPstream::allocateTag " << s
1476  << " : tag:" << tag
1477  << endl;
1478  }
1479 
1480  return tag;
1481 }
1482 
1483 
1484 void Foam::UPstream::freeTag(const char* s, const int tag)
1485 {
1486  if (debug)
1487  {
1488  //if (UPstream::lateBlocking > 0)
1489  //{
1490  // string& poutp = Pout.prefix();
1491  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
1492  // Perr.prefix() = Pout.prefix();
1493  //}
1494  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
1495  }
1497 }
1498 
1499 
1500 void Foam::UPstream::freeTag(const word& s, const int tag)
1501 {
1502  if (debug)
1503  {
1504  //if (UPstream::lateBlocking > 0)
1505  //{
1506  // string& poutp = Pout.prefix();
1507  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
1508  // Perr.prefix() = Pout.prefix();
1509  //}
1510  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
1511  }
1513 }
1514 
1515 
1516 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::allocateTag
static int allocateTag(const char *)
Definition: UPstream.C:1426
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:296
Foam::UPstream::resetRequests
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:258
int.H
System signed integer.
collatedFileOperation.H
Foam::profilingPstream::addWaitTime
static void addWaitTime()
Add time increment to waitTime.
Definition: profilingPstream.H:174
Foam::error::printStack
static void printStack(Ostream &os)
Helper function to print a stack.
Definition: dummyPrintStack.C:36
SubList.H
Foam::UList::cdata
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:230
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:65
Foam::PstreamGlobals::freedRequests_
DynamicList< label > freedRequests_
Definition: PstreamGlobals.C:33
s
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Definition: gmvOutputSpray.H:25
Foam::UPstream::mpiGather
static void mpiGather(const char *sendData, int sendSize, char *recvData, int recvSize, const label communicator=worldComm)
Receive data from all processors on the master (low-level)
Definition: UPstream.C:183
Foam::HashTableOps::values
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:149
Foam::Pstream::scatterList
static void scatterList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Scatter data. Reverse of gatherList.
Definition: gatherScatterList.C:215
Foam::read
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:108
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:262
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:457
Foam::Pstream::scatter
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
Definition: gatherScatter.C:150
Foam::profilingPstream::beginTiming
static void beginTiming()
Update timer prior to measurement.
Definition: profilingPstream.H:138
Foam::PstreamGlobals::outstandingRequests_
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Definition: PstreamGlobals.C:32
Foam::UPstream::abort
static void abort()
Call MPI_Abort with no other checks or cleanup.
Definition: UPstream.C:70
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:369
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::getEnv
string getEnv(const std::string &envName)
Get environment value for given envName.
Definition: MSwindows.C:371
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::profilingPstream::addScatterTime
static void addScatterTime()
Add time increment to scatterTime.
Definition: profilingPstream.H:162
Foam::PstreamGlobals::MPICommunicators_
DynamicList< MPI_Comm > MPICommunicators_
Definition: PstreamGlobals.C:39
Foam::wordList
List< word > wordList
A List of words.
Definition: fileName.H:62
Foam::allReduce
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
Definition: allReduceTemplates.C:36
Foam::PstreamGlobals::MPIGroups_
DynamicList< MPI_Group > MPIGroups_
Definition: PstreamGlobals.C:40
ourMpi
static bool ourMpi
Definition: UPstream.C:63
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:51
Foam::UPstream::mpiBufferSize
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:290
Foam::profilingPstream::addGatherTime
static void addGatherTime()
Add time increment to gatherTime.
Definition: profilingPstream.H:156
Foam::DynamicList::append
DynamicList< T, SizeMin > & append(const T &val)
Append an element to the end of this list.
Definition: DynamicListI.H:511
Foam::UPstream::waitRequest
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:266
Foam::UPstream::mpiScatter
static void mpiScatter(const char *sendData, int sendSize, char *recvData, int recvSize, const label communicator=worldComm)
Send data to all processors from master (low-level)
Definition: UPstream.C:197
Foam::UPstream::addValidParOptions
static void addValidParOptions(HashTable< string > &validParOptions)
Definition: UPstream.C:34
Foam::FatalError
error FatalError
Pstream.H
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
detachOurBuffers
static void detachOurBuffers()
Definition: UPstream.C:109
PstreamReduceOps.H
Inter-processor communication reduction functions.
allReduce.H
Various functions to wrap MPI_Allreduce.
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
PstreamGlobals.H
found
bool found
Definition: TABSMDCalcMethod2.H:32
Foam::Perr
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Foam::profilingPstream::addAllToAllTime
static void addAllToAllTime()
Add time increment to allToAllTime.
Definition: profilingPstream.H:180
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
Foam::Pstream::gatherList
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
Definition: gatherScatterList.C:52
Foam::UPstream::msgType
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:540
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:252
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:463
Foam::nl
constexpr char nl
Definition: Ostream.H:404
Foam::UPstream::scatter
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=worldComm)
Send data to all processors from the root of the communicator.
Definition: UPstream.C:226
Foam::vector2D
Vector2D< scalar > vector2D
A 2D vector of scalars obtained from the generic Vector2D.
Definition: vector2D.H:51
Foam::UPstream::initNull
static bool initNull()
Special purpose initialisation function.
Definition: UPstream.C:38
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
Foam::UPstream::parRun
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:433
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
Foam::UPstream::shutdown
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
Definition: UPstream.C:59
Foam::UPstream::finishedRequest
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:270
Foam::DynamicList::remove
T remove()
Remove and return the last element. Fatal on an empty list.
Definition: DynamicListI.H:704
Foam::PstreamGlobals::freedTags_
DynamicList< int > freedTags_
Free'd message tags.
Definition: PstreamGlobals.C:37
Foam::name
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:59
ourBuffers
static bool ourBuffers
Definition: UPstream.C:60
Foam::UList::size
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
attachOurBuffers
static void attachOurBuffers()
Definition: UPstream.C:68
minBufLen
constexpr int minBufLen
Definition: UPstream.C:57
Foam::sumReduce
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
Definition: PstreamReduceOps.H:133
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:329
Foam::labelUList
UList< label > labelUList
A UList of labels.
Definition: UList.H:85
Foam::UPstream::init
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition: UPstream.C:48
Foam::UPstream::allToAll
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=worldComm)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:172
Foam::PstreamGlobals::nTags_
int nTags_
Max outstanding message tag operations.
Definition: PstreamGlobals.C:35
WarningInFunction
#define WarningInFunction
Report a warning using Foam::Warning.
Definition: messageStream.H:328
Foam::UPstream::gather
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=worldComm)
Receive data from all processors on the master.
Definition: UPstream.C:211
Foam::prefixOSstream::prefix
const string & prefix() const noexcept
Return the stream prefix.
Definition: prefixOSstream.H:101
profilingPstream.H
Foam::UPstream::exit
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
Definition: UPstream.C:63
Foam::UPstream::freeTag
static void freeTag(const char *, const int tag)
Definition: UPstream.C:1484