UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2016-2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "Pstream.H"
30 #include "PstreamReduceOps.H"
31 #include "PstreamGlobals.H"
32 #include "profilingPstream.H"
33 #include "SubList.H"
34 #include "allReduce.H"
35 #include "int.H"
36 #include "collatedFileOperation.H"
37 
38 #include <mpi.h>
39 #include <cstring>
40 #include <cstdlib>
41 #include <csignal>
42 
43 #if defined(WM_SP)
44  #define MPI_SCALAR MPI_FLOAT
45  #define MPI_SOLVESCALAR MPI_FLOAT
46 #elif defined(WM_SPDP)
47  #define MPI_SCALAR MPI_FLOAT
48  #define MPI_SOLVESCALAR MPI_DOUBLE
49 #elif defined(WM_DP)
50  #define MPI_SCALAR MPI_DOUBLE
51  #define MPI_SOLVESCALAR MPI_DOUBLE
52 #endif
53 
54 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
55 
56 // The min value and default for MPI buffers length
57 constexpr int minBufLen = 20000000;
58 
59 // Track if we have attached MPI buffers
60 static bool ourBuffers = false;
61 
62 // Track if we initialized MPI
63 static bool ourMpi = false;
64 
65 
66 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
67 
68 static void attachOurBuffers()
69 {
70  if (ourBuffers)
71  {
72  return; // Already attached
73  }
74  ourBuffers = true;
75 
76  // Use UPstream::mpiBufferSize (optimisationSwitch),
77  // but allow override with MPI_BUFFER_SIZE env variable (int value)
78 
79 #ifndef SGIMPI
80  int len = 0;
81 
82  const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
83  if (str.empty() || !Foam::read(str, len) || len <= 0)
84  {
86  }
87 
88  if (len < minBufLen)
89  {
90  len = minBufLen;
91  }
92 
94  {
95  Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
96  }
97 
98  char* buf = new char[len];
99 
100  if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
101  {
102  delete[] buf;
103  Foam::Pout<< "UPstream::init : could not attach buffer\n";
104  }
105 #endif
106 }
107 
108 
109 static void detachOurBuffers()
110 {
111  if (!ourBuffers)
112  {
113  return; // Nothing to detach
114  }
115  ourBuffers = false;
116 
117  // Some MPI notes suggest that the return code is MPI_SUCCESS when
118  // no buffer is attached.
119  // Be extra careful and require a non-zero size as well.
120 
121 #ifndef SGIMPI
122  int len = 0;
123  char* buf = nullptr;
124 
125  if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
126  {
127  delete[] buf;
128  }
129 #endif
130 }
131 
132 
133 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
134 
135 // NOTE:
136 // valid parallel options vary between implementations, but flag common ones.
137 // if they are not removed by MPI_Init(), the subsequent argument processing
138 // will notice that they are wrong
139 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
140 {
141  validParOptions.insert("np", "");
142  validParOptions.insert("p4pg", "PI file");
143  validParOptions.insert("p4wd", "directory");
144  validParOptions.insert("p4amslave", "");
145  validParOptions.insert("p4yourname", "hostname");
146  validParOptions.insert("machinefile", "machine file");
147 }
148 
149 
151 {
152  int flag = 0;
153 
154  MPI_Finalized(&flag);
155  if (flag)
156  {
157  // Already finalized - this is an error
159  << "MPI was already finalized - cannot perform MPI_Init\n"
161 
162  return false;
163  }
164 
165  MPI_Initialized(&flag);
166  if (flag)
167  {
168  if (debug)
169  {
170  Pout<< "UPstream::initNull : was already initialized\n";
171  }
172  }
173  else
174  {
175  // Not already initialized
176 
177  MPI_Init_thread
178  (
179  nullptr, // argc
180  nullptr, // argv
181  MPI_THREAD_SINGLE,
182  &flag // provided_thread_support
183  );
184 
185  ourMpi = true;
186  }
187 
188  // Could also attach buffers etc.
189 
190  return true;
191 }
192 
193 
194 bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
195 {
196  int numprocs = 0, myRank = 0;
197  int provided_thread_support = 0;
198  int flag = 0;
199 
200  MPI_Finalized(&flag);
201  if (flag)
202  {
203  // Already finalized - this is an error
205  << "MPI was already finalized - cannot perform MPI_Init" << endl
207 
208  return false;
209  }
210 
211  MPI_Initialized(&flag);
212  if (flag)
213  {
214  // Already initialized.
215  // Warn if we've called twice, but skip if initialized externally
216 
217  if (ourMpi)
218  {
220  << "MPI was already initialized - cannot perform MPI_Init" << nl
221  << "This could indicate an application programming error!"
222  << endl;
223 
224  return true;
225  }
226  else if (debug)
227  {
228  Pout<< "UPstream::init : was already initialized\n";
229  }
230  }
231  else
232  {
233  MPI_Init_thread
234  (
235  &argc,
236  &argv,
237  (
238  needsThread
239  ? MPI_THREAD_MULTIPLE
240  : MPI_THREAD_SINGLE
241  ),
242  &provided_thread_support
243  );
244 
245  ourMpi = true;
246  }
247 
248  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
249  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
250 
251  if (debug)
252  {
253  Pout<< "UPstream::init : procs=" << numprocs
254  << " rank:" << myRank << endl;
255  }
256 
257  if (numprocs <= 1)
258  {
260  << "attempt to run parallel on 1 processor"
262  }
263 
264  // Initialise parallel structure
265  setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
266 
268 
269  return true;
270 }
271 
272 
273 void Foam::UPstream::shutdown(int errNo)
274 {
275  if (debug)
276  {
277  Pout<< "UPstream::shutdown\n";
278  }
279 
280  int flag = 0;
281 
282  MPI_Initialized(&flag);
283  if (!flag)
284  {
285  // No MPI initialized - we are done
286  return;
287  }
288 
289  MPI_Finalized(&flag);
290  if (flag)
291  {
292  // Already finalized elsewhere?
293  if (ourMpi)
294  {
296  << "MPI was already finalized (by a connected program?)\n";
297  }
298  else if (debug)
299  {
300  Pout<< "UPstream::shutdown : was already finalized\n";
301  }
302  }
303  else
304  {
306  }
307 
308 
309  // Warn about any outstanding requests
310  {
311  label nOutstanding = 0;
312 
314  {
315  if (!PstreamGlobals::freedRequests_.found(requestID))
316  {
317  ++nOutstanding;
318  }
319  }
320 
322 
323  if (nOutstanding)
324  {
326  << "There were still " << nOutstanding
327  << " outstanding MPI_Requests." << nl
328  << "Which means your code exited before doing a "
329  << " UPstream::waitRequests()." << nl
330  << "This should not happen for a normal code exit."
331  << nl;
332  }
333  }
334 
335  // Clean mpi communicators
336  forAll(myProcNo_, communicator)
337  {
338  if (myProcNo_[communicator] != -1)
339  {
340  freePstreamCommunicator(communicator);
341  }
342  }
343 
344  if (!flag)
345  {
346  // MPI not already finalized
347 
348  if (!ourMpi)
349  {
351  << "Finalizing MPI, but was initialized elsewhere\n";
352  }
353 
354  if (errNo == 0)
355  {
356  MPI_Finalize();
357  }
358  else
359  {
360  MPI_Abort(MPI_COMM_WORLD, errNo);
361  }
362  }
363 }
364 
365 
366 void Foam::UPstream::exit(int errNo)
367 {
368  UPstream::shutdown(errNo);
369  std::exit(errNo);
370 }
371 
372 
374 {
375  MPI_Abort(MPI_COMM_WORLD, 1);
376 }
377 
378 
379 void Foam::reduce
380 (
381  scalar& Value,
382  const sumOp<scalar>& bop,
383  const int tag,
384  const label communicator
385 )
386 {
387  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
388  {
389  Pout<< "** reducing:" << Value << " with comm:" << communicator
390  << " warnComm:" << UPstream::warnComm
391  << endl;
393  }
394  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
395 }
396 
397 
398 void Foam::reduce
399 (
400  scalar& Value,
401  const minOp<scalar>& bop,
402  const int tag,
403  const label communicator
404 )
405 {
406  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
407  {
408  Pout<< "** reducing:" << Value << " with comm:" << communicator
409  << " warnComm:" << UPstream::warnComm
410  << endl;
412  }
413  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
414 }
415 
416 
417 void Foam::reduce
418 (
419  vector2D& Value,
420  const sumOp<vector2D>& bop,
421  const int tag,
422  const label communicator
423 )
424 {
425  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
426  {
427  Pout<< "** reducing:" << Value << " with comm:" << communicator
428  << " warnComm:" << UPstream::warnComm
429  << endl;
431  }
432  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
433 }
434 
435 
436 void Foam::sumReduce
437 (
438  scalar& Value,
439  label& Count,
440  const int tag,
441  const label communicator
442 )
443 {
444  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
445  {
446  Pout<< "** reducing:" << Value << " with comm:" << communicator
447  << " warnComm:" << UPstream::warnComm
448  << endl;
450  }
451  vector2D twoScalars(Value, scalar(Count));
452  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
453 
454  Value = twoScalars.x();
455  Count = twoScalars.y();
456 }
457 
458 
459 void Foam::reduce
460 (
461  scalar& Value,
462  const sumOp<scalar>& bop,
463  const int tag,
464  const label communicator,
465  label& requestID
466 )
467 {
468  iallReduce<scalar>(&Value, 1, MPI_SCALAR, MPI_SUM, communicator, requestID);
469 }
470 
471 
472 void Foam::reduce
473 (
474  scalar values[],
475  const int size,
476  const sumOp<scalar>& bop,
477  const int tag,
478  const label communicator,
479  label& requestID
480 )
481 {
482  iallReduce<scalar>
483  (
484  values,
485  size,
486  MPI_SCALAR,
487  MPI_SUM,
488  communicator,
489  requestID
490  );
491 }
492 
493 
494 #if defined(WM_SPDP)
495 void Foam::reduce
496 (
497  solveScalar& Value,
498  const sumOp<solveScalar>& bop,
499  const int tag,
500  const label communicator
501 )
502 {
503  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
504  {
505  Pout<< "** reducing:" << Value << " with comm:" << communicator
506  << " warnComm:" << UPstream::warnComm
507  << endl;
509  }
510  allReduce(Value, 1, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
511 }
512 
513 
514 void Foam::reduce
515 (
516  solveScalar& Value,
517  const minOp<solveScalar>& bop,
518  const int tag,
519  const label communicator
520 )
521 {
522  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
523  {
524  Pout<< "** reducing:" << Value << " with comm:" << communicator
525  << " warnComm:" << UPstream::warnComm
526  << endl;
528  }
529  allReduce(Value, 1, MPI_SOLVESCALAR, MPI_MIN, bop, tag, communicator);
530 }
531 
532 
533 void Foam::reduce
534 (
535  Vector2D<solveScalar>& Value,
536  const sumOp<Vector2D<solveScalar>>& bop,
537  const int tag,
538  const label communicator
539 )
540 {
541  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
542  {
543  Pout<< "** reducing:" << Value << " with comm:" << communicator
544  << " warnComm:" << UPstream::warnComm
545  << endl;
547  }
548  allReduce(Value, 2, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
549 }
550 
551 
552 void Foam::sumReduce
553 (
554  solveScalar& Value,
555  label& Count,
556  const int tag,
557  const label communicator
558 )
559 {
560  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
561  {
562  Pout<< "** reducing:" << Value << " with comm:" << communicator
563  << " warnComm:" << UPstream::warnComm
564  << endl;
566  }
567  Vector2D<solveScalar> twoScalars(Value, solveScalar(Count));
568  reduce(twoScalars, sumOp<Vector2D<solveScalar>>(), tag, communicator);
569 
570  Value = twoScalars.x();
571  Count = twoScalars.y();
572 }
573 
574 
575 void Foam::reduce
576 (
577  solveScalar& Value,
578  const sumOp<solveScalar>& bop,
579  const int tag,
580  const label communicator,
581  label& requestID
582 )
583 {
584  iallReduce<solveScalar>
585  (
586  &Value,
587  1,
588  MPI_SOLVESCALAR,
589  MPI_SUM,
590  communicator,
591  requestID
592  );
593 }
594 
595 
596 void Foam::reduce
597 (
598  solveScalar values[],
599  const int size,
600  const sumOp<solveScalar>& bop,
601  const int tag,
602  const label communicator,
603  label& requestID
604 )
605 {
606  iallReduce<solveScalar>
607  (
608  values,
609  size,
610  MPI_SOLVESCALAR,
611  MPI_SUM,
612  communicator,
613  requestID
614  );
615 }
616 #endif
617 
618 
620 (
621  const labelUList& sendData,
622  labelUList& recvData,
623  const label communicator
624 )
625 {
626  label np = nProcs(communicator);
627 
628  if (sendData.size() != np || recvData.size() != np)
629  {
631  << "Size of sendData " << sendData.size()
632  << " or size of recvData " << recvData.size()
633  << " is not equal to the number of processors in the domain "
634  << np
636  }
637 
638  if (!UPstream::parRun())
639  {
640  recvData.deepCopy(sendData);
641  }
642  else
643  {
645 
646  if
647  (
648  MPI_Alltoall
649  (
650  // NOTE: const_cast is a temporary hack for
651  // backward-compatibility with versions of OpenMPI < 1.7.4
652  const_cast<label*>(sendData.begin()),
653  sizeof(label),
654  MPI_BYTE,
655  recvData.begin(),
656  sizeof(label),
657  MPI_BYTE,
659  )
660  )
661  {
663  << "MPI_Alltoall failed for " << sendData
664  << " on communicator " << communicator
666  }
667 
669  }
670 }
671 
672 
674 (
675  const char* sendData,
676  const UList<int>& sendSizes,
677  const UList<int>& sendOffsets,
678 
679  char* recvData,
680  const UList<int>& recvSizes,
681  const UList<int>& recvOffsets,
682 
683  const label communicator
684 )
685 {
686  label np = nProcs(communicator);
687 
688  if
689  (
690  sendSizes.size() != np
691  || sendOffsets.size() != np
692  || recvSizes.size() != np
693  || recvOffsets.size() != np
694  )
695  {
697  << "Size of sendSize " << sendSizes.size()
698  << ", sendOffsets " << sendOffsets.size()
699  << ", recvSizes " << recvSizes.size()
700  << " or recvOffsets " << recvOffsets.size()
701  << " is not equal to the number of processors in the domain "
702  << np
704  }
705 
706  if (!UPstream::parRun())
707  {
708  if (recvSizes[0] != sendSizes[0])
709  {
711  << "Bytes to send " << sendSizes[0]
712  << " does not equal bytes to receive " << recvSizes[0]
714  }
715  std::memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
716  }
717  else
718  {
720 
721  if
722  (
723  MPI_Alltoallv
724  (
725  const_cast<char*>(sendData),
726  const_cast<int*>(sendSizes.begin()),
727  const_cast<int*>(sendOffsets.begin()),
728  MPI_BYTE,
729  recvData,
730  const_cast<int*>(recvSizes.begin()),
731  const_cast<int*>(recvOffsets.begin()),
732  MPI_BYTE,
734  )
735  )
736  {
738  << "MPI_Alltoallv failed for sendSizes " << sendSizes
739  << " recvSizes " << recvSizes
740  << " communicator " << communicator
742  }
743 
745  }
746 }
747 
748 
750 (
751  const char* sendData,
752  int sendSize,
753 
754  char* recvData,
755  const UList<int>& recvSizes,
756  const UList<int>& recvOffsets,
757  const label communicator
758 )
759 {
760  label np = nProcs(communicator);
761 
762  if
763  (
764  UPstream::master(communicator)
765  && (recvSizes.size() != np || recvOffsets.size() < np)
766  )
767  {
768  // Note: allow recvOffsets to be e.g. 1 larger than np so we
769  // can easily loop over the result
770 
772  << "Size of recvSizes " << recvSizes.size()
773  << " or recvOffsets " << recvOffsets.size()
774  << " is not equal to the number of processors in the domain "
775  << np
777  }
778 
779  if (!UPstream::parRun())
780  {
781  std::memmove(recvData, sendData, sendSize);
782  }
783  else
784  {
786 
787  if
788  (
789  MPI_Gatherv
790  (
791  const_cast<char*>(sendData),
792  sendSize,
793  MPI_BYTE,
794  recvData,
795  const_cast<int*>(recvSizes.begin()),
796  const_cast<int*>(recvOffsets.begin()),
797  MPI_BYTE,
798  0,
799  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
800  )
801  )
802  {
804  << "MPI_Gatherv failed for sendSize " << sendSize
805  << " recvSizes " << recvSizes
806  << " communicator " << communicator
808  }
809 
811  }
812 }
813 
814 
816 (
817  const char* sendData,
818  const UList<int>& sendSizes,
819  const UList<int>& sendOffsets,
820 
821  char* recvData,
822  int recvSize,
823  const label communicator
824 )
825 {
826  label np = nProcs(communicator);
827 
828  if
829  (
830  UPstream::master(communicator)
831  && (sendSizes.size() != np || sendOffsets.size() != np)
832  )
833  {
835  << "Size of sendSizes " << sendSizes.size()
836  << " or sendOffsets " << sendOffsets.size()
837  << " is not equal to the number of processors in the domain "
838  << np
840  }
841 
842  if (!UPstream::parRun())
843  {
844  std::memmove(recvData, sendData, recvSize);
845  }
846  else
847  {
849 
850  if
851  (
852  MPI_Scatterv
853  (
854  const_cast<char*>(sendData),
855  const_cast<int*>(sendSizes.begin()),
856  const_cast<int*>(sendOffsets.begin()),
857  MPI_BYTE,
858  recvData,
859  recvSize,
860  MPI_BYTE,
861  0,
862  MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
863  )
864  )
865  {
867  << "MPI_Scatterv failed for sendSizes " << sendSizes
868  << " sendOffsets " << sendOffsets
869  << " communicator " << communicator
871  }
872 
874  }
875 }
876 
877 
878 void Foam::UPstream::allocatePstreamCommunicator
879 (
880  const label parentIndex,
881  const label index
882 )
883 {
884  if (index == PstreamGlobals::MPIGroups_.size())
885  {
886  // Extend storage with dummy values
887  MPI_Group newGroup = MPI_GROUP_NULL;
888  PstreamGlobals::MPIGroups_.append(newGroup);
889  MPI_Comm newComm = MPI_COMM_NULL;
890  PstreamGlobals::MPICommunicators_.append(newComm);
891  }
892  else if (index > PstreamGlobals::MPIGroups_.size())
893  {
895  << "PstreamGlobals out of sync with UPstream data. Problem."
897  }
898 
899 
900  if (parentIndex == -1)
901  {
902  // Allocate world communicator
903 
904  if (index != UPstream::worldComm)
905  {
907  << "world communicator should always be index "
909  }
910 
911  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
912  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
913  MPI_Comm_rank
914  (
916  &myProcNo_[index]
917  );
918 
919  // Set the number of processes to the actual number
920  int numProcs;
921  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
922 
923  //procIDs_[index] = identity(numProcs);
924  procIDs_[index].setSize(numProcs);
925  forAll(procIDs_[index], i)
926  {
927  procIDs_[index][i] = i;
928  }
929  }
930  else
931  {
932  // Create new group
933  MPI_Group_incl
934  (
935  PstreamGlobals::MPIGroups_[parentIndex],
936  procIDs_[index].size(),
937  procIDs_[index].begin(),
939  );
940 
941  // Create new communicator
942  MPI_Comm_create
943  (
947  );
948 
949  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
950  {
951  myProcNo_[index] = -1;
952  }
953  else
954  {
955  if
956  (
957  MPI_Comm_rank
958  (
960  &myProcNo_[index]
961  )
962  )
963  {
965  << "Problem :"
966  << " when allocating communicator at " << index
967  << " from ranks " << procIDs_[index]
968  << " of parent " << parentIndex
969  << " cannot find my own rank"
971  }
972  }
973  }
974 }
975 
976 
977 void Foam::UPstream::freePstreamCommunicator(const label communicator)
978 {
979  if (communicator != UPstream::worldComm)
980  {
981  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
982  {
983  // Free communicator. Sets communicator to MPI_COMM_NULL
984  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
985  }
986  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
987  {
988  // Free greoup. Sets group to MPI_GROUP_NULL
989  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
990  }
991  }
992 }
993 
994 
995 Foam::label Foam::UPstream::nRequests()
996 {
998 }
999 
1000 
1001 void Foam::UPstream::resetRequests(const label i)
1002 {
1003  if (i < PstreamGlobals::outstandingRequests_.size())
1004  {
1006  }
1007 }
1008 
1009 
1010 void Foam::UPstream::waitRequests(const label start)
1011 {
1012  if (UPstream::debug)
1013  {
1014  Pout<< "UPstream::waitRequests : starting wait for "
1015  << PstreamGlobals::outstandingRequests_.size()-start
1016  << " outstanding requests starting at " << start << endl;
1017  }
1018 
1020  {
1021  SubList<MPI_Request> waitRequests
1022  (
1024  PstreamGlobals::outstandingRequests_.size() - start,
1025  start
1026  );
1027 
1029 
1030  if
1031  (
1032  MPI_Waitall
1033  (
1034  waitRequests.size(),
1035  waitRequests.begin(),
1036  MPI_STATUSES_IGNORE
1037  )
1038  )
1039  {
1041  << "MPI_Waitall returned with error" << Foam::endl;
1042  }
1043 
1045 
1046  resetRequests(start);
1047  }
1048 
1049  if (debug)
1050  {
1051  Pout<< "UPstream::waitRequests : finished wait." << endl;
1052  }
1053 }
1054 
1055 
1056 void Foam::UPstream::waitRequest(const label i)
1057 {
1058  if (debug)
1059  {
1060  Pout<< "UPstream::waitRequest : starting wait for request:" << i
1061  << endl;
1062  }
1063 
1064  if (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
1065  {
1067  << "There are " << PstreamGlobals::outstandingRequests_.size()
1068  << " outstanding send requests and you are asking for i=" << i
1069  << nl
1070  << "Maybe you are mixing blocking/non-blocking comms?"
1071  << Foam::abort(FatalError);
1072  }
1073 
1075 
1076  if
1077  (
1078  MPI_Wait
1079  (
1081  MPI_STATUS_IGNORE
1082  )
1083  )
1084  {
1086  << "MPI_Wait returned with error" << Foam::endl;
1087  }
1088 
1090  // Push index onto free cache
1092 
1093  if (debug)
1094  {
1095  Pout<< "UPstream::waitRequest : finished wait for request:" << i
1096  << endl;
1097  }
1098 }
1099 
1100 
1101 bool Foam::UPstream::finishedRequest(const label i)
1102 {
1103  if (debug)
1104  {
1105  Pout<< "UPstream::finishedRequest : checking request:" << i
1106  << endl;
1107  }
1108 
1109  if (i >= PstreamGlobals::outstandingRequests_.size())
1110  {
1112  << "There are " << PstreamGlobals::outstandingRequests_.size()
1113  << " outstanding send requests and you are asking for i=" << i
1114  << nl
1115  << "Maybe you are mixing blocking/non-blocking comms?"
1116  << Foam::abort(FatalError);
1117  }
1118 
1119  int flag;
1120  MPI_Test
1121  (
1123  &flag,
1124  MPI_STATUS_IGNORE
1125  );
1126 
1127  if (debug)
1128  {
1129  Pout<< "UPstream::finishedRequest : finished request:" << i
1130  << endl;
1131  }
1132 
1133  return flag != 0;
1134 }
1135 
1136 
1138 {
1139  int tag;
1140  if (PstreamGlobals::freedTags_.size())
1141  {
1143  }
1144  else
1145  {
1146  tag = PstreamGlobals::nTags_++;
1147  }
1148 
1149  if (debug)
1150  {
1151  //if (UPstream::lateBlocking > 0)
1152  //{
1153  // string& poutp = Pout.prefix();
1154  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
1155  // Perr.prefix() = Pout.prefix();
1156  //}
1157  Pout<< "UPstream::allocateTag " << s
1158  << " : tag:" << tag
1159  << endl;
1160  }
1161 
1162  return tag;
1163 }
1164 
1165 
1167 {
1168  int tag;
1169  if (PstreamGlobals::freedTags_.size())
1170  {
1172  }
1173  else
1174  {
1175  tag = PstreamGlobals::nTags_++;
1176  }
1177 
1178  if (debug)
1179  {
1180  //if (UPstream::lateBlocking > 0)
1181  //{
1182  // string& poutp = Pout.prefix();
1183  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
1184  // Perr.prefix() = Pout.prefix();
1185  //}
1186  Pout<< "UPstream::allocateTag " << s
1187  << " : tag:" << tag
1188  << endl;
1189  }
1190 
1191  return tag;
1192 }
1193 
1194 
1195 void Foam::UPstream::freeTag(const char* s, const int tag)
1196 {
1197  if (debug)
1198  {
1199  //if (UPstream::lateBlocking > 0)
1200  //{
1201  // string& poutp = Pout.prefix();
1202  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
1203  // Perr.prefix() = Pout.prefix();
1204  //}
1205  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
1206  }
1208 }
1209 
1210 
1211 void Foam::UPstream::freeTag(const word& s, const int tag)
1212 {
1213  if (debug)
1214  {
1215  //if (UPstream::lateBlocking > 0)
1216  //{
1217  // string& poutp = Pout.prefix();
1218  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
1219  // Perr.prefix() = Pout.prefix();
1220  //}
1221  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
1222  }
1224 }
1225 
1226 
1227 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::allocateTag
static int allocateTag(const char *)
Definition: UPstream.C:1137
Foam::UPstream::warnComm
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:288
Foam::UPstream::resetRequests
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:230
int.H
System signed integer.
collatedFileOperation.H
Foam::profilingPstream::addWaitTime
static void addWaitTime()
Add time increment to waitTime.
Definition: profilingPstream.H:159
Foam::error::printStack
static void printStack(Ostream &os)
Helper function to print a stack.
Definition: dummyPrintStack.C:36
SubList.H
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:62
stdFoam::begin
constexpr auto begin(C &c) -> decltype(c.begin())
Return iterator to the beginning of the container c.
Definition: stdFoam.H:97
Foam::PstreamGlobals::freedRequests_
DynamicList< label > freedRequests_
Definition: PstreamGlobals.C:33
s
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Definition: gmvOutputSpray.H:25
Foam::HashTableOps::values
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:149
Foam::read
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:107
Foam::UPstream::parRun
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:415
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:234
Foam::profilingPstream::beginTiming
static void beginTiming()
Update timer prior to measurement.
Definition: profilingPstream.H:123
Foam::PstreamGlobals::outstandingRequests_
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Definition: PstreamGlobals.C:32
Foam::UPstream::abort
static void abort()
Call MPI_Abort with no other checks or cleanup.
Definition: UPstream.C:70
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:350
Foam::Pout
prefixOSstream Pout
An Ostream wrapper for parallel output to std::cout.
Foam::UPstream::allToAll
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:172
Foam::UPstream::gather
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
Definition: UPstream.C:183
Foam::UList::begin
iterator begin()
Return an iterator to begin traversing the UList.
Definition: UListI.H:276
Foam::getEnv
string getEnv(const std::string &envName)
Get environment value for given envName.
Definition: MSwindows.C:371
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::profilingPstream::addScatterTime
static void addScatterTime()
Add time increment to scatterTime.
Definition: profilingPstream.H:147
Foam::PstreamGlobals::MPICommunicators_
DynamicList< MPI_Comm > MPICommunicators_
Definition: PstreamGlobals.C:39
Foam::allReduce
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
Definition: allReduceTemplates.C:36
Foam::PstreamGlobals::MPIGroups_
DynamicList< MPI_Group > MPIGroups_
Definition: PstreamGlobals.C:40
ourMpi
static bool ourMpi
Definition: UPstream.C:63
Foam::reduce
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
Definition: PstreamReduceOps.H:51
Foam::UPstream::mpiBufferSize
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:282
Foam::profilingPstream::addGatherTime
static void addGatherTime()
Add time increment to gatherTime.
Definition: profilingPstream.H:141
Foam::DynamicList::append
DynamicList< T, SizeMin > & append(const T &val)
Append an element to the end of this list.
Definition: DynamicListI.H:472
Foam::UPstream::waitRequest
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:238
Foam::UPstream::addValidParOptions
static void addValidParOptions(HashTable< string > &validParOptions)
Definition: UPstream.C:34
Foam::UPstream::scatter
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
Definition: UPstream.C:198
Foam::FatalError
error FatalError
Pstream.H
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:137
detachOurBuffers
static void detachOurBuffers()
Definition: UPstream.C:109
PstreamReduceOps.H
Inter-processor communication reduction functions.
allReduce.H
Various functions to wrap MPI_Allreduce.
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::UPstream::master
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:439
PstreamGlobals.H
found
bool found
Definition: TABSMDCalcMethod2.H:32
Foam::profilingPstream::addAllToAllTime
static void addAllToAllTime()
Add time increment to allToAllTime.
Definition: profilingPstream.H:165
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:372
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:224
Foam::nl
constexpr char nl
Definition: Ostream.H:385
Foam::vector2D
Vector2D< scalar > vector2D
A 2D vector of scalars obtained from the generic Vector2D.
Definition: vector2D.H:51
Foam::UPstream::initNull
static bool initNull()
Special purpose initialisation function.
Definition: UPstream.C:38
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:285
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
Foam::UPstream::shutdown
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
Definition: UPstream.C:59
Foam::UPstream::finishedRequest
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:242
Foam::DynamicList::remove
T remove()
Remove and return the last element. Fatal on an empty list.
Definition: DynamicListI.H:651
Foam::PstreamGlobals::freedTags_
DynamicList< int > freedTags_
Free'd message tags.
Definition: PstreamGlobals.C:37
Foam::UList::size
void size(const label n) noexcept
Override size to be inconsistent with allocated storage.
Definition: UListI.H:360
ourBuffers
static bool ourBuffers
Definition: UPstream.C:60
attachOurBuffers
static void attachOurBuffers()
Definition: UPstream.C:68
minBufLen
constexpr int minBufLen
Definition: UPstream.C:57
Foam::sumReduce
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
Definition: PstreamReduceOps.H:133
Foam::UPstream::communicator
Helper class for allocating/freeing communicators.
Definition: UPstream.H:321
Foam::labelUList
UList< label > labelUList
A UList of labels.
Definition: UList.H:80
Foam::UPstream::init
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition: UPstream.C:48
Foam::PstreamGlobals::nTags_
int nTags_
Max outstanding message tag operations.
Definition: PstreamGlobals.C:35
WarningInFunction
#define WarningInFunction
Report a warning using Foam::Warning.
Definition: messageStream.H:298
profilingPstream.H
Foam::UPstream::exit
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
Definition: UPstream.C:63
Foam::UPstream::freeTag
static void freeTag(const char *, const int tag)
Definition: UPstream.C:1195