UIPstreamBase.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2015 OpenFOAM Foundation
9 Copyright (C) 2017-2021 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "error.H"
30#include "UIPstream.H"
31#include "int.H"
32#include "token.H"
33#include <cctype>
34
35// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
36
37namespace Foam
38{
39
40// Convert a single character to a word with length 1
41inline static Foam::word charToWord(char c)
42{
43 return Foam::word(std::string(1, c), false);
44}
45
46
47// Adjust stream format based on the flagMask
48inline static void processFlags(Istream& is, int flagMask)
49{
50 if ((flagMask & token::ASCII))
51 {
53 }
54 else if ((flagMask & token::BINARY))
55 {
57 }
58}
59
60
61// Return the position with word boundary alignment
62inline static label byteAlign(const label pos, const size_t align)
63{
64 return
65 (
66 (align > 1)
67 ? (align + ((pos - 1) & ~(align - 1)))
68 : pos
69 );
70}
71
72} // End namespace Foam
73
74
75// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
76
77inline void Foam::UIPstreamBase::checkEof()
78{
80 {
81 setEof();
82 }
83}
84
85
86inline void Foam::UIPstreamBase::prepareBuffer(const size_t align)
87{
88 recvBufPos_ = byteAlign(recvBufPos_, align);
89}
90
91
92template<class T>
93inline void Foam::UIPstreamBase::readFromBuffer(T& val)
94{
95 prepareBuffer(sizeof(T));
96
97 val = reinterpret_cast<T&>(recvBuf_[recvBufPos_]);
98 recvBufPos_ += sizeof(T);
99 checkEof();
100}
101
102
103inline void Foam::UIPstreamBase::readFromBuffer
104(
105 void* data,
106 const size_t count
107)
108{
109 const char* const __restrict__ buf = &recvBuf_[recvBufPos_];
110 char* const __restrict__ output = reinterpret_cast<char*>(data);
111
112 for (size_t i = 0; i < count; ++i)
113 {
114 output[i] = buf[i];
115 }
116
117 recvBufPos_ += count;
118 checkEof();
119}
120
121
122inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str)
123{
124 // Use std::string::assign() to copy content, including '\0'.
125 // Stripping (when desired) is the responsibility of the sending side.
126
127 size_t len;
128 readFromBuffer(len);
129
130 if (len)
131 {
132 str.assign(&recvBuf_[recvBufPos_], len);
133 recvBufPos_ += len;
134 checkEof();
135 }
136 else
137 {
138 str.clear();
139 }
140
141 return *this;
142}
143
145// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
146
148(
149 const commsTypes commsType,
150 const int fromProcNo,
151 DynamicList<char>& receiveBuf,
152 label& receiveBufPosition,
153 const int tag,
154 const label comm,
155 const bool clearAtEnd,
157)
158:
159 UPstream(commsType),
160 Istream(fmt, IOstreamOption::currentVersion),
161 fromProcNo_(fromProcNo),
162 recvBuf_(receiveBuf),
163 recvBufPos_(receiveBufPosition),
164 tag_(tag),
165 comm_(comm),
166 clearAtEnd_(clearAtEnd),
167 messageSize_(0)
168{
169 setOpened();
170 setGood();
171}
172
173
175(
176 const int fromProcNo,
177 PstreamBuffers& buffers
178)
179:
180 UPstream(buffers.commsType()),
181 Istream(buffers.format(), IOstreamOption::currentVersion),
182 fromProcNo_(fromProcNo),
183 recvBuf_(buffers.recvBuf_[fromProcNo]),
184 recvBufPos_(buffers.recvBufPos_[fromProcNo]),
185 tag_(buffers.tag()),
186 comm_(buffers.comm()),
187 clearAtEnd_(buffers.allowClearRecv()),
188 messageSize_(0)
189{
190 if
191 (
193 && !buffers.finished()
194 )
195 {
197 << "PstreamBuffers::finishedSends() never called." << endl
198 << "Please call PstreamBuffers::finishedSends() after doing"
199 << " all your sends (using UOPstream) and before doing any"
200 << " receives (using UIPstream)" << Foam::exit(FatalError);
201 }
202
203 setOpened();
204 setGood();
205}
206
207
208// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
209
211{
212 if (clearAtEnd_ && eof())
213 {
214 if (debug)
215 {
216 Pout<< "UIPstreamBase Destructor : tag:" << tag_
217 << " fromProcNo:" << fromProcNo_
218 << " clearing receive buffer of size "
219 << recvBuf_.size()
220 << " messageSize_:" << messageSize_ << endl;
221 }
222 recvBuf_.clearStorage();
223 }
224}
225
226
227// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
228
230{
231 // Return the put back token if it exists
232 // - with additional handling for special stream flags
233 if (Istream::getBack(t))
234 {
235 if (t.isFlag())
236 {
237 processFlags(*this, t.flagToken());
238 }
239 else
240 {
241 return *this;
242 }
243 }
244
245 // Read character, return on error
246 // - with additional handling for special stream flags
247
248 char c;
249 do
250 {
251 if (!read(c))
252 {
253 t.setBad(); // Error
254 return *this;
255 }
256
257 if (c == token::FLAG)
258 {
259 char flagVal;
260
261 if (read(flagVal))
262 {
263 processFlags(*this, flagVal);
264 }
265 else
266 {
267 t.setBad(); // Error
268 return *this;
269 }
270 }
271 }
272 while (c == token::FLAG);
273
274
275 // Set the line number of this token to the current stream line number
276 t.lineNumber(this->lineNumber());
277
278 // Analyse input starting with this character.
279 switch (c)
280 {
281 // Punctuation
283 case token::BEGIN_LIST :
284 case token::END_LIST :
285 case token::BEGIN_SQR :
286 case token::END_SQR :
287 case token::BEGIN_BLOCK :
288 case token::END_BLOCK :
289 case token::COLON :
290 case token::COMMA :
291 case token::ASSIGN :
292 case token::PLUS :
293 case token::MINUS :
294 case token::MULTIPLY :
295 case token::DIVIDE :
296 {
298 return *this;
299 }
300
301 // The word-variants
304 {
305 word val;
306 if (readString(val))
307 {
309 {
310 t = token::compound::New(val, *this).ptr();
311 }
312 else
313 {
314 t = std::move(val);
316 }
317 }
318 else
319 {
320 t.setBad();
321 }
322 return *this;
323 }
324
325 // The string-variants
330 {
331 string val;
332 if (readString(val))
333 {
334 t = std::move(val);
336 }
337 else
338 {
339 t.setBad();
340 }
341 return *this;
342 }
343
344 // Label
346 {
347 label val;
348 if (read(val))
349 {
350 t = val;
351 }
352 else
353 {
354 t.setBad();
355 }
356 return *this;
357 }
358
359 // Float
361 {
362 floatScalar val;
363 if (read(val))
364 {
365 t = val;
366 }
367 else
368 {
369 t.setBad();
370 }
371 return *this;
372 }
373
374 // Double
376 {
377 doubleScalar val;
378 if (read(val))
379 {
380 t = val;
381 }
382 else
383 {
384 t.setBad();
385 }
386 return *this;
387 }
388
389 // Character (returned as a single character word) or error
390 default:
391 {
392 if (isalpha(c))
393 {
394 t = charToWord(c);
395 return *this;
396 }
397
398 setBad();
399 t.setBad();
400
401 return *this;
402 }
403 }
404}
405
406
408{
409 c = recvBuf_[recvBufPos_];
410 ++recvBufPos_;
411 checkEof();
412 return *this;
413}
414
415
417{
418 return readString(str);
419}
420
421
423{
424 return readString(str);
425}
426
427
429{
430 readFromBuffer(val);
431 return *this;
432}
433
434
436{
437 readFromBuffer(val);
438 return *this;
439}
440
441
443{
444 readFromBuffer(val);
445 return *this;
446}
447
448
449Foam::Istream& Foam::UIPstreamBase::read(char* data, std::streamsize count)
450{
451 if (count)
452 {
453 // For count == 0, a no-op
454 // - see UOPstream::write(const char*, streamsize)
455 beginRawRead();
456 readRaw(data, count);
457 endRawRead();
458 }
459
460 return *this;
461}
462
463
464Foam::Istream& Foam::UIPstreamBase::readRaw(char* data, std::streamsize count)
465{
466 // No check for format() == BINARY since this is either done in the
467 // beginRawRead() method, or the caller knows what they are doing.
468
469 // Any alignment must have been done prior to this call
470 readFromBuffer(data, count);
471 return *this;
472}
473
474
476{
477 if (format() != BINARY)
478 {
480 << "stream format not binary"
482 }
483
484 // Align on word boundary (64-bit)
485 // - as per read(const char*, streamsize)
486 // The check for zero-size will have been done by the caller
487 prepareBuffer(8);
488
489 return true;
490}
491
492
493// Not needed yet
502
503
505{
506 recvBufPos_ = 0; // Assume the entire buffer is for us to read from
507 setOpened();
508 setGood();
509 if (recvBuf_.empty() || !messageSize_)
510 {
511 setEof();
512 }
513}
514
515
517{
518 os << "Reading from processor " << fromProcNo_
519 << " using communicator " << comm_
520 << " and tag " << tag_
521 << Foam::endl;
522}
523
524
525// ************************************************************************* //
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition: DynamicList.H:72
The IOstreamOption is a simple container for options an IOstream can normally have.
streamFormat format() const noexcept
Get the current stream format.
streamFormat
Data format (ascii | binary)
void setEof() noexcept
Set stream state as reached 'eof'.
Definition: IOstream.H:357
void setGood() noexcept
Set stream state to be good.
Definition: IOstream.H:147
void setOpened() noexcept
Set stream opened.
Definition: IOstream.H:129
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition: Istream.H:64
bool getBack(token &tok)
Get the put-back token if there is one.
Definition: Istream.C:92
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition: Ostream.H:62
Buffers for inter-processor communications streams (UOPstream, UIPstream).
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
virtual bool read()
Re-read model coefficients if they have changed.
static autoPtr< Time > New()
Construct (dummy) Time - no functionObjects or libraries.
Definition: Time.C:717
Base class for input inter-processor communications stream (ie, parallel streams)....
Definition: UIPstream.H:62
virtual void rewind()
Rewind the receive stream position so that it may be read again.
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
bool beginRawRead()
Start of low-level raw binary read.
Istream & readRaw(char *data, std::streamsize count)
Low-level raw binary read.
label & recvBufPos_
Definition: UIPstream.H:91
Inter-processor communications stream.
Definition: UPstream.H:59
commsTypes
Types of communications.
Definition: UPstream.H:67
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:562
Database for solution data, solver performance and other reduced data.
Definition: data.H:58
scalar print()
Print to screen.
A token holds an item read from Istream.
Definition: token.H:69
tokenType
Enumeration defining the types of token.
Definition: token.H:77
@ LABEL
label (integer) type
Definition: token.H:85
@ DOUBLE
double (double-precision) type
Definition: token.H:87
@ VARIABLE
Definition: token.H:98
@ FLAG
stream flag (1-byte bitmask)
Definition: token.H:82
@ WORD
Foam::word.
Definition: token.H:90
@ EXPRESSION
Definition: token.H:96
@ FLOAT
float (single-precision) type
Definition: token.H:86
@ VERBATIM
Definition: token.H:100
@ DIRECTIVE
Definition: token.H:94
@ STRING
Foam::string (usually double-quoted)
Definition: token.H:91
label lineNumber() const noexcept
The line number for the token.
Definition: tokenI.H:391
punctuationToken
Standard punctuation tokens (a character)
Definition: token.H:121
@ DIVIDE
Divide [isseparator].
Definition: token.H:141
@ BEGIN_BLOCK
Begin block [isseparator].
Definition: token.H:159
@ BEGIN_SQR
Begin dimensions [isseparator].
Definition: token.H:157
@ COLON
Colon [isseparator].
Definition: token.H:127
@ END_BLOCK
End block [isseparator].
Definition: token.H:160
@ ASSIGN
Assignment/equals [isseparator].
Definition: token.H:137
@ END_STATEMENT
End entry [isseparator].
Definition: token.H:154
@ BEGIN_LIST
Begin list [isseparator].
Definition: token.H:155
@ PLUS
Addition [isseparator].
Definition: token.H:138
@ END_LIST
End list [isseparator].
Definition: token.H:156
@ END_SQR
End dimensions [isseparator].
Definition: token.H:158
@ MULTIPLY
Multiply [isseparator].
Definition: token.H:140
@ MINUS
Subtract or start of negative number.
Definition: token.H:139
@ COMMA
Comma [isseparator].
Definition: token.H:129
void setBad()
Clear token and set to be ERROR.
Definition: tokenI.H:734
@ BINARY
BINARY-mode stream.
Definition: token.H:115
@ ASCII
ASCII-mode stream.
Definition: token.H:114
bool isFlag() const noexcept
Token is FLAG.
Definition: tokenI.H:441
bool setType(const tokenType tokType) noexcept
Change the token type, for similar types.
Definition: tokenI.H:317
int flagToken() const
Return flag bitmask value.
Definition: tokenI.H:447
bool isCompound() const noexcept
Token is COMPOUND.
Definition: tokenI.H:716
A class for handling words, derived from Foam::string.
Definition: word.H:68
const volScalarField & T
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:453
OBJstream os(runTime.globalPath()/outputName)
System signed integer.
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
Definition: BitOps.H:78
Namespace for OpenFOAM.
dimensionedScalar pos(const dimensionedScalar &ds)
static Foam::word charToWord(char c)
Definition: UIPstreamBase.C:41
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:108
static void processFlags(Istream &is, int flagMask)
Definition: UIPstreamBase.C:48
double doubleScalar
A typedef for double.
Definition: scalarFwd.H:48
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:372
static label byteAlign(const label pos, const size_t align)
Definition: UIPstreamBase.C:62
float floatScalar
A typedef for float.
Definition: scalarFwd.H:45
errorManip< error > abort(error &err)
Definition: errorManip.H:144
static Ostream & output(Ostream &os, const IntRange< T > &range)
Definition: IntRanges.C:66
error FatalError
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:130
word format(conversionProperties.get< word >("format"))