collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2020-2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "collatedFileOperation.H"
31 #include "Pstream.H"
32 #include "Time.H"
34 #include "decomposedBlockData.H"
35 #include "registerSwitch.H"
36 #include "masterOFstream.H"
37 #include "OFstream.H"
38 #include "foamVersion.H"
39 
40 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
41 
42 namespace Foam
43 {
44 namespace fileOperations
45 {
46  defineTypeNameAndDebug(collatedFileOperation, 0);
48  (
49  fileOperation,
50  collatedFileOperation,
51  word
52  );
53 
55  (
56  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
57  );
59  (
60  "maxThreadFileBufferSize",
61  float,
63  );
64 
65  // Mark as needing threaded mpi
67  (
69  collatedFileOperationInitialise,
70  word,
71  collated
72  );
73 }
74 }
75 
76 
77 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
78 
80 (
81  const bool printRanks
82 ) const
83 {
85  << "I/O : " << this->type();
86 
87  if (maxThreadFileBufferSize == 0)
88  {
90  << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl
91  << " Writing may be slow for large file sizes."
92  << endl;
93  }
94  else
95  {
97  << " [threaded] (maxThreadFileBufferSize = "
98  << maxThreadFileBufferSize << ")." << nl
99  << " Requires buffer large enough to collect all data"
100  " or thread support" << nl
101  << " enabled in MPI. If MPI thread support cannot be"
102  " enabled, deactivate" << nl
103  << " threading by setting maxThreadFileBufferSize"
104  " to 0 in" << nl
105  << " OpenFOAM etc/controlDict" << endl;
106  }
107 
108  if (printRanks)
109  {
110  // Information about the ranks
111  stringList hosts(Pstream::nProcs());
112  if (Pstream::master(comm_))
113  {
114  // Don't usually need the pid
115  // hosts[Pstream::myProcNo()] = hostName()+"."+name(pid());
116  hosts[Pstream::myProcNo()] = hostName();
117  }
118  Pstream::gatherList(hosts);
119 
120  DynamicList<label> offsetMaster(Pstream::nProcs());
121 
122  forAll(hosts, ranki)
123  {
124  if (!hosts[ranki].empty())
125  {
126  offsetMaster.append(ranki);
127  }
128  }
129 
130  if (offsetMaster.size() > 1)
131  {
132  DetailInfo
133  << "IO nodes:" << nl << '(' << nl;
134 
135  offsetMaster.append(Pstream::nProcs());
136 
137  for (label group = 1; group < offsetMaster.size(); ++group)
138  {
139  const label beg = offsetMaster[group-1];
140  const label end = offsetMaster[group];
141 
142  DetailInfo
143  << " (" << hosts[beg].c_str() << ' '
144  << (end-beg) << ')' << nl;
145  }
146  DetailInfo
147  << ')' << nl;
148  }
149  }
150 
151  // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
152  // {
153  // WarningInFunction
154  // << "Resetting fileModificationChecking to timeStamp" << endl;
155  // }
156  // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
157  // {
158  // WarningInFunction
159  // << "Resetting fileModificationChecking to inotify" << endl;
160  // }
161 }
162 
163 
165 (
166  const label proci
167 )
168 const
169 {
170  if (Pstream::parRun())
171  {
172  return Pstream::master(comm_);
173  }
174  else if (ioRanks_.size())
175  {
176  // Found myself in IO rank
177  return ioRanks_.found(proci);
178  }
179  else
180  {
181  // Assume all in single communicator
182  return proci == 0;
183  }
184 }
185 
186 
188 (
189  const regIOobject& io,
190  const fileName& pathName,
191  IOstreamOption streamOpt
192 ) const
193 {
194  // Append to processorsNN/ file
195 
196  const label proci = detectProcessorPath(io.objectPath());
197 
198  if (debug)
199  {
200  Pout<< "collatedFileOperation::writeObject :"
201  << " For local object : " << io.name()
202  << " appending processor " << proci
203  << " data to " << pathName << endl;
204  }
205  if (proci == -1)
206  {
208  << "Invalid processor path: " << pathName
209  << exit(FatalError);
210  }
211 
212  const bool isMaster = isMasterRank(proci);
213 
214  // Update meta-data for current state
215  if (isMaster)
216  {
217  const_cast<regIOobject&>(io).updateMetaData();
218  }
219 
220  // Note: cannot do append + compression. This is a limitation
221  // of ogzstream (or rather most compressed formats)
222 
223  OFstream os
224  (
225  pathName,
226  IOstreamOption(IOstream::BINARY, streamOpt.version()), // UNCOMPRESSED
227  !isMaster // append slaves
228  );
229 
230  if (!os.good())
231  {
233  << "Cannot open for appending"
234  << exit(FatalIOError);
235  }
236 
237  if (isMaster)
238  {
239  decomposedBlockData::writeHeader(os, streamOpt, io);
240  }
241 
242  std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
243  (
244  os,
245  streamOpt,
246  io,
247  proci,
248  // With FoamFile header on master?
249  isMaster
250  );
251 
252  return (blockOffset >= 0) && os.good();
253 }
254 
255 
256 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
257 
259 (
260  bool verbose
261 )
262 :
264  (
265  (
266  ioRanks().size()
267  ? UPstream::allocateCommunicator
268  (
269  UPstream::worldComm,
270  subRanks(Pstream::nProcs())
271  )
272  : UPstream::worldComm
273  ),
274  false
275  ),
276  myComm_(comm_),
277  writer_(mag(maxThreadFileBufferSize), comm_),
278  nProcs_(Pstream::nProcs()),
279  ioRanks_(ioRanks())
280 {
281  if (verbose && Foam::infoDetailLevel > 0)
282  {
283  this->printBanner(ioRanks_.size());
284  }
285 }
286 
287 
289 (
290  const label comm,
291  const labelList& ioRanks,
292  const word& typeName,
293  bool verbose
294 )
295 :
296  masterUncollatedFileOperation(comm, false),
297  myComm_(-1),
298  writer_(mag(maxThreadFileBufferSize), comm),
299  nProcs_(Pstream::nProcs()),
300  ioRanks_(ioRanks)
301 {
302  if (verbose && Foam::infoDetailLevel > 0)
303  {
304  this->printBanner(ioRanks_.size());
305  }
306 }
307 
308 
309 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
310 
312 {
313  // Wait for any outstanding file operations
314  flush();
315 
316  if (myComm_ != -1 && myComm_ != UPstream::worldComm)
317  {
319  }
320 }
321 
322 
323 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
324 
326 (
327  const IOobject& io,
328  const word& typeName
329 ) const
330 {
331  // Replacement for objectPath
332  if (io.time().processorCase())
333  {
335  (
336  io,
338  "dummy", // not used for processorsobject
339  io.instance()
340  );
341  }
342  else
343  {
345  (
346  io,
348  word::null,
349  io.instance()
350  );
351  }
352 }
353 
354 
356 (
357  const regIOobject& io,
358  IOstreamOption streamOpt,
359  const bool valid
360 ) const
361 {
362  const Time& tm = io.time();
363  const fileName& inst = io.instance();
364 
365  // Update meta-data for current state
366  const_cast<regIOobject&>(io).updateMetaData();
367 
368  if (inst.isAbsolute() || !tm.processorCase())
369  {
370  mkDir(io.path());
371  fileName pathName(io.objectPath());
372 
373  if (debug)
374  {
375  Pout<< "collatedFileOperation::writeObject :"
376  << " For object : " << io.name()
377  << " falling back to master-only output to " << io.path()
378  << endl;
379  }
380 
382  (
383  pathName,
384  streamOpt,
385  false, // append=false
386  valid
387  );
388 
389  // If any of these fail, return
390  // (leave error handling to Ostream class)
391 
392  const bool ok =
393  (
394  os.good()
395  && io.writeHeader(os)
396  && io.writeData(os)
397  );
398 
399  if (ok)
400  {
402  }
403 
404  return ok;
405  }
406  else
407  {
408  // Construct the equivalent processors/ directory
409  fileName path(processorsPath(io, inst, processorsDir(io)));
410 
411  mkDir(path);
412  fileName pathName(path/io.name());
413 
414  if (io.global())
415  {
416  if (debug)
417  {
418  Pout<< "collatedFileOperation::writeObject :"
419  << " For global object : " << io.name()
420  << " falling back to master-only output to " << pathName
421  << endl;
422  }
423 
425  (
426  pathName,
427  streamOpt,
428  false, // append=false
429  valid
430  );
431 
432  // If any of these fail, return
433  // (leave error handling to Ostream class)
434 
435  const bool ok =
436  (
437  os.good()
438  && io.writeHeader(os)
439  && io.writeData(os)
440  );
441 
442  if (ok)
443  {
445  }
446 
447  return ok;
448  }
449  else if (!Pstream::parRun())
450  {
451  // Special path for e.g. decomposePar. Append to
452  // processorsDDD/ file
453  if (debug)
454  {
455  Pout<< "collatedFileOperation::writeObject :"
456  << " For object : " << io.name()
457  << " appending to " << pathName << endl;
458  }
459 
460  return appendObject(io, pathName, streamOpt);
461  }
462  else
463  {
464  // Re-check static maxThreadFileBufferSize variable to see
465  // if needs to use threading
466  const bool useThread = (maxThreadFileBufferSize != 0);
467 
468  if (debug)
469  {
470  Pout<< "collatedFileOperation::writeObject :"
471  << " For object : " << io.name()
472  << " starting collating output to " << pathName
473  << " useThread:" << useThread << endl;
474  }
475 
476  if (!useThread)
477  {
478  writer_.waitAll();
479  }
480 
482  (
483  writer_,
484  pathName,
485  streamOpt,
486  useThread
487  );
488 
489  bool ok = os.good();
490 
491  if (Pstream::master(comm_))
492  {
493  // Suppress comment banner
494  const bool old = IOobject::bannerEnabled(false);
495 
496  ok = ok && io.writeHeader(os);
497 
499 
500  // Additional header content
503  (
504  dict,
505  streamOpt,
506  io
507  );
508  os.setHeaderEntries(dict);
509  }
510 
511  ok = ok && io.writeData(os);
512  // No end divider for collated output
513 
514  return ok;
515  }
516  }
517 }
518 
520 {
521  if (debug)
522  {
523  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
524  << endl;
525  }
527  // Wait for thread to finish (note: also removes thread)
528  writer_.waitAll();
529 }
530 
531 
533 (
534  const fileName& fName
535 ) const
536 {
537  if (Pstream::parRun())
538  {
539  const List<int>& procs(UPstream::procID(comm_));
540 
541  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
542 
543  if (procs.size() != Pstream::nProcs())
544  {
545  procDir +=
546  + "_"
547  + Foam::name(procs.first())
548  + "-"
549  + Foam::name(procs.last());
550  }
551  return procDir;
552  }
553  else
554  {
555  word procDir(processorsBaseDir+Foam::name(nProcs_));
556 
557  if (ioRanks_.size())
558  {
559  // Detect current processor number
560  label proci = detectProcessorPath(fName);
561 
562  if (proci != -1)
563  {
564  // Find lowest io rank
565  label minProc = 0;
566  label maxProc = nProcs_-1;
567  for (const label ranki : ioRanks_)
568  {
569  if (ranki >= nProcs_)
570  {
571  break;
572  }
573  else if (ranki <= proci)
574  {
575  minProc = ranki;
576  }
577  else
578  {
579  maxProc = ranki-1;
580  break;
581  }
582  }
583  procDir +=
584  + "_"
585  + Foam::name(minProc)
586  + "-"
587  + Foam::name(maxProc);
588  }
589  }
590 
591  return procDir;
592  }
593 }
594 
595 
597 (
598  const IOobject& io
599 ) const
600 {
601  return processorsDir(io.objectPath());
602 }
603 
604 
606 {
607  nProcs_ = nProcs;
608 
609  if (debug)
610  {
611  Pout<< "collatedFileOperation::setNProcs :"
612  << " Setting number of processors to " << nProcs_ << endl;
613  }
614 }
615 
616 
617 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::IOobject
Defines the attributes of an object for which implicit objectRegistry management is supported,...
Definition: IOobject.H:169
collatedFileOperation.H
Foam::fileOperations::collatedFileOperation::maxThreadFileBufferSize
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
Definition: collatedFileOperation.H:117
Foam::Time
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:73
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:65
Foam::fileOperations::collatedFileOperation::~collatedFileOperation
virtual ~collatedFileOperation()
Destructor.
Definition: collatedFileOperation.C:311
Foam::fileName
A class for handling file names.
Definition: fileName.H:73
Foam::constant::atomic::group
constexpr const char *const group
Group name for atomic constants.
Definition: atomicConstants.H:52
Foam::IOobject::instance
const fileName & instance() const noexcept
Definition: IOobjectI.H:196
Foam::fileOperations::collatedFileOperation::processorsDir
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
Definition: collatedFileOperation.C:597
Foam::DynamicList< label >
Foam::IOobject::writeEndDivider
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
Definition: IOobjectWriteHeader.C:142
Foam::regIOobject::writeData
virtual bool writeData(Ostream &) const =0
Pure virtual writeData function.
masterOFstream.H
Foam::TimePaths::processorCase
bool processorCase() const noexcept
Return true if this is a processor case.
Definition: TimePathsI.H:36
Foam::fileOperation::OBJECT
io.objectPath() exists
Definition: fileOperation.H:77
Foam::IOobject::bannerEnabled
static bool bannerEnabled() noexcept
Status of output file banner.
Definition: IOobject.H:315
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:457
Foam::regIOobject::global
virtual bool global() const
Is object global.
Definition: regIOobject.H:354
Foam::fileOperations::addNamedToRunTimeSelectionTable
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
Foam::FatalIOError
IOerror FatalIOError
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:369
Foam::writeHeader
static void writeHeader(Ostream &os, const word &fieldName)
Definition: rawSurfaceWriterImpl.C:66
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::IOobject::time
const Time & time() const
Return Time associated with the objectRegistry.
Definition: IOobject.C:493
Foam::fileOperations::collatedFileOperation::objectPath
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
Definition: collatedFileOperation.C:326
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
OFstream.H
Foam::fileOperations::registerOptSwitch
registerOptSwitch("maxThreadFileBufferSize", float, collatedFileOperation::maxThreadFileBufferSize)
Foam::decomposedBlockData::writeExtraHeaderContent
static void writeExtraHeaderContent(dictionary &dict, IOstreamOption streamOptData, const IOobject &io)
Helper: generate additional entries for FoamFile header.
Definition: decomposedBlockDataHeader.C:179
Foam::fileOperations::collatedFileOperation::appendObject
bool appendObject(const regIOobject &io, const fileName &pathName, IOstreamOption streamOpt) const
Append to processorsNN/ file.
Definition: collatedFileOperation.C:188
Foam::DynamicList::append
DynamicList< T, SizeMin > & append(const T &val)
Append an element to the end of this list.
Definition: DynamicListI.H:511
Foam::fileOperations::addToRunTimeSelectionTable
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
Foam::fileOperations::collatedFileOperation::setNProcs
virtual void setNProcs(const label nProcs)
Definition: collatedFileOperation.C:605
Foam::IOstreamOption
The IOstreamOption is a simple container for options an IOstream can normally have.
Definition: IOstreamOption.H:63
Foam::UPstream::procID
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:474
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:174
DetailInfo
#define DetailInfo
Definition: evalEntry.C:37
Foam::fileOperations::collatedFileOperation::isMasterRank
bool isMasterRank(const label proci) const
Definition: collatedFileOperation.C:165
Foam::fileOperations::collatedFileOperation::writeObject
virtual bool writeObject(const regIOobject &, IOstreamOption streamOpt=IOstreamOption(), const bool valid=true) const
Writes a regIOobject (so header, contents and divider).
Definition: collatedFileOperation.C:356
Foam::IOstreamOption::version
versionNumber version() const noexcept
Get the stream version.
Definition: IOstreamOption.H:338
dict
dictionary dict
Definition: searchingEngine.H:14
Foam::threadedCollatedOFstream
Master-only drop-in replacement for OFstream.
Definition: threadedCollatedOFstream.H:55
threadedCollatedOFstream.H
Foam::FatalError
error FatalError
Foam::dictionary
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition: dictionary.H:123
os
OBJstream os(runTime.globalPath()/outputName)
addToRunTimeSelectionTable.H
Macros for easy insertion into run-time selection tables.
stdFoam::end
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
Definition: stdFoam.H:121
Pstream.H
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
fileOperationInitialise
General fileOperation initialiser. Handles -ioRanks option, using it to set the FOAM_IORANKS environm...
Foam::infoDetailLevel
int infoDetailLevel
Global for selective suppression of Info output.
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::OFstream
Output to file stream, using an OSstream.
Definition: OFstream.H:53
Foam::IOobject::name
const word & name() const noexcept
Return name.
Definition: IOobjectI.H:65
Time.H
Foam::regIOobject
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:73
Foam::fileOperations::masterUncollatedFileOperation::flush
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
Definition: masterUncollatedFileOperation.C:2417
Foam::fileOperations::masterUncollatedFileOperation
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
Definition: masterUncollatedFileOperation.H:84
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
Foam::debug::floatOptimisationSwitch
float floatOptimisationSwitch(const char *name, const float deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:243
Foam::nl
constexpr char nl
Definition: Ostream.H:404
decomposedBlockData.H
Foam::fileOperations::collatedFileOperation::printBanner
void printBanner(const bool printRanks=false) const
Print banner information, optionally with io ranks.
Definition: collatedFileOperation.C:80
Foam::UPstream::worldComm
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:293
Foam::UPstream::parRun
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:433
Foam::List< string >
Foam::mag
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
path
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
Foam::hostName
string hostName(bool full=false)
Return the system's host name, as per hostname(1)
Definition: MSwindows.C:410
Foam::word::null
static const word null
An empty word.
Definition: word.H:80
Foam::fileOperation::PROCOBJECT
objectPath exists in 'processorsNN_first-last'
Definition: fileOperation.H:86
Foam::roots::type
type
Types of root.
Definition: Roots.H:54
Foam::name
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:59
Foam::fileOperations::masterUncollatedFileOperation::localObjectPath
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
Definition: masterUncollatedFileOperation.C:336
Foam::IOobject::writeHeader
bool writeHeader(Ostream &os) const
Write header with current type()
Definition: IOobjectWriteHeader.C:277
FatalIOErrorInFunction
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:473
registerSwitch.H
Foam::fileOperations::collatedFileOperation::collatedFileOperation
collatedFileOperation(bool verbose)
Default construct.
Definition: collatedFileOperation.C:259
Foam::IOobject::objectPath
fileName objectPath() const
The complete path + object name.
Definition: IOobjectI.H:214
Foam::fileOperations::collatedFileOperation::myComm_
const label myComm_
Any communicator allocated by me.
Definition: collatedFileOperation.H:74
Foam::mkDir
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: MSwindows.C:507
Foam::IOobject::path
fileName path() const
The complete path.
Definition: IOobject.C:511
Foam::fileOperations::collatedFileOperation::flush
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
Definition: collatedFileOperation.C:519
Foam::fileName::isAbsolute
static bool isAbsolute(const std::string &str)
Definition: fileNameI.H:136
Foam::masterOFstream
Master-only drop-in replacement for OFstream.
Definition: masterOFstream.H:51
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:445
Foam::fileOperations::defineTypeNameAndDebug
defineTypeNameAndDebug(collatedFileOperation, 0)
foamVersion.H