Go to the documentation of this file.
43 #if defined(WM_SP) || defined(WM_SPDP)
44 #define MPI_SCALAR MPI_FLOAT
46 #define MPI_SCALAR MPI_DOUBLE
78 if (str.empty() || !
Foam::read(str, len) || len <= 0)
90 Foam::Pout<<
"UPstream::init : buffer-size " << len <<
'\n';
93 char* buf =
new char[len];
95 if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
98 Foam::Pout<<
"UPstream::init : could not attach buffer\n";
120 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
136 validParOptions.insert(
"np",
"");
137 validParOptions.insert(
"p4pg",
"PI file");
138 validParOptions.insert(
"p4wd",
"directory");
139 validParOptions.insert(
"p4amslave",
"");
140 validParOptions.insert(
"p4yourname",
"hostname");
141 validParOptions.insert(
"machinefile",
"machine file");
149 MPI_Finalized(&flag);
154 <<
"MPI was already finalized - cannot perform MPI_Init\n"
160 MPI_Initialized(&flag);
165 Pout<<
"UPstream::initNull : was already initialized\n";
191 int numprocs = 0, myRank = 0;
192 int provided_thread_support = 0;
195 MPI_Finalized(&flag);
200 <<
"MPI was already finalized - cannot perform MPI_Init" <<
endl
206 MPI_Initialized(&flag);
215 <<
"MPI was already initialized - cannot perform MPI_Init" <<
nl
216 <<
"This could indicate an application programming error!"
223 Pout<<
"UPstream::init : was already initialized\n";
234 ? MPI_THREAD_MULTIPLE
237 &provided_thread_support
243 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
244 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
248 Pout<<
"UPstream::init : procs=" << numprocs
249 <<
" rank:" << myRank <<
endl;
255 <<
"attempt to run parallel on 1 processor"
260 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
272 Pout<<
"UPstream::exit\n";
277 MPI_Initialized(&flag);
285 MPI_Finalized(&flag);
292 <<
"MPI was already finalized (by a connected program?)\n";
296 Pout<<
"UPstream::exit : was already finalized\n";
311 <<
"There were still " << nOutstanding
312 <<
" outstanding MPI_Requests." <<
nl
313 <<
"Which means your code exited before doing a "
314 <<
" UPstream::waitRequests()." <<
nl
315 <<
"This should not happen for a normal code exit."
320 forAll(myProcNo_, communicator)
322 if (myProcNo_[communicator] != -1)
324 freePstreamCommunicator(communicator);
335 <<
"Finalizing MPI, but was initialized elsewhere\n";
344 MPI_Abort(MPI_COMM_WORLD, errnum);
354 MPI_Abort(MPI_COMM_WORLD, 1);
361 const sumOp<scalar>& bop,
363 const label communicator
368 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
373 allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
380 const minOp<scalar>& bop,
382 const label communicator
387 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
392 allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
399 const sumOp<vector2D>& bop,
401 const label communicator
406 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
411 allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
420 const label communicator
425 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
430 vector2D twoScalars(Value, scalar(Count));
431 reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
433 Value = twoScalars.x();
434 Count = twoScalars.y();
441 const sumOp<scalar>& bop,
443 const label communicator,
447 #ifdef MPIX_COMM_TYPE_SHARED
469 Pout<<
"UPstream::allocateRequest for non-blocking reduce"
470 <<
" : request:" << requestID
475 reduce(Value, bop, tag, communicator);
485 const label communicator
488 label np = nProcs(communicator);
490 if (sendData.size() != np || recvData.size() != np)
493 <<
"Size of sendData " << sendData.size()
494 <<
" or size of recvData " << recvData.size()
495 <<
" is not equal to the number of processors in the domain "
502 recvData.deepCopy(sendData);
514 const_cast<label*
>(sendData.begin()),
525 <<
"MPI_Alltoall failed for " << sendData
526 <<
" on communicator " << communicator
537 const char* sendData,
552 sendSizes.
size() != np
553 || sendOffsets.
size() != np
554 || recvSizes.
size() != np
555 || recvOffsets.
size() != np
559 <<
"Size of sendSize " << sendSizes.
size()
560 <<
", sendOffsets " << sendOffsets.
size()
561 <<
", recvSizes " << recvSizes.
size()
562 <<
" or recvOffsets " << recvOffsets.
size()
563 <<
" is not equal to the number of processors in the domain "
570 if (recvSizes[0] != sendSizes[0])
573 <<
"Bytes to send " << sendSizes[0]
574 <<
" does not equal bytes to receive " << recvSizes[0]
577 std::memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
587 const_cast<char*
>(sendData),
588 const_cast<int*
>(sendSizes.
begin()),
589 const_cast<int*
>(sendOffsets.
begin()),
592 const_cast<int*
>(recvSizes.
begin()),
593 const_cast<int*
>(recvOffsets.
begin()),
600 <<
"MPI_Alltoallv failed for sendSizes " << sendSizes
601 <<
" recvSizes " << recvSizes
613 const char* sendData,
619 const label communicator
622 label np = nProcs(communicator);
627 && (recvSizes.
size() != np || recvOffsets.
size() < np)
634 <<
"Size of recvSizes " << recvSizes.
size()
635 <<
" or recvOffsets " << recvOffsets.
size()
636 <<
" is not equal to the number of processors in the domain "
643 std::memmove(recvData, sendData, sendSize);
653 const_cast<char*
>(sendData),
657 const_cast<int*
>(recvSizes.
begin()),
658 const_cast<int*
>(recvOffsets.
begin()),
666 <<
"MPI_Gatherv failed for sendSize " << sendSize
667 <<
" recvSizes " << recvSizes
668 <<
" communicator " << communicator
679 const char* sendData,
680 const UList<int>& sendSizes,
681 const UList<int>& sendOffsets,
685 const label communicator
688 label np = nProcs(communicator);
693 && (sendSizes.size() != np || sendOffsets.size() != np)
697 <<
"Size of sendSizes " << sendSizes.size()
698 <<
" or sendOffsets " << sendOffsets.size()
699 <<
" is not equal to the number of processors in the domain "
706 std::memmove(recvData, sendData, recvSize);
716 const_cast<char*
>(sendData),
717 const_cast<int*
>(sendSizes.begin()),
718 const_cast<int*
>(sendOffsets.begin()),
729 <<
"MPI_Scatterv failed for sendSizes " << sendSizes
730 <<
" sendOffsets " << sendOffsets
731 <<
" communicator " << communicator
740 void Foam::UPstream::allocatePstreamCommunicator
742 const label parentIndex,
749 MPI_Group newGroup = MPI_GROUP_NULL;
751 MPI_Comm newComm = MPI_COMM_NULL;
757 <<
"PstreamGlobals out of sync with UPstream data. Problem."
762 if (parentIndex == -1)
769 <<
"world communicator should always be index "
786 procIDs_[index].setSize(numProcs);
787 forAll(procIDs_[index], i)
789 procIDs_[index][i] = i;
798 procIDs_[index].size(),
799 procIDs_[index].
begin(),
813 myProcNo_[index] = -1;
828 <<
" when allocating communicator at " << index
829 <<
" from ranks " << procIDs_[index]
830 <<
" of parent " << parentIndex
831 <<
" cannot find my own rank"
839 void Foam::UPstream::freePstreamCommunicator(
const label communicator)
876 Pout<<
"UPstream::waitRequests : starting wait for "
878 <<
" outstanding requests starting at " <<
start <<
endl;
883 SubList<MPI_Request> waitRequests
897 waitRequests.begin(),
903 <<
"MPI_Waitall returned with error" <<
Foam::endl;
908 resetRequests(
start);
913 Pout<<
"UPstream::waitRequests : finished wait." <<
endl;
922 Pout<<
"UPstream::waitRequest : starting wait for request:" << i
930 <<
" outstanding send requests and you are asking for i=" << i
932 <<
"Maybe you are mixing blocking/non-blocking comms?"
948 <<
"MPI_Wait returned with error" <<
Foam::endl;
955 Pout<<
"UPstream::waitRequest : finished wait for request:" << i
965 Pout<<
"UPstream::finishedRequest : checking request:" << i
973 <<
" outstanding send requests and you are asking for i=" << i
975 <<
"Maybe you are mixing blocking/non-blocking comms?"
989 Pout<<
"UPstream::finishedRequest : finished request:" << i
1017 Pout<<
"UPstream::allocateTag " <<
s
1046 Pout<<
"UPstream::allocateTag " <<
s
1065 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
1081 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
int debug
Static debugging option.
static int allocateTag(const char *)
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
static void resetRequests(const label sz)
Truncate number of outstanding requests.
static void addWaitTime()
Add time increment to waitTime.
static void printStack(Ostream &os)
Helper function to print a stack.
A class for handling words, derived from Foam::string.
constexpr auto begin(C &c) -> decltype(c.begin())
Return iterator to the beginning of the container c.
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
static void exit(int errnum=1)
Exit program.
bool read(const char *buf, int32_t &val)
Same as readInt32.
static bool & parRun()
Is this a parallel run?
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static void beginTiming()
Update timer prior to measurement.
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
static void abort()
Abort program.
Ostream & endl(Ostream &os)
Add newline and flush stream.
prefixOSstream Pout
An Ostream wrapper for parallel output to std::cout.
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
iterator begin()
Return an iterator to begin traversing the UList.
string getEnv(const std::string &envName)
Get environment value for given envName.
#define forAll(list, i)
Loop across all elements in list.
static void addScatterTime()
Add time increment to scatterTime.
DynamicList< MPI_Comm > MPICommunicators_
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
DynamicList< MPI_Group > MPIGroups_
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
static const int mpiBufferSize
MPI buffer-size (bytes)
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
static void addGatherTime()
Add time increment to gatherTime.
DynamicList< T, SizeMin > & append(const T &val)
Append an element to the end of this list.
static void waitRequest(const label i)
Wait until request i has finished.
static void addValidParOptions(HashTable< string > &validParOptions)
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
errorManip< error > abort(error &err)
static void detachOurBuffers()
Inter-processor communication reduction functions.
Various functions to wrap MPI_Allreduce.
errorManipArg< error, int > exit(error &err, const int errNo=1)
static bool master(const label communicator=0)
Am I the master process.
static void addAllToAllTime()
Add time increment to allToAllTime.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static label nRequests()
Get number of outstanding requests.
Vector2D< scalar > vector2D
A 2D vector of scalars obtained from the generic Vector2D.
static bool initNull()
Special purpose initialisation function.
static label worldComm
Default communicator (all processors)
label ListType::const_reference const label start
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
T remove()
Remove and return the last element. Fatal on an empty list.
DynamicList< int > freedTags_
Free'd message tags.
void size(const label n) noexcept
Override size to be inconsistent with allocated storage.
static void attachOurBuffers()
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
Helper class for allocating/freeing communicators.
UList< label > labelUList
A UList of labels.
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
int nTags_
Max outstanding message tag operations.
#define WarningInFunction
Report a warning using Foam::Warning.
static void freeTag(const char *, const int tag)