mirror of
https://github.com/panda3d/panda3d.git
synced 2025-09-29 08:15:18 -04:00
Give istream/ostream a friendlier file-like interface for Python
This commit is contained in:
parent
15cdd1da0a
commit
60922fabc1
317
dtool/src/dtoolutil/iostream_ext.cxx
Normal file
317
dtool/src/dtoolutil/iostream_ext.cxx
Normal file
@ -0,0 +1,317 @@
|
|||||||
|
/**
|
||||||
|
* PANDA 3D SOFTWARE
|
||||||
|
* Copyright (c) Carnegie Mellon University. All rights reserved.
|
||||||
|
*
|
||||||
|
* All use of this software is subject to the terms of the revised BSD
|
||||||
|
* license. You should have received a copy of this license along
|
||||||
|
* with this source code in a file named "LICENSE."
|
||||||
|
*
|
||||||
|
* @file iostream_ext.cxx
|
||||||
|
* @author rdb
|
||||||
|
* @date 2017-07-24
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "iostream_ext.h"
|
||||||
|
|
||||||
|
#ifdef HAVE_PYTHON
|
||||||
|
|
||||||
|
#ifndef CPPPARSER
|
||||||
|
extern struct Dtool_PyTypedObject Dtool_std_istream;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the given number of bytes from the stream, returned as bytes object.
|
||||||
|
* If the given size is -1, all bytes are read from the stream.
|
||||||
|
*/
|
||||||
|
PyObject *Extension<istream>::
|
||||||
|
read(int size) {
|
||||||
|
if (size < 0) {
|
||||||
|
return readall();
|
||||||
|
}
|
||||||
|
|
||||||
|
char *buffer;
|
||||||
|
std::streamsize read_bytes = 0;
|
||||||
|
|
||||||
|
if (size > 0) {
|
||||||
|
std::streambuf *buf = _this->rdbuf();
|
||||||
|
nassertr(buf != nullptr, nullptr);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
PyThreadState *_save;
|
||||||
|
Py_UNBLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
buffer = (char *)alloca((size_t)size);
|
||||||
|
read_bytes = buf->sgetn(buffer, (size_t)size);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
Py_BLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
#if PY_MAJOR_VERSION >= 3
|
||||||
|
return PyBytes_FromStringAndSize(buffer, read_bytes);
|
||||||
|
#else
|
||||||
|
return PyString_FromStringAndSize(buffer, read_bytes);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads from the underlying stream, but using at most one call. The number
|
||||||
|
* of returned bytes may therefore be less than what was requested, but it
|
||||||
|
* will always be greater than 0 until EOF is reached.
|
||||||
|
*/
|
||||||
|
PyObject *Extension<istream>::
|
||||||
|
read1(int size) {
|
||||||
|
std::streambuf *buf = _this->rdbuf();
|
||||||
|
nassertr(buf != nullptr, nullptr);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
PyThreadState *_save;
|
||||||
|
Py_UNBLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
std::streamsize avail = buf->in_avail();
|
||||||
|
if (avail == 0) {
|
||||||
|
avail = 4096;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (size >= 0 && (std::streamsize)size < avail) {
|
||||||
|
avail = (std::streamsize)size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't read more than 4K at a time
|
||||||
|
if (avail > 4096) {
|
||||||
|
avail = 4096;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *buffer = (char *)alloca(avail);
|
||||||
|
std::streamsize read_bytes = buf->sgetn(buffer, avail);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
Py_BLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if PY_MAJOR_VERSION >= 3
|
||||||
|
return PyBytes_FromStringAndSize(buffer, read_bytes);
|
||||||
|
#else
|
||||||
|
return PyString_FromStringAndSize(buffer, read_bytes);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads all of the bytes in the stream.
|
||||||
|
*/
|
||||||
|
PyObject *Extension<istream>::
|
||||||
|
readall() {
|
||||||
|
std::streambuf *buf = _this->rdbuf();
|
||||||
|
nassertr(buf != nullptr, nullptr);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
PyThreadState *_save;
|
||||||
|
Py_UNBLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
std::vector<unsigned char> result;
|
||||||
|
|
||||||
|
static const size_t buffer_size = 4096;
|
||||||
|
char buffer[buffer_size];
|
||||||
|
|
||||||
|
std::streamsize count = buf->sgetn(buffer, buffer_size);
|
||||||
|
while (count != 0) {
|
||||||
|
thread_consider_yield();
|
||||||
|
result.insert(result.end(), buffer, buffer + count);
|
||||||
|
count = buf->sgetn(buffer, buffer_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
Py_BLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if PY_MAJOR_VERSION >= 3
|
||||||
|
return PyBytes_FromStringAndSize((char *)result.data(), result.size());
|
||||||
|
#else
|
||||||
|
return PyString_FromStringAndSize((char *)result.data(), result.size());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads bytes into a preallocated, writable, bytes-like object, returning the
|
||||||
|
* number of bytes read.
|
||||||
|
*/
|
||||||
|
std::streamsize Extension<istream>::
|
||||||
|
readinto(PyObject *b) {
|
||||||
|
std::streambuf *buf = _this->rdbuf();
|
||||||
|
nassertr(buf != nullptr, 0);
|
||||||
|
|
||||||
|
Py_buffer view;
|
||||||
|
if (PyObject_GetBuffer(b, &view, PyBUF_CONTIG) == -1) {
|
||||||
|
PyErr_SetString(PyExc_TypeError,
|
||||||
|
"write() requires a contiguous, read-write bytes-like object");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
PyThreadState *_save;
|
||||||
|
Py_UNBLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
std::streamsize count = buf->sgetn((char *)view.buf, view.len);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
Py_BLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
PyBuffer_Release(&view);
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts one line up to and including the trailing newline character.
|
||||||
|
* Returns empty string when the end of file is reached.
|
||||||
|
*/
|
||||||
|
PyObject *Extension<istream>::
|
||||||
|
readline(int size) {
|
||||||
|
std::streambuf *buf = _this->rdbuf();
|
||||||
|
nassertr(buf != nullptr, nullptr);
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
PyThreadState *_save;
|
||||||
|
Py_UNBLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
std::string line;
|
||||||
|
int ch = buf->sbumpc();
|
||||||
|
while (ch != EOF && (--size) != 0) {
|
||||||
|
line.push_back(ch);
|
||||||
|
if (ch == '\n') {
|
||||||
|
// Here's the newline character.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ch = buf->sbumpc();
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
Py_BLOCK_THREADS
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if PY_MAJOR_VERSION >= 3
|
||||||
|
return PyBytes_FromStringAndSize(line.data(), line.size());
|
||||||
|
#else
|
||||||
|
return PyString_FromStringAndSize(line.data(), line.size());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads all the lines at once and returns a list. Also see the documentation
|
||||||
|
* for readline().
|
||||||
|
*/
|
||||||
|
PyObject *Extension<istream>::
|
||||||
|
readlines(int hint) {
|
||||||
|
PyObject *lst = PyList_New(0);
|
||||||
|
if (lst == nullptr) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
PyObject *py_line = readline(-1);
|
||||||
|
|
||||||
|
if (hint < 0) {
|
||||||
|
while (Py_SIZE(py_line) > 0) {
|
||||||
|
PyList_Append(lst, py_line);
|
||||||
|
Py_DECREF(py_line);
|
||||||
|
|
||||||
|
py_line = readline(-1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
size_t totchars = 0;
|
||||||
|
while (Py_SIZE(py_line) > 0) {
|
||||||
|
totchars += Py_SIZE(py_line);
|
||||||
|
PyList_Append(lst, py_line);
|
||||||
|
Py_DECREF(py_line);
|
||||||
|
|
||||||
|
if (totchars > hint) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
py_line = readline(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return lst;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Yields continuously to read all the lines from the istream.
|
||||||
|
*/
|
||||||
|
static PyObject *gen_next(PyObject *self) {
|
||||||
|
istream *stream = nullptr;
|
||||||
|
if (!Dtool_Call_ExtractThisPointer(self, Dtool_std_istream, (void **)&stream)) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
PyObject *line = invoke_extension(stream).readline();
|
||||||
|
if (Py_SIZE(line) > 0) {
|
||||||
|
return line;
|
||||||
|
} else {
|
||||||
|
PyErr_SetObject(PyExc_StopIteration, nullptr);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterates over the lines of the file.
|
||||||
|
*/
|
||||||
|
PyObject *Extension<istream>::
|
||||||
|
__iter__(PyObject *self) {
|
||||||
|
return Dtool_NewGenerator(self, &gen_next);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the bytes object to the stream.
|
||||||
|
*/
|
||||||
|
void Extension<ostream>::
|
||||||
|
write(PyObject *b) {
|
||||||
|
std::streambuf *buf = _this->rdbuf();
|
||||||
|
nassertv(buf != nullptr);
|
||||||
|
|
||||||
|
Py_buffer view;
|
||||||
|
if (PyObject_GetBuffer(b, &view, PyBUF_CONTIG_RO) == -1) {
|
||||||
|
PyErr_SetString(PyExc_TypeError, "write() requires a contiguous buffer");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
|
||||||
|
PyThreadState *_save;
|
||||||
|
Py_UNBLOCK_THREADS
|
||||||
|
buf->sputn((const char *)view.buf, view.len);
|
||||||
|
Py_BLOCK_THREADS
|
||||||
|
#else
|
||||||
|
buf->sputn((const char *)view.buf, view.len);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
PyBuffer_Release(&view);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a list of lines to the stream. Line separators are not added, so it
|
||||||
|
* is usual for each of the lines provided to have a line separator at the
|
||||||
|
* end.
|
||||||
|
*/
|
||||||
|
void Extension<ostream>::
|
||||||
|
writelines(PyObject *lines) {
|
||||||
|
PyObject *seq = PySequence_Fast(lines, "writelines() expects a sequence");
|
||||||
|
if (seq == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
PyObject **items = PySequence_Fast_ITEMS(seq);
|
||||||
|
Py_ssize_t len = PySequence_Fast_GET_SIZE(seq);
|
||||||
|
|
||||||
|
for (Py_ssize_t i = 0; i < len; ++i) {
|
||||||
|
write(items[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Py_DECREF(seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // HAVE_PYTHON
|
53
dtool/src/dtoolutil/iostream_ext.h
Normal file
53
dtool/src/dtoolutil/iostream_ext.h
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
/**
|
||||||
|
* PANDA 3D SOFTWARE
|
||||||
|
* Copyright (c) Carnegie Mellon University. All rights reserved.
|
||||||
|
*
|
||||||
|
* All use of this software is subject to the terms of the revised BSD
|
||||||
|
* license. You should have received a copy of this license along
|
||||||
|
* with this source code in a file named "LICENSE."
|
||||||
|
*
|
||||||
|
* @file iostream_ext.h
|
||||||
|
* @author rdb
|
||||||
|
* @date 2017-07-24
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef IOSTREAM_EXT_H
|
||||||
|
#define IOSTREAM_EXT_H
|
||||||
|
|
||||||
|
#include "dtoolbase.h"
|
||||||
|
|
||||||
|
#ifdef HAVE_PYTHON
|
||||||
|
|
||||||
|
#include "extension.h"
|
||||||
|
#include <iostream>
|
||||||
|
#include "py_panda.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These classes define the extension methods for istream and ostream, which
|
||||||
|
* are called instead of any C++ methods with the same prototype.
|
||||||
|
*
|
||||||
|
* These are designed to allow streams to be treated as file-like objects.
|
||||||
|
*/
|
||||||
|
template<>
|
||||||
|
class Extension<istream> : public ExtensionBase<istream> {
|
||||||
|
public:
|
||||||
|
PyObject *read(int size=-1);
|
||||||
|
PyObject *read1(int size=-1);
|
||||||
|
PyObject *readall();
|
||||||
|
std::streamsize readinto(PyObject *b);
|
||||||
|
|
||||||
|
PyObject *readline(int size=-1);
|
||||||
|
PyObject *readlines(int hint=-1);
|
||||||
|
PyObject *__iter__(PyObject *self);
|
||||||
|
};
|
||||||
|
|
||||||
|
template<>
|
||||||
|
class Extension<ostream> : public ExtensionBase<ostream> {
|
||||||
|
public:
|
||||||
|
void write(PyObject *b);
|
||||||
|
void writelines(PyObject *lines);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif // HAVE_PYTHON
|
||||||
|
|
||||||
|
#endif // IOSTREAM_EXT_H
|
@ -1,3 +1,4 @@
|
|||||||
#include "filename_ext.cxx"
|
#include "filename_ext.cxx"
|
||||||
#include "globPattern_ext.cxx"
|
#include "globPattern_ext.cxx"
|
||||||
|
#include "iostream_ext.cxx"
|
||||||
#include "textEncoder_ext.cxx"
|
#include "textEncoder_ext.cxx"
|
||||||
|
@ -1,16 +1,15 @@
|
|||||||
// Filename: iostream
|
/**
|
||||||
// Created by: drose (12May00)
|
* PANDA 3D SOFTWARE
|
||||||
//
|
* Copyright (c) Carnegie Mellon University. All rights reserved.
|
||||||
////////////////////////////////////////////////////////////////////
|
*
|
||||||
//
|
* All use of this software is subject to the terms of the revised BSD
|
||||||
// PANDA 3D SOFTWARE
|
* license. You should have received a copy of this license along
|
||||||
// Copyright (c) Carnegie Mellon University. All rights reserved.
|
* with this source code in a file named "LICENSE."
|
||||||
//
|
*
|
||||||
// All use of this software is subject to the terms of the revised BSD
|
* @file iostream
|
||||||
// license. You should have received a copy of this license along
|
* @author drose
|
||||||
// with this source code in a file named "LICENSE."
|
* @date 2000-05-12
|
||||||
//
|
*/
|
||||||
////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
// This file, and all the other files in this directory, aren't
|
// This file, and all the other files in this directory, aren't
|
||||||
// intended to be compiled--they're just parsed by CPPParser (and
|
// intended to be compiled--they're just parsed by CPPParser (and
|
||||||
@ -34,6 +33,9 @@ namespace std {
|
|||||||
__published:
|
__published:
|
||||||
ostream(const ostream&) = delete;
|
ostream(const ostream&) = delete;
|
||||||
|
|
||||||
|
__extension void write(PyObject *b);
|
||||||
|
__extension void writelines(PyObject *lines);
|
||||||
|
|
||||||
void put(char c);
|
void put(char c);
|
||||||
void flush();
|
void flush();
|
||||||
streampos tellp();
|
streampos tellp();
|
||||||
@ -43,10 +45,20 @@ namespace std {
|
|||||||
protected:
|
protected:
|
||||||
ostream(ostream &&);
|
ostream(ostream &&);
|
||||||
};
|
};
|
||||||
|
|
||||||
class istream : virtual public ios {
|
class istream : virtual public ios {
|
||||||
__published:
|
__published:
|
||||||
istream(const istream&) = delete;
|
istream(const istream&) = delete;
|
||||||
|
|
||||||
|
__extension PyObject *read(int size=-1);
|
||||||
|
__extension PyObject *read1(int size=-1);
|
||||||
|
__extension PyObject *readall();
|
||||||
|
__extension std::streamsize readinto(PyObject *b);
|
||||||
|
|
||||||
|
__extension PyObject *readline(int size=-1);
|
||||||
|
__extension PyObject *readlines(int hint=-1);
|
||||||
|
__extension PyObject *__iter__(PyObject *self);
|
||||||
|
|
||||||
int get();
|
int get();
|
||||||
streampos tellg();
|
streampos tellg();
|
||||||
void seekg(streampos pos);
|
void seekg(streampos pos);
|
||||||
@ -55,6 +67,7 @@ namespace std {
|
|||||||
protected:
|
protected:
|
||||||
istream(istream &&);
|
istream(istream &&);
|
||||||
};
|
};
|
||||||
|
|
||||||
class iostream : public istream, public ostream {
|
class iostream : public istream, public ostream {
|
||||||
__published:
|
__published:
|
||||||
iostream(const iostream&) = delete;
|
iostream(const iostream&) = delete;
|
||||||
|
@ -3685,6 +3685,7 @@ IGATEFILES += [
|
|||||||
"globPattern_ext.h",
|
"globPattern_ext.h",
|
||||||
"pandaFileStream.h",
|
"pandaFileStream.h",
|
||||||
"lineStream.h",
|
"lineStream.h",
|
||||||
|
"iostream_ext.h",
|
||||||
]
|
]
|
||||||
TargetAdd('libp3dtoolutil.in', opts=OPTS, input=IGATEFILES)
|
TargetAdd('libp3dtoolutil.in', opts=OPTS, input=IGATEFILES)
|
||||||
TargetAdd('libp3dtoolutil.in', opts=['IMOD:panda3d.core', 'ILIB:libp3dtoolutil', 'SRCDIR:dtool/src/dtoolutil'])
|
TargetAdd('libp3dtoolutil.in', opts=['IMOD:panda3d.core', 'ILIB:libp3dtoolutil', 'SRCDIR:dtool/src/dtoolutil'])
|
||||||
|
128
tests/dtoolutil/test_iostream.py
Normal file
128
tests/dtoolutil/test_iostream.py
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
from panda3d.core import StringStream
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
ISTREAM_DATA = b'abcdefghijklmnopqrstuvwxyz' * 500
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def istream():
|
||||||
|
return StringStream(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_readall(istream):
|
||||||
|
assert istream.readall() == ISTREAM_DATA
|
||||||
|
assert istream.readall() == b''
|
||||||
|
assert istream.readall() == b''
|
||||||
|
assert istream.tellg() == len(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_read(istream):
|
||||||
|
assert istream.read() == ISTREAM_DATA
|
||||||
|
assert istream.read() == b''
|
||||||
|
assert istream.read() == b''
|
||||||
|
assert istream.tellg() == len(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_read_size(istream):
|
||||||
|
assert istream.read(100) == ISTREAM_DATA[:100]
|
||||||
|
assert istream.read(5000) == ISTREAM_DATA[100:5100]
|
||||||
|
assert istream.read(5000) == ISTREAM_DATA[5100:10100]
|
||||||
|
assert istream.read(5000) == ISTREAM_DATA[10100:15100]
|
||||||
|
assert istream.read() == b''
|
||||||
|
assert istream.tellg() == len(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_read1(istream):
|
||||||
|
accumulated = b''
|
||||||
|
data = istream.read1()
|
||||||
|
while data:
|
||||||
|
accumulated += data
|
||||||
|
data = istream.read1()
|
||||||
|
|
||||||
|
assert accumulated == ISTREAM_DATA
|
||||||
|
assert istream.tellg() == len(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_read1_size(istream):
|
||||||
|
accumulated = b''
|
||||||
|
data = istream.read1(4000)
|
||||||
|
while data:
|
||||||
|
accumulated += data
|
||||||
|
data = istream.read1(4000)
|
||||||
|
|
||||||
|
assert accumulated == ISTREAM_DATA
|
||||||
|
assert istream.tellg() == len(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_readinto(istream):
|
||||||
|
ba = bytearray()
|
||||||
|
assert istream.readinto(ba) == 0
|
||||||
|
assert istream.tellg() == 0
|
||||||
|
|
||||||
|
ba = bytearray(10)
|
||||||
|
assert istream.readinto(ba) == 10
|
||||||
|
assert ba == ISTREAM_DATA[:10]
|
||||||
|
assert istream.tellg() == 10
|
||||||
|
|
||||||
|
ba = bytearray(len(ISTREAM_DATA))
|
||||||
|
assert istream.readinto(ba) == len(ISTREAM_DATA) - 10
|
||||||
|
assert ba[:len(ISTREAM_DATA)-10] == ISTREAM_DATA[10:]
|
||||||
|
assert istream.tellg() == len(ISTREAM_DATA)
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_readline():
|
||||||
|
# Empty stream
|
||||||
|
stream = StringStream(b'')
|
||||||
|
assert stream.readline() == b''
|
||||||
|
assert stream.readline() == b''
|
||||||
|
|
||||||
|
# Single line without newline
|
||||||
|
stream = StringStream(b'A')
|
||||||
|
assert stream.readline() == b'A'
|
||||||
|
assert stream.readline() == b''
|
||||||
|
|
||||||
|
# Single newline
|
||||||
|
stream = StringStream(b'\n')
|
||||||
|
assert stream.readline() == b'\n'
|
||||||
|
assert stream.readline() == b''
|
||||||
|
|
||||||
|
# Line with text followed by empty line
|
||||||
|
stream = StringStream(b'A\n\n')
|
||||||
|
assert stream.readline() == b'A\n'
|
||||||
|
assert stream.readline() == b'\n'
|
||||||
|
assert stream.readline() == b''
|
||||||
|
|
||||||
|
# Preserve null byte
|
||||||
|
stream = StringStream(b'\x00\x00')
|
||||||
|
assert stream.readline() == b'\x00\x00'
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_readlines():
|
||||||
|
istream = StringStream(b'a')
|
||||||
|
assert istream.readlines() == [b'a']
|
||||||
|
assert istream.readlines() == []
|
||||||
|
|
||||||
|
istream = StringStream(b'a\nb\nc\n')
|
||||||
|
assert istream.readlines() == [b'a\n', b'b\n', b'c\n']
|
||||||
|
|
||||||
|
istream = StringStream(b'\na\nb\nc')
|
||||||
|
assert istream.readlines() == [b'\n', b'a\n', b'b\n', b'c']
|
||||||
|
|
||||||
|
istream = StringStream(b'\n\n\n')
|
||||||
|
assert istream.readlines() == [b'\n', b'\n', b'\n']
|
||||||
|
|
||||||
|
|
||||||
|
def test_istream_iter():
|
||||||
|
istream = StringStream(b'a')
|
||||||
|
assert tuple(istream) == (b'a',)
|
||||||
|
assert tuple(istream) == ()
|
||||||
|
|
||||||
|
istream = StringStream(b'a\nb\nc\n')
|
||||||
|
assert tuple(istream) == (b'a\n', b'b\n', b'c\n')
|
||||||
|
|
||||||
|
istream = StringStream(b'\na\nb\nc')
|
||||||
|
assert tuple(istream) == (b'\n', b'a\n', b'b\n', b'c')
|
||||||
|
|
||||||
|
istream = StringStream(b'\n\n\n')
|
||||||
|
assert tuple(istream) == (b'\n', b'\n', b'\n')
|
Loading…
x
Reference in New Issue
Block a user