mirror of
https://github.com/KolibriOS/kolibrios.git
synced 2025-09-11 21:15:30 -04:00
135 lines
4.8 KiB
Python
135 lines
4.8 KiB
Python
# Copyright 2021 Magomed Kostoev
|
|
# Published under MIT License
|
|
|
|
import io
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import timeit
|
|
import time
|
|
import shlex
|
|
import signal
|
|
import shutil
|
|
from . makeflop import Floppy
|
|
|
|
def is_win32():
|
|
return True if sys.platform == "win32" else False
|
|
|
|
def is_linux():
|
|
return True if sys.platform == "linux" or sys.platform == "linux2" else False
|
|
|
|
def is_osx():
|
|
return True if sys.platform == "darwin" else False
|
|
|
|
class TestException(Exception):
|
|
def __init__(self, error_kind, message, qemu_cmd):
|
|
self.error_kind = error_kind
|
|
super().__init__(message)
|
|
self.qemu_cmd = qemu_cmd
|
|
|
|
def kind(self):
|
|
return self.error_kind
|
|
|
|
def cmd(self):
|
|
return self.qemu_cmd
|
|
|
|
class TestException_Timeout(TestException):
|
|
def __init__(self, message, qemu_cmd):
|
|
super().__init__("TIMEOUT", message, qemu_cmd)
|
|
|
|
class TestException_Failure(TestException):
|
|
def __init__(self, message, qemu_cmd):
|
|
super().__init__("FAILURE", message, qemu_cmd)
|
|
|
|
class Qemu:
|
|
def __init__(self, popen, debug_log):
|
|
self.popen = popen
|
|
# Qemu needs time to create debug.log file
|
|
while not os.path.exists(debug_log) and self.qemu_is_alive():
|
|
self.wait(0.250)
|
|
self.assert_qemu_not_died("waiting for the debug log file creation")
|
|
self.debug = open(debug_log, "rb")
|
|
|
|
def qemu_is_alive(self):
|
|
return self.popen.poll() == None
|
|
|
|
def assert_qemu_not_died(self, while_):
|
|
if not self.qemu_is_alive():
|
|
raise TestException_Failure(f"Qemu has finished while {while_}.", ' '.join(self.popen.args))
|
|
|
|
def wait_for_debug_log(self, needle, timeout = 1):
|
|
needle = bytes(needle, "utf-8")
|
|
start = timeit.default_timer()
|
|
log = b""
|
|
|
|
# While no timeout, read and search logs
|
|
while timeit.default_timer() - start < timeout:
|
|
# TODO: Non-blocking read.
|
|
log += self.debug.read(1)
|
|
if needle in log:
|
|
return
|
|
|
|
# We don't have to read whole logs to find the neddle
|
|
# If we read len(needle) * 2 bytes of log then we
|
|
# already can say that if there's no needle in the data
|
|
# then it can't be in first len(needle) bytes of the data
|
|
# so first len(needle) bytes of saved logs may be thrown away
|
|
#
|
|
# So we consume lessser memory and don't search all the previous
|
|
# logs every single time
|
|
if len(log) > len(needle) * 2:
|
|
log = log[len(needle):]
|
|
|
|
self.assert_qemu_not_died("waiting for the debug log")
|
|
|
|
self.timeout("waiting for the debug log")
|
|
|
|
def kill(self):
|
|
if is_win32():
|
|
# FIXME: This is shit, isn't there anything better?
|
|
subprocess.Popen(f"TASKKILL /F /PID {self.popen.pid} /T", stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL, stdin = subprocess.DEVNULL)
|
|
else:
|
|
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
|
|
|
|
def timeout(self, while_):
|
|
self.kill()
|
|
raise TestException_Timeout(while_, ' '.join(self.popen.args))
|
|
|
|
def wait(self, seconds):
|
|
time.sleep(seconds)
|
|
|
|
def run_qemu(root_dir, test_dir, debug_log):
|
|
# Make local copy of IMG, so we will be able to run the test in parallel
|
|
if os.path.exists(f"{test_dir}/kolibri_test.img"): # If previous test run interrupted the file may be busy
|
|
os.remove(f"{test_dir}/kolibri_test.img")
|
|
shutil.copyfile(f"{root_dir}/kolibri_test.img", f"{test_dir}/kolibri_test.img")
|
|
qemu_command = f"qemu-system-i386"
|
|
flags = ""
|
|
flags += "-nographic " # Makes it faster
|
|
flags += f"-debugcon file:{debug_log} " # 0xe9 port output
|
|
flags += "-L . " # IDK why it does not work without this
|
|
flags += "-m 128 "
|
|
flags += f"-drive format=raw,file={test_dir}/kolibri_test.img,index=0,if=floppy -boot a "
|
|
flags += "-vga vmware "
|
|
flags += "-net nic,model=rtl8139 -net user "
|
|
if is_win32():
|
|
qemu_full_path = shutil.which(qemu_command)
|
|
qemu_directory = os.path.dirname(qemu_full_path)
|
|
flags += f"-L {qemu_directory} "
|
|
s = f"{qemu_command} {flags}"
|
|
qemu_stdout = open(f"{test_dir}/qemu_stdout.log", "w")
|
|
qemu_stderr = open(f"{test_dir}/qemu_stderr.log", "w")
|
|
if is_win32():
|
|
return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
|
|
else:
|
|
a = shlex.split(s)
|
|
return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True)
|
|
|
|
def run(root_dir, test_dir):
|
|
debug_log = f"{test_dir}/debug.log"
|
|
if os.path.exists(debug_log):
|
|
os.remove(debug_log)
|
|
popen = run_qemu(root_dir, test_dir, debug_log)
|
|
return Qemu(popen, debug_log)
|
|
|