OFstreamCollator.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017-2018 OpenFOAM Foundation
9 Copyright (C) 2019-2021 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "OFstreamCollator.H"
30#include "OFstream.H"
31#include "decomposedBlockData.H"
32#include "dictionary.H"
34
35// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36
37namespace Foam
38{
40}
41
42
43// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
44
46(
47 const label comm,
48 const word& objectType,
49 const fileName& fName,
50 const string& masterData,
51 const labelUList& recvSizes,
52 const PtrList<SubList<char>>& slaveData, // optional slave data
53 IOstreamOption streamOpt,
54 const bool append,
55 const dictionary& headerEntries
56)
57{
58 if (debug)
59 {
60 Pout<< "OFstreamCollator : Writing master " << label(masterData.size())
61 << " bytes to " << fName << " using comm " << comm
62 << " and " << slaveData.size() << " sub-ranks" << endl;
63
64 forAll(slaveData, proci)
65 {
66 if (slaveData.set(proci))
67 {
68 Pout<< " " << proci
69 << " size:" << slaveData[proci].size()
70 << endl;
71 }
72 }
73 }
74
75 autoPtr<OSstream> osPtr;
76 if (UPstream::master(comm))
77 {
78 Foam::mkDir(fName.path());
79 osPtr.reset(new OFstream(fName, streamOpt, append));
80 auto& os = *osPtr;
81
82 if (!append)
83 {
84 // No IOobject so cannot use IOobject::writeHeader
85
86 // FoamFile
87 decomposedBlockData::writeHeader
88 (
89 os,
90 streamOpt, // streamOpt for container
91 objectType,
92 "", // note
93 "", // location (leave empty instead inaccurate)
94 fName.name(), // object name
95 headerEntries
96 );
97 }
98 }
99
100 // Assuming threaded writing hides any slowness so we
101 // can use scheduled communication to send the data to
102 // the master processor in order. However can be unstable
103 // for some mpi so default is non-blocking.
104 const UPstream::commsTypes myCommsType
105 (
106 (
107 fileOperations::masterUncollatedFileOperation::
108 maxMasterFileBufferSize == 0
109 )
112 );
113
114
115 UList<char> slice
116 (
117 const_cast<char*>(masterData.data()),
118 label(masterData.size())
119 );
120
121 List<std::streamoff> blockOffset;
123 (
124 comm,
125 osPtr,
126 blockOffset,
127 slice,
128 recvSizes,
129 slaveData,
130 myCommsType,
131 false // do not reduce return state
132 );
133
134 if (osPtr && !osPtr->good())
135 {
137 << "Failed writing to " << fName << exit(FatalIOError);
138 }
139
140 if (debug)
141 {
142 Pout<< "OFstreamCollator : Finished writing " << masterData.size()
143 << " bytes";
144 if (UPstream::master(comm))
145 {
146 off_t sum = 0;
147 for (const label recv : recvSizes)
148 {
149 sum += recv;
150 }
151 // Use std::to_string to display long int
152 Pout<< " (overall " << std::to_string(sum) << ')';
153 }
154 Pout<< " to " << fName
155 << " using comm " << comm << endl;
156 }
157
158 return true;
159}
160
161
162void* Foam::OFstreamCollator::writeAll(void *threadarg)
163{
164 OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
165
166 // Consume stack
167 while (true)
168 {
169 writeData* ptr = nullptr;
170
171 {
172 std::lock_guard<std::mutex> guard(handler.mutex_);
173 if (handler.objects_.size())
174 {
175 ptr = handler.objects_.pop();
176 }
177 }
178
179 if (!ptr)
180 {
181 break;
182 }
183 else
184 {
185 // Convert storage to pointers
186 PtrList<SubList<char>> slaveData;
187 if (ptr->slaveData_.size())
188 {
189 slaveData.resize(ptr->slaveData_.size());
190 forAll(slaveData, proci)
191 {
192 if (ptr->slaveData_.set(proci))
193 {
194 slaveData.set
195 (
196 proci,
197 new SubList<char>
198 (
199 ptr->slaveData_[proci],
200 ptr->sizes_[proci]
201 )
202 );
203 }
204 }
205 }
206
207 bool ok = writeFile
208 (
209 ptr->comm_,
210 ptr->objectType_,
211 ptr->pathName_,
212 ptr->data_,
213 ptr->sizes_,
214 slaveData,
215 ptr->streamOpt_,
216 ptr->append_,
217 ptr->headerEntries_
218 );
219 if (!ok)
220 {
221 FatalIOErrorInFunction(ptr->pathName_)
222 << "Failed writing " << ptr->pathName_
223 << exit(FatalIOError);
224 }
225
226 delete ptr;
227 }
228 //sleep(1);
229 }
230
231 if (debug)
232 {
233 Pout<< "OFstreamCollator : Exiting write thread " << endl;
234 }
235
236 {
237 std::lock_guard<std::mutex> guard(handler.mutex_);
238 handler.threadRunning_ = false;
239 }
240
241 return nullptr;
242}
243
244
245void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
246{
247 while (true)
248 {
249 // Count files to be written
250 off_t totalSize = 0;
251
252 {
253 std::lock_guard<std::mutex> guard(mutex_);
254 forAllConstIters(objects_, iter)
255 {
256 totalSize += iter()->size();
257 }
258 }
259
260 if
261 (
262 totalSize == 0
263 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
264 )
265 {
266 break;
267 }
268
269 if (debug)
270 {
271 std::lock_guard<std::mutex> guard(mutex_);
272 Pout<< "OFstreamCollator : Waiting for buffer space."
273 << " Currently in use:" << totalSize
274 << " limit:" << maxBufferSize_
275 << " files:" << objects_.size()
276 << endl;
277 }
278
279 sleep(5);
280 }
281}
282
283
284// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
285
287:
288 maxBufferSize_(maxBufferSize),
289 threadRunning_(false),
290 localComm_(UPstream::worldComm),
291 threadComm_
292 (
293 UPstream::allocateCommunicator
294 (
295 localComm_,
296 identity(UPstream::nProcs(localComm_))
297 )
298 )
299{}
300
301
303(
304 const off_t maxBufferSize,
305 const label comm
306)
307:
308 maxBufferSize_(maxBufferSize),
309 threadRunning_(false),
310 localComm_(comm),
311 threadComm_
312 (
313 UPstream::allocateCommunicator
314 (
315 localComm_,
316 identity(UPstream::nProcs(localComm_))
317 )
318 )
319{}
320
321
322// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
323
325{
326 if (thread_)
327 {
328 if (debug)
329 {
330 Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
331 }
332 thread_->join();
333 thread_.clear();
334 }
335
336 if (threadComm_ != -1)
337 {
338 UPstream::freeCommunicator(threadComm_);
339 }
340}
341
342
343// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
344
346(
347 const word& objectType,
348 const fileName& fName,
349 const string& data,
350 IOstreamOption streamOpt,
351 const bool append,
352 const bool useThread,
353 const dictionary& headerEntries
354)
355{
356 // Determine (on master) sizes to receive. Note: do NOT use thread
357 // communicator
358 labelList recvSizes;
359 decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
360
361 off_t totalSize = 0;
362 label maxLocalSize = 0;
363 {
364 for (const label recvSize : recvSizes)
365 {
366 totalSize += recvSize;
367 maxLocalSize = max(maxLocalSize, recvSize);
368 }
369 Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
370 }
371
372 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
373 {
374 if (debug)
375 {
376 Pout<< "OFstreamCollator : non-thread gather and write of " << fName
377 << " using local comm " << localComm_ << endl;
378 }
379 // Direct collating and writing (so master blocks until all written!)
380 const PtrList<SubList<char>> dummySlaveData;
381 return writeFile
382 (
383 localComm_,
384 objectType,
385 fName,
386 data,
387 recvSizes,
388 dummySlaveData,
389 streamOpt,
390 append,
391 headerEntries
392 );
393 }
394 else if (totalSize <= maxBufferSize_)
395 {
396 // Total size can be stored locally so receive all data now and only
397 // do the writing in the thread
398
399 if (debug)
400 {
401 Pout<< "OFstreamCollator : non-thread gather; thread write of "
402 << fName << endl;
403 }
404
405 if (Pstream::master(localComm_))
406 {
407 waitForBufferSpace(totalSize);
408 }
409
410
411 // Receive in chunks of labelMax (2^31-1) since this is the maximum
412 // size that a List can be
413
414 autoPtr<writeData> fileAndDataPtr
415 (
416 new writeData
417 (
418 threadComm_, // Note: comm not actually used anymore
419 objectType,
420 fName,
421 (
422 Pstream::master(localComm_)
423 ? data // Only used on master
425 ),
426 recvSizes,
427 streamOpt,
428 append,
429 headerEntries
430 )
431 );
432 writeData& fileAndData = fileAndDataPtr();
433
434 PtrList<List<char>>& slaveData = fileAndData.slaveData_;
435
436 UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
437
438 slaveData.setSize(recvSizes.size());
439
440 // Gather all data onto master. Is done in local communicator since
441 // not in write thread. Note that we do not store in contiguous
442 // buffer since that would limit to 2G chars.
443 const label startOfRequests = Pstream::nRequests();
444 if (Pstream::master(localComm_))
445 {
446 for (label proci = 1; proci < slaveData.size(); proci++)
447 {
448 slaveData.set(proci, new List<char>(recvSizes[proci]));
450 (
452 proci,
453 slaveData[proci].data(),
454 slaveData[proci].size_bytes(),
456 localComm_
457 );
458 }
459 }
460 else
461 {
462 if
463 (
465 (
467 0,
468 slice.cdata(),
469 slice.size_bytes(),
471 localComm_
472 )
473 )
474 {
476 << "Cannot send outgoing message. "
477 << "to:" << 0 << " nBytes:"
478 << label(slice.size_bytes())
480 }
481 }
482 Pstream::waitRequests(startOfRequests);
483
484 {
485 std::lock_guard<std::mutex> guard(mutex_);
486
487 // Append to thread buffer
488 objects_.push(fileAndDataPtr.ptr());
489
490 // Start thread if not running
491 if (!threadRunning_)
492 {
493 if (thread_)
494 {
495 if (debug)
496 {
497 Pout<< "OFstreamCollator : Waiting for write thread"
498 << endl;
499 }
500 thread_->join();
501 }
502
503 if (debug)
504 {
505 Pout<< "OFstreamCollator : Starting write thread"
506 << endl;
507 }
508 thread_.reset(new std::thread(writeAll, this));
509 threadRunning_ = true;
510 }
511 }
512
513 return true;
514 }
515 else
516 {
517 if (debug)
518 {
519 Pout<< "OFstreamCollator : thread gather and write of " << fName
520 << " using communicator " << threadComm_ << endl;
521 }
522
524 {
526 << "mpi does not seem to have thread support."
527 << " Make sure to set buffer size 'maxThreadFileBufferSize'"
528 << " to at least " << totalSize
529 << " to be able to do the collating before threading."
530 << exit(FatalError);
531 }
532
533 if (Pstream::master(localComm_))
534 {
535 waitForBufferSpace(data.size());
536 }
537
538 {
539 std::lock_guard<std::mutex> guard(mutex_);
540
541 // Push all file info on buffer. Note that no slave data provided
542 // so it will trigger communication inside the thread
543 objects_.push
544 (
545 new writeData
546 (
547 threadComm_,
548 objectType,
549 fName,
550 data,
551 recvSizes,
552 streamOpt,
553 append,
554 headerEntries
555 )
556 );
557
558 if (!threadRunning_)
559 {
560 if (thread_)
561 {
562 if (debug)
563 {
564 Pout<< "OFstreamCollator : Waiting for write thread"
565 << endl;
566 }
567 thread_->join();
568 }
569
570 if (debug)
571 {
572 Pout<< "OFstreamCollator : Starting write thread" << endl;
573 }
574 thread_.reset(new std::thread(writeAll, this));
575 threadRunning_ = true;
576 }
577 }
578
579 return true;
580 }
581}
582
583
585{
586 // Wait for all buffer space to be available i.e. wait for all jobs
587 // to finish
588 if (Pstream::master(localComm_))
589 {
590 if (debug)
591 {
592 Pout<< "OFstreamCollator : waiting for thread to have consumed all"
593 << endl;
594 }
595 waitForBufferSpace(-1);
596 }
597}
598
599
600// ************************************************************************* //
The IOstreamOption is a simple container for options an IOstream can normally have.
Threaded file writer.
virtual ~OFstreamCollator()
Destructor.
void waitAll()
Wait for all thread actions to have finished.
virtual const fileName & name() const
Read/write access to the name of the stream.
Definition: OSstream.H:107
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
Definition: PtrList.H:73
const T * set(const label i) const
Definition: PtrList.H:138
void setSize(const label newLen)
Same as resize()
Definition: PtrList.H:151
virtual bool read()
Re-read model coefficients if they have changed.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:94
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:230
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:258
Inter-processor communications stream.
Definition: UPstream.H:59
commsTypes
Types of communications.
Definition: UPstream.H:67
@ nonBlocking
"nonBlocking"
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:174
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:556
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:90
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:439
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:100
label size() const noexcept
The number of elements in the list.
Definition: UPtrListI.H:106
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
Definition: autoPtr.H:66
T * ptr() noexcept
Same as release().
Definition: autoPtr.H:172
Database for solution data, solver performance and other reduced data.
Definition: data.H:58
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char > > &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition: dictionary.H:126
A class for handling file names.
Definition: fileName.H:76
virtual bool write()
Write the output fields.
Base class for writing single files from the function objects.
Definition: writeFile.H:120
splitCell * master() const
Definition: splitCell.H:113
A class for handling words, derived from Foam::string.
Definition: word.H:68
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
Definition: className.H:121
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:473
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
OBJstream os(runTime.globalPath()/outputName)
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
Namespace for OpenFOAM.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:47
labelList identity(const label len, label start=0)
Return an identity map of the given length with (map[i] == i)
Definition: labelList.C:38
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: MSwindows.C:515
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition: MSwindows.C:1114
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
errorManip< error > abort(error &err)
Definition: errorManip.H:144
IOerror FatalIOError
error FatalError
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void writeData(Ostream &os, const Type &val)
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333
#define forAllConstIters(container, iter)
Iterate across all elements of the container object with const access.
Definition: stdFoam.H:278