Skip to content

Add Accessor.openText() as context manager for openAddress() #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions tests/test_ftpaccessor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""
Test module of xcp.accessor.FTPAccessor

Expand All @@ -21,18 +22,29 @@

import pytest
import pytest_localftpserver # pylint: disable=unused-import # Ensure that it is installed
from six import ensure_binary, ensure_str

import xcp.accessor

binary_data = b"\x80\x91\xaa\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff"
text_data = "✋➔Hello Accessor from the 🗺, download and verify ✅ me!"


def upload_textfile(ftpserver, accessor):
accessor.writeFile(BytesIO(ensure_binary(text_data)), "textfile")
assert accessor.access("textfile")
assert text_data == ensure_str(ftpserver_content(ftpserver, "testdir/textfile"))

def upload_binary_file(ftpserver, accessor):
"""Upload a binary file and compare the uploaded file content with the local content"""
accessor.writeFile(BytesIO(binary_data), "filename")
assert accessor.access("filename")
ftp_content_generator = ftpserver.get_file_contents("testdir/filename", read_mode="rb")
assert binary_data == next(ftp_content_generator)["content"]
assert binary_data == ftpserver_content(ftpserver, "testdir/filename")


def ftpserver_content(ftpserver, path):
ftp_content_generator = ftpserver.get_file_contents(path, read_mode="rb")
return next(ftp_content_generator)["content"]


@pytest.fixture
Expand All @@ -44,6 +56,7 @@ def ftp_accessor(ftpserver):
accessor = xcp.accessor.FTPAccessor(url + "/testdir", False)
accessor.start()
upload_binary_file(ftpserver, accessor)
upload_textfile(ftpserver, accessor)
# This leaves ftp_accessor.finish() to each test to because disconnecting from the
# ftpserver after the test in the fixture would cause the formatting of the pytest
# live log to be become less readable:
Expand Down Expand Up @@ -73,3 +86,10 @@ def test_download_binary_file(ftp_accessor):
assert remote_ftp_filehandle.read() == binary_data
assert ftp_accessor.access("filename") is True # covers FTPAccessor._cleanup()
ftp_accessor.finish()


def test_download_textfile(ftp_accessor):
"""Download a text file containing UTF-8 and compare the returned decoded string contents"""
with ftp_accessor.openText("textfile") as remote_ftp_filehandle:
assert remote_ftp_filehandle.read() == text_data
ftp_accessor.finish()
18 changes: 17 additions & 1 deletion tests/test_httpaccessor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Test xcp.accessor.HTTPAccessor using a local pure-Python http(s)server fixture"""
# -*- coding: utf-8 -*-
import base64
import sys
import unittest
from contextlib import contextmanager

import pytest

Expand All @@ -17,6 +19,8 @@
pytest.skip(allow_module_level=True)


UTF8TEXT_LITERAL = "✋Hello accessor from the 🗺, download and verify me! ✅"

class HTTPAccessorTestCase(unittest.TestCase):
document_root = "tests/"
httpserver = HTTPServer() # pyright: ignore[reportUnboundVariable]
Expand Down Expand Up @@ -52,14 +56,19 @@ def handle_get(request):

cls.httpserver.expect_request("/" + read_file).respond_with_handler(handle_get)

def assert_http_get_request_data(self, url, read_file, error_handler):
@contextmanager
def http_get_request_data(self, url, read_file, error_handler):
"""Serve a GET request, assert that the accessor returns the content of the GET Request"""
self.serve_a_get_request(self.document_root, read_file, error_handler)

httpaccessor = createAccessor(url, True)
self.assertEqual(type(httpaccessor), HTTPAccessor)

with open(self.document_root + read_file, "rb") as ref:
yield httpaccessor, ref

def assert_http_get_request_data(self, url, read_file, error_handler):
with self.http_get_request_data(url, read_file, error_handler) as (httpaccessor, ref):
http_accessor_filehandle = httpaccessor.openAddress(read_file)
if sys.version_info >= (3, 0):
assert isinstance(http_accessor_filehandle, HTTPResponse)
Expand Down Expand Up @@ -109,3 +118,10 @@ def test_get_binary(self):
+ ".pyc"
)
self.assert_http_get_request_data(self.httpserver.url_for(""), binary, None)

def test_httpaccessor_open_text(self):
"""Get text containing UTF-8 and compare the returned decoded string contents"""
self.httpserver.expect_request("/textfile").respond_with_data(UTF8TEXT_LITERAL)
accessor = createAccessor(self.httpserver.url_for("/"), True)
with accessor.openText("textfile") as textfile:
assert textfile.read() == UTF8TEXT_LITERAL
16 changes: 16 additions & 0 deletions tests/test_mountingaccessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import xcp.accessor

from .test_httpaccessor import UTF8TEXT_LITERAL

binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff"


Expand Down Expand Up @@ -41,6 +43,7 @@ def check_mounting_accessor(accessor, fs):

assert check_binary_read(accessor, location, fs)
assert check_binary_write(accessor, location, fs)
assert open_text(accessor, location, fs, UTF8TEXT_LITERAL) == UTF8TEXT_LITERAL

if sys.version_info.major >= 3:
fs.mount_points.pop(location)
Expand Down Expand Up @@ -82,3 +85,16 @@ def check_binary_write(accessor, location, fs):

with FakeFileOpen(fs, delete_on_close=True)(location + "/" + name, "rb") as written:
return cast(bytes, written.read()) == binary_data


def open_text(accessor, location, fs, text):
# type: (xcp.accessor.MountingAccessor, str, FakeFilesystem, str) -> str
"""Test the openText() method of subclasses of xcp.accessor.MountingAccessor"""
name = "textfile"
path = location + "/" + name
assert fs.create_file(path, contents=text)
assert accessor.access(name)
with accessor.openText(name) as textfile:
assert not isinstance(textfile, bool)
fs.remove(path)
return textfile.read()
21 changes: 21 additions & 0 deletions xcp/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@

"""accessor - provide common interface to access methods"""

# pyre-ignore-all-errors[6,16]
import ftplib
import io
import os
import sys
import tempfile
import errno
from contextlib import contextmanager
from typing import cast, TYPE_CHECKING

from six.moves import urllib # pyright: ignore

from xcp import logger, mount

if TYPE_CHECKING:
from collections.abc import Generator
from typing import IO
from typing_extensions import Literal

Expand Down Expand Up @@ -70,6 +75,22 @@ def access(self, name):

return True

@contextmanager
def openText(self, address):
# type:(str) -> Generator[IO[str] | Literal[False], None, None]
"""Context manager to read text from address using 'with'. Yields IO[str] or False"""
readbuffer = self.openAddress(address)

if readbuffer and sys.version_info >= (3, 0):
textiowrapper = io.TextIOWrapper(readbuffer, encoding="utf-8")
yield textiowrapper
textiowrapper.close()
else:
yield cast(io.TextIOWrapper, readbuffer)

if readbuffer:
readbuffer.close()

def openAddress(self, address):
# type:(str) -> IO[bytes] | Literal[False]
"""must be overloaded"""
Expand Down
17 changes: 4 additions & 13 deletions xcp/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from hashlib import md5
import io
import os.path
import xml.dom.minidom
import configparser
import sys

import six

Expand Down Expand Up @@ -180,18 +178,11 @@ def _getVersion(cls, access, category):

access.start()
try:
rawtreeinfofp = access.openAddress(cls.TREEINFO_FILENAME)
if sys.version_info < (3, 0) or isinstance(rawtreeinfofp, io.TextIOBase):
# e.g. with FileAccessor
treeinfofp = rawtreeinfofp
else:
# e.g. with HTTPAccessor
treeinfofp = io.TextIOWrapper(rawtreeinfofp, encoding='utf-8')
treeinfo = configparser.ConfigParser()
treeinfo.read_file(treeinfofp)
if treeinfofp != rawtreeinfofp:
treeinfofp.close()
rawtreeinfofp.close()

with access.openText(cls.TREEINFO_FILENAME) as fp:
treeinfo.read_file(fp)

if treeinfo.has_section('system-v1'):
ver_str = treeinfo.get('system-v1', category_map[category])
else:
Expand Down