OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2019-2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "OFstreamCollator.H"
30 #include "OFstream.H"
31 #include "decomposedBlockData.H"
32 #include "dictionary.H"
34 
35 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39  defineTypeNameAndDebug(OFstreamCollator, 0);
40 }
41 
42 
43 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
44 
45 bool Foam::OFstreamCollator::writeFile
46 (
47  const label comm,
48  const word& objectType,
49  const fileName& fName,
50  const string& masterData,
51  const labelUList& recvSizes,
52  const PtrList<SubList<char>>& slaveData, // optional slave data
53  IOstreamOption streamOpt,
54  const bool append,
55  const dictionary& headerEntries
56 )
57 {
58  if (debug)
59  {
60  Pout<< "OFstreamCollator : Writing master " << masterData.size()
61  << " bytes to " << fName
62  << " using comm " << comm << endl;
63 
64  if (slaveData.size())
65  {
66  Pout<< "OFstreamCollator : Slave data" << endl;
67  forAll(slaveData, proci)
68  {
69  if (slaveData.set(proci))
70  {
71  Pout<< " " << proci
72  << " size:" << slaveData[proci].size()
73  << endl;
74  }
75  }
76  }
77  }
78 
79  autoPtr<OSstream> osPtr;
80  if (UPstream::master(comm))
81  {
82  Foam::mkDir(fName.path());
83  osPtr.reset(new OFstream(fName, streamOpt, append));
84  auto& os = *osPtr;
85 
86  if (!append)
87  {
88  // No IOobject so cannot use IOobject::writeHeader
89 
90  // FoamFile
92  (
93  os,
94  streamOpt, // streamOpt for container
95  objectType,
96  "", // note
97  "", // location (leave empty instead inaccurate)
98  fName.name(), // object name
99  headerEntries
100  );
101  }
102  }
103 
104 
105  UList<char> slice
106  (
107  const_cast<char*>(masterData.data()),
108  label(masterData.size())
109  );
110 
111  // Assuming threaded writing hides any slowness so we
112  // can use scheduled communication to send the data to
113  // the master processor in order. However can be unstable
114  // for some mpi so default is non-blocking.
115 
116  List<std::streamoff> blockOffset;
118  (
119  comm,
120  osPtr,
121  blockOffset,
122  slice,
123  recvSizes,
124  slaveData,
125  (
126  fileOperations::masterUncollatedFileOperation::
127  maxMasterFileBufferSize == 0
130  ),
131  false // do not reduce return state
132  );
133 
134  if (osPtr && !osPtr->good())
135  {
136  FatalIOErrorInFunction(*osPtr)
137  << "Failed writing to " << fName << exit(FatalIOError);
138  }
139 
140  if (debug)
141  {
142  Pout<< "OFstreamCollator : Finished writing " << masterData.size()
143  << " bytes";
144  if (UPstream::master(comm))
145  {
146  off_t sum = 0;
147  for (const label recv : recvSizes)
148  {
149  sum += recv;
150  }
151  // Use std::to_string to display long int
152  Pout<< " (overall " << std::to_string(sum) << ')';
153  }
154  Pout<< " to " << fName
155  << " using comm " << comm << endl;
156  }
157 
158  return true;
159 }
160 
161 
162 void* Foam::OFstreamCollator::writeAll(void *threadarg)
163 {
164  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
165 
166  // Consume stack
167  while (true)
168  {
169  writeData* ptr = nullptr;
170 
171  {
172  std::lock_guard<std::mutex> guard(handler.mutex_);
173  if (handler.objects_.size())
174  {
175  ptr = handler.objects_.pop();
176  }
177  }
178 
179  if (!ptr)
180  {
181  break;
182  }
183  else
184  {
185  // Convert storage to pointers
186  PtrList<SubList<char>> slaveData;
187  if (ptr->slaveData_.size())
188  {
189  slaveData.resize(ptr->slaveData_.size());
190  forAll(slaveData, proci)
191  {
192  if (ptr->slaveData_.set(proci))
193  {
194  slaveData.set
195  (
196  proci,
197  new SubList<char>
198  (
199  ptr->slaveData_[proci],
200  ptr->sizes_[proci]
201  )
202  );
203  }
204  }
205  }
206 
207  bool ok = writeFile
208  (
209  ptr->comm_,
210  ptr->objectType_,
211  ptr->pathName_,
212  ptr->data_,
213  ptr->sizes_,
214  slaveData,
215  ptr->streamOpt_,
216  ptr->append_,
217  ptr->headerEntries_
218  );
219  if (!ok)
220  {
221  FatalIOErrorInFunction(ptr->pathName_)
222  << "Failed writing " << ptr->pathName_
223  << exit(FatalIOError);
224  }
225 
226  delete ptr;
227  }
228  //sleep(1);
229  }
230 
231  if (debug)
232  {
233  Pout<< "OFstreamCollator : Exiting write thread " << endl;
234  }
235 
236  {
237  std::lock_guard<std::mutex> guard(handler.mutex_);
238  handler.threadRunning_ = false;
239  }
240 
241  return nullptr;
242 }
243 
244 
245 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
246 {
247  while (true)
248  {
249  // Count files to be written
250  off_t totalSize = 0;
251 
252  {
253  std::lock_guard<std::mutex> guard(mutex_);
254  forAllConstIters(objects_, iter)
255  {
256  totalSize += iter()->size();
257  }
258  }
259 
260  if
261  (
262  totalSize == 0
263  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
264  )
265  {
266  break;
267  }
268 
269  if (debug)
270  {
271  std::lock_guard<std::mutex> guard(mutex_);
272  Pout<< "OFstreamCollator : Waiting for buffer space."
273  << " Currently in use:" << totalSize
274  << " limit:" << maxBufferSize_
275  << " files:" << objects_.size()
276  << endl;
277  }
278 
279  sleep(5);
280  }
281 }
282 
283 
284 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
285 
287 :
288  maxBufferSize_(maxBufferSize),
289  threadRunning_(false),
290  localComm_(UPstream::worldComm),
291  threadComm_
292  (
293  UPstream::allocateCommunicator
294  (
295  localComm_,
296  identity(UPstream::nProcs(localComm_))
297  )
298  )
299 {}
300 
301 
303 (
304  const off_t maxBufferSize,
305  const label comm
306 )
307 :
308  maxBufferSize_(maxBufferSize),
309  threadRunning_(false),
310  localComm_(comm),
311  threadComm_
312  (
314  (
315  localComm_,
316  identity(UPstream::nProcs(localComm_))
317  )
318  )
319 {}
320 
321 
322 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
323 
325 {
326  if (thread_)
327  {
328  if (debug)
329  {
330  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
331  }
332  thread_->join();
333  thread_.clear();
334  }
335 
336  if (threadComm_ != -1)
337  {
338  UPstream::freeCommunicator(threadComm_);
339  }
340 }
341 
342 
343 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
344 
346 (
347  const word& objectType,
348  const fileName& fName,
349  const string& data,
350  IOstreamOption streamOpt,
351  const bool append,
352  const bool useThread,
353  const dictionary& headerEntries
354 )
355 {
356  // Determine (on master) sizes to receive. Note: do NOT use thread
357  // communicator
358  labelList recvSizes;
359  decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
360 
361  off_t totalSize = 0;
362  label maxLocalSize = 0;
363  {
364  for (const label recvSize : recvSizes)
365  {
366  totalSize += recvSize;
367  maxLocalSize = max(maxLocalSize, recvSize);
368  }
369  Pstream::scatter(totalSize, Pstream::msgType(), localComm_);
370  Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_);
371  }
372 
373  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
374  {
375  if (debug)
376  {
377  Pout<< "OFstreamCollator : non-thread gather and write of " << fName
378  << " using local comm " << localComm_ << endl;
379  }
380  // Direct collating and writing (so master blocks until all written!)
381  const PtrList<SubList<char>> dummySlaveData;
382  return writeFile
383  (
384  localComm_,
385  objectType,
386  fName,
387  data,
388  recvSizes,
389  dummySlaveData,
390  streamOpt,
391  append,
392  headerEntries
393  );
394  }
395  else if (totalSize <= maxBufferSize_)
396  {
397  // Total size can be stored locally so receive all data now and only
398  // do the writing in the thread
399 
400  if (debug)
401  {
402  Pout<< "OFstreamCollator : non-thread gather; thread write of "
403  << fName << endl;
404  }
405 
406  if (Pstream::master(localComm_))
407  {
408  waitForBufferSpace(totalSize);
409  }
410 
411 
412  // Receive in chunks of labelMax (2^31-1) since this is the maximum
413  // size that a List can be
414 
415  autoPtr<writeData> fileAndDataPtr
416  (
417  new writeData
418  (
419  threadComm_, // Note: comm not actually used anymore
420  objectType,
421  fName,
422  (
423  Pstream::master(localComm_)
424  ? data // Only used on master
425  : string::null
426  ),
427  recvSizes,
428  streamOpt,
429  append,
430  headerEntries
431  )
432  );
433  writeData& fileAndData = fileAndDataPtr();
434 
435  PtrList<List<char>>& slaveData = fileAndData.slaveData_;
436 
437  UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
438 
439  slaveData.setSize(recvSizes.size());
440 
441  // Gather all data onto master. Is done in local communicator since
442  // not in write thread. Note that we do not store in contiguous
443  // buffer since that would limit to 2G chars.
444  const label startOfRequests = Pstream::nRequests();
445  if (Pstream::master(localComm_))
446  {
447  for (label proci = 1; proci < slaveData.size(); proci++)
448  {
449  slaveData.set(proci, new List<char>(recvSizes[proci]));
451  (
453  proci,
454  slaveData[proci].data(),
455  slaveData[proci].size_bytes(),
457  localComm_
458  );
459  }
460  }
461  else
462  {
463  if
464  (
466  (
468  0,
469  slice.cdata(),
470  slice.size_bytes(),
472  localComm_
473  )
474  )
475  {
477  << "Cannot send outgoing message. "
478  << "to:" << 0 << " nBytes:"
479  << label(slice.size_bytes())
481  }
482  }
483  Pstream::waitRequests(startOfRequests);
484 
485  {
486  std::lock_guard<std::mutex> guard(mutex_);
487 
488  // Append to thread buffer
489  objects_.push(fileAndDataPtr.ptr());
490 
491  // Start thread if not running
492  if (!threadRunning_)
493  {
494  if (thread_)
495  {
496  if (debug)
497  {
498  Pout<< "OFstreamCollator : Waiting for write thread"
499  << endl;
500  }
501  thread_->join();
502  }
503 
504  if (debug)
505  {
506  Pout<< "OFstreamCollator : Starting write thread"
507  << endl;
508  }
509  thread_.reset(new std::thread(writeAll, this));
510  threadRunning_ = true;
511  }
512  }
513 
514  return true;
515  }
516  else
517  {
518  if (debug)
519  {
520  Pout<< "OFstreamCollator : thread gather and write of " << fName
521  << " using communicator " << threadComm_ << endl;
522  }
523 
524  if (!UPstream::haveThreads())
525  {
527  << "mpi does not seem to have thread support."
528  << " Make sure to set buffer size 'maxThreadFileBufferSize'"
529  << " to at least " << totalSize
530  << " to be able to do the collating before threading."
531  << exit(FatalError);
532  }
533 
534  if (Pstream::master(localComm_))
535  {
536  waitForBufferSpace(data.size());
537  }
538 
539  {
540  std::lock_guard<std::mutex> guard(mutex_);
541 
542  // Push all file info on buffer. Note that no slave data provided
543  // so it will trigger communication inside the thread
544  objects_.push
545  (
546  new writeData
547  (
548  threadComm_,
549  objectType,
550  fName,
551  data,
552  recvSizes,
553  streamOpt,
554  append,
555  headerEntries
556  )
557  );
558 
559  if (!threadRunning_)
560  {
561  if (thread_)
562  {
563  if (debug)
564  {
565  Pout<< "OFstreamCollator : Waiting for write thread"
566  << endl;
567  }
568  thread_->join();
569  }
570 
571  if (debug)
572  {
573  Pout<< "OFstreamCollator : Starting write thread" << endl;
574  }
575  thread_.reset(new std::thread(writeAll, this));
576  threadRunning_ = true;
577  }
578  }
579 
580  return true;
581  }
582 }
583 
584 
586 {
587  // Wait for all buffer space to be available i.e. wait for all jobs
588  // to finish
589  if (Pstream::master(localComm_))
590  {
591  if (debug)
592  {
593  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
594  << endl;
595  }
596  waitForBufferSpace(-1);
597  }
598 }
599 
600 
601 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::decomposedBlockData::writeHeader
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
Definition: decomposedBlockDataHeader.C:137
Foam::UList::cdata
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:230
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:65
Foam::UOPstream::write
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Write given buffer to given processor.
Definition: UOPwrite.C:36
Foam::fileName
A class for handling file names.
Definition: fileName.H:73
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:262
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:457
Foam::Pstream::scatter
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
Definition: gatherScatter.C:150
Foam::FatalIOError
IOerror FatalIOError
Foam::OFstreamCollator::OFstreamCollator
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
Definition: OFstreamCollator.C:286
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:369
Foam::PtrList::set
const T * set(const label i) const
Return const pointer to element (can be nullptr),.
Definition: PtrList.H:138
append
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::UIPstream::read
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=UPstream::worldComm)
Read into given buffer from given processor.
Definition: UIPread.C:81
Foam::UPstream::haveThreads
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:439
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
OFstream.H
Foam::decomposedBlockData::writeBlocks
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
Definition: decomposedBlockData.C:718
Foam::UPstream::allocateCommunicator
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:108
Foam::UPstream
Inter-processor communications stream.
Definition: UPstream.H:61
Foam::decomposedBlockData::gather
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
Definition: decomposedBlockData.C:579
Foam::IOstreamOption
The IOstreamOption is a simple container for options an IOstream can normally have.
Definition: IOstreamOption.H:63
Foam::PtrList::setSize
void setSize(const label newLen)
Same as resize()
Definition: PtrList.H:151
Foam::PtrList
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
Definition: List.H:59
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:174
Foam::autoPtr::ptr
T * ptr() noexcept
Same as release().
Definition: autoPtr.H:172
Foam::max
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:47
Foam::writeData
static void writeData(Ostream &os, const Type &val)
Definition: rawSurfaceWriterImpl.C:45
Foam::FatalError
error FatalError
Foam::dictionary
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition: dictionary.H:123
Foam::string::null
static const string null
An empty string.
Definition: string.H:169
os
OBJstream os(runTime.globalPath()/outputName)
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
Foam::OFstreamCollator::waitAll
void waitAll()
Wait for all thread actions to have finished.
Definition: OFstreamCollator.C:585
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::autoPtr
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
Definition: HashPtrTable.H:53
masterUncollatedFileOperation.H
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
Foam::UPstream::commsTypes::nonBlocking
Foam::UPstream::msgType
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:540
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:252
forAllConstIters
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
Foam::UPstream::commsTypes::scheduled
decomposedBlockData.H
Foam::List< label >
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
dictionary.H
Foam::identity
labelList identity(const label len, label start=0)
Create identity map of the given length with (map[i] == i)
Definition: labelList.C:38
Foam::OFstreamCollator::write
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, const bool append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
Definition: OFstreamCollator.C:346
Foam::sum
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
Definition: DimensionedFieldFunctions.C:327
FatalIOErrorInFunction
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:473
Foam::labelUList
UList< label > labelUList
A UList of labels.
Definition: UList.H:85
Foam::mkDir
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: MSwindows.C:507
Foam::defineTypeNameAndDebug
defineTypeNameAndDebug(combustionModel, 0)
Foam::sleep
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition: MSwindows.C:1106
Foam::data
Database for solution data, solver performance and other reduced data.
Definition: data.H:55
Foam::UList::size_bytes
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:258
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:445
OFstreamCollator.H
Foam::OFstreamCollator::~OFstreamCollator
virtual ~OFstreamCollator()
Destructor.
Definition: OFstreamCollator.C:324