48 const word& objectType,
49 const fileName& fName,
50 const string& masterData,
51 const labelUList& recvSizes,
52 const PtrList<SubList<char>>& slaveData,
53 IOstreamOption streamOpt,
55 const dictionary& headerEntries
60 Pout<<
"OFstreamCollator : Writing master " << label(masterData.size())
61 <<
" bytes to " << fName <<
" using comm " << comm
62 <<
" and " << slaveData.size() <<
" sub-ranks" <<
endl;
66 if (slaveData.set(proci))
69 <<
" size:" << slaveData[proci].size()
75 autoPtr<OSstream> osPtr;
79 osPtr.reset(
new OFstream(fName, streamOpt,
append));
87 decomposedBlockData::writeHeader
107 fileOperations::masterUncollatedFileOperation::
108 maxMasterFileBufferSize == 0
117 const_cast<char*
>(masterData.data()),
118 label(masterData.size())
121 List<std::streamoff> blockOffset;
134 if (osPtr && !osPtr->good())
142 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
147 for (
const label recv : recvSizes)
152 Pout<<
" (overall " << std::to_string(
sum) <<
')';
154 Pout<<
" to " << fName
155 <<
" using comm " << comm <<
endl;
162void* Foam::OFstreamCollator::writeAll(
void *threadarg)
164 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
172 std::lock_guard<std::mutex> guard(handler.mutex_);
173 if (handler.objects_.size())
175 ptr = handler.objects_.pop();
186 PtrList<SubList<char>> slaveData;
187 if (ptr->slaveData_.size())
189 slaveData.resize(ptr->slaveData_.size());
192 if (ptr->slaveData_.set(proci))
199 ptr->slaveData_[proci],
222 <<
"Failed writing " << ptr->pathName_
233 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
237 std::lock_guard<std::mutex> guard(handler.mutex_);
238 handler.threadRunning_ =
false;
245void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const
253 std::lock_guard<std::mutex> guard(mutex_);
256 totalSize += iter()->size();
263 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
271 std::lock_guard<std::mutex> guard(mutex_);
272 Pout<<
"OFstreamCollator : Waiting for buffer space."
273 <<
" Currently in use:" << totalSize
274 <<
" limit:" << maxBufferSize_
275 <<
" files:" << objects_.size()
288 maxBufferSize_(maxBufferSize),
289 threadRunning_(false),
304 const off_t maxBufferSize,
308 maxBufferSize_(maxBufferSize),
309 threadRunning_(false),
330 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
336 if (threadComm_ != -1)
347 const word& objectType,
352 const bool useThread,
362 label maxLocalSize = 0;
364 for (
const label recvSize : recvSizes)
366 totalSize += recvSize;
367 maxLocalSize =
max(maxLocalSize, recvSize);
372 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
376 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
377 <<
" using local comm " << localComm_ <<
endl;
394 else if (totalSize <= maxBufferSize_)
401 Pout<<
"OFstreamCollator : non-thread gather; thread write of "
407 waitForBufferSpace(totalSize);
432 writeData& fileAndData = fileAndDataPtr();
446 for (label proci = 1; proci < slaveData.
size(); proci++)
453 slaveData[proci].
data(),
454 slaveData[proci].size_bytes(),
476 <<
"Cannot send outgoing message. "
477 <<
"to:" << 0 <<
" nBytes:"
485 std::lock_guard<std::mutex> guard(mutex_);
488 objects_.push(fileAndDataPtr.
ptr());
497 Pout<<
"OFstreamCollator : Waiting for write thread"
505 Pout<<
"OFstreamCollator : Starting write thread"
508 thread_.reset(
new std::thread(writeAll,
this));
509 threadRunning_ =
true;
519 Pout<<
"OFstreamCollator : thread gather and write of " << fName
520 <<
" using communicator " << threadComm_ <<
endl;
526 <<
"mpi does not seem to have thread support."
527 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'"
528 <<
" to at least " << totalSize
529 <<
" to be able to do the collating before threading."
535 waitForBufferSpace(
data.size());
539 std::lock_guard<std::mutex> guard(mutex_);
564 Pout<<
"OFstreamCollator : Waiting for write thread"
572 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
574 thread_.reset(
new std::thread(writeAll,
this));
575 threadRunning_ =
true;
592 Pout<<
"OFstreamCollator : waiting for thread to have consumed all"
595 waitForBufferSpace(-1);
The IOstreamOption is a simple container for options an IOstream can normally have.
virtual ~OFstreamCollator()
Destructor.
void waitAll()
Wait for all thread actions to have finished.
virtual const fileName & name() const
Read/write access to the name of the stream.
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
const T * set(const label i) const
void setSize(const label newLen)
Same as resize()
virtual bool read()
Re-read model coefficients if they have changed.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
void size(const label n)
Older name for setAddressableSize.
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Inter-processor communications stream.
commsTypes
Types of communications.
@ nonBlocking
"nonBlocking"
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
static int & msgType() noexcept
Message tag of standard messages.
static label nRequests()
Get number of outstanding requests.
static bool haveThreads() noexcept
Have support for threads.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
label size() const noexcept
The number of elements in the list.
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
T * ptr() noexcept
Same as release().
Database for solution data, solver performance and other reduced data.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char > > &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
A class for handling file names.
virtual bool write()
Write the output fields.
Base class for writing single files from the function objects.
splitCell * master() const
A class for handling words, derived from Foam::string.
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
OBJstream os(runTime.globalPath()/outputName)
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
labelList identity(const label len, label start=0)
Return an identity map of the given length with (map[i] == i)
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Ostream & endl(Ostream &os)
Add newline and flush stream.
errorManip< error > abort(error &err)
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void writeData(Ostream &os, const Type &val)
errorManipArg< error, int > exit(error &err, const int errNo=1)
#define forAll(list, i)
Loop across all elements in list.
#define forAllConstIters(container, iter)
Iterate across all elements of the container object with const access.