collatedFileOperation.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017-2018 OpenFOAM Foundation
9 Copyright (C) 2020-2021 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
31#include "Pstream.H"
32#include "Time.H"
34#include "decomposedBlockData.H"
35#include "registerSwitch.H"
36#include "masterOFstream.H"
37#include "OFstream.H"
38#include "foamVersion.H"
39
40/* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
41
42namespace Foam
43{
44namespace fileOperations
45{
48 (
51 word
52 );
53
55 (
56 debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
57 );
59 (
60 "maxThreadFileBufferSize",
61 float,
63 );
64
65 // Mark as needing threaded mpi
67 (
70 word,
71 collated
72 );
73}
74}
75
76
77// * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
78
80(
81 const bool printRanks
82) const
83{
85 << "I/O : " << this->type();
86
88 {
90 << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl
91 << " Writing may be slow for large file sizes."
92 << endl;
93 }
94 else
95 {
97 << " [threaded] (maxThreadFileBufferSize = "
98 << maxThreadFileBufferSize << ")." << nl
99 << " Requires buffer large enough to collect all data"
100 " or thread support" << nl
101 << " enabled in MPI. If MPI thread support cannot be"
102 " enabled, deactivate" << nl
103 << " threading by setting maxThreadFileBufferSize"
104 " to 0 in" << nl
105 << " OpenFOAM etc/controlDict" << endl;
106 }
107
108 if (printRanks)
109 {
110 // Information about the ranks
113 {
114 hosts[Pstream::myProcNo()] = hostName();
115 }
116 Pstream::gatherList(hosts);
117
118 DynamicList<label> offsetMaster(Pstream::nProcs());
119
120 forAll(hosts, ranki)
121 {
122 if (!hosts[ranki].empty())
123 {
124 offsetMaster.append(ranki);
125 }
126 }
127
128 if (offsetMaster.size() > 1)
129 {
131 << "IO nodes:" << nl << '(' << nl;
132
133 offsetMaster.append(Pstream::nProcs());
134
135 for (label group = 1; group < offsetMaster.size(); ++group)
136 {
137 const label beg = offsetMaster[group-1];
138 const label end = offsetMaster[group];
139
141 << " (" << hosts[beg].c_str() << ' '
142 << (end-beg) << ')' << nl;
143 }
145 << ')' << nl;
146 }
147 }
148
149 // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
150 // {
151 // WarningInFunction
152 // << "Resetting fileModificationChecking to timeStamp" << endl;
153 // }
154 // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
155 // {
156 // WarningInFunction
157 // << "Resetting fileModificationChecking to inotify" << endl;
158 // }
159}
160
161
163(
164 const label proci
165)
166const
167{
168 if (Pstream::parRun())
169 {
170 return Pstream::master(comm_);
171 }
172 else if (ioRanks_.size())
173 {
174 // Found myself in IO rank
175 return ioRanks_.found(proci);
176 }
177 else
178 {
179 // Assume all in single communicator
180 return proci == 0;
181 }
182}
183
184
186(
187 const regIOobject& io,
188 const fileName& pathName,
189 IOstreamOption streamOpt
190) const
191{
192 // Append to processorsNN/ file
193
194 const label proci = detectProcessorPath(io.objectPath());
195
196 if (debug)
197 {
198 Pout<< "collatedFileOperation::writeObject :"
199 << " For local object : " << io.name()
200 << " appending processor " << proci
201 << " data to " << pathName << endl;
202 }
203 if (proci == -1)
204 {
206 << "Invalid processor path: " << pathName
207 << exit(FatalError);
208 }
209
210 const bool isMaster = isMasterRank(proci);
211
212 // Update meta-data for current state
213 if (isMaster)
214 {
215 const_cast<regIOobject&>(io).updateMetaData();
216 }
217
218 // Note: cannot do append + compression. This is a limitation
219 // of ogzstream (or rather most compressed formats)
220
222 (
223 pathName,
224 IOstreamOption(IOstream::BINARY, streamOpt.version()), // UNCOMPRESSED
225 !isMaster // append slaves
226 );
227
228 if (!os.good())
229 {
231 << "Cannot open for appending"
232 << exit(FatalIOError);
233 }
234
235 if (isMaster)
236 {
237 decomposedBlockData::writeHeader(os, streamOpt, io);
238 }
239
240 std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
241 (
242 os,
243 streamOpt,
244 io,
245 proci,
246 // With FoamFile header on master?
247 isMaster
248 );
249
250 return (blockOffset >= 0) && os.good();
251}
252
253
254// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
255
257(
258 bool verbose
259)
260:
262 (
263 (
264 ioRanks().size()
265 ? UPstream::allocateCommunicator
266 (
267 UPstream::worldComm,
268 subRanks(Pstream::nProcs())
269 )
270 : UPstream::worldComm
271 ),
272 false
273 ),
274 myComm_(comm_),
275 writer_(mag(maxThreadFileBufferSize), comm_),
276 nProcs_(Pstream::nProcs()),
277 ioRanks_(ioRanks())
278{
279 if (verbose && Foam::infoDetailLevel > 0)
280 {
281 this->printBanner(ioRanks_.size());
282 }
283}
284
285
287(
288 const label comm,
289 const labelList& ioRanks,
290 const word& typeName,
291 bool verbose
292)
293:
295 myComm_(-1),
296 writer_(mag(maxThreadFileBufferSize), comm),
297 nProcs_(Pstream::nProcs()),
298 ioRanks_(ioRanks)
299{
300 if (verbose && Foam::infoDetailLevel > 0)
301 {
302 this->printBanner(ioRanks_.size());
303 }
304}
305
306
307// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
308
310{
311 // Wait for any outstanding file operations
312 flush();
313
314 if (myComm_ != -1 && myComm_ != UPstream::worldComm)
315 {
317 }
318}
319
320
321// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
322
324(
325 const IOobject& io,
326 const word& typeName
327) const
328{
329 // Replacement for objectPath
330 if (io.time().processorCase())
331 {
333 (
334 io,
336 "dummy", // not used for processorsobject
337 io.instance()
338 );
339 }
340 else
341 {
343 (
344 io,
347 io.instance()
348 );
349 }
350}
351
352
354(
355 const regIOobject& io,
356 IOstreamOption streamOpt,
357 const bool valid
358) const
359{
360 const Time& tm = io.time();
361 const fileName& inst = io.instance();
362
363 // Update meta-data for current state
364 const_cast<regIOobject&>(io).updateMetaData();
365
366 if (inst.isAbsolute() || !tm.processorCase())
367 {
368 mkDir(io.path());
369 fileName pathName(io.objectPath());
370
371 if (debug)
372 {
373 Pout<< "collatedFileOperation::writeObject :"
374 << " For object : " << io.name()
375 << " falling back to master-only output to " << io.path()
376 << endl;
377 }
378
380 (
381 pathName,
382 streamOpt,
383 false, // append=false
384 valid
385 );
386
387 // If any of these fail, return
388 // (leave error handling to Ostream class)
389
390 const bool ok =
391 (
392 os.good()
393 && io.writeHeader(os)
394 && io.writeData(os)
395 );
396
397 if (ok)
398 {
400 }
401
402 return ok;
403 }
404 else
405 {
406 // Construct the equivalent processors/ directory
407 fileName path(processorsPath(io, inst, processorsDir(io)));
408
409 mkDir(path);
410 fileName pathName(path/io.name());
411
412 if (io.global())
413 {
414 if (debug)
415 {
416 Pout<< "collatedFileOperation::writeObject :"
417 << " For global object : " << io.name()
418 << " falling back to master-only output to " << pathName
419 << endl;
420 }
421
423 (
424 pathName,
425 streamOpt,
426 false, // append=false
427 valid
428 );
429
430 // If any of these fail, return
431 // (leave error handling to Ostream class)
432
433 const bool ok =
434 (
435 os.good()
436 && io.writeHeader(os)
437 && io.writeData(os)
438 );
439
440 if (ok)
441 {
443 }
444
445 return ok;
446 }
447 else if (!Pstream::parRun())
448 {
449 // Special path for e.g. decomposePar. Append to
450 // processorsDDD/ file
451 if (debug)
452 {
453 Pout<< "collatedFileOperation::writeObject :"
454 << " For object : " << io.name()
455 << " appending to " << pathName << endl;
456 }
457
458 return appendObject(io, pathName, streamOpt);
459 }
460 else
461 {
462 // Re-check static maxThreadFileBufferSize variable to see
463 // if needs to use threading
464 const bool useThread = (maxThreadFileBufferSize != 0);
465
466 if (debug)
467 {
468 Pout<< "collatedFileOperation::writeObject :"
469 << " For object : " << io.name()
470 << " starting collating output to " << pathName
471 << " useThread:" << useThread << endl;
472 }
473
474 if (!useThread)
475 {
476 writer_.waitAll();
477 }
478
480 (
481 writer_,
482 pathName,
483 streamOpt,
484 useThread
485 );
486
487 bool ok = os.good();
488
489 if (Pstream::master(comm_))
490 {
491 // Suppress comment banner
492 const bool old = IOobject::bannerEnabled(false);
493
494 ok = ok && io.writeHeader(os);
495
497
498 // Additional header content
501 (
502 dict,
503 streamOpt,
504 io
505 );
506 os.setHeaderEntries(dict);
507 }
508
509 ok = ok && io.writeData(os);
510 // No end divider for collated output
511
512 return ok;
513 }
514 }
515}
516
518{
519 if (debug)
520 {
521 Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
522 << endl;
523 }
525 // Wait for thread to finish (note: also removes thread)
526 writer_.waitAll();
527}
528
529
531(
532 const fileName& fName
533) const
534{
535 if (Pstream::parRun())
536 {
537 const List<int>& procs(UPstream::procID(comm_));
538
539 word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
540
541 if (procs.size() != Pstream::nProcs())
542 {
543 procDir +=
544 + "_"
545 + Foam::name(procs.first())
546 + "-"
547 + Foam::name(procs.last());
548 }
549 return procDir;
550 }
551 else
552 {
553 word procDir(processorsBaseDir+Foam::name(nProcs_));
554
555 if (ioRanks_.size())
556 {
557 // Detect current processor number
558 label proci = detectProcessorPath(fName);
559
560 if (proci != -1)
561 {
562 // Find lowest io rank
563 label minProc = 0;
564 label maxProc = nProcs_-1;
565 for (const label ranki : ioRanks_)
566 {
567 if (ranki >= nProcs_)
568 {
569 break;
570 }
571 else if (ranki <= proci)
572 {
573 minProc = ranki;
574 }
575 else
576 {
577 maxProc = ranki-1;
578 break;
579 }
580 }
581 procDir +=
582 + "_"
583 + Foam::name(minProc)
584 + "-"
585 + Foam::name(maxProc);
586 }
587 }
588
589 return procDir;
590 }
591}
592
593
595(
596 const IOobject& io
597) const
598{
599 return processorsDir(io.objectPath());
600}
601
602
604{
605 nProcs_ = nProcs;
606
607 if (debug)
608 {
609 Pout<< "collatedFileOperation::setNProcs :"
610 << " Setting number of processors to " << nProcs_ << endl;
611 }
612}
613
614
615// ************************************************************************* //
Macros for easy insertion into run-time selection tables.
#define addNamedToRunTimeSelectionTable(baseType, thisType, argNames, lookupName)
Add to construction table with 'lookupName' as the key.
#define addToRunTimeSelectionTable(baseType, thisType, argNames)
Add to construction table with typeName as the key.
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition: DynamicList.H:72
void append(const T &val)
Copy append an element to the end of this list.
Definition: DynamicListI.H:503
Defines the attributes of an object for which implicit objectRegistry management is supported,...
Definition: IOobject.H:170
static bool bannerEnabled() noexcept
Status of output file banner.
Definition: IOobject.H:315
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
fileName objectPath() const
The complete path + object name.
Definition: IOobjectI.H:214
The IOstreamOption is a simple container for options an IOstream can normally have.
versionNumber version() const noexcept
Get the stream version.
Output to file stream, using an OSstream.
Definition: OFstream.H:57
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
Inter-processor communications stream.
Definition: Pstream.H:63
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
bool processorCase() const noexcept
Return true if this is a processor case.
Definition: TimePathsI.H:36
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:80
T & first()
Return the first element of the list.
Definition: UListI.H:202
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:114
T & last()
Return the last element of the list.
Definition: UListI.H:216
Inter-processor communications stream.
Definition: UPstream.H:59
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:174
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:433
static std::streamoff writeBlockEntry(OSstream &os, const label blocki, const UList< char > &charData)
Helper: write block of (binary) character data.
static void writeExtraHeaderContent(dictionary &dict, IOstreamOption streamOptData, const IOobject &io)
Helper: generate additional entries for FoamFile header.
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition: dictionary.H:126
A class for handling file names.
Definition: fileName.H:76
static bool isAbsolute(const std::string &str)
Definition: fileNameI.H:136
An encapsulation of filesystem-related operations.
Definition: fileOperation.H:69
const label comm_
Communicator to use.
@ OBJECT
io.objectPath() exists
Definition: fileOperation.H:77
@ PROCOBJECT
objectPath exists in 'processorsNN_first-last'
Definition: fileOperation.H:86
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
virtual void setNProcs(const label nProcs)
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
const labelList ioRanks_
Ranks of IO handlers.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
bool appendObject(const regIOobject &io, const fileName &pathName, IOstreamOption streamOpt) const
Append to processorsNN/ file.
virtual bool writeObject(const regIOobject &, IOstreamOption streamOpt=IOstreamOption(), const bool valid=true) const
Writes a regIOobject (so header, contents and divider).
void printBanner(const bool printRanks=false) const
Print banner information, optionally with io ranks.
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
virtual fileName::Type type(const fileName &, const bool followLink=true) const
Return the file type: DIRECTORY, FILE or SYMLINK.
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
Master-only drop-in replacement for OFstream.
int myProcNo() const noexcept
Return processor number.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:76
splitCell * master() const
Definition: splitCell.H:113
Master-only drop-in replacement for OFstream.
A class for handling words, derived from Foam::string.
Definition: word.H:68
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
Definition: className.H:121
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:473
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
#define DetailInfo
Definition: evalEntry.C:37
OBJstream os(runTime.globalPath()/outputName)
IOobject io("surfaceFilmProperties", mesh.time().constant(), mesh, IOobject::READ_IF_PRESENT, IOobject::NO_WRITE, false)
float floatOptimisationSwitch(const char *name, const float deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:243
Namespace for OpenFOAM.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: MSwindows.C:515
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
string hostName()
Return the system's host name, as per hostname(1)
Definition: MSwindows.C:410
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
IOerror FatalIOError
int infoDetailLevel
Global for selective suppression of Info output.
error FatalError
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:59
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Ostream & flush(Ostream &os)
Flush stream.
Definition: Ostream.H:364
constexpr char nl
The newline '\n' character (0x0a)
Definition: Ostream.H:53
#define registerOptSwitch(Name, Type, SwitchVar)
dictionary dict
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:333