Go to the documentation of this file.
44 bool Foam::OFstreamCollator::writeFile
48 const fileName& fName,
49 const string& masterData,
51 const PtrList<SubList<char>>& slaveData,
52 IOstream::streamFormat fmt,
53 IOstream::versionNumber ver,
54 IOstream::compressionType cmp,
60 Pout<<
"OFstreamCollator : Writing master " << masterData.size()
61 <<
" bytes to " << fName
62 <<
" using comm " << comm <<
endl;
65 Pout<<
"OFstreamCollator : Slave data" <<
endl;
68 if (slaveData.set(proci))
71 <<
" size:" << slaveData[proci].size()
78 autoPtr<OSstream> osPtr;
97 OSstream& os = osPtr();
114 const_cast<char*
>(masterData.data()),
115 label(masterData.size())
123 List<std::streamoff> start;
133 fileOperations::masterUncollatedFileOperation::
134 maxMasterFileBufferSize == 0
141 if (osPtr && !osPtr->good())
149 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
154 for (
const label recv : recvSizes)
160 std::ostringstream os;
162 Pout<<
" (overall " << os.str() <<
")";
164 Pout<<
" to " << fName
165 <<
" using comm " << comm <<
endl;
172 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
174 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
182 std::lock_guard<std::mutex> guard(handler.mutex_);
183 if (handler.objects_.size())
185 ptr = handler.objects_.pop();
196 PtrList<SubList<char>> slaveData;
197 if (ptr->slaveData_.size())
199 slaveData.setSize(ptr->slaveData_.size());
202 if (ptr->slaveData_.set(proci))
209 ptr->slaveData_[proci],
233 <<
"Failed writing " << ptr->pathName_
244 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
248 std::lock_guard<std::mutex> guard(handler.mutex_);
249 handler.threadRunning_ =
false;
256 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const
264 std::lock_guard<std::mutex> guard(mutex_);
267 totalSize += iter()->size();
274 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
282 std::lock_guard<std::mutex> guard(mutex_);
283 Pout<<
"OFstreamCollator : Waiting for buffer space."
284 <<
" Currently in use:" << totalSize
285 <<
" limit:" << maxBufferSize_
286 <<
" files:" << objects_.size()
299 maxBufferSize_(maxBufferSize),
300 threadRunning_(false),
315 const off_t maxBufferSize,
319 maxBufferSize_(maxBufferSize),
320 threadRunning_(
false),
341 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
347 if (threadComm_ != -1)
358 const word& typeName,
374 label maxLocalSize = 0;
376 for (label proci = 0; proci < recvSizes.size(); proci++)
378 totalSize += recvSizes[proci];
379 maxLocalSize =
max(maxLocalSize, recvSizes[proci]);
385 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
389 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
390 <<
" using local comm " << localComm_ <<
endl;
408 else if (totalSize <= maxBufferSize_)
415 Pout<<
"OFstreamCollator : non-thread gather; thread write of "
421 waitForBufferSpace(totalSize);
447 writeData& fileAndData = fileAndDataPtr();
453 slaveData.
setSize(recvSizes.size());
461 for (label proci = 1; proci < slaveData.size(); proci++)
468 reinterpret_cast<char*
>(slaveData[proci].
data()),
469 slaveData[proci].byteSize(),
483 reinterpret_cast<const char*
>(slice.
cdata()),
491 <<
"Cannot send outgoing message. "
492 <<
"to:" << 0 <<
" nBytes:"
500 std::lock_guard<std::mutex> guard(mutex_);
503 objects_.push(fileAndDataPtr.
ptr());
512 Pout<<
"OFstreamCollator : Waiting for write thread"
520 Pout<<
"OFstreamCollator : Starting write thread"
523 thread_.reset(
new std::thread(writeAll,
this));
524 threadRunning_ =
true;
534 Pout<<
"OFstreamCollator : thread gather and write of " << fName
535 <<
" using communicator " << threadComm_ <<
endl;
541 <<
"mpi does not seem to have thread support."
542 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'"
543 <<
" to at least " << totalSize
544 <<
" to be able to do the collating before threading."
550 waitForBufferSpace(
data.size());
554 std::lock_guard<std::mutex> guard(mutex_);
580 Pout<<
"OFstreamCollator : Waiting for write thread"
588 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
590 thread_.reset(
new std::thread(writeAll,
this));
591 threadRunning_ =
true;
608 Pout<<
"OFstreamCollator : waiting for thread to have consumed all"
611 waitForBufferSpace(-1);
int debug
Static debugging option.
A class for handling words, derived from Foam::string.
A class for handling file names.
static bool haveThreads()
Have support for threads.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static bool master(const label communicator=worldComm)
Am I the master process.
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
Ostream & endl(Ostream &os)
Add newline and flush stream.
const T * set(const label i) const
Return const pointer to element (can be nullptr),.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
#define forAll(list, i)
Loop across all elements in list.
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Inter-processor communications stream.
Representation of a major/minor version number.
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string ¬e, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
void setSize(const label newLen)
Same as resize()
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
T * ptr() noexcept
Same as release().
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
static void writeData(Ostream &os, const Type &val)
streamFormat
Data format (ascii | binary)
static const string null
An empty string.
errorManip< error > abort(error &err)
void waitAll()
Wait for all thread actions to have finished.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
static int & msgType()
Message tag of standard messages.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static label nRequests()
Get number of outstanding requests.
forAllConstIters(mixture.phases(), phase)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
labelList identity(const label len, label start=0)
Create identity map of the given length with (map[i] == i)
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
std::streamsize byteSize() const
const T * cdata() const
Return a const pointer to the first data element.
compressionType
Compression treatment (UNCOMPRESSED | COMPRESSED)
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
UList< label > labelUList
A UList of labels.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
defineTypeNameAndDebug(combustionModel, 0)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Database for solution data, solver performance and other reduced data.
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
virtual ~OFstreamCollator()
Destructor.