Go to the documentation of this file.
45 bool Foam::OFstreamCollator::writeFile
48 const word& objectType,
49 const fileName& fName,
50 const string& masterData,
52 const PtrList<SubList<char>>& slaveData,
53 IOstreamOption streamOpt,
55 const dictionary& headerEntries
60 Pout<<
"OFstreamCollator : Writing master " << masterData.size()
61 <<
" bytes to " << fName
62 <<
" using comm " << comm <<
endl;
66 Pout<<
"OFstreamCollator : Slave data" <<
endl;
69 if (slaveData.set(proci))
72 <<
" size:" << slaveData[proci].size()
79 autoPtr<OSstream> osPtr;
83 osPtr.reset(
new OFstream(fName, streamOpt,
append));
107 const_cast<char*
>(masterData.data()),
108 label(masterData.size())
116 List<std::streamoff> blockOffset;
126 fileOperations::masterUncollatedFileOperation::
127 maxMasterFileBufferSize == 0
134 if (osPtr && !osPtr->good())
142 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
147 for (
const label recv : recvSizes)
152 Pout<<
" (overall " << std::to_string(
sum) <<
')';
154 Pout<<
" to " << fName
155 <<
" using comm " << comm <<
endl;
162 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
164 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
172 std::lock_guard<std::mutex> guard(handler.mutex_);
173 if (handler.objects_.size())
175 ptr = handler.objects_.pop();
186 PtrList<SubList<char>> slaveData;
187 if (ptr->slaveData_.size())
189 slaveData.resize(ptr->slaveData_.size());
192 if (ptr->slaveData_.set(proci))
199 ptr->slaveData_[proci],
222 <<
"Failed writing " << ptr->pathName_
233 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
237 std::lock_guard<std::mutex> guard(handler.mutex_);
238 handler.threadRunning_ =
false;
245 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const
253 std::lock_guard<std::mutex> guard(mutex_);
256 totalSize += iter()->size();
263 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
271 std::lock_guard<std::mutex> guard(mutex_);
272 Pout<<
"OFstreamCollator : Waiting for buffer space."
273 <<
" Currently in use:" << totalSize
274 <<
" limit:" << maxBufferSize_
275 <<
" files:" << objects_.size()
288 maxBufferSize_(maxBufferSize),
289 threadRunning_(false),
304 const off_t maxBufferSize,
308 maxBufferSize_(maxBufferSize),
309 threadRunning_(
false),
330 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
336 if (threadComm_ != -1)
347 const word& objectType,
352 const bool useThread,
362 label maxLocalSize = 0;
364 for (
const label recvSize : recvSizes)
366 totalSize += recvSize;
367 maxLocalSize =
max(maxLocalSize, recvSize);
373 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
377 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
378 <<
" using local comm " << localComm_ <<
endl;
395 else if (totalSize <= maxBufferSize_)
402 Pout<<
"OFstreamCollator : non-thread gather; thread write of "
408 waitForBufferSpace(totalSize);
433 writeData& fileAndData = fileAndDataPtr();
439 slaveData.
setSize(recvSizes.size());
447 for (label proci = 1; proci < slaveData.size(); proci++)
454 slaveData[proci].
data(),
455 slaveData[proci].size_bytes(),
477 <<
"Cannot send outgoing message. "
478 <<
"to:" << 0 <<
" nBytes:"
486 std::lock_guard<std::mutex> guard(mutex_);
489 objects_.push(fileAndDataPtr.
ptr());
498 Pout<<
"OFstreamCollator : Waiting for write thread"
506 Pout<<
"OFstreamCollator : Starting write thread"
509 thread_.reset(
new std::thread(writeAll,
this));
510 threadRunning_ =
true;
520 Pout<<
"OFstreamCollator : thread gather and write of " << fName
521 <<
" using communicator " << threadComm_ <<
endl;
527 <<
"mpi does not seem to have thread support."
528 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'"
529 <<
" to at least " << totalSize
530 <<
" to be able to do the collating before threading."
536 waitForBufferSpace(
data.size());
540 std::lock_guard<std::mutex> guard(mutex_);
565 Pout<<
"OFstreamCollator : Waiting for write thread"
573 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
575 thread_.reset(
new std::thread(writeAll,
this));
576 threadRunning_ =
true;
593 Pout<<
"OFstreamCollator : waiting for thread to have consumed all"
596 waitForBufferSpace(-1);
int debug
Static debugging option.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string ¬e, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
A class for handling words, derived from Foam::string.
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Write given buffer to given processor.
A class for handling file names.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static bool master(const label communicator=worldComm)
Am I the master process.
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
Ostream & endl(Ostream &os)
Add newline and flush stream.
const T * set(const label i) const
Return const pointer to element (can be nullptr),.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Read into given buffer from given processor.
static bool haveThreads() noexcept
Have support for threads.
#define forAll(list, i)
Loop across all elements in list.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Inter-processor communications stream.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
The IOstreamOption is a simple container for options an IOstream can normally have.
void setSize(const label newLen)
Same as resize()
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
T * ptr() noexcept
Same as release().
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
static void writeData(Ostream &os, const Type &val)
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
static const string null
An empty string.
OBJstream os(runTime.globalPath()/outputName)
errorManip< error > abort(error &err)
void waitAll()
Wait for all thread actions to have finished.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static int & msgType() noexcept
Message tag of standard messages.
static label nRequests()
Get number of outstanding requests.
forAllConstIters(mixture.phases(), phase)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
labelList identity(const label len, label start=0)
Create identity map of the given length with (map[i] == i)
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, const bool append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
UList< label > labelUList
A UList of labels.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
defineTypeNameAndDebug(combustionModel, 0)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Database for solution data, solver performance and other reduced data.
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
virtual ~OFstreamCollator()
Destructor.