Sync PYPI tests with GH released version diff -ruN tests/__init__.py proxmoxer-2.2.0/tests/__init__.py --- tests/__init__.py 1970-01-01 01:00:00.000000000 +0100 +++ proxmoxer-2.2.0/tests/__init__.py 2024-12-15 02:12:42.000000000 +0000 @@ -0,0 +1,3 @@ +__author__ = "John Hollowell" +__copyright__ = "(c) John Hollowell 2022" +__license__ = "MIT" diff -ruN tests/api_mock.py proxmoxer-2.2.0/tests/api_mock.py --- tests/api_mock.py 1970-01-01 01:00:00.000000000 +0100 +++ proxmoxer-2.2.0/tests/api_mock.py 2024-12-15 02:12:42.000000000 +0000 @@ -0,0 +1,360 @@ +__author__ = "John Hollowell" +__copyright__ = "(c) John Hollowell 2022" +__license__ = "MIT" + +import json +import re +from urllib.parse import parse_qsl, urlparse + +import pytest +import responses +from requests_toolbelt import MultipartEncoder + + +@pytest.fixture() +def mock_pve(): + with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps: + yield rsps + + +class PVERegistry(responses.registries.FirstMatchRegistry): + base_url = "https://1.2.3.4:1234/api2/json" + + common_headers = { + "Cache-Control": "max-age=0", + "Connection": "close, Keep-Alive", + "Pragma": "no-cache", + "Server": "pve-api-daemon/3.0", + "Content-Type": "application/json;charset=UTF-8", + } + + def __init__(self): + super().__init__() + for resp in self._generate_static_responses(): + self.add(resp) + + for resp in self._generate_dynamic_responses(): + self.add(resp) + + def _generate_static_responses(self): + resps = [] + + # Basic GET requests + resps.append( + responses.Response( + method="GET", + url=self.base_url + "/version", + json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}}, + ) + ) + + resps.append( + responses.Response( + method="POST", + url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"), + # "done" added to UPID so polling will terminate (status checking is tested elsewhere) + json={ + "data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done", + "success": 1, + }, + ) + ) + + resps.append( + responses.Response( + method="POST", + url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"), + # "done" added to UPID so polling will terminate (status checking is tested elsewhere) + json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"}, + ) + ) + resps.append( + responses.Response( + method="POST", + url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"), + status=500, + body="storage 'missing' does not exist", + ) + ) + + return resps + + def _generate_dynamic_responses(self): + resps = [] + + # Authentication + resps.append( + responses.CallbackResponse( + method="POST", + url=self.base_url + "/access/ticket", + callback=self._cb_password_auth, + ) + ) + + # Session testing + resps.append( + responses.CallbackResponse( + method="GET", + url=self.base_url + "/fake/echo", + callback=self._cb_echo, + ) + ) + + resps.append( + responses.CallbackResponse( + method="GET", + url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"), + callback=self._cb_echo, + ) + ) + + resps.append( + responses.CallbackResponse( + method="GET", + url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"), + callback=self._cb_qemu_monitor, + ) + ) + + resps.append( + responses.CallbackResponse( + method="GET", + url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"), + callback=self._cb_task_status, + ) + ) + + resps.append( + responses.CallbackResponse( + method="GET", + url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"), + callback=self._cb_url_metadata, + ) + ) + + return resps + + ################################### + # Callbacks for Dynamic Responses # + ################################### + + def _cb_echo(self, request): + body = request.body + if body is not None: + if isinstance(body, MultipartEncoder): + body = body.to_string() # really, to byte string + body = body if isinstance(body, str) else str(body, "utf-8") + + resp = { + "method": request.method, + "url": request.url, + "headers": dict(request.headers), + "cookies": request._cookies.get_dict(), + "body": body, + # "body_json": dict(parse_qsl(request.body)), + } + return (200, self.common_headers, json.dumps(resp)) + + def _cb_password_auth(self, request): + form_data_dict = dict(parse_qsl(request.body)) + + # if this user should not be authenticated + if form_data_dict.get("username") == "bad_auth": + return ( + 401, + self.common_headers, + json.dumps({"data": None}), + ) + # if this user requires OTP and it is not included + if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None: + return ( + 200, + self.common_headers, + json.dumps( + { + "data": { + "ticket": "otp_ticket", + "CSRFPreventionToken": "CSRFPreventionToken", + "NeedTFA": 1, + } + } + ), + ) + + # if this is the first ticket + if form_data_dict.get("password") != "ticket": + return ( + 200, + self.common_headers, + json.dumps( + {"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}} + ), + ) + # if this is refreshing the ticket, return new ticket + else: + return ( + 200, + self.common_headers, + json.dumps( + { + "data": { + "ticket": "new_ticket", + "CSRFPreventionToken": "CSRFPreventionToken_2", + } + } + ), + ) + + def _cb_task_status(self, request): + resp = {} + if "keep-running" in request.url: + resp = { + "data": { + "id": "110", + "pid": 1044989, + "node": "node1", + "pstart": 284768076, + "status": "running", + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + } + } + + elif "stopped" in request.url: + resp = { + "data": { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "interrupted by signal", + "pid": 1044989, + "id": "110", + "node": "node1", + } + } + + elif "done" in request.url: + resp = { + "data": { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "OK", + "pid": 1044989, + "id": "110", + "node": "node1", + } + } + + elif "comment" in request.url: + resp = { + "data": { + "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", + "node": "node", + "pid": 0, + "pstart": 0, + "starttime": 0, + "type": "task", + "id": "id", + "user": "root@pam", + "status": "stopped", + "exitstatus": "OK", + } + } + + return (200, self.common_headers, json.dumps(resp)) + + def _cb_url_metadata(self, request): + form_data_dict = dict(parse_qsl((urlparse(request.url)).query)) + + if "file.iso" in form_data_dict.get("url", ""): + return ( + 200, + self.common_headers, + json.dumps( + { + "data": { + "size": 123456, + "filename": "file.iso", + "mimetype": "application/x-iso9660-image", + # "mimetype": "application/octet-stream", + }, + "success": 1, + } + ), + ) + elif "invalid.iso" in form_data_dict.get("url", ""): + return ( + 500, + self.common_headers, + json.dumps( + { + "status": 500, + "message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n", + "success": 0, + "data": None, + } + ), + ) + elif "missing.iso" in form_data_dict.get("url", ""): + return ( + 500, + self.common_headers, + json.dumps( + { + "status": 500, + "success": 0, + "message": "invalid server response: '404 Not Found'\n", + "data": None, + } + ), + ) + + elif "index.html" in form_data_dict.get("url", ""): + return ( + 200, + self.common_headers, + json.dumps( + { + "success": 1, + "data": {"filename": "index.html", "mimetype": "text/html", "size": 17664}, + } + ), + ) + + def _cb_qemu_monitor(self, request): + body = request.body + if body is not None: + body = body if isinstance(body, str) else str(body, "utf-8") + + # if the command is an array, throw the type error PVE would throw + if "&" in body: + return ( + 400, + self.common_headers, + json.dumps( + { + "data": None, + "errors": {"command": "type check ('string') failed - got ARRAY"}, + } + ), + ) + else: + resp = { + "method": request.method, + "url": request.url, + "headers": dict(request.headers), + "cookies": request._cookies.get_dict(), + "body": body, + # "body_json": dict(parse_qsl(request.body)), + } + print(resp) + return (200, self.common_headers, json.dumps(resp)) diff -ruN tests/files_mock.py proxmoxer-2.2.0/tests/files_mock.py --- tests/files_mock.py 1970-01-01 01:00:00.000000000 +0100 +++ proxmoxer-2.2.0/tests/files_mock.py 2024-12-15 02:12:42.000000000 +0000 @@ -0,0 +1,127 @@ +__author__ = "John Hollowell" +__copyright__ = "(c) John Hollowell 2022" +__license__ = "MIT" + +import re + +import pytest +import responses +from requests import exceptions + +from .api_mock import PVERegistry + + +@pytest.fixture() +def mock_files(): + with responses.RequestsMock( + registry=FilesRegistry, assert_all_requests_are_fired=False + ) as rsps: + yield rsps + + +class FilesRegistry(responses.registries.FirstMatchRegistry): + base_url = "https://sub.domain.tld" + + common_headers = { + "Cache-Control": "max-age=0", + "Connection": "close, Keep-Alive", + "Pragma": "no-cache", + "Server": "pve-api-daemon/3.0", + "Content-Type": "application/json;charset=UTF-8", + } + + def __init__(self): + super().__init__() + for resp in self._generate_static_responses(): + self.add(resp) + + def _generate_static_responses(self): + resps = [] + + # Basic GET requests + resps.append(responses.Response(method="GET", url=self.base_url, body="hello world")) + resps.append( + responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS") + ) + + # sibling + resps.append( + responses.Response( + method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n" + ) + ) + resps.append( + responses.Response( + method="GET", + url=self.base_url + "/sibling/TESTINGSUMS", + body="this_is_the_hash file.iso", + ) + ) + + # extension + resps.append( + responses.Response( + method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n" + ) + ) + resps.append( + responses.Response( + method="GET", + url=self.base_url + "/extension/file.iso.testing", + body="this_is_the_hash file.iso", + ) + ) + resps.append( + responses.Response( + method="GET", + url=self.base_url + "/extension/connectionerror.iso.testing", + body=exceptions.ConnectionError(), + ) + ) + resps.append( + responses.Response( + method="GET", + url=self.base_url + "/extension/readtimeout.iso.testing", + body=exceptions.ReadTimeout(), + ) + ) + + # extension upper + resps.append( + responses.Response( + method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n" + ) + ) + resps.append( + responses.Response( + method="GET", + url=self.base_url + "/upper/file.iso.TESTING", + body="this_is_the_hash file.iso", + ) + ) + + resps.append( + responses.Response( + method="GET", + url=re.compile(self.base_url + r"/checksums/file.iso.\w+"), + body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso", + ) + ) + + return resps + + +@pytest.fixture() +def mock_files_and_pve(): + with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps: + yield rsps + + +class BothRegistry(responses.registries.FirstMatchRegistry): + def __init__(self): + super().__init__() + registries = [FilesRegistry(), PVERegistry()] + + for reg in registries: + for resp in reg.registered: + self.add(resp) diff -ruN tests/tools/__init__.py proxmoxer-2.2.0/tests/tools/__init__.py --- tests/tools/__init__.py 1970-01-01 01:00:00.000000000 +0100 +++ proxmoxer-2.2.0/tests/tools/__init__.py 2024-12-15 02:12:42.000000000 +0000 @@ -0,0 +1,3 @@ +__author__ = "John Hollowell" +__copyright__ = "(c) John Hollowell 2022" +__license__ = "MIT" diff -ruN tests/tools/test_files.py proxmoxer-2.2.0/tests/tools/test_files.py --- tests/tools/test_files.py 1970-01-01 01:00:00.000000000 +0100 +++ proxmoxer-2.2.0/tests/tools/test_files.py 2024-12-15 02:12:42.000000000 +0000 @@ -0,0 +1,375 @@ +__author__ = "John Hollowell" +__copyright__ = "(c) John Hollowell 2023" +__license__ = "MIT" + +import logging +import tempfile +from unittest import mock + +import pytest + +from proxmoxer import ProxmoxAPI, core +from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums + +from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401 +from ..files_mock import ( # pylint: disable=unused-import # noqa: F401 + mock_files, + mock_files_and_pve, +) + +MODULE_LOGGER_NAME = "proxmoxer.tools.files" + + +class TestChecksumInfo: + def test_basic(self): + info = ChecksumInfo("name", 123) + + assert info.name == "name" + assert info.hex_size == 123 + + def test_str(self): + info = ChecksumInfo("name", 123) + + assert str(info) == "name" + + def test_repr(self): + info = ChecksumInfo("name", 123) + + assert repr(info) == "name (123 digits)" + + +class TestGetChecksum: + def test_get_checksum_from_sibling_file_success(self, mock_files): + url = "https://sub.domain.tld/sibling/file.iso" + exp_hash = "this_is_the_hash" + info = ChecksumInfo("testing", 16) + res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info) + res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso") + + assert res1 == exp_hash + assert res2 == exp_hash + + def test_get_checksum_from_sibling_file_fail(self, mock_files): + url = "https://sub.domain.tld/sibling/missing.iso" + info = ChecksumInfo("testing", 16) + res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info) + res2 = Files._get_checksum_from_sibling_file( + url, checksum_info=info, filename="missing.iso" + ) + + assert res1 is None + assert res2 is None + + def test_get_checksum_from_extension_success(self, mock_files): + url = "https://sub.domain.tld/extension/file.iso" + exp_hash = "this_is_the_hash" + info = ChecksumInfo("testing", 16) + res1 = Files._get_checksum_from_extension(url, checksum_info=info) + res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso") + + assert res1 == exp_hash + assert res2 == exp_hash + + def test_get_checksum_from_extension_fail(self, mock_files): + url = "https://sub.domain.tld/extension/missing.iso" + + info = ChecksumInfo("testing", 16) + res1 = Files._get_checksum_from_extension(url, checksum_info=info) + res2 = Files._get_checksum_from_extension( + url, checksum_info=info, filename="connectionerror.iso" + ) + res3 = Files._get_checksum_from_extension( + url, checksum_info=info, filename="readtimeout.iso" + ) + + assert res1 is None + assert res2 is None + assert res3 is None + + def test_get_checksum_from_extension_upper_success(self, mock_files): + url = "https://sub.domain.tld/upper/file.iso" + exp_hash = "this_is_the_hash" + info = ChecksumInfo("testing", 16) + res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info) + res2 = Files._get_checksum_from_extension_upper( + url, checksum_info=info, filename="file.iso" + ) + + assert res1 == exp_hash + assert res2 == exp_hash + + def test_get_checksum_from_extension_upper_fail(self, mock_files): + url = "https://sub.domain.tld/upper/missing.iso" + info = ChecksumInfo("testing", 16) + res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info) + res2 = Files._get_checksum_from_extension_upper( + url, checksum_info=info, filename="missing.iso" + ) + + assert res1 is None + assert res2 is None + + def test_get_checksums_from_file_url_all_checksums(self, mock_files): + base_url = "https://sub.domain.tld/checksums/file.iso" + full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" + for types_enum in SupportedChecksums: + checksum_info = types_enum.value + + data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info) + + assert data[0] == full_checksum_string[0 : checksum_info.hex_size] + assert data[1] == checksum_info + + def test_get_checksums_from_file_url_missing(self, mock_files): + url = "https://sub.domain.tld/missing.iso" + + data = Files.get_checksums_from_file_url(url) + + assert data[0] is None + assert data[1] is None + + +class TestFiles: + prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value") + + def test_init_basic(self): + f = Files(self.prox, "node1", "storage1") + + assert f._prox == self.prox + assert f._node == "node1" + assert f._storage == "storage1" + + def test_repr(self): + f = Files(self.prox, "node1", "storage1") + assert ( + repr(f) + == "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))" + ) + + def test_get_file_info_pass(self, mock_pve): + f = Files(self.prox, "node1", "storage1") + info = f.get_file_info("https://sub.domain.tld/file.iso") + + assert info["filename"] == "file.iso" + assert info["mimetype"] == "application/x-iso9660-image" + assert info["size"] == 123456 + + def test_get_file_info_fail(self, mock_pve): + f = Files(self.prox, "node1", "storage1") + info = f.get_file_info("https://sub.domain.tld/invalid.iso") + + assert info is None + + +class TestFilesDownload: + prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value") + f = Files(prox, "node1", "storage1") + + def test_download_discover_checksum(self, mock_files_and_pve, caplog): + status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso") + + # this is the default "done" task mock information + assert status == { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "OK", + "pid": 1044989, + "id": "110", + "node": "node1", + } + assert caplog.record_tuples == [] + + def test_download_no_blocking(self, mock_files_and_pve, caplog): + status = self.f.download_file_to_storage( + "https://sub.domain.tld/checksums/file.iso", blocking_status=False + ) + + # this is the default "done" task mock information + assert status == { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "OK", + "pid": 1044989, + "id": "110", + "node": "node1", + } + assert caplog.record_tuples == [] + + def test_download_no_discover_checksum(self, mock_files_and_pve, caplog): + caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME) + + status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso") + + # this is the default "stopped" task mock information + assert status == { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "OK", + "pid": 1044989, + "id": "110", + "node": "node1", + } + assert caplog.record_tuples == [ + ( + MODULE_LOGGER_NAME, + logging.WARNING, + "Unable to discover checksum. Will not do checksum validation", + ), + ] + + def test_uneven_checksum(self, caplog, mock_files_and_pve): + caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) + status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf") + + assert status is None + + assert caplog.record_tuples == [ + ( + MODULE_LOGGER_NAME, + logging.ERROR, + "Must pass both checksum and checksum_type or leave both None for auto-discovery", + ), + ] + + def test_uneven_checksum_type(self, caplog, mock_files_and_pve): + caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) + status = self.f.download_file_to_storage( + "https://sub.domain.tld/file.iso", checksum_type="asdf" + ) + + assert status is None + + assert caplog.record_tuples == [ + ( + MODULE_LOGGER_NAME, + logging.ERROR, + "Must pass both checksum and checksum_type or leave both None for auto-discovery", + ), + ] + + def test_get_file_info_missing(self, mock_pve): + f = Files(self.prox, "node1", "storage1") + info = f.get_file_info("https://sub.domain.tld/missing.iso") + + assert info is None + + def test_get_file_info_non_iso(self, mock_pve): + f = Files(self.prox, "node1", "storage1") + info = f.get_file_info("https://sub.domain.tld/index.html") + + assert info["filename"] == "index.html" + assert info["mimetype"] == "text/html" + + +class TestFilesUpload: + prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value") + f = Files(prox, "node1", "storage1") + + def test_upload_no_file(self, mock_files_and_pve, caplog): + status = self.f.upload_local_file_to_storage("/does-not-exist.iso") + + assert status is None + assert caplog.record_tuples == [ + ( + MODULE_LOGGER_NAME, + logging.ERROR, + '"/does-not-exist.iso" does not exist or is not a file', + ), + ] + + def test_upload_dir(self, mock_files_and_pve, caplog): + with tempfile.TemporaryDirectory() as tmp_dir: + status = self.f.upload_local_file_to_storage(tmp_dir) + + assert status is None + assert caplog.record_tuples == [ + ( + MODULE_LOGGER_NAME, + logging.ERROR, + f'"{tmp_dir}" does not exist or is not a file', + ), + ] + + def test_upload_empty_file(self, mock_files_and_pve, caplog): + with tempfile.NamedTemporaryFile("rb") as f_obj: + status = self.f.upload_local_file_to_storage(filename=f_obj.name) + + assert status is not None + assert caplog.record_tuples == [] + + def test_upload_non_empty_file(self, mock_files_and_pve, caplog): + with tempfile.NamedTemporaryFile("w+b") as f_obj: + f_obj.write(b"a" * 100) + f_obj.seek(0) + status = self.f.upload_local_file_to_storage(filename=f_obj.name) + + assert status is not None + assert caplog.record_tuples == [] + + def test_upload_no_checksum(self, mock_files_and_pve, caplog): + with tempfile.NamedTemporaryFile("rb") as f_obj: + status = self.f.upload_local_file_to_storage( + filename=f_obj.name, do_checksum_check=False + ) + + assert status is not None + assert caplog.record_tuples == [] + + def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums): + with tempfile.NamedTemporaryFile("rb") as f_obj: + status = self.f.upload_local_file_to_storage(filename=f_obj.name) + + assert status is not None + assert caplog.record_tuples == [ + ( + MODULE_LOGGER_NAME, + logging.WARNING, + "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation", + ) + ] + + def test_upload_non_blocking(self, mock_files_and_pve, caplog): + with tempfile.NamedTemporaryFile("rb") as f_obj: + status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False) + + assert status is not None + assert caplog.record_tuples == [] + + def test_upload_proxmox_error(self, mock_files_and_pve, caplog): + with tempfile.NamedTemporaryFile("rb") as f_obj: + f_copy = Files(self.f._prox, self.f._node, "missing") + + with pytest.raises(core.ResourceException) as exc_info: + f_copy.upload_local_file_to_storage(filename=f_obj.name) + + assert exc_info.value.status_code == 500 + assert exc_info.value.status_message == "Internal Server Error" + # assert exc_info.value.content == "storage 'missing' does not exist" + + def test_upload_io_error(self, mock_files_and_pve, caplog): + with tempfile.NamedTemporaryFile("rb") as f_obj: + mo = mock.mock_open() + mo.side_effect = IOError("ERROR MESSAGE") + with mock.patch("builtins.open", mo): + status = self.f.upload_local_file_to_storage(filename=f_obj.name) + + assert status is None + assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")] + + +@pytest.fixture +def apply_no_checksums(): + with mock.patch("hashlib.algorithms_available", set()): + yield diff -ruN tests/tools/test_tasks.py proxmoxer-2.2.0/tests/tools/test_tasks.py --- tests/tools/test_tasks.py 1970-01-01 01:00:00.000000000 +0100 +++ proxmoxer-2.2.0/tests/tools/test_tasks.py 2024-12-15 02:12:42.000000000 +0000 @@ -0,0 +1,223 @@ +__author__ = "John Hollowell" +__copyright__ = "(c) John Hollowell 2022" +__license__ = "MIT" + +import logging + +import pytest + +from proxmoxer import ProxmoxAPI +from proxmoxer.tools import Tasks + +from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401 + + +class TestBlockingStatus: + def test_basic(self, mocked_prox, caplog): + caplog.set_level(logging.DEBUG, logger="proxmoxer.core") + + status = Tasks.blocking_status( + mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done" + ) + + assert status == { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "OK", + "pid": 1044989, + "id": "110", + "node": "node1", + } + assert caplog.record_tuples == [ + ( + "proxmoxer.core", + 20, + "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status", + ), + ( + "proxmoxer.core", + 10, + 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'', + ), + ] + + def test_zeroed(self, mocked_prox, caplog): + caplog.set_level(logging.DEBUG, logger="proxmoxer.core") + + status = Tasks.blocking_status( + mocked_prox, "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment" + ) + + assert status == { + "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", + "node": "node", + "pid": 0, + "pstart": 0, + "starttime": 0, + "type": "task", + "id": "id", + "user": "root@pam", + "status": "stopped", + "exitstatus": "OK", + } + assert caplog.record_tuples == [ + ( + "proxmoxer.core", + 20, + "GET https://1.2.3.4:1234/api2/json/nodes/node/tasks/UPID:node:00000000:00000000:00000000:task:id:root@pam:comment/status", + ), + ( + "proxmoxer.core", + 10, + 'Status code: 200, output: b\'{"data": {"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK"}}\'', + ), + ] + + def test_killed(self, mocked_prox, caplog): + caplog.set_level(logging.DEBUG, logger="proxmoxer.core") + + status = Tasks.blocking_status( + mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped" + ) + + assert status == { + "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", + "starttime": 1661825068, + "user": "root@pam", + "type": "vzdump", + "pstart": 284768076, + "status": "stopped", + "exitstatus": "interrupted by signal", + "pid": 1044989, + "id": "110", + "node": "node1", + } + assert caplog.record_tuples == [ + ( + "proxmoxer.core", + 20, + "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped/status", + ), + ( + "proxmoxer.core", + 10, + 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1"}}\'', + ), + ] + + def test_timeout(self, mocked_prox, caplog): + caplog.set_level(logging.DEBUG, logger="proxmoxer.core") + + status = Tasks.blocking_status( + mocked_prox, + "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", + timeout=0.021, + polling_interval=0.01, + ) + + assert status is None + assert caplog.record_tuples == [ + ( + "proxmoxer.core", + 20, + "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status", + ), + ( + "proxmoxer.core", + 10, + 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'', + ), + ( + "proxmoxer.core", + 20, + "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status", + ), + ( + "proxmoxer.core", + 10, + 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'', + ), + ( + "proxmoxer.core", + 20, + "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status", + ), + ( + "proxmoxer.core", + 10, + 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'', + ), + ] + + +class TestDecodeUpid: + def test_basic(self): + upid = "UPID:node:000CFC5C:03E8D0C3:6194806C:aptupdate::root@pam:" + decoded = Tasks.decode_upid(upid) + + assert decoded["upid"] == upid + assert decoded["node"] == "node" + assert decoded["pid"] == 851036 + assert decoded["pstart"] == 65589443 + assert decoded["starttime"] == 1637122156 + assert decoded["type"] == "aptupdate" + assert decoded["id"] == "" + assert decoded["user"] == "root@pam" + assert decoded["comment"] == "" + + def test_all_values(self): + upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:local" + decoded = Tasks.decode_upid(upid) + + assert decoded["upid"] == upid + assert decoded["node"] == "node1" + assert decoded["pid"] == 851962 + assert decoded["pstart"] == 65597267 + assert decoded["starttime"] == 1637122234 + assert decoded["type"] == "vzdump" + assert decoded["id"] == "103" + assert decoded["user"] == "root@pam" + assert decoded["comment"] == "local" + + def test_invalid_length(self): + upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam" + with pytest.raises(AssertionError) as exc_info: + Tasks.decode_upid(upid) + + assert str(exc_info.value) == "UPID is not in the correct format" + + def test_invalid_start(self): + upid = "ASDF:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:" + with pytest.raises(AssertionError) as exc_info: + Tasks.decode_upid(upid) + + assert str(exc_info.value) == "UPID is not in the correct format" + + +class TestDecodeLog: + def test_basic(self): + log_list = [{"n": 1, "t": "client connection: 127.0.0.1:49608"}, {"t": "TASK OK", "n": 2}] + log_str = Tasks.decode_log(log_list) + + assert log_str == "client connection: 127.0.0.1:49608\nTASK OK" + + def test_empty(self): + log_list = [] + log_str = Tasks.decode_log(log_list) + + assert log_str == "" + + def test_unordered(self): + log_list = [{"n": 3, "t": "third"}, {"t": "first", "n": 1}, {"t": "second", "n": 2}] + log_str = Tasks.decode_log(log_list) + + assert log_str == "first\nsecond\nthird" + + +@pytest.fixture +def mocked_prox(mock_pve): + return ProxmoxAPI("1.2.3.4:1234", user="user", password="password")