34void Foam::PstreamBuffers::finalExchange
42 finishedSendsCalled_ =
true;
49 Pstream::exchange<DynamicList<char>,
char>
62void Foam::PstreamBuffers::finalExchange
72 finishedSendsCalled_ =
true;
86 Pstream::exchange<DynamicList<char>,
char>
99void Foam::PstreamBuffers::finalExchangeGatherScatter
107 finishedSendsCalled_ =
true;
121 recvSizes.resize_nocopy(recvBuf_.size());
129 recvSizes.resize_nocopy(sendBuf_.size());
135 recvSizes[proci] = sendBuf_[proci].size();
142 recvSizes[0] = myRecv;
146 Pstream::exchange<DynamicList<char>,
char>
169 finishedSendsCalled_(false),
170 allowClearRecv_(true),
172 commsType_(commsType),
189 finishedSendsCalled_(false),
190 allowClearRecv_(true),
192 commsType_(commsType),
206 forAll(recvBufPos_, proci)
208 if (recvBufPos_[proci] < recvBuf_[proci].size())
211 <<
"Message from processor " << proci
212 <<
" Only consumed " << recvBufPos_[proci] <<
" of "
213 << recvBuf_[proci].size() <<
" bytes" <<
nl
234 finishedSendsCalled_ =
false;
240 recvBuf_[proci].clear();
241 recvBufPos_[proci] = 0;
259 finishedSendsCalled_ =
false;
278 if (finishedSendsCalled_)
280 forAll(recvBufPos_, proci)
282 if (recvBuf_[proci].size() > recvBufPos_[proci])
302 return sendBuf_[proci].size();
308 if (finishedSendsCalled_)
310 const label len(recvBuf_[proci].size() > recvBufPos_[proci]);
333 if (finishedSendsCalled_)
335 forAll(recvBufPos_, proci)
337 const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
360 if (finishedSendsCalled_)
362 const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
368 const_cast<char*
>(&recvBuf_[proci][recvBufPos_[proci]]),
387 bool old(allowClearRecv_);
388 allowClearRecv_ = on;
396 finalExchange(recvSizes, wait);
406 finalExchange(recvSizes, wait);
411 <<
"Obtaining sizes not supported in "
413 <<
" since transfers already in progress. Use non-blocking instead."
430 finalExchange(sendProcs, recvProcs, recvSizes, wait);
442 finalExchange(sendProcs, recvProcs, recvSizes, wait);
447 <<
"Obtaining sizes not supported in "
449 <<
" since transfers already in progress. Use non-blocking instead."
466 bool changed = (sendConnections.
size() != nProcs());
470 sendConnections.
resize(nProcs());
478 if (sendConnections.
set(proci, !sendBuf_[proci].empty()))
496 if (!sendBuf_[proci].empty())
509 if (!recvBuf_[proci].empty())
519 finishedSends(sendProcs, recvProcs, wait);
528 finalExchangeGatherScatter(
true, wait);
534 finalExchangeGatherScatter(
false, wait);
544 finalExchangeGatherScatter(
true, wait);
549 <<
"Obtaining sizes not supported in "
551 <<
" since transfers already in progress. Use non-blocking instead."
561 recvSizes = recvDataCounts();
571 finalExchangeGatherScatter(
false, wait);
576 <<
"Obtaining sizes not supported in "
578 <<
" since transfers already in progress. Use non-blocking instead."
588 recvSizes = recvDataCounts();
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void append(const T &val)
Copy append an element to the end of this list.
streamFormat
Data format (ascii | binary)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
label size() const noexcept
Number of entries.
Buffers for inter-processor communications streams (UOPstream, UIPstream).
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
const UList< char > peekRecvData(const label proci) const
bool allowClearRecv() const noexcept
void clearStorage()
Clear individual buffer storage and reset states.
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
bool hasSendData() const
True if any (local) send buffers have data.
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
labelList recvDataCounts() const
label recvDataCount(const label proci) const
void finishedSends(const bool wait=true)
Mark sends as done.
void clear()
Clear individual buffers and reset states.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendData, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Inter-processor communications stream.
commsTypes
Types of communications.
@ nonBlocking
"nonBlocking"
static const Enum< commsTypes > commsTypeNames
Names of the communication types.
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
void set(const bitSet &bitset)
Set specified bits from another bitset.
splitCell * master() const
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
List< label > labelList
A List of labels.
Ostream & endl(Ostream &os)
Add newline and flush stream.
void reduce(const List< UPstream::commsStruct > &comms, T &value, const BinaryOp &bop, const int tag, const label comm)
errorManip< error > abort(error &err)
static constexpr const zero Zero
Global zero (0)
errorManipArg< error, int > exit(error &err, const int errNo=1)
UList< label > labelUList
A UList of labels.
constexpr char nl
The newline '\n' character (0x0a)
#define forAll(list, i)
Loop across all elements in list.