OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2019-2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "OFstreamCollator.H"
30 #include "OFstream.H"
31 #include "decomposedBlockData.H"
33 
34 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
35 
36 namespace Foam
37 {
38  defineTypeNameAndDebug(OFstreamCollator, 0);
39 }
40 
41 
42 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
43 
44 bool Foam::OFstreamCollator::writeFile
45 (
46  const label comm,
47  const word& typeName,
48  const fileName& fName,
49  const string& masterData,
50  const labelUList& recvSizes,
51  const PtrList<SubList<char>>& slaveData, // optional slave data
52  IOstream::streamFormat fmt,
53  IOstream::versionNumber ver,
54  IOstream::compressionType cmp,
55  const bool append
56 )
57 {
58  if (debug)
59  {
60  Pout<< "OFstreamCollator : Writing master " << masterData.size()
61  << " bytes to " << fName
62  << " using comm " << comm << endl;
63  if (slaveData.size())
64  {
65  Pout<< "OFstreamCollator : Slave data" << endl;
66  forAll(slaveData, proci)
67  {
68  if (slaveData.set(proci))
69  {
70  Pout<< " " << proci
71  << " size:" << slaveData[proci].size()
72  << endl;
73  }
74  }
75  }
76  }
77 
78  autoPtr<OSstream> osPtr;
79  if (UPstream::master(comm))
80  {
81  Foam::mkDir(fName.path());
82  osPtr.reset
83  (
84  new OFstream
85  (
86  fName,
87  fmt,
88  ver,
89  cmp,
90  append
91  )
92  );
93 
94  // We don't have IOobject so cannot use IOobject::writeHeader
95  if (!append)
96  {
97  OSstream& os = osPtr();
99  (
100  os,
101  ver,
102  fmt,
103  typeName,
104  "",
105  fName,
106  fName.name()
107  );
108  }
109  }
110 
111 
112  UList<char> slice
113  (
114  const_cast<char*>(masterData.data()),
115  label(masterData.size())
116  );
117 
118  // Assuming threaded writing hides any slowness so we
119  // can use scheduled communication to send the data to
120  // the master processor in order. However can be unstable
121  // for some mpi so default is non-blocking.
122 
123  List<std::streamoff> start;
125  (
126  comm,
127  osPtr,
128  start,
129  slice,
130  recvSizes,
131  slaveData,
132  (
133  fileOperations::masterUncollatedFileOperation::
134  maxMasterFileBufferSize == 0
137  ),
138  false // do not reduce return state
139  );
140 
141  if (osPtr && !osPtr->good())
142  {
143  FatalIOErrorInFunction(*osPtr)
144  << "Failed writing to " << fName << exit(FatalIOError);
145  }
146 
147  if (debug)
148  {
149  Pout<< "OFstreamCollator : Finished writing " << masterData.size()
150  << " bytes";
151  if (UPstream::master(comm))
152  {
153  off_t sum = 0;
154  for (const label recv : recvSizes)
155  {
156  sum += recv;
157  }
158  // Use ostringstream to display long int (until writing these is
159  // supported)
160  std::ostringstream os;
161  os << sum;
162  Pout<< " (overall " << os.str() << ")";
163  }
164  Pout<< " to " << fName
165  << " using comm " << comm << endl;
166  }
167 
168  return true;
169 }
170 
171 
172 void* Foam::OFstreamCollator::writeAll(void *threadarg)
173 {
174  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
175 
176  // Consume stack
177  while (true)
178  {
179  writeData* ptr = nullptr;
180 
181  {
182  std::lock_guard<std::mutex> guard(handler.mutex_);
183  if (handler.objects_.size())
184  {
185  ptr = handler.objects_.pop();
186  }
187  }
188 
189  if (!ptr)
190  {
191  break;
192  }
193  else
194  {
195  // Convert storage to pointers
196  PtrList<SubList<char>> slaveData;
197  if (ptr->slaveData_.size())
198  {
199  slaveData.setSize(ptr->slaveData_.size());
200  forAll(slaveData, proci)
201  {
202  if (ptr->slaveData_.set(proci))
203  {
204  slaveData.set
205  (
206  proci,
207  new SubList<char>
208  (
209  ptr->slaveData_[proci],
210  ptr->sizes_[proci]
211  )
212  );
213  }
214  }
215  }
216 
217  bool ok = writeFile
218  (
219  ptr->comm_,
220  ptr->typeName_,
221  ptr->pathName_,
222  ptr->data_,
223  ptr->sizes_,
224  slaveData,
225  ptr->format_,
226  ptr->version_,
227  ptr->compression_,
228  ptr->append_
229  );
230  if (!ok)
231  {
232  FatalIOErrorInFunction(ptr->pathName_)
233  << "Failed writing " << ptr->pathName_
234  << exit(FatalIOError);
235  }
236 
237  delete ptr;
238  }
239  //sleep(1);
240  }
241 
242  if (debug)
243  {
244  Pout<< "OFstreamCollator : Exiting write thread " << endl;
245  }
246 
247  {
248  std::lock_guard<std::mutex> guard(handler.mutex_);
249  handler.threadRunning_ = false;
250  }
251 
252  return nullptr;
253 }
254 
255 
256 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
257 {
258  while (true)
259  {
260  // Count files to be written
261  off_t totalSize = 0;
262 
263  {
264  std::lock_guard<std::mutex> guard(mutex_);
265  forAllConstIters(objects_, iter)
266  {
267  totalSize += iter()->size();
268  }
269  }
270 
271  if
272  (
273  totalSize == 0
274  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
275  )
276  {
277  break;
278  }
279 
280  if (debug)
281  {
282  std::lock_guard<std::mutex> guard(mutex_);
283  Pout<< "OFstreamCollator : Waiting for buffer space."
284  << " Currently in use:" << totalSize
285  << " limit:" << maxBufferSize_
286  << " files:" << objects_.size()
287  << endl;
288  }
289 
290  sleep(5);
291  }
292 }
293 
294 
295 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
296 
298 :
299  maxBufferSize_(maxBufferSize),
300  threadRunning_(false),
301  localComm_(UPstream::worldComm),
302  threadComm_
303  (
304  UPstream::allocateCommunicator
305  (
306  localComm_,
307  identity(UPstream::nProcs(localComm_))
308  )
309  )
310 {}
311 
312 
314 (
315  const off_t maxBufferSize,
316  const label comm
317 )
318 :
319  maxBufferSize_(maxBufferSize),
320  threadRunning_(false),
321  localComm_(comm),
322  threadComm_
323  (
325  (
326  localComm_,
327  identity(UPstream::nProcs(localComm_))
328  )
329  )
330 {}
331 
332 
333 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
334 
336 {
337  if (thread_)
338  {
339  if (debug)
340  {
341  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
342  }
343  thread_->join();
344  thread_.clear();
345  }
346 
347  if (threadComm_ != -1)
348  {
349  UPstream::freeCommunicator(threadComm_);
350  }
351 }
352 
353 
354 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
355 
357 (
358  const word& typeName,
359  const fileName& fName,
360  const string& data,
364  const bool append,
365  const bool useThread
366 )
367 {
368  // Determine (on master) sizes to receive. Note: do NOT use thread
369  // communicator
370  labelList recvSizes;
371  decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
372 
373  off_t totalSize = 0;
374  label maxLocalSize = 0;
375  {
376  for (label proci = 0; proci < recvSizes.size(); proci++)
377  {
378  totalSize += recvSizes[proci];
379  maxLocalSize = max(maxLocalSize, recvSizes[proci]);
380  }
381  Pstream::scatter(totalSize, Pstream::msgType(), localComm_);
382  Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_);
383  }
384 
385  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
386  {
387  if (debug)
388  {
389  Pout<< "OFstreamCollator : non-thread gather and write of " << fName
390  << " using local comm " << localComm_ << endl;
391  }
392  // Direct collating and writing (so master blocks until all written!)
393  const PtrList<SubList<char>> dummySlaveData;
394  return writeFile
395  (
396  localComm_,
397  typeName,
398  fName,
399  data,
400  recvSizes,
401  dummySlaveData,
402  fmt,
403  ver,
404  cmp,
405  append
406  );
407  }
408  else if (totalSize <= maxBufferSize_)
409  {
410  // Total size can be stored locally so receive all data now and only
411  // do the writing in the thread
412 
413  if (debug)
414  {
415  Pout<< "OFstreamCollator : non-thread gather; thread write of "
416  << fName << endl;
417  }
418 
419  if (Pstream::master(localComm_))
420  {
421  waitForBufferSpace(totalSize);
422  }
423 
424 
425  // Receive in chunks of labelMax (2^31-1) since this is the maximum
426  // size that a List can be
427 
428  autoPtr<writeData> fileAndDataPtr
429  (
430  new writeData
431  (
432  threadComm_, // Note: comm not actually used anymore
433  typeName,
434  fName,
435  (
436  Pstream::master(localComm_)
437  ? data // Only used on master
438  : string::null
439  ),
440  recvSizes,
441  fmt,
442  ver,
443  cmp,
444  append
445  )
446  );
447  writeData& fileAndData = fileAndDataPtr();
448 
449  PtrList<List<char>>& slaveData = fileAndData.slaveData_;
450 
451  UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
452 
453  slaveData.setSize(recvSizes.size());
454 
455  // Gather all data onto master. Is done in local communicator since
456  // not in write thread. Note that we do not store in contiguous
457  // buffer since that would limit to 2G chars.
458  const label startOfRequests = Pstream::nRequests();
459  if (Pstream::master(localComm_))
460  {
461  for (label proci = 1; proci < slaveData.size(); proci++)
462  {
463  slaveData.set(proci, new List<char>(recvSizes[proci]));
465  (
467  proci,
468  reinterpret_cast<char*>(slaveData[proci].data()),
469  slaveData[proci].byteSize(),
471  localComm_
472  );
473  }
474  }
475  else
476  {
477  if
478  (
480  (
482  0,
483  reinterpret_cast<const char*>(slice.cdata()),
484  slice.byteSize(),
486  localComm_
487  )
488  )
489  {
491  << "Cannot send outgoing message. "
492  << "to:" << 0 << " nBytes:"
493  << label(slice.byteSize())
495  }
496  }
497  Pstream::waitRequests(startOfRequests);
498 
499  {
500  std::lock_guard<std::mutex> guard(mutex_);
501 
502  // Append to thread buffer
503  objects_.push(fileAndDataPtr.ptr());
504 
505  // Start thread if not running
506  if (!threadRunning_)
507  {
508  if (thread_)
509  {
510  if (debug)
511  {
512  Pout<< "OFstreamCollator : Waiting for write thread"
513  << endl;
514  }
515  thread_->join();
516  }
517 
518  if (debug)
519  {
520  Pout<< "OFstreamCollator : Starting write thread"
521  << endl;
522  }
523  thread_.reset(new std::thread(writeAll, this));
524  threadRunning_ = true;
525  }
526  }
527 
528  return true;
529  }
530  else
531  {
532  if (debug)
533  {
534  Pout<< "OFstreamCollator : thread gather and write of " << fName
535  << " using communicator " << threadComm_ << endl;
536  }
537 
538  if (!UPstream::haveThreads())
539  {
541  << "mpi does not seem to have thread support."
542  << " Make sure to set buffer size 'maxThreadFileBufferSize'"
543  << " to at least " << totalSize
544  << " to be able to do the collating before threading."
545  << exit(FatalError);
546  }
547 
548  if (Pstream::master(localComm_))
549  {
550  waitForBufferSpace(data.size());
551  }
552 
553  {
554  std::lock_guard<std::mutex> guard(mutex_);
555 
556  // Push all file info on buffer. Note that no slave data provided
557  // so it will trigger communication inside the thread
558  objects_.push
559  (
560  new writeData
561  (
562  threadComm_,
563  typeName,
564  fName,
565  data,
566  recvSizes,
567  fmt,
568  ver,
569  cmp,
570  append
571  )
572  );
573 
574  if (!threadRunning_)
575  {
576  if (thread_)
577  {
578  if (debug)
579  {
580  Pout<< "OFstreamCollator : Waiting for write thread"
581  << endl;
582  }
583  thread_->join();
584  }
585 
586  if (debug)
587  {
588  Pout<< "OFstreamCollator : Starting write thread" << endl;
589  }
590  thread_.reset(new std::thread(writeAll, this));
591  threadRunning_ = true;
592  }
593  }
594 
595  return true;
596  }
597 }
598 
599 
601 {
602  // Wait for all buffer space to be available i.e. wait for all jobs
603  // to finish
604  if (Pstream::master(localComm_))
605  {
606  if (debug)
607  {
608  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
609  << endl;
610  }
611  waitForBufferSpace(-1);
612  }
613 }
614 
615 
616 // ************************************************************************* //
Foam::expressions::patchExpr::debug
int debug
Static debugging option.
Foam::UPstream::commsTypes::nonBlocking
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:62
Foam::fileName
A class for handling file names.
Definition: fileName.H:69
Foam::UPstream::haveThreads
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:440
Foam::UPstream::waitRequests
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:234
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:458
Foam::Pstream::scatter
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
Definition: gatherScatter.C:150
Foam::FatalIOError
IOerror FatalIOError
Foam::OFstreamCollator::OFstreamCollator
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
Definition: OFstreamCollator.C:297
Foam::decomposedBlockData::writeBlocks
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
Definition: decomposedBlockData.C:736
Foam::endl
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:350
Foam::PtrList::set
const T * set(const label i) const
Return const pointer to element (can be nullptr),.
Definition: PtrList.H:138
append
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
Foam::Pout
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Foam::OFstreamCollator::write
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
Definition: OFstreamCollator.C:357
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
OFstream.H
Foam::UIPstream::read
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:81
Foam::UOPstream::write
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
Definition: UOPwrite.C:36
Foam::UPstream::allocateCommunicator
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:100
Foam::UPstream
Inter-processor communications stream.
Definition: UPstream.H:61
Foam::IOstreamOption::versionNumber
Representation of a major/minor version number.
Definition: IOstreamOption.H:85
Foam::decomposedBlockData::writeHeader
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string &note, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
Definition: decomposedBlockData.C:187
Foam::decomposedBlockData::gather
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
Definition: decomposedBlockData.C:597
Foam::PtrList::setSize
void setSize(const label newLen)
Same as resize()
Definition: PtrListI.H:105
Foam::PtrList
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
Definition: List.H:62
Foam::UPstream::freeCommunicator
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:166
Foam::autoPtr::ptr
T * ptr() noexcept
Same as release().
Definition: autoPtr.H:177
Foam::max
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:47
Foam::UPstream::commsTypes::scheduled
Foam::writeData
static void writeData(Ostream &os, const Type &val)
Definition: rawSurfaceWriterImpl.C:39
Foam::IOstreamOption::streamFormat
streamFormat
Data format (ascii | binary)
Definition: IOstreamOption.H:70
Foam::FatalError
error FatalError
Foam::string::null
static const string null
An empty string.
Definition: string.H:147
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
Foam::abort
errorManip< error > abort(error &err)
Definition: errorManip.H:144
Foam::OFstreamCollator::waitAll
void waitAll()
Wait for all thread actions to have finished.
Definition: OFstreamCollator.C:600
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::autoPtr
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
Definition: HashPtrTable.H:53
masterUncollatedFileOperation.H
Foam::UPstream::msgType
static int & msgType()
Message tag of standard messages.
Definition: UPstream.H:541
FatalErrorInFunction
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:381
Foam::UPstream::nRequests
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:224
forAllConstIters
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
decomposedBlockData.H
Foam::List< label >
Foam::UList
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:103
Foam::identity
labelList identity(const label len, label start=0)
Create identity map of the given length with (map[i] == i)
Definition: labelList.C:38
Foam::sum
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
Definition: DimensionedFieldFunctions.C:327
Foam::UList::byteSize
std::streamsize byteSize() const
Definition: UList.C:191
Foam::UList::cdata
const T * cdata() const
Return a const pointer to the first data element.
Definition: UListI.H:198
Foam::IOstreamOption::compressionType
compressionType
Compression treatment (UNCOMPRESSED | COMPRESSED)
Definition: IOstreamOption.H:77
FatalIOErrorInFunction
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:401
Foam::labelUList
UList< label > labelUList
A UList of labels.
Definition: UList.H:80
Foam::mkDir
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: MSwindows.C:507
Foam::defineTypeNameAndDebug
defineTypeNameAndDebug(combustionModel, 0)
Foam::sleep
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition: MSwindows.C:1096
Foam::data
Database for solution data, solver performance and other reduced data.
Definition: data.H:55
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:446
OFstreamCollator.H
Foam::OFstreamCollator::~OFstreamCollator
virtual ~OFstreamCollator()
Destructor.
Definition: OFstreamCollator.C:335