masterOFstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017 OpenFOAM Foundation
9  Copyright (C) 2020 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "masterOFstream.H"
30 #include "OFstream.H"
31 #include "OSspecific.H"
32 #include "PstreamBuffers.H"
34 #include "boolList.H"
35 #include <algorithm>
36 
37 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
38 
39 void Foam::masterOFstream::checkWrite
40 (
41  const fileName& fName,
42  const char* str,
43  std::streamsize len
44 )
45 {
46  if (!len)
47  {
48  // Can probably skip all of this if there is nothing to write
49  return;
50  }
51 
52  mkDir(fName.path());
53 
54  OFstream os
55  (
56  fName,
57  IOstream::BINARY, //format(),
58  version(),
59  compression_,
60  append_
61  );
62  if (!os.good())
63  {
65  << "Could not open file " << fName << nl
66  << exit(FatalIOError);
67  }
68 
69  // Use writeRaw() instead of writeQuoted(string,false) to output
70  // characters directly.
71 
72  os.writeRaw(str, len);
73 
74  if (!os.good())
75  {
77  << "Failed writing to " << fName << nl
78  << exit(FatalIOError);
79  }
80 }
81 
82 
83 void Foam::masterOFstream::checkWrite
84 (
85  const fileName& fName,
86  const std::string& s
87 )
88 {
89  checkWrite(fName, &s[0], s.length());
90 }
91 
92 
93 void Foam::masterOFstream::commit()
94 {
95  if (Pstream::parRun())
96  {
97  List<fileName> filePaths(Pstream::nProcs());
98  filePaths[Pstream::myProcNo()] = pathName_;
99  Pstream::gatherList(filePaths);
100 
101  bool uniform =
103  (
104  filePaths
105  );
106 
107  Pstream::scatter(uniform);
108 
109  if (uniform)
110  {
111  if (Pstream::master() && valid_)
112  {
113  checkWrite(pathName_, this->str());
114  }
115 
116  this->reset();
117  return;
118  }
119 
120  boolList valid(Pstream::nProcs());
121  valid[Pstream::myProcNo()] = valid_;
122  Pstream::gatherList(valid);
123 
124 
125  // Different files
126  PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
127 
128  // Send my buffer to master
129  if (!Pstream::master())
130  {
131  UOPstream os(Pstream::masterNo(), pBufs);
132  string s(this->str());
133  this->reset();
134 
135  os.write(&s[0], s.length());
136  }
137 
138  labelList recvSizes;
139  pBufs.finishedSends(recvSizes);
140 
141  if (Pstream::master())
142  {
143  // Write master data
144  if (valid[Pstream::masterNo()])
145  {
146  checkWrite(filePaths[Pstream::masterNo()], this->str());
147  }
148  this->reset();
149 
150  // Find the max slave size
151  recvSizes[Pstream::masterNo()] = 0;
152  List<char> buf
153  (
154  *std::max_element(recvSizes.cbegin(), recvSizes.cend())
155  );
156 
157  for (const int proci : Pstream::subProcs())
158  {
159  UIPstream is(proci, pBufs);
160 
161  const std::streamsize count(recvSizes[proci]);
162  is.read(buf.data(), count);
163 
164  if (valid[proci])
165  {
166  checkWrite(filePaths[proci], buf.cdata(), count);
167  }
168  }
169  }
170  }
171  else
172  {
173  checkWrite(pathName_, this->str());
174  this->reset();
175  }
176 
177  // This method is only called once (internally)
178  // so no need to clear/flush old buffered data
179 }
180 
181 
182 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
183 
185 (
186  const fileName& pathName,
187  IOstreamOption streamOpt,
188  const bool append,
189  const bool valid
190 )
191 :
192  OStringStream(streamOpt.format(), streamOpt.version()),
193  pathName_(pathName),
194  compression_(streamOpt.compression()),
195  append_(append),
196  valid_(valid)
197 {}
198 
199 
200 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
201 
203 {
204  commit();
205 }
206 
207 
208 // ************************************************************************* //
Foam::labelList
List< label > labelList
A List of labels.
Definition: List.H:71
OSspecific.H
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
Foam::UPstream::commsTypes::nonBlocking
Foam::UPstream::masterNo
static constexpr int masterNo() noexcept
Process index of the master (always 0)
Definition: UPstream.H:452
boolList.H
Foam::fileName
A class for handling file names.
Definition: fileName.H:69
s
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Definition: gmvOutputSpray.H:25
Foam::fileOperations::masterUncollatedFileOperation::uniformFile
static bool uniformFile(const fileNameList &)
Same file?
Definition: masterUncollatedFileOperation.C:479
masterOFstream.H
Foam::UPstream::parRun
static bool & parRun()
Test if this a parallel run, or allow modify access.
Definition: UPstream.H:434
Foam::IOstreamOption::format
streamFormat format() const noexcept
Get the current stream format.
Definition: IOstreamOption.H:289
Foam::UPstream::master
static bool master(const label communicator=worldComm)
Am I the master process.
Definition: UPstream.H:458
Foam::Pstream::scatter
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
Definition: gatherScatter.C:150
Foam::boolList
List< bool > boolList
A List of bools.
Definition: List.H:69
Foam::FatalIOError
IOerror FatalIOError
Foam::masterOFstream::~masterOFstream
~masterOFstream()
Destructor - commits buffered information to file.
Definition: masterOFstream.C:202
append
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
OFstream.H
Foam::UPstream::subProcs
static rangeType subProcs(const label communicator=worldComm)
Range of process indices for sub-processes.
Definition: UPstream.H:516
Foam::IOstreamOption
The IOstreamOption is a simple container for options an IOstream can normally have.
Definition: IOstreamOption.H:63
Foam::masterOFstream::masterOFstream
masterOFstream(const fileName &pathname, IOstreamOption streamOpt=IOstreamOption(), const bool append=false, const bool valid=true)
Construct from pathname and set stream status.
Definition: masterOFstream.C:185
Foam::IOstreamOption::version
versionNumber version() const noexcept
Get the stream version.
Definition: IOstreamOption.H:341
Foam::exit
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
Foam::Detail::StringStreamAllocator< std::ostringstream >::str
Foam::string str() const
Get the string - as Foam::string rather than std::string.
Definition: StringStream.H:91
Foam::IOstreamOption::BINARY
"binary"
Definition: IOstreamOption.H:73
masterUncollatedFileOperation.H
Foam::Pstream::gatherList
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
Definition: gatherScatterList.C:52
Foam::UPstream::myProcNo
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:464
Foam::nl
constexpr char nl
Definition: Ostream.H:385
Foam::BitOps::count
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
Definition: BitOps.H:77
Foam::OStringStream
Output to string buffer, using a OSstream.
Definition: StringStream.H:196
PstreamBuffers.H
FatalIOErrorInFunction
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:401
Foam::IOstreamOption::compression
compressionType compression() const noexcept
Get the stream compression.
Definition: IOstreamOption.H:315
Foam::mkDir
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: MSwindows.C:507
Foam::UPstream::nProcs
static label nProcs(const label communicator=worldComm)
Number of processes in parallel run, and 1 for serial run.
Definition: UPstream.H:446