Go to the documentation of this file.
44 #define MPI_SCALAR MPI_FLOAT
45 #define MPI_SOLVESCALAR MPI_FLOAT
46 #elif defined(WM_SPDP)
47 #define MPI_SCALAR MPI_FLOAT
48 #define MPI_SOLVESCALAR MPI_DOUBLE
50 #define MPI_SCALAR MPI_DOUBLE
51 #define MPI_SOLVESCALAR MPI_DOUBLE
83 if (str.empty() || !
Foam::read(str, len) || len <= 0)
95 Foam::Pout<<
"UPstream::init : buffer-size " << len <<
'\n';
98 char* buf =
new char[len];
100 if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
103 Foam::Pout<<
"UPstream::init : could not attach buffer\n";
125 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
141 validParOptions.insert(
"np",
"");
142 validParOptions.insert(
"p4pg",
"PI file");
143 validParOptions.insert(
"p4wd",
"directory");
144 validParOptions.insert(
"p4amslave",
"");
145 validParOptions.insert(
"p4yourname",
"hostname");
146 validParOptions.insert(
"machinefile",
"machine file");
154 MPI_Finalized(&flag);
159 <<
"MPI was already finalized - cannot perform MPI_Init\n"
165 MPI_Initialized(&flag);
170 Pout<<
"UPstream::initNull : was already initialized\n";
196 int numprocs = 0, myRank = 0;
197 int provided_thread_support = 0;
200 MPI_Finalized(&flag);
205 <<
"MPI was already finalized - cannot perform MPI_Init" <<
endl
211 MPI_Initialized(&flag);
220 <<
"MPI was already initialized - cannot perform MPI_Init" <<
nl
221 <<
"This could indicate an application programming error!"
228 Pout<<
"UPstream::init : was already initialized\n";
239 ? MPI_THREAD_MULTIPLE
242 &provided_thread_support
249 label worldIndex = -1;
251 for (
int argi = 1; argi < argc; ++argi)
253 if (strcmp(argv[argi],
"-world") == 0)
259 <<
"Missing world name to argument \"world\""
268 if (worldIndex != -1)
270 for (label i = worldIndex+2; i < argc; i++)
277 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
278 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
282 Pout<<
"UPstream::init :"
283 <<
" thread-support : wanted:" << needsThread
286 provided_thread_support == MPI_THREAD_MULTIPLE
287 ?
"MPI_THREAD_MULTIPLE"
288 :
"MPI_THREAD_SINGLE"
290 <<
" procs:" << numprocs
291 <<
" rank:" << myRank
292 <<
" world:" << world <<
endl;
295 if (worldIndex == -1 && numprocs <= 1)
298 <<
"attempt to run parallel on 1 processor"
303 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
305 if (worldIndex != -1)
315 DynamicList<word> allWorlds(numprocs);
316 for (
const word& world : worlds)
318 allWorlds.appendUniq(world);
320 allWorlds_ = std::move(allWorlds);
322 worldIDs_.setSize(numprocs);
325 const word& world = worlds[proci];
326 worldIDs_[proci] = allWorlds_.find(world);
332 DynamicList<label> subRanks;
337 subRanks.append(proci);
342 const label subComm = allocateCommunicator(0, subRanks,
true);
352 int subNProcs, subRank;
364 Pout<<
"UPstream::init : in world:" << world
365 <<
" using local communicator:" << subComm
366 <<
" with procs:" << subNProcs
367 <<
" and rank:" << subRank
372 Pout.
prefix() =
'[' + world +
'/' +
name(myProcNo(subComm)) +
"] ";
373 Perr.
prefix() =
'[' + world +
'/' +
name(myProcNo(subComm)) +
"] ";
378 worldIDs_.setSize(numprocs, 0);
391 Pout<<
"UPstream::shutdown\n";
396 MPI_Initialized(&flag);
403 MPI_Finalized(&flag);
410 <<
"MPI was already finalized (by a connected program?)\n";
414 Pout<<
"UPstream::shutdown : was already finalized\n";
425 label nOutstanding = 0;
440 <<
"There were still " << nOutstanding
441 <<
" outstanding MPI_Requests." <<
nl
442 <<
"Which means your code exited before doing a "
443 <<
" UPstream::waitRequests()." <<
nl
444 <<
"This should not happen for a normal code exit."
450 forAll(myProcNo_, communicator)
452 if (myProcNo_[communicator] != -1)
454 freePstreamCommunicator(communicator);
465 <<
"Finalizing MPI, but was initialized elsewhere\n";
475 MPI_Abort(MPI_COMM_WORLD, errNo);
490 MPI_Abort(MPI_COMM_WORLD, 1);
497 const sumOp<scalar>& bop,
499 const label communicator
504 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
509 allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
516 const minOp<scalar>& bop,
518 const label communicator
523 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
528 allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
535 const sumOp<vector2D>& bop,
537 const label communicator
542 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
547 allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
556 const label communicator
561 Pout<<
"** sumReduce:" << Value <<
" with comm:" << communicator
566 vector2D twoScalars(Value, scalar(Count));
567 reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
569 Value = twoScalars.x();
570 Count = twoScalars.y();
577 const sumOp<scalar>& bop,
579 const label communicator,
583 iallReduce<scalar>(&Value, 1, MPI_SCALAR, MPI_SUM, communicator, requestID);
591 const sumOp<scalar>& bop,
593 const label communicator,
613 const sumOp<solveScalar>& bop,
615 const label communicator
620 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
625 allReduce(Value, 1, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
632 const minOp<solveScalar>& bop,
634 const label communicator
639 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
644 allReduce(Value, 1, MPI_SOLVESCALAR, MPI_MIN, bop, tag, communicator);
650 Vector2D<solveScalar>& Value,
651 const sumOp<Vector2D<solveScalar>>& bop,
653 const label communicator
658 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
663 allReduce(Value, 2, MPI_SOLVESCALAR, MPI_SUM, bop, tag, communicator);
672 const label communicator
677 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
682 Vector2D<solveScalar> twoScalars(Value, solveScalar(Count));
683 reduce(twoScalars, sumOp<Vector2D<solveScalar>>(), tag, communicator);
685 Value = twoScalars.x();
686 Count = twoScalars.y();
693 const sumOp<solveScalar>& bop,
695 const label communicator,
699 iallReduce<solveScalar>
715 const sumOp<solveScalar>& bop,
717 const label communicator,
721 iallReduce<solveScalar>
738 const label communicator
741 const label np = nProcs(communicator);
745 Pout<<
"** allToAll :"
747 <<
" sendData:" << sendData.size()
748 <<
" with comm:" << communicator
754 if (sendData.size() != np || recvData.size() != np)
757 <<
"Size of sendData " << sendData.size()
758 <<
" or size of recvData " << recvData.size()
759 <<
" is not equal to the number of processors in the domain "
766 recvData.deepCopy(sendData);
778 const_cast<label*
>(sendData.cdata()),
789 <<
"MPI_Alltoall failed for " << sendData
790 <<
" on communicator " << communicator
801 const char* sendData,
816 Pout<<
"** MPI_Alltoallv :"
817 <<
" sendSizes:" << sendSizes
818 <<
" sendOffsets:" << sendOffsets
827 sendSizes.
size() != np
828 || sendOffsets.
size() != np
829 || recvSizes.
size() != np
830 || recvOffsets.
size() != np
834 <<
"Size of sendSize " << sendSizes.
size()
835 <<
", sendOffsets " << sendOffsets.
size()
836 <<
", recvSizes " << recvSizes.
size()
837 <<
" or recvOffsets " << recvOffsets.
size()
838 <<
" is not equal to the number of processors in the domain "
845 if (recvSizes[0] != sendSizes[0])
848 <<
"Bytes to send " << sendSizes[0]
849 <<
" does not equal bytes to receive " << recvSizes[0]
852 std::memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
862 const_cast<char*
>(sendData),
863 const_cast<int*
>(sendSizes.
cdata()),
864 const_cast<int*
>(sendOffsets.
cdata()),
867 const_cast<int*
>(recvSizes.
cdata()),
868 const_cast<int*
>(recvOffsets.
cdata()),
875 <<
"MPI_Alltoallv failed for sendSizes " << sendSizes
876 <<
" recvSizes " << recvSizes
888 const char* sendData,
893 const label communicator
896 const label np = nProcs(communicator);
900 Pout<<
"** MPI_Gather :"
902 <<
" recvSize:" << recvSize
903 <<
" with comm:" << communicator
911 std::memmove(recvData, sendData, recvSize);
921 const_cast<char*
>(sendData),
933 <<
"MPI_Gather failed for sendSize " << sendSize
934 <<
" recvSize " << recvSize
935 <<
" communicator " << communicator
946 const char* sendData,
951 const label communicator
954 const label np = nProcs(communicator);
958 Pout<<
"** MPI_Scatter :"
960 <<
" recvSize:" << recvSize
961 <<
" with comm:" << communicator
969 std::memmove(recvData, sendData, recvSize);
979 const_cast<char*
>(sendData),
991 <<
"MPI_Scatter failed for sendSize " << sendSize
992 <<
" recvSize " << recvSize
993 <<
" communicator " << communicator
1004 const char* sendData,
1008 const UList<int>& recvSizes,
1009 const UList<int>& recvOffsets,
1010 const label communicator
1013 const label np = nProcs(communicator);
1017 Pout<<
"** MPI_Gatherv :"
1019 <<
" recvSizes:" << recvSizes
1020 <<
" recvOffsets:" << recvOffsets
1021 <<
" with comm:" << communicator
1030 && (recvSizes.size() != np || recvOffsets.size() < np)
1037 <<
"Size of recvSizes " << recvSizes.size()
1038 <<
" or recvOffsets " << recvOffsets.size()
1039 <<
" is not equal to the number of processors in the domain "
1047 std::memmove(recvData, sendData, sendSize);
1057 const_cast<char*
>(sendData),
1061 const_cast<int*
>(recvSizes.cdata()),
1062 const_cast<int*
>(recvOffsets.cdata()),
1070 <<
"MPI_Gatherv failed for sendSize " << sendSize
1071 <<
" recvSizes " << recvSizes
1072 <<
" communicator " << communicator
1083 const char* sendData,
1084 const UList<int>& sendSizes,
1085 const UList<int>& sendOffsets,
1089 const label communicator
1092 const label np = nProcs(communicator);
1096 Pout<<
"** MPI_Scatterv :"
1098 <<
" sendSizes:" << sendSizes
1099 <<
" sendOffsets:" << sendOffsets
1100 <<
" with comm:" << communicator
1109 && (sendSizes.size() != np || sendOffsets.size() != np)
1113 <<
"Size of sendSizes " << sendSizes.size()
1114 <<
" or sendOffsets " << sendOffsets.size()
1115 <<
" is not equal to the number of processors in the domain "
1122 std::memmove(recvData, sendData, recvSize);
1132 const_cast<char*
>(sendData),
1133 const_cast<int*
>(sendSizes.cdata()),
1134 const_cast<int*
>(sendOffsets.cdata()),
1145 <<
"MPI_Scatterv failed for sendSizes " << sendSizes
1146 <<
" sendOffsets " << sendOffsets
1147 <<
" communicator " << communicator
1156 void Foam::UPstream::allocatePstreamCommunicator
1158 const label parentIndex,
1165 MPI_Group newGroup = MPI_GROUP_NULL;
1167 MPI_Comm newComm = MPI_COMM_NULL;
1173 <<
"PstreamGlobals out of sync with UPstream data. Problem."
1178 if (parentIndex == -1)
1185 <<
"world communicator should always be index "
1202 procIDs_[index].setSize(numProcs);
1203 forAll(procIDs_[index], i)
1205 procIDs_[index][i] = i;
1214 procIDs_[index].size(),
1215 procIDs_[index].cdata(),
1219 #if defined(MSMPI_VER)
1229 MPI_Comm_create_group
1240 myProcNo_[index] = -1;
1255 <<
" when allocating communicator at " << index
1256 <<
" from ranks " << procIDs_[index]
1257 <<
" of parent " << parentIndex
1258 <<
" cannot find my own rank"
1266 void Foam::UPstream::freePstreamCommunicator(
const label communicator)
1268 if (communicator != 0)
1303 Pout<<
"UPstream::waitRequests : starting wait for "
1305 <<
" outstanding requests starting at " << start <<
endl;
1310 SubList<MPI_Request> waitRequests
1323 waitRequests.size(),
1324 waitRequests.data(),
1330 <<
"MPI_Waitall returned with error" <<
Foam::endl;
1335 resetRequests(start);
1340 Pout<<
"UPstream::waitRequests : finished wait." <<
endl;
1349 Pout<<
"UPstream::waitRequest : starting wait for request:" << i
1357 <<
" outstanding send requests and you are asking for i=" << i
1359 <<
"Maybe you are mixing blocking/non-blocking comms?"
1375 <<
"MPI_Wait returned with error" <<
Foam::endl;
1384 Pout<<
"UPstream::waitRequest : finished wait for request:" << i
1394 Pout<<
"UPstream::finishedRequest : checking request:" << i
1402 <<
" outstanding send requests and you are asking for i=" << i
1404 <<
"Maybe you are mixing blocking/non-blocking comms?"
1418 Pout<<
"UPstream::finishedRequest : finished request:" << i
1446 Pout<<
"UPstream::allocateTag " <<
s
1475 Pout<<
"UPstream::allocateTag " <<
s
1494 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
1510 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
int debug
Static debugging option.
static int allocateTag(const char *)
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
static void resetRequests(const label sz)
Truncate number of outstanding requests.
static void addWaitTime()
Add time increment to waitTime.
static void printStack(Ostream &os)
Helper function to print a stack.
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
A class for handling words, derived from Foam::string.
DynamicList< label > freedRequests_
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
static void mpiGather(const char *sendData, int sendSize, char *recvData, int recvSize, const label communicator=worldComm)
Receive data from all processors on the master (low-level)
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
static void scatterList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Scatter data. Reverse of gatherList.
bool read(const char *buf, int32_t &val)
Same as readInt32.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static bool master(const label communicator=worldComm)
Am I the master process.
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
static void beginTiming()
Update timer prior to measurement.
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
static void abort()
Call MPI_Abort with no other checks or cleanup.
Ostream & endl(Ostream &os)
Add newline and flush stream.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
string getEnv(const std::string &envName)
Get environment value for given envName.
#define forAll(list, i)
Loop across all elements in list.
static void addScatterTime()
Add time increment to scatterTime.
DynamicList< MPI_Comm > MPICommunicators_
List< word > wordList
A List of words.
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
DynamicList< MPI_Group > MPIGroups_
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
static const int mpiBufferSize
MPI buffer-size (bytes)
static void addGatherTime()
Add time increment to gatherTime.
DynamicList< T, SizeMin > & append(const T &val)
Append an element to the end of this list.
static void waitRequest(const label i)
Wait until request i has finished.
static void mpiScatter(const char *sendData, int sendSize, char *recvData, int recvSize, const label communicator=worldComm)
Send data to all processors from master (low-level)
static void addValidParOptions(HashTable< string > &validParOptions)
errorManip< error > abort(error &err)
static void detachOurBuffers()
Inter-processor communication reduction functions.
Various functions to wrap MPI_Allreduce.
errorManipArg< error, int > exit(error &err, const int errNo=1)
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
static void addAllToAllTime()
Add time increment to allToAllTime.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
static int & msgType() noexcept
Message tag of standard messages.
static label nRequests()
Get number of outstanding requests.
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=worldComm)
Send data to all processors from the root of the communicator.
Vector2D< scalar > vector2D
A 2D vector of scalars obtained from the generic Vector2D.
static bool initNull()
Special purpose initialisation function.
static label worldComm
Default communicator (all processors)
static bool & parRun() noexcept
Test if this a parallel run.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
T remove()
Remove and return the last element. Fatal on an empty list.
DynamicList< int > freedTags_
Free'd message tags.
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
void size(const label n)
Older name for setAddressableSize.
static void attachOurBuffers()
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
Helper class for allocating/freeing communicators.
UList< label > labelUList
A UList of labels.
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=worldComm)
Exchange label with all processors (in the communicator).
int nTags_
Max outstanding message tag operations.
#define WarningInFunction
Report a warning using Foam::Warning.
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=worldComm)
Receive data from all processors on the master.
const string & prefix() const noexcept
Return the stream prefix.
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
static void freeTag(const char *, const int tag)