OFstreamCollator.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2021 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::OFstreamCollator
29 
30 Description
31  Threaded file writer.
32 
33  Collects all data from all processors and writes as single
34  'decomposedBlockData' file. The operation is determined by the
35  buffer size (maxThreadFileBufferSize setting):
36  - local size of data is larger than buffer: receive and write processor
37  by processor (i.e. 'scheduled'). Does not use a thread, no file size
38  limit.
39  - total size of data is larger than buffer (but local is not):
40  thread does all the collecting and writing of the processors. No file
41  size limit.
42  - total size of data is less than buffer:
43  collecting is done locally; the thread only does the writing
44  (since the data has already been collected)
45 
46 SourceFiles
47  OFstreamCollator.C
48 
49 \*---------------------------------------------------------------------------*/
50 
51 #ifndef OFstreamCollator_H
52 #define OFstreamCollator_H
53 
54 #include <thread>
55 #include <mutex>
56 #include "IOstream.H"
57 #include "labelList.H"
58 #include "FIFOStack.H"
59 #include "SubList.H"
60 #include "dictionary.H"
61 
62 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
63 
64 namespace Foam
65 {
66 
67 /*---------------------------------------------------------------------------*\
68  Class OFstreamCollator Declaration
69 \*---------------------------------------------------------------------------*/
70 
71 class OFstreamCollator
72 {
73  // Private Class
74 
75  struct writeData
76  {
77  const label comm_;
78  const word objectType_;
79  const fileName pathName_;
80  const string data_;
81  const labelList sizes_;
82  PtrList<List<char>> slaveData_;
83  const IOstreamOption streamOpt_;
84  const bool append_;
85  const dictionary headerEntries_;
86 
87  writeData
88  (
89  const label comm,
90  const word& objectType,
91  const fileName& pathName,
92  const string& data,
93  const labelList& sizes,
94  IOstreamOption streamOpt,
95  const bool append,
96  const dictionary& headerEntries
97  )
98  :
99  comm_(comm),
100  objectType_(objectType),
101  pathName_(pathName),
102  data_(data),
103  sizes_(sizes),
104  slaveData_(),
105  streamOpt_(streamOpt),
106  append_(append),
107  headerEntries_(headerEntries)
108  {}
109 
110  //- The (approximate) size of master + any optional slave data
111  off_t size() const
112  {
113  off_t totalSize = data_.size();
114  forAll(slaveData_, i)
115  {
116  if (slaveData_.set(i))
117  {
118  totalSize += slaveData_[i].size();
119  }
120  }
121  return totalSize;
122  }
123  };
124 
125 
126  // Private Data
127 
128  //- Total amount of storage to use for object stack below
129  const off_t maxBufferSize_;
130 
131  mutable std::mutex mutex_;
132 
133  autoPtr<std::thread> thread_;
134 
135  //- Stack of files to write + contents
136  FIFOStack<writeData*> objects_;
137 
138  //- Whether thread is running (and not exited)
139  bool threadRunning_;
140 
141  //- Communicator to use for all parallel ops (in simulation thread)
142  label localComm_;
143 
144  //- Communicator to use for all parallel ops (in write thread)
145  label threadComm_;
146 
147 
148  // Private Member Functions
149 
150  //- Write actual file
151  static bool writeFile
152  (
153  const label comm,
154  const word& objectType,
155  const fileName& fName,
156  const string& masterData,
157  const labelUList& recvSizes,
158  const PtrList<SubList<char>>& slaveData,
159  IOstreamOption streamOpt,
160  const bool append,
161  const dictionary& headerEntries
162  );
163 
164  //- Write all files in stack
165  static void* writeAll(void *threadarg);
166 
167  //- Wait for total size of objects_ (master + optional slave data)
168  // to be wantedSize less than overall maxBufferSize.
169  void waitForBufferSpace(const off_t wantedSize) const;
170 
171 
172 public:
173 
174  // Declare name of the class and its debug switch
175  TypeName("OFstreamCollator");
176 
177 
178  // Constructors
179 
180  //- Construct from buffer size. 0 = do not use thread
181  explicit OFstreamCollator(const off_t maxBufferSize);
182 
183  //- Construct from buffer size (0 = do not use thread)
184  //- and specified communicator
185  OFstreamCollator(const off_t maxBufferSize, const label comm);
186 
187 
188  //- Destructor
189  virtual ~OFstreamCollator();
190 
191 
192  // Member Functions
193 
194  //- Write file with contents.
195  // Blocks until writethread has space available
196  // (total file sizes < maxBufferSize)
197  bool write
198  (
199  const word& objectType,
200  const fileName&,
201  const string& data,
202  IOstreamOption streamOpt,
203  const bool append,
204  const bool useThread = true,
205  const dictionary& headerEntries = dictionary::null
206  );
207 
208  //- Wait for all thread actions to have finished
209  void waitAll();
210 };
211 
212 
213 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
214 
215 } // End namespace Foam
216 
217 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
218 
219 #endif
220 
221 // ************************************************************************* //
FIFOStack.H
SubList.H
Foam::word
A class for handling words, derived from Foam::string.
Definition: word.H:65
Foam::fileName
A class for handling file names.
Definition: fileName.H:73
Foam::SubList
A List obtained as a section of another List.
Definition: SubList.H:54
Foam::OFstreamCollator::OFstreamCollator
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
Definition: OFstreamCollator.C:286
Foam::PtrList::set
const T * set(const label i) const
Return const pointer to element (can be nullptr),.
Definition: PtrList.H:138
append
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
forAll
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:296
Foam::dictionary::null
static const dictionary null
An empty dictionary, which is also the parent for all dictionaries.
Definition: dictionary.H:392
Foam::OFstreamCollator::TypeName
TypeName("OFstreamCollator")
labelList.H
IOstream.H
Foam::IOstreamOption
The IOstreamOption is a simple container for options an IOstream can normally have.
Definition: IOstreamOption.H:63
Foam::PtrList
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers....
Definition: List.H:59
Foam::writeData
static void writeData(Ostream &os, const Type &val)
Definition: rawSurfaceWriterImpl.C:45
Foam::dictionary
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition: dictionary.H:123
Foam
Namespace for OpenFOAM.
Definition: atmBoundaryLayer.C:33
Foam::OFstreamCollator::waitAll
void waitAll()
Wait for all thread actions to have finished.
Definition: OFstreamCollator.C:585
Foam::OFstreamCollator
Threaded file writer.
Definition: OFstreamCollator.H:70
Foam::autoPtr< std::thread >
Foam::List< label >
Foam::UList< label >
dictionary.H
Foam::OFstreamCollator::write
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, const bool append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
Definition: OFstreamCollator.C:346
Foam::FIFOStack< writeData * >
Foam::data
Database for solution data, solver performance and other reduced data.
Definition: data.H:55
Foam::OFstreamCollator::~OFstreamCollator
virtual ~OFstreamCollator()
Destructor.
Definition: OFstreamCollator.C:324