提交 be8a5129 编写于 作者: C Cleber Rosa

Python 3 port: implement consistent open() usage style

This is an attempt to catch most (if not all) usages of open
that do not follow the context manager pattern.

Sometimes, for better readability, our own `genio` library is
used.
Signed-off-by: NCleber Rosa <crosa@redhat.com>
上级 eb499d54
......@@ -66,7 +66,8 @@ def add_runner_failure(test_state, new_status, message):
else:
test_state["text_output"] = message + "\n"
if test_log:
open(test_log, "a").write('\n' + message + '\n')
with open(test_log, "a") as log_file:
log_file.write('\n' + message + '\n')
# Update the results
if test_state.get("fail_reason"):
test_state["fail_reason"] = "%s\n%s" % (test_state["fail_reason"],
......
......@@ -349,19 +349,15 @@ class LogWatcher(Collectible):
self.inode = current_inode
self.size = current_size
in_messages = open(self.path)
out_messages = gzip.GzipFile(dstpath, "w")
try:
in_messages.seek(bytes_to_skip)
while True:
# Read data in manageable chunks rather than all at once.
in_data = in_messages.read(200000)
if not in_data:
break
out_messages.write(in_data)
finally:
out_messages.close()
in_messages.close()
with open(self.path) as in_messages:
with gzip.GzipFile(dstpath, "w") as out_messages:
in_messages.seek(bytes_to_skip)
while True:
# Read data in manageable chunks rather than all at once.
in_data = in_messages.read(200000)
if not in_data:
break
out_messages.write(in_data)
except ValueError as e:
log.info(e)
except (IOError, OSError):
......
......@@ -1144,11 +1144,12 @@ class SimpleTest(Test):
except process.CmdError as details:
self._log_detailed_cmd_info(details.result)
raise exceptions.TestFail(details)
for line in open(self.logfile):
if self.re_avocado_log.match(line):
raise exceptions.TestWarn("Test passed but there were warnings"
" on stdout during execution. Check "
"the log for details.")
with open(self.logfile) as logfile:
for line in logfile:
if self.re_avocado_log.match(line):
raise exceptions.TestWarn("Test passed but there were warnings"
" on stdout during execution. Check "
"the log for details.")
def test(self):
"""
......
......@@ -245,7 +245,9 @@ def load_distro(path):
:return: a dict with the distro definition data
:rtype: dict
"""
return json.loads(bz2.decompress(open(path).read()))
with open(path, 'rb') as distro_file:
json_data = json.loads(bz2.decompress(distro_file.read()))
return json_data
def load_from_tree(name, version, release, arch, package_type, path):
......
......@@ -103,27 +103,28 @@ class Replay(CLI):
max_index = 0
no_tests = 0
_tests = {}
for line in open(path):
line = line.strip()
if line.startswith("#"):
continue
result = re_result.match(line)
if result:
if result.group(1) is None:
res = result.group(5)
if res is None:
res = "PASS"
else:
res = "ERROR"
index = int(result.group(2))
_tests[index] = {"status": res,
"test": result.group(3).rstrip()}
max_index = max(max_index, index)
continue
_no_tests = re_no_tests.match(line)
if _no_tests:
no_tests = int(_no_tests.group(1))
continue
with open(path) as tapfile:
for line in tapfile:
line = line.strip()
if line.startswith("#"):
continue
result = re_result.match(line)
if result:
if result.group(1) is None:
res = result.group(5)
if res is None:
res = "PASS"
else:
res = "ERROR"
index = int(result.group(2))
_tests[index] = {"status": res,
"test": result.group(3).rstrip()}
max_index = max(max_index, index)
continue
_no_tests = re_no_tests.match(line)
if _no_tests:
no_tests = int(_no_tests.group(1))
continue
if not (no_tests or max_index):
return None
......
......@@ -173,8 +173,8 @@ class Asset(object):
if not os.path.isfile(self.hashfile):
self._compute_hash()
with open(self.hashfile, 'r') as f:
for line in f.readlines():
with open(self.hashfile, 'r') as asset_file:
for line in asset_file:
# md5 is 32 chars big and sha512 is 128 chars big.
# others supported algorithms are between those.
pattern = '%s [a-f0-9]{32,128}' % self.algorithm
......
......@@ -44,10 +44,11 @@ def _get_cpu_info():
:rtype: `list`
"""
cpuinfo = []
for line in open('/proc/cpuinfo').readlines():
if line == '\n':
break
cpuinfo.append(line)
with open('/proc/cpuinfo') as proc_cpuinfo:
for line in proc_cpuinfo:
if line == '\n':
break
cpuinfo.append(line)
return cpuinfo
......@@ -60,8 +61,9 @@ def _get_cpu_status(cpu):
:returns: `bool` True if online or False if not
:rtype: 'bool'
"""
if '1' in open('/sys/devices/system/cpu/cpu%s/online' % cpu).read():
return True
with open('/sys/devices/system/cpu/cpu%s/online' % cpu) as online:
if '1' in online.read():
return True
return False
......@@ -134,9 +136,10 @@ def cpu_online_list():
Reports a list of indexes of the online cpus
"""
cpus = []
for line in open('/proc/cpuinfo', 'r'):
if line.startswith('processor'):
cpus.append(int(line.split()[2])) # grab cpu number
with open('/proc/cpuinfo', 'r') as proc_cpuinfo:
for line in proc_cpuinfo:
if line.startswith('processor'):
cpus.append(int(line.split()[2])) # grab cpu number
return cpus
......
......@@ -56,21 +56,21 @@ def hash_file(filename, size=None, algorithm="md5"):
if not size or size > fsize:
size = fsize
f = open(filename, 'rb')
try:
hash_obj = hash_wrapper(algorithm=algorithm)
except ValueError:
logging.error("Unknown hash algorithm %s, returning None", algorithm)
while size > 0:
if chunksize > size:
chunksize = size
data = f.read(chunksize)
if len(data) == 0:
logging.debug("Nothing left to read but size=%d", size)
break
hash_obj.update(data)
size -= len(data)
f.close()
with open(filename, 'rb') as file_to_hash:
while size > 0:
if chunksize > size:
chunksize = size
data = file_to_hash.read(chunksize)
if len(data) == 0:
logging.debug("Nothing left to read but size=%d", size)
break
hash_obj.update(data)
size -= len(data)
return hash_obj.hexdigest()
......@@ -164,9 +164,10 @@ class Probe(object):
"""
if self.check_name_for_file_contains():
if os.path.exists(self.CHECK_FILE):
for line in open(self.CHECK_FILE).readlines():
if self.CHECK_FILE_CONTAINS in line:
return self.CHECK_FILE_DISTRO_NAME
with open(self.CHECK_FILE) as check_file:
for line in check_file:
if self.CHECK_FILE_CONTAINS in line:
return self.CHECK_FILE_DISTRO_NAME
def check_version(self):
"""
......@@ -185,12 +186,12 @@ class Probe(object):
Returns the match result for the version regex on the file content
"""
if self.check_version():
if os.path.exists(self.CHECK_FILE):
version_file_content = open(self.CHECK_FILE).read()
else:
if not os.path.exists(self.CHECK_FILE):
return None
return self.CHECK_VERSION_REGEX.match(version_file_content)
with open(self.CHECK_FILE) as version_file:
version_file_content = version_file.read()
return self.CHECK_VERSION_REGEX.match(version_file_content)
def version(self):
"""
......@@ -366,10 +367,11 @@ class SUSEProbe(Probe):
version_id_re = re.compile(r'VERSION_ID="([\d\.]*)"')
version_id = None
for line in open(self.CHECK_FILE).readlines():
match = version_id_re.match(line)
if match:
version_id = match.group(1)
with open(self.check_file) as check_file:
for line in check_file:
match = version_id_re.match(line)
if match:
version_id = match.group(1)
if version_id:
version_parts = version_id.split('.')
......
......@@ -67,11 +67,8 @@ def url_download(url, filename, data=None, timeout=300):
src_file = url_open(url, data=data, timeout=timeout)
try:
dest_file = open(filename, 'wb')
try:
with open(filename, 'wb') as dest_file:
shutil.copyfileobj(src_file, dest_file)
finally:
dest_file.close()
finally:
src_file.close()
......@@ -90,31 +87,29 @@ def url_download_interactive(url, output_file, title='', chunk_size=102400):
:param chunk_size: amount of data to read at a time
"""
output_dir = os.path.dirname(output_file)
output_file = open(output_file, 'w+b')
input_file = urlopen(url)
with open(output_file, 'w+b') as open_output_file:
input_file = urlopen(url)
try:
file_size = int(input_file.headers['Content-Length'])
except KeyError:
raise ValueError('Could not find file size in HTTP headers')
logging.info('Downloading %s, %s to %s', os.path.basename(url),
output.display_data_size(file_size), output_dir)
progress_bar = output.ProgressBar(maximum=file_size, title=title)
# Download the file, while interactively updating the progress
progress_bar.draw()
while True:
data = input_file.read(chunk_size)
if data:
progress_bar.append_amount(len(data))
output_file.write(data)
else:
progress_bar.update_amount(file_size)
break
output_file.close()
try:
file_size = int(input_file.headers['Content-Length'])
except KeyError:
raise ValueError('Could not find file size in HTTP headers')
logging.info('Downloading %s, %s to %s', os.path.basename(url),
output.display_data_size(file_size), output_dir)
progress_bar = output.ProgressBar(maximum=file_size, title=title)
# Download the file, while interactively updating the progress
progress_bar.draw()
while True:
data = input_file.read(chunk_size)
if data:
progress_bar.append_amount(len(data))
open_output_file.write(data)
else:
progress_bar.update_amount(file_size)
break
def _get_file(src, dst, permissions=None):
......
......@@ -91,9 +91,11 @@ def can_mount():
logging.debug('Can not use mount: missing "mount" tool')
return False
if 'iso9660' not in open('/proc/filesystems').read():
process.system("modprobe iso9660", ignore_status=True, sudo=True)
if 'iso9660' not in open('/proc/filesystems').read():
with open('/proc/filesystems') as proc_filesystems:
if 'iso9660' not in proc_filesystems.read():
process.system("modprobe iso9660", ignore_status=True, sudo=True)
with open('/proc/filesystems') as proc_filesystems:
if 'iso9660' not in proc_filesystems.read():
logging.debug('Can not use mount: lack of iso9660 kernel support')
return False
......@@ -150,9 +152,8 @@ class BaseIso9660(object):
:rtype: None
"""
content = self.read(src)
output = open(dst, 'w+b')
output.write(content)
output.close()
with open(dst, 'w+b') as output:
output.write(content)
def mnt_dir(self):
"""
......@@ -283,10 +284,11 @@ class Iso9660IsoRead(MixInMntDirMount, BaseIso9660):
self.temp_dir = tempfile.mkdtemp(prefix='avocado_' + __name__)
def read(self, path):
temp_file = os.path.join(self.temp_dir, path)
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, path, temp_file)
temp_path = os.path.join(self.temp_dir, path)
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, path, temp_path)
process.run(cmd)
return open(temp_file).read()
with open(temp_path) as temp_file:
return temp_file.read()
def copy(self, src, dst):
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, src, dst)
......@@ -329,7 +331,8 @@ class Iso9660Mount(BaseIso9660):
:rtype: str
"""
full_path = os.path.join(self.mnt_dir, path)
return open(full_path).read()
with open(full_path) as file_to_read:
return file_to_read.read()
def copy(self, src, dst):
"""
......
......@@ -222,17 +222,18 @@ def check_kernel_config(config_name):
kernel_version = platform.uname()[2]
config_file = '/boot/config-' + kernel_version
for line in open(config_file, 'r'):
line = line.split('=')
if len(line) != 2:
continue
config = line[0].strip()
if config == config_name:
option = line[1].strip()
if option == "m":
return MODULE
else:
return BUILTIN
with open(config_file, 'r') as kernel_config:
for line in kernel_config:
line = line.split('=')
if len(line) != 2:
continue
config = line[0].strip()
if config == config_name:
option = line[1].strip()
if option == "m":
return MODULE
else:
return BUILTIN
return NOT_SET
......@@ -46,10 +46,10 @@ def _check_memory_state(block):
:rtype: bool
"""
def _is_online():
with open('/sys/devices/system/memory/memory%s/state' % block, 'r') as state_file:
if state_file.read() == 'online\n':
return True
return False
path = '/sys/devices/system/memory/memory%s/state' % block
if genio.read_file(path) == 'online\n':
return True
return False
return wait.wait_for(_is_online, timeout=120, step=1) or False
......@@ -75,8 +75,8 @@ def is_hot_pluggable(block):
:return: True if hotpluggable, else False
:rtype: 'bool'
"""
with open('/sys/devices/system/memory/memory%s/removable' % block, 'r') as file_obj:
return bool(int(file_obj.read()))
path = '/sys/devices/system/memory/memory%s/removable' % block
return bool(int(genio.read_file(path)))
def hotplug(block):
......@@ -136,11 +136,11 @@ def memtotal_sys():
no_memblocks = 0
for directory in os.listdir(sys_mempath):
if directory.startswith('memory'):
if open(os.path.join(sys_mempath, directory, 'online'), "r").read().strip() == '1':
path = os.path.join(sys_mempath, directory, 'online')
if genio.read_file(path).strip() == '1':
no_memblocks += 1
block_size = int(open(os.path.join(sys_mempath,
'block_size_bytes'),
"r").read().strip(), 16)
path = os.path.join(sys_mempath, 'block_size_bytes')
block_size = int(genio.read_file(path).strip(), 16)
return (no_memblocks * block_size)/1024.0
......@@ -266,10 +266,9 @@ def read_from_vmstat(key):
:return: The value of the item
:rtype: int
"""
vmstat = open("/proc/vmstat")
vmstat_info = vmstat.read()
vmstat.close()
return int(re.findall("%s\s+(\d+)" % key, vmstat_info)[0])
with open("/proc/vmstat") as vmstat:
vmstat_info = vmstat.read()
return int(re.findall("%s\s+(\d+)" % key, vmstat_info)[0])
def read_from_smaps(pid, key):
......@@ -283,15 +282,14 @@ def read_from_smaps(pid, key):
:return: The value of the item in kb
:rtype: int
"""
smaps = open("/proc/%s/smaps" % pid)
smaps_info = smaps.read()
smaps.close()
with open("/proc/%s/smaps" % pid) as smaps:
smaps_info = smaps.read()
memory_size = 0
for each_number in re.findall("%s:\s+(\d+)" % key, smaps_info):
memory_size += int(each_number)
memory_size = 0
for each_number in re.findall("%s:\s+(\d+)" % key, smaps_info):
memory_size += int(each_number)
return memory_size
return memory_size
def read_from_numa_maps(pid, key):
......@@ -306,16 +304,15 @@ def read_from_numa_maps(pid, key):
:return: A dict using the address as the keys
:rtype: dict
"""
numa_maps = open("/proc/%s/numa_maps" % pid)
numa_map_info = numa_maps.read()
numa_maps.close()
with open("/proc/%s/numa_maps" % pid) as numa_maps:
numa_map_info = numa_maps.read()
numa_maps_dict = {}
numa_pattern = r"(^[\dabcdfe]+)\s+.*%s[=:](\d+)" % key
for address, number in re.findall(numa_pattern, numa_map_info, re.M):
numa_maps_dict[address] = number
numa_maps_dict = {}
numa_pattern = r"(^[\dabcdfe]+)\s+.*%s[=:](\d+)" % key
for address, number in re.findall(numa_pattern, numa_map_info, re.M):
numa_maps_dict[address] = number
return numa_maps_dict
return numa_maps_dict
def get_buddy_info(chunk_sizes, nodes="all", zones="all"):
......@@ -346,46 +343,45 @@ def get_buddy_info(chunk_sizes, nodes="all", zones="all"):
:return: A dict using the chunk_size as the keys
:rtype: dict
"""
buddy_info = open("/proc/buddyinfo")
buddy_info_content = buddy_info.read()
buddy_info.close()
re_buddyinfo = "Node\s+"
if nodes == "all":
re_buddyinfo += "(\d+)"
else:
re_buddyinfo += "(%s)" % "|".join(nodes.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find Nodes %s" % nodes)
return None
re_buddyinfo += ".*?zone\s+"
if zones == "all":
re_buddyinfo += "(\w+)"
else:
re_buddyinfo += "(%s)" % "|".join(zones.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find zones %s" % zones)
return None
re_buddyinfo += "\s+([\s\d]+)"
buddy_list = re.findall(re_buddyinfo, buddy_info_content)
if re.findall("[<>=]", chunk_sizes) and buddy_list:
size_list = range(len(buddy_list[-1][-1].strip().split()))
chunk_sizes = [str(_) for _ in size_list if eval("%s %s" % (_,
chunk_sizes))]
chunk_sizes = ' '.join(chunk_sizes)
buddyinfo_dict = {}
for chunk_size in chunk_sizes.split():
buddyinfo_dict[chunk_size] = 0
for _, _, chunk_info in buddy_list:
chunk_info = chunk_info.strip().split()[int(chunk_size)]
buddyinfo_dict[chunk_size] += int(chunk_info)
return buddyinfo_dict
with open("/proc/buddyinfo") as buddy_info:
buddy_info_content = buddy_info.read()
re_buddyinfo = "Node\s+"
if nodes == "all":
re_buddyinfo += "(\d+)"
else:
re_buddyinfo += "(%s)" % "|".join(nodes.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find Nodes %s" % nodes)
return None
re_buddyinfo += ".*?zone\s+"
if zones == "all":
re_buddyinfo += "(\w+)"
else:
re_buddyinfo += "(%s)" % "|".join(zones.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find zones %s" % zones)
return None
re_buddyinfo += "\s+([\s\d]+)"
buddy_list = re.findall(re_buddyinfo, buddy_info_content)
if re.findall("[<>=]", chunk_sizes) and buddy_list:
size_list = range(len(buddy_list[-1][-1].strip().split()))
chunk_sizes = [str(_) for _ in size_list if eval("%s %s" % (_,
chunk_sizes))]
chunk_sizes = ' '.join(chunk_sizes)
buddyinfo_dict = {}
for chunk_size in chunk_sizes.split():
buddyinfo_dict[chunk_size] = 0
for _, _, chunk_info in buddy_list:
chunk_info = chunk_info.strip().split()[int(chunk_size)]
buddyinfo_dict[chunk_size] += int(chunk_info)
return buddyinfo_dict
def set_thp_value(feature, value):
......
......@@ -137,11 +137,12 @@ class Partition(object):
"""
# Try to match this device/mountpoint
if filename:
for line in open(filename):
parts = line.split()
if parts[0] == self.device or parts[1] == self.mountpoint:
return parts[1] # The mountpoint where it's mounted
return None
with open(filename) as open_file:
for line in open_file:
parts = line.split()
if parts[0] == self.device or parts[1] == self.mountpoint:
return parts[1] # The mountpoint where it's mounted
return None
# no specific file given, look in /proc/mounts
res = self.get_mountpoint(filename='/proc/mounts')
......
......@@ -111,9 +111,8 @@ class PathInspector(object):
def get_first_line(self):
first_line = ""
if os.path.isfile(self.path):
checked_file = open(self.path, "r")
first_line = checked_file.readline()
checked_file.close()
with open(self.path, 'r') as open_file:
first_line = open_file.readline()
return first_line
def has_exec_permission(self):
......
......@@ -38,6 +38,7 @@ except ImportError:
from . import gdb
from . import runtime
from . import path
from . import genio
log = logging.getLogger('avocado.test')
stdout_log = logging.getLogger('avocado.test.stdout')
......@@ -853,9 +854,8 @@ class GDBSubProcess(object):
:rtype: str
"""
os.mkfifo(path)
f = open(path, 'r')
c = f.read(1)
f.close()
with open(path, 'r') as fifo_file:
c = fifo_file.read(1)
os.unlink(path)
return c
......@@ -865,10 +865,9 @@ class GDBSubProcess(object):
binary_name = os.path.basename(self.binary)
script_name = '%s.gdb.connect_commands' % binary_name
path = os.path.join(current_test.outputdir, script_name)
cmds = open(path, 'w')
cmds.write('file %s\n' % os.path.abspath(self.binary))
cmds.write('target extended-remote :%s\n' % self.gdb_server.port)
cmds.close()
with open(path, 'w') as cmds_file:
cmds_file('file %s\n' % os.path.abspath(self.binary))
cmds_file('target extended-remote :%s\n' % self.gdb_server.port)
return path
def generate_gdb_connect_sh(self):
......@@ -886,11 +885,10 @@ class GDBSubProcess(object):
script_name = '%s.gdb.sh' % binary_name
script_path = os.path.join(current_test.outputdir, script_name)
script = open(script_path, 'w')
script.write("#!/bin/sh\n")
script.write("%s -x %s\n" % (gdb.GDB_PATH, cmds))
script.write("echo -n 'C' > %s\n" % fifo_path)
script.close()
with open(script_path, 'w') as script_file:
script_file.write("#!/bin/sh\n")
script_file.write("%s -x %s\n" % (gdb.GDB_PATH, cmds))
script_file.write("echo -n 'C' > %s\n" % fifo_path)
os.chmod(script_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return (script_path, fifo_path)
......@@ -1043,8 +1041,7 @@ class GDBSubProcess(object):
gdb.GDB_PRERUN_COMMANDS.get('', None))
if prerun_commands_path is not None:
prerun_commands = open(prerun_commands_path).readlines()
for command in prerun_commands:
for command in genio.read_all_lines(prerun_commands_path):
self.gdb.cmd(command)
def run(self, timeout=None):
......@@ -1073,10 +1070,10 @@ class GDBSubProcess(object):
if current_test is not None:
if os.path.exists(self.gdb_server.stdout_path):
shutil.copy(self.gdb_server.stdout_path, stdout_path)
self.result.stdout = open(stdout_path, 'r').read()
self.result.stdout = genio.read_file(stdout_path)
if os.path.exists(self.gdb_server.stderr_path):
shutil.copy(self.gdb_server.stderr_path, stderr_path)
self.result.stderr = open(stderr_path, 'r').read()
self.result.stderr = genio.read_file(stderr_path)
self.gdb_server.exit()
return self.result
......
......@@ -13,6 +13,7 @@ sys.path.insert(0, root_path)
from avocado.utils import path
from avocado.utils import process
from avocado.utils import genio
# Flag that tells if the docs are being built on readthedocs.org
ON_RTD = os.environ.get('READTHEDOCS', None) == 'True'
......@@ -40,7 +41,7 @@ API_SECTIONS = {"Test APIs": (None,
("modules.rst", )),
"Utilities APIs": ("utils",
open("api_utils_heading", "r").read(),
genio.read_file("api_utils_heading"),
"utils",
("core", "plugins"),
("avocado.rst", "modules.rst")),
......@@ -93,14 +94,14 @@ for (section, params) in API_SECTIONS.iteritems():
if not details.errno == errno.EEXIST:
raise
else:
main_rst_content = open(main_rst).readlines()
with open(main_rst) as main_rst_file:
main_rst_content = main_rst_file.readlines()
new_main_rst_content = [section, "=" * len(section), "",
params[1], ""]
new_main_rst = open(main_rst, "w")
new_main_rst.write("\n".join(new_main_rst_content))
new_main_rst.write("".join(main_rst_content[2:]))
new_main_rst.close()
with open(main_rst, "w") as new_main_rst:
new_main_rst.write("\n".join(new_main_rst_content))
new_main_rst.write("".join(main_rst_content[2:]))
# Generate optional-plugins
optional_plugins_path = os.path.join(root_path, "optional_plugins")
......@@ -150,7 +151,7 @@ project = u'Avocado'
copyright = u'2014-2015, Red Hat'
version_file = os.path.join(root_path, 'VERSION')
VERSION = open(version_file, 'r').read().strip()
VERSION = genio.read_file(version_file).strip()
version = VERSION
release = VERSION
......
......@@ -7,6 +7,7 @@ from six.moves import xrange as range
from avocado import Test
from avocado import main
from avocado.utils import gdb
from avocado.utils import genio
from avocado.utils import process
......@@ -358,7 +359,7 @@ class GdbTest(Test):
s.exit()
self.assertTrue(os.path.exists(s.stderr_path))
stderr_lines = open(s.stderr_path, 'r').readlines()
stderr_lines = genio.read_all_lines(s.stderr_path)
listening_line = "Listening on port %s\n" % s.port
self.assertIn(listening_line, stderr_lines)
......@@ -374,7 +375,7 @@ class GdbTest(Test):
self.assertTrue(os.path.exists(s.stdout_path))
self.assertTrue(os.path.exists(s.stderr_path))
stdout_lines = open(s.stdout_path, 'r').readlines()
stdout_lines = genio.read_all_lines(s.stdout_path)
self.assertIn("return 99\n", stdout_lines)
def test_interactive_stdout(self):
......
......@@ -4,6 +4,7 @@ import os
from avocado import Test
from avocado import main
from avocado.utils import genio
class GenDataTest(Test):
......@@ -23,8 +24,7 @@ class GenDataTest(Test):
dmesg_path = os.path.join(self.job.logdir, "sysinfo", "pre", "dmesg_-c")
self.log.info("dmesg_path: %s", dmesg_path)
if os.path.exists(dmesg_path):
dmesg = open(dmesg_path)
text = dmesg.readlines()[0:50]
text = genio.read_all_lines(dmesg_path)[0:50]
bsod = Image.new("RGB", (640, 480), "blue")
draw = ImageDraw.Draw(bsod)
......@@ -39,7 +39,8 @@ class GenDataTest(Test):
output_path = os.path.join(self.outputdir, "test.json")
output = {"basedir": self.basedir,
"outputdir": self.outputdir}
json.dump(output, open(output_path, "w"))
with open(output_path, "w") as output_file:
json.dump(output, output_file)
if __name__ == "__main__":
......
......@@ -2,6 +2,7 @@ import os
import sys
from avocado import Test
from avocado.utils import genio
class Env(Test):
......@@ -15,7 +16,7 @@ class Env(Test):
def get_proc_content(rel_path):
try:
return open(os.path.join(p_dir, rel_path)).read().strip()
return genio.read_file(os.path.join(p_dir, rel_path)).strip()
except:
return "<NOT AVAILABLE>"
......
......@@ -25,9 +25,9 @@ class WhiteBoard(Test):
if data_file:
self.log.info('Writing data to whiteboard from file: %s',
data_file)
whiteboard_file = open(data_file, 'r')
size = int(data_size)
data = whiteboard_file.read(size)
with open(data_file, 'r') as whiteboard_file:
size = int(data_size)
data = whiteboard_file.read(size)
else:
offset = int(data_size) - 1
data = self.params.get('whiteboard_data_text',
......
......@@ -23,6 +23,7 @@ from six.moves import xrange as range
from avocado.core import exit_codes
from avocado.utils import astring
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
from avocado.utils import path as utils_path
......@@ -526,7 +527,7 @@ class RunnerOperationTest(unittest.TestCase):
" foo:bar:b foo:baz:c bar:bar:bar --dry-run" % AVOCADO)
result = json.loads(process.run(cmd).stdout)
debuglog = result['debuglog']
log = open(debuglog, 'r').read()
log = genio.read_file(debuglog)
# Remove the result dir
shutil.rmtree(os.path.dirname(os.path.dirname(debuglog)))
self.assertIn(tempfile.gettempdir(), debuglog) # Use tmp dir, not default location
......@@ -839,9 +840,10 @@ class RunnerSimpleTest(unittest.TestCase):
"1.")
sleep_dir = astring.string_to_safe_path("1-60")
debug_log = os.path.join(self.tmpdir, "latest", "test-results",
sleep_dir, "debug.log")
debug_log = open(debug_log).read()
debug_log_path = os.path.join(self.tmpdir, "latest", "test-results",
sleep_dir, "debug.log")
debug_log = genio.read_file(debug_log_path)
self.assertIn("Runner error occurred: Timeout reached", debug_log,
"Runner error occurred: Timeout reached message not "
"in the test's debug.log:\n%s" % debug_log)
......
......@@ -6,6 +6,7 @@ import xml.dom.minidom
import unittest
from avocado.core import exit_codes
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
......@@ -98,8 +99,8 @@ class JobTimeOutTest(unittest.TestCase):
def _check_timeout_msg(self, idx):
res_dir = os.path.join(self.tmpdir, "latest", "test-results")
debug_log = glob.glob(os.path.join(res_dir, "%s-*" % idx, "debug.log"))
debug_log = open(debug_log[0]).read()
debug_log_paths = glob.glob(os.path.join(res_dir, "%s-*" % idx, "debug.log"))
debug_log = genio.read_file(debug_log_paths[0])
self.assertIn("Runner error occurred: Timeout reached", debug_log,
"Runner error occurred: Timeout reached message not "
"in the %sst test's debug.log:\n%s"
......
......@@ -10,6 +10,7 @@ import pkg_resources
from avocado.core import exit_codes
from avocado.core.output import TermSupport
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
from avocado.utils import path as utils_path
......@@ -165,15 +166,18 @@ class OutputTest(unittest.TestCase):
def _check_output(path, exps, name):
i = 0
end = len(exps)
for line in open(path):
if exps[i] in line:
i += 1
if i == end:
break
self.assertEqual(i, end, "Failed to find %sth message from\n%s\n"
"\nin the %s. Either it's missing or in wrong "
"order.\n%s" % (i, "\n".join(exps), name,
open(path).read()))
with open(path) as output_file:
output_file_content = output_file.read()
output_file.seek(0)
for line in output_file:
if exps[i] in line:
i += 1
if i == end:
break
self.assertEqual(i, end, "Failed to find %sth message from\n%s\n"
"\nin the %s. Either it's missing or in wrong "
"order.\n%s" % (i, "\n".join(exps), name,
output_file_content))
test = script.Script(os.path.join(self.tmpdir, "output_test.py"),
OUTPUT_TEST_CONTENT)
test.save()
......@@ -189,10 +193,12 @@ class OutputTest(unittest.TestCase):
"[stderr] test_stderr", "[stdout] test_process"]
_check_output(joblog, exps, "job.log")
testdir = res["tests"][0]["logdir"]
self.assertEqual("test_print\ntest_stdout\ntest_process__test_stdout__",
open(os.path.join(testdir, "stdout")).read())
self.assertEqual("test_stderr\n__test_stderr__",
open(os.path.join(testdir, "stderr")).read())
with open(os.path.join(testdir, "stdout")) as stdout_file:
self.assertEqual("test_print\ntest_stdout\ntest_process__test_stdout__",
stdout_file.read())
with open(os.path.join(testdir, "stderr")) as stderr_file:
self.assertEqual("test_stderr\n__test_stderr__",
stderr_file.read())
# Now run the same test, but with combined output
# combined output can not keep track of sys.stdout and sys.stdout
......@@ -205,8 +211,9 @@ class OutputTest(unittest.TestCase):
"--json - -- %s" % (AVOCADO, self.tmpdir, test))
res = json.loads(result.stdout)
testdir = res["tests"][0]["logdir"]
self.assertEqual("test_process__test_stderr____test_stdout__",
open(os.path.join(testdir, "output")).read())
with open(os.path.join(testdir, "output")) as output_file:
self.assertEqual("test_process__test_stderr____test_stdout__",
output_file.read())
def test_check_record_no_module_default(self):
"""
......@@ -276,20 +283,21 @@ class OutputPluginTest(unittest.TestCase):
def check_output_files(self, debug_log):
base_dir = os.path.dirname(debug_log)
json_output = os.path.join(base_dir, 'results.json')
self.assertTrue(os.path.isfile(json_output))
with open(json_output, 'r') as fp:
json_output_path = os.path.join(base_dir, 'results.json')
self.assertTrue(os.path.isfile(json_output_path))
with open(json_output_path, 'r') as fp:
json.load(fp)
xunit_output = os.path.join(base_dir, 'results.xml')
self.assertTrue(os.path.isfile(json_output))
xunit_output_path = os.path.join(base_dir, 'results.xml')
self.assertTrue(os.path.isfile(json_output_path))
try:
minidom.parse(xunit_output)
minidom.parse(xunit_output_path)
except Exception as details:
xunit_output_content = genio.read_file(xunit_output_path)
raise AssertionError("Unable to parse xunit output: %s\n\n%s"
% (details, open(xunit_output).read()))
% (details, xunit_output_content))
tap_output = os.path.join(base_dir, "results.tap")
self.assertTrue(os.path.isfile(tap_output))
tap = open(tap_output).read()
tap = genio.read_file(tap_output)
self.assertIn("..", tap)
self.assertIn("\n# debug.log of ", tap)
......
......@@ -5,6 +5,7 @@ import tempfile
import unittest
from avocado.core import exit_codes
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
......@@ -119,9 +120,10 @@ class TestSkipDecorators(unittest.TestCase):
self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
self.assertEqual(json_results['skip'], 3)
self.assertFalse('setup executed' in open(debuglog, 'r').read())
self.assertFalse('test executed' in open(debuglog, 'r').read())
self.assertFalse('teardown executed' in open(debuglog, 'r').read())
debuglog_contents = genio.read_file(debuglog)
self.assertFalse('setup executed' in debuglog_contents)
self.assertFalse('test executed' in debuglog_contents)
self.assertFalse('teardown executed' in debuglog_contents)
def test_skip_setup(self):
os.chdir(basedir)
......
......@@ -4,6 +4,7 @@ import shutil
import tempfile
import unittest
from avocado.utils import genio
from avocado.utils import process
......@@ -183,7 +184,7 @@ class TestStatuses(unittest.TestCase):
self.assertEqual(expected[0], test['status'],
"Status error: '%s' != '%s' (%s)" %
(expected[0], test['status'], variant))
debug_log = open(test['logfile'], 'r').read()
debug_log = genio.read_file(test['logfile'])
for msg in expected[1]:
self.assertIn(msg, debug_log,
"Message '%s' should be in the log (%s)."
......
......@@ -110,8 +110,9 @@ class JobTest(unittest.TestCase):
myjob.run_tests()
finally:
myjob.post_tests()
self.assertEqual(myjob.unique_id[::-1],
open(os.path.join(myjob.logdir, "reversed_id")).read())
with open(os.path.join(myjob.logdir, "reversed_id")) as reverse_id_file:
self.assertEqual(myjob.unique_id[::-1],
reverse_id_file.read())
def test_job_run(self):
class JobFilterLog(job.Job):
......@@ -135,8 +136,9 @@ class JobTest(unittest.TestCase):
self.assertEqual(myjob.run(),
exit_codes.AVOCADO_ALL_OK)
self.assertLessEqual(len(myjob.test_suite), 1)
self.assertEqual(myjob.unique_id[::-1],
open(os.path.join(myjob.logdir, "reversed_id")).read())
with open(os.path.join(myjob.logdir, "reversed_id")) as reverse_id_file:
self.assertEqual(myjob.unique_id[::-1],
reverse_id_file.read())
def test_job_run_account_time(self):
args = argparse.Namespace(base_logdir=self.tmpdir)
......
......@@ -50,10 +50,14 @@ class TestPartition(unittest.TestCase):
self.assertEqual(None, self.disk.get_mountpoint())
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
self.assertEqual(self.mountpoint, self.disk.get_mountpoint())
self.disk.unmount()
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.assertEqual(self.mountpoint, self.disk.get_mountpoint())
self.disk.unmount()
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
@unittest.skipIf(not process.can_sudo('kill -l'),
"requires running kill as a privileged user")
......@@ -61,37 +65,51 @@ class TestPartition(unittest.TestCase):
""" Test force-unmount feature """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
proc = process.SubProcess("cd %s; while :; do echo a > a; rm a; done"
% self.mountpoint, shell=True)
proc.start()
self.assertTrue(self.disk.unmount())
self.assertEqual(proc.poll(), -9) # Process should be killed -9
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
proc = process.SubProcess("cd %s; while :; do echo a > a; rm a; done"
% self.mountpoint, shell=True)
proc.start()
self.assertTrue(self.disk.unmount())
self.assertEqual(proc.poll(), -9) # Process should be killed -9
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
def test_double_mount(self):
""" Check the attempt for second mount fails """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
self.assertRaises(partition.PartitionError, self.disk.mount)
self.assertIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.assertRaises(partition.PartitionError, self.disk.mount)
self.assertIn(self.mountpoint, proc_mounts)
def test_double_umount(self):
""" Check double unmount works well """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.disk.unmount()
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
self.disk.unmount()
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
def test_format_mounted(self):
""" Check format on mounted device fails """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.assertRaises(partition.PartitionError, self.disk.mkfs)
def tearDown(self):
......
......@@ -21,7 +21,8 @@ import sys
from setuptools import setup, find_packages
BASE_PATH = os.path.dirname(__file__)
VERSION = open(os.path.join(BASE_PATH, 'VERSION'), 'r').read().strip()
with open(os.path.join(BASE_PATH, 'VERSION'), 'r') as version_file:
VERSION = version_file.read().strip()
VIRTUAL_ENV = (hasattr(sys, 'real_prefix') or 'VIRTUAL_ENV' in os.environ)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册