未验证 提交 22788bae 编写于 作者: A Amador Pahim

Merge branch 'clebergnu-python3_open_v2'

Signed-off-by: NAmador Pahim <apahim@redhat.com>
......@@ -66,7 +66,8 @@ def add_runner_failure(test_state, new_status, message):
else:
test_state["text_output"] = message + "\n"
if test_log:
open(test_log, "a").write('\n' + message + '\n')
with open(test_log, "a") as log_file:
log_file.write('\n' + message + '\n')
# Update the results
if test_state.get("fail_reason"):
test_state["fail_reason"] = "%s\n%s" % (test_state["fail_reason"],
......
......@@ -349,19 +349,15 @@ class LogWatcher(Collectible):
self.inode = current_inode
self.size = current_size
in_messages = open(self.path)
out_messages = gzip.GzipFile(dstpath, "w")
try:
in_messages.seek(bytes_to_skip)
while True:
# Read data in manageable chunks rather than all at once.
in_data = in_messages.read(200000)
if not in_data:
break
out_messages.write(in_data)
finally:
out_messages.close()
in_messages.close()
with open(self.path) as in_messages:
with gzip.GzipFile(dstpath, "w") as out_messages:
in_messages.seek(bytes_to_skip)
while True:
# Read data in manageable chunks rather than all at once.
in_data = in_messages.read(200000)
if not in_data:
break
out_messages.write(in_data)
except ValueError as e:
log.info(e)
except (IOError, OSError):
......
......@@ -1144,11 +1144,12 @@ class SimpleTest(Test):
except process.CmdError as details:
self._log_detailed_cmd_info(details.result)
raise exceptions.TestFail(details)
for line in open(self.logfile):
if self.re_avocado_log.match(line):
raise exceptions.TestWarn("Test passed but there were warnings"
" on stdout during execution. Check "
"the log for details.")
with open(self.logfile) as logfile:
for line in logfile:
if self.re_avocado_log.match(line):
raise exceptions.TestWarn("Test passed but there were warnings"
" on stdout during execution. Check "
"the log for details.")
def test(self):
"""
......
......@@ -245,7 +245,9 @@ def load_distro(path):
:return: a dict with the distro definition data
:rtype: dict
"""
return json.loads(bz2.decompress(open(path).read()))
with open(path, 'rb') as distro_file:
json_data = json.loads(bz2.decompress(distro_file.read()))
return json_data
def load_from_tree(name, version, release, arch, package_type, path):
......
......@@ -103,27 +103,28 @@ class Replay(CLI):
max_index = 0
no_tests = 0
_tests = {}
for line in open(path):
line = line.strip()
if line.startswith("#"):
continue
result = re_result.match(line)
if result:
if result.group(1) is None:
res = result.group(5)
if res is None:
res = "PASS"
else:
res = "ERROR"
index = int(result.group(2))
_tests[index] = {"status": res,
"test": result.group(3).rstrip()}
max_index = max(max_index, index)
continue
_no_tests = re_no_tests.match(line)
if _no_tests:
no_tests = int(_no_tests.group(1))
continue
with open(path) as tapfile:
for line in tapfile:
line = line.strip()
if line.startswith("#"):
continue
result = re_result.match(line)
if result:
if result.group(1) is None:
res = result.group(5)
if res is None:
res = "PASS"
else:
res = "ERROR"
index = int(result.group(2))
_tests[index] = {"status": res,
"test": result.group(3).rstrip()}
max_index = max(max_index, index)
continue
_no_tests = re_no_tests.match(line)
if _no_tests:
no_tests = int(_no_tests.group(1))
continue
if not (no_tests or max_index):
return None
......
......@@ -173,8 +173,8 @@ class Asset(object):
if not os.path.isfile(self.hashfile):
self._compute_hash()
with open(self.hashfile, 'r') as f:
for line in f.readlines():
with open(self.hashfile, 'r') as asset_file:
for line in asset_file:
# md5 is 32 chars big and sha512 is 128 chars big.
# others supported algorithms are between those.
pattern = '%s [a-f0-9]{32,128}' % self.algorithm
......
......@@ -44,10 +44,11 @@ def _get_cpu_info():
:rtype: `list`
"""
cpuinfo = []
for line in open('/proc/cpuinfo').readlines():
if line == '\n':
break
cpuinfo.append(line)
with open('/proc/cpuinfo') as proc_cpuinfo:
for line in proc_cpuinfo:
if line == '\n':
break
cpuinfo.append(line)
return cpuinfo
......@@ -60,8 +61,9 @@ def _get_cpu_status(cpu):
:returns: `bool` True if online or False if not
:rtype: 'bool'
"""
if '1' in open('/sys/devices/system/cpu/cpu%s/online' % cpu).read():
return True
with open('/sys/devices/system/cpu/cpu%s/online' % cpu) as online:
if '1' in online.read():
return True
return False
......@@ -134,9 +136,10 @@ def cpu_online_list():
Reports a list of indexes of the online cpus
"""
cpus = []
for line in open('/proc/cpuinfo', 'r'):
if line.startswith('processor'):
cpus.append(int(line.split()[2])) # grab cpu number
with open('/proc/cpuinfo', 'r') as proc_cpuinfo:
for line in proc_cpuinfo:
if line.startswith('processor'):
cpus.append(int(line.split()[2])) # grab cpu number
return cpus
......
......@@ -56,21 +56,21 @@ def hash_file(filename, size=None, algorithm="md5"):
if not size or size > fsize:
size = fsize
f = open(filename, 'rb')
try:
hash_obj = hash_wrapper(algorithm=algorithm)
except ValueError:
logging.error("Unknown hash algorithm %s, returning None", algorithm)
while size > 0:
if chunksize > size:
chunksize = size
data = f.read(chunksize)
if len(data) == 0:
logging.debug("Nothing left to read but size=%d", size)
break
hash_obj.update(data)
size -= len(data)
f.close()
with open(filename, 'rb') as file_to_hash:
while size > 0:
if chunksize > size:
chunksize = size
data = file_to_hash.read(chunksize)
if len(data) == 0:
logging.debug("Nothing left to read but size=%d", size)
break
hash_obj.update(data)
size -= len(data)
return hash_obj.hexdigest()
......@@ -164,9 +164,10 @@ class Probe(object):
"""
if self.check_name_for_file_contains():
if os.path.exists(self.CHECK_FILE):
for line in open(self.CHECK_FILE).readlines():
if self.CHECK_FILE_CONTAINS in line:
return self.CHECK_FILE_DISTRO_NAME
with open(self.CHECK_FILE) as check_file:
for line in check_file:
if self.CHECK_FILE_CONTAINS in line:
return self.CHECK_FILE_DISTRO_NAME
def check_version(self):
"""
......@@ -185,12 +186,12 @@ class Probe(object):
Returns the match result for the version regex on the file content
"""
if self.check_version():
if os.path.exists(self.CHECK_FILE):
version_file_content = open(self.CHECK_FILE).read()
else:
if not os.path.exists(self.CHECK_FILE):
return None
return self.CHECK_VERSION_REGEX.match(version_file_content)
with open(self.CHECK_FILE) as version_file:
version_file_content = version_file.read()
return self.CHECK_VERSION_REGEX.match(version_file_content)
def version(self):
"""
......@@ -366,10 +367,11 @@ class SUSEProbe(Probe):
version_id_re = re.compile(r'VERSION_ID="([\d\.]*)"')
version_id = None
for line in open(self.CHECK_FILE).readlines():
match = version_id_re.match(line)
if match:
version_id = match.group(1)
with open(self.check_file) as check_file:
for line in check_file:
match = version_id_re.match(line)
if match:
version_id = match.group(1)
if version_id:
version_parts = version_id.split('.')
......
......@@ -67,11 +67,8 @@ def url_download(url, filename, data=None, timeout=300):
src_file = url_open(url, data=data, timeout=timeout)
try:
dest_file = open(filename, 'wb')
try:
with open(filename, 'wb') as dest_file:
shutil.copyfileobj(src_file, dest_file)
finally:
dest_file.close()
finally:
src_file.close()
......@@ -90,31 +87,29 @@ def url_download_interactive(url, output_file, title='', chunk_size=102400):
:param chunk_size: amount of data to read at a time
"""
output_dir = os.path.dirname(output_file)
output_file = open(output_file, 'w+b')
input_file = urlopen(url)
with open(output_file, 'w+b') as open_output_file:
input_file = urlopen(url)
try:
file_size = int(input_file.headers['Content-Length'])
except KeyError:
raise ValueError('Could not find file size in HTTP headers')
logging.info('Downloading %s, %s to %s', os.path.basename(url),
output.display_data_size(file_size), output_dir)
progress_bar = output.ProgressBar(maximum=file_size, title=title)
# Download the file, while interactively updating the progress
progress_bar.draw()
while True:
data = input_file.read(chunk_size)
if data:
progress_bar.append_amount(len(data))
output_file.write(data)
else:
progress_bar.update_amount(file_size)
break
output_file.close()
try:
file_size = int(input_file.headers['Content-Length'])
except KeyError:
raise ValueError('Could not find file size in HTTP headers')
logging.info('Downloading %s, %s to %s', os.path.basename(url),
output.display_data_size(file_size), output_dir)
progress_bar = output.ProgressBar(maximum=file_size, title=title)
# Download the file, while interactively updating the progress
progress_bar.draw()
while True:
data = input_file.read(chunk_size)
if data:
progress_bar.append_amount(len(data))
open_output_file.write(data)
else:
progress_bar.update_amount(file_size)
break
def _get_file(src, dst, permissions=None):
......
......@@ -91,9 +91,11 @@ def can_mount():
logging.debug('Can not use mount: missing "mount" tool')
return False
if 'iso9660' not in open('/proc/filesystems').read():
process.system("modprobe iso9660", ignore_status=True, sudo=True)
if 'iso9660' not in open('/proc/filesystems').read():
with open('/proc/filesystems') as proc_filesystems:
if 'iso9660' not in proc_filesystems.read():
process.system("modprobe iso9660", ignore_status=True, sudo=True)
with open('/proc/filesystems') as proc_filesystems:
if 'iso9660' not in proc_filesystems.read():
logging.debug('Can not use mount: lack of iso9660 kernel support')
return False
......@@ -150,9 +152,8 @@ class BaseIso9660(object):
:rtype: None
"""
content = self.read(src)
output = open(dst, 'w+b')
output.write(content)
output.close()
with open(dst, 'w+b') as output:
output.write(content)
def mnt_dir(self):
"""
......@@ -283,10 +284,11 @@ class Iso9660IsoRead(MixInMntDirMount, BaseIso9660):
self.temp_dir = tempfile.mkdtemp(prefix='avocado_' + __name__)
def read(self, path):
temp_file = os.path.join(self.temp_dir, path)
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, path, temp_file)
temp_path = os.path.join(self.temp_dir, path)
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, path, temp_path)
process.run(cmd)
return open(temp_file).read()
with open(temp_path) as temp_file:
return temp_file.read()
def copy(self, src, dst):
cmd = 'iso-read -i %s -e %s -o %s' % (self.path, src, dst)
......@@ -329,7 +331,8 @@ class Iso9660Mount(BaseIso9660):
:rtype: str
"""
full_path = os.path.join(self.mnt_dir, path)
return open(full_path).read()
with open(full_path) as file_to_read:
return file_to_read.read()
def copy(self, src, dst):
"""
......
......@@ -222,17 +222,18 @@ def check_kernel_config(config_name):
kernel_version = platform.uname()[2]
config_file = '/boot/config-' + kernel_version
for line in open(config_file, 'r'):
line = line.split('=')
if len(line) != 2:
continue
config = line[0].strip()
if config == config_name:
option = line[1].strip()
if option == "m":
return MODULE
else:
return BUILTIN
with open(config_file, 'r') as kernel_config:
for line in kernel_config:
line = line.split('=')
if len(line) != 2:
continue
config = line[0].strip()
if config == config_name:
option = line[1].strip()
if option == "m":
return MODULE
else:
return BUILTIN
return NOT_SET
......@@ -46,10 +46,10 @@ def _check_memory_state(block):
:rtype: bool
"""
def _is_online():
with open('/sys/devices/system/memory/memory%s/state' % block, 'r') as state_file:
if state_file.read() == 'online\n':
return True
return False
path = '/sys/devices/system/memory/memory%s/state' % block
if genio.read_file(path) == 'online\n':
return True
return False
return wait.wait_for(_is_online, timeout=120, step=1) or False
......@@ -75,8 +75,8 @@ def is_hot_pluggable(block):
:return: True if hotpluggable, else False
:rtype: 'bool'
"""
with open('/sys/devices/system/memory/memory%s/removable' % block, 'r') as file_obj:
return bool(int(file_obj.read()))
path = '/sys/devices/system/memory/memory%s/removable' % block
return bool(int(genio.read_file(path)))
def hotplug(block):
......@@ -136,11 +136,11 @@ def memtotal_sys():
no_memblocks = 0
for directory in os.listdir(sys_mempath):
if directory.startswith('memory'):
if open(os.path.join(sys_mempath, directory, 'online'), "r").read().strip() == '1':
path = os.path.join(sys_mempath, directory, 'online')
if genio.read_file(path).strip() == '1':
no_memblocks += 1
block_size = int(open(os.path.join(sys_mempath,
'block_size_bytes'),
"r").read().strip(), 16)
path = os.path.join(sys_mempath, 'block_size_bytes')
block_size = int(genio.read_file(path).strip(), 16)
return (no_memblocks * block_size)/1024.0
......@@ -266,10 +266,9 @@ def read_from_vmstat(key):
:return: The value of the item
:rtype: int
"""
vmstat = open("/proc/vmstat")
vmstat_info = vmstat.read()
vmstat.close()
return int(re.findall("%s\s+(\d+)" % key, vmstat_info)[0])
with open("/proc/vmstat") as vmstat:
vmstat_info = vmstat.read()
return int(re.findall("%s\s+(\d+)" % key, vmstat_info)[0])
def read_from_smaps(pid, key):
......@@ -283,15 +282,14 @@ def read_from_smaps(pid, key):
:return: The value of the item in kb
:rtype: int
"""
smaps = open("/proc/%s/smaps" % pid)
smaps_info = smaps.read()
smaps.close()
with open("/proc/%s/smaps" % pid) as smaps:
smaps_info = smaps.read()
memory_size = 0
for each_number in re.findall("%s:\s+(\d+)" % key, smaps_info):
memory_size += int(each_number)
memory_size = 0
for each_number in re.findall("%s:\s+(\d+)" % key, smaps_info):
memory_size += int(each_number)
return memory_size
return memory_size
def read_from_numa_maps(pid, key):
......@@ -306,16 +304,15 @@ def read_from_numa_maps(pid, key):
:return: A dict using the address as the keys
:rtype: dict
"""
numa_maps = open("/proc/%s/numa_maps" % pid)
numa_map_info = numa_maps.read()
numa_maps.close()
with open("/proc/%s/numa_maps" % pid) as numa_maps:
numa_map_info = numa_maps.read()
numa_maps_dict = {}
numa_pattern = r"(^[\dabcdfe]+)\s+.*%s[=:](\d+)" % key
for address, number in re.findall(numa_pattern, numa_map_info, re.M):
numa_maps_dict[address] = number
numa_maps_dict = {}
numa_pattern = r"(^[\dabcdfe]+)\s+.*%s[=:](\d+)" % key
for address, number in re.findall(numa_pattern, numa_map_info, re.M):
numa_maps_dict[address] = number
return numa_maps_dict
return numa_maps_dict
def get_buddy_info(chunk_sizes, nodes="all", zones="all"):
......@@ -346,46 +343,45 @@ def get_buddy_info(chunk_sizes, nodes="all", zones="all"):
:return: A dict using the chunk_size as the keys
:rtype: dict
"""
buddy_info = open("/proc/buddyinfo")
buddy_info_content = buddy_info.read()
buddy_info.close()
re_buddyinfo = "Node\s+"
if nodes == "all":
re_buddyinfo += "(\d+)"
else:
re_buddyinfo += "(%s)" % "|".join(nodes.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find Nodes %s" % nodes)
return None
re_buddyinfo += ".*?zone\s+"
if zones == "all":
re_buddyinfo += "(\w+)"
else:
re_buddyinfo += "(%s)" % "|".join(zones.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find zones %s" % zones)
return None
re_buddyinfo += "\s+([\s\d]+)"
buddy_list = re.findall(re_buddyinfo, buddy_info_content)
if re.findall("[<>=]", chunk_sizes) and buddy_list:
size_list = range(len(buddy_list[-1][-1].strip().split()))
chunk_sizes = [str(_) for _ in size_list if eval("%s %s" % (_,
chunk_sizes))]
chunk_sizes = ' '.join(chunk_sizes)
buddyinfo_dict = {}
for chunk_size in chunk_sizes.split():
buddyinfo_dict[chunk_size] = 0
for _, _, chunk_info in buddy_list:
chunk_info = chunk_info.strip().split()[int(chunk_size)]
buddyinfo_dict[chunk_size] += int(chunk_info)
return buddyinfo_dict
with open("/proc/buddyinfo") as buddy_info:
buddy_info_content = buddy_info.read()
re_buddyinfo = "Node\s+"
if nodes == "all":
re_buddyinfo += "(\d+)"
else:
re_buddyinfo += "(%s)" % "|".join(nodes.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find Nodes %s" % nodes)
return None
re_buddyinfo += ".*?zone\s+"
if zones == "all":
re_buddyinfo += "(\w+)"
else:
re_buddyinfo += "(%s)" % "|".join(zones.split())
if not re.findall(re_buddyinfo, buddy_info_content):
logging.warn("Can not find zones %s" % zones)
return None
re_buddyinfo += "\s+([\s\d]+)"
buddy_list = re.findall(re_buddyinfo, buddy_info_content)
if re.findall("[<>=]", chunk_sizes) and buddy_list:
size_list = range(len(buddy_list[-1][-1].strip().split()))
chunk_sizes = [str(_) for _ in size_list if eval("%s %s" % (_,
chunk_sizes))]
chunk_sizes = ' '.join(chunk_sizes)
buddyinfo_dict = {}
for chunk_size in chunk_sizes.split():
buddyinfo_dict[chunk_size] = 0
for _, _, chunk_info in buddy_list:
chunk_info = chunk_info.strip().split()[int(chunk_size)]
buddyinfo_dict[chunk_size] += int(chunk_info)
return buddyinfo_dict
def set_thp_value(feature, value):
......
......@@ -137,11 +137,12 @@ class Partition(object):
"""
# Try to match this device/mountpoint
if filename:
for line in open(filename):
parts = line.split()
if parts[0] == self.device or parts[1] == self.mountpoint:
return parts[1] # The mountpoint where it's mounted
return None
with open(filename) as open_file:
for line in open_file:
parts = line.split()
if parts[0] == self.device or parts[1] == self.mountpoint:
return parts[1] # The mountpoint where it's mounted
return None
# no specific file given, look in /proc/mounts
res = self.get_mountpoint(filename='/proc/mounts')
......
......@@ -111,9 +111,8 @@ class PathInspector(object):
def get_first_line(self):
first_line = ""
if os.path.isfile(self.path):
checked_file = open(self.path, "r")
first_line = checked_file.readline()
checked_file.close()
with open(self.path, 'r') as open_file:
first_line = open_file.readline()
return first_line
def has_exec_permission(self):
......
......@@ -38,6 +38,7 @@ except ImportError:
from . import gdb
from . import runtime
from . import path
from . import genio
log = logging.getLogger('avocado.test')
stdout_log = logging.getLogger('avocado.test.stdout')
......@@ -853,9 +854,8 @@ class GDBSubProcess(object):
:rtype: str
"""
os.mkfifo(path)
f = open(path, 'r')
c = f.read(1)
f.close()
with open(path, 'r') as fifo_file:
c = fifo_file.read(1)
os.unlink(path)
return c
......@@ -865,10 +865,9 @@ class GDBSubProcess(object):
binary_name = os.path.basename(self.binary)
script_name = '%s.gdb.connect_commands' % binary_name
path = os.path.join(current_test.outputdir, script_name)
cmds = open(path, 'w')
cmds.write('file %s\n' % os.path.abspath(self.binary))
cmds.write('target extended-remote :%s\n' % self.gdb_server.port)
cmds.close()
with open(path, 'w') as cmds_file:
cmds_file('file %s\n' % os.path.abspath(self.binary))
cmds_file('target extended-remote :%s\n' % self.gdb_server.port)
return path
def generate_gdb_connect_sh(self):
......@@ -886,11 +885,10 @@ class GDBSubProcess(object):
script_name = '%s.gdb.sh' % binary_name
script_path = os.path.join(current_test.outputdir, script_name)
script = open(script_path, 'w')
script.write("#!/bin/sh\n")
script.write("%s -x %s\n" % (gdb.GDB_PATH, cmds))
script.write("echo -n 'C' > %s\n" % fifo_path)
script.close()
with open(script_path, 'w') as script_file:
script_file.write("#!/bin/sh\n")
script_file.write("%s -x %s\n" % (gdb.GDB_PATH, cmds))
script_file.write("echo -n 'C' > %s\n" % fifo_path)
os.chmod(script_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return (script_path, fifo_path)
......@@ -1043,8 +1041,7 @@ class GDBSubProcess(object):
gdb.GDB_PRERUN_COMMANDS.get('', None))
if prerun_commands_path is not None:
prerun_commands = open(prerun_commands_path).readlines()
for command in prerun_commands:
for command in genio.read_all_lines(prerun_commands_path):
self.gdb.cmd(command)
def run(self, timeout=None):
......@@ -1073,10 +1070,10 @@ class GDBSubProcess(object):
if current_test is not None:
if os.path.exists(self.gdb_server.stdout_path):
shutil.copy(self.gdb_server.stdout_path, stdout_path)
self.result.stdout = open(stdout_path, 'r').read()
self.result.stdout = genio.read_file(stdout_path)
if os.path.exists(self.gdb_server.stderr_path):
shutil.copy(self.gdb_server.stderr_path, stderr_path)
self.result.stderr = open(stderr_path, 'r').read()
self.result.stderr = genio.read_file(stderr_path)
self.gdb_server.exit()
return self.result
......
......@@ -13,6 +13,7 @@ sys.path.insert(0, root_path)
from avocado.utils import path
from avocado.utils import process
from avocado.utils import genio
# Flag that tells if the docs are being built on readthedocs.org
ON_RTD = os.environ.get('READTHEDOCS', None) == 'True'
......@@ -40,7 +41,7 @@ API_SECTIONS = {"Test APIs": (None,
("modules.rst", )),
"Utilities APIs": ("utils",
open("api_utils_heading", "r").read(),
genio.read_file("api_utils_heading"),
"utils",
("core", "plugins"),
("avocado.rst", "modules.rst")),
......@@ -93,14 +94,14 @@ for (section, params) in API_SECTIONS.iteritems():
if not details.errno == errno.EEXIST:
raise
else:
main_rst_content = open(main_rst).readlines()
with open(main_rst) as main_rst_file:
main_rst_content = main_rst_file.readlines()
new_main_rst_content = [section, "=" * len(section), "",
params[1], ""]
new_main_rst = open(main_rst, "w")
new_main_rst.write("\n".join(new_main_rst_content))
new_main_rst.write("".join(main_rst_content[2:]))
new_main_rst.close()
with open(main_rst, "w") as new_main_rst:
new_main_rst.write("\n".join(new_main_rst_content))
new_main_rst.write("".join(main_rst_content[2:]))
# Generate optional-plugins
optional_plugins_path = os.path.join(root_path, "optional_plugins")
......@@ -150,7 +151,7 @@ project = u'Avocado'
copyright = u'2014-2015, Red Hat'
version_file = os.path.join(root_path, 'VERSION')
VERSION = open(version_file, 'r').read().strip()
VERSION = genio.read_file(version_file).strip()
version = VERSION
release = VERSION
......
......@@ -7,6 +7,7 @@ from six.moves import xrange as range
from avocado import Test
from avocado import main
from avocado.utils import gdb
from avocado.utils import genio
from avocado.utils import process
......@@ -358,7 +359,7 @@ class GdbTest(Test):
s.exit()
self.assertTrue(os.path.exists(s.stderr_path))
stderr_lines = open(s.stderr_path, 'r').readlines()
stderr_lines = genio.read_all_lines(s.stderr_path)
listening_line = "Listening on port %s\n" % s.port
self.assertIn(listening_line, stderr_lines)
......@@ -374,7 +375,7 @@ class GdbTest(Test):
self.assertTrue(os.path.exists(s.stdout_path))
self.assertTrue(os.path.exists(s.stderr_path))
stdout_lines = open(s.stdout_path, 'r').readlines()
stdout_lines = genio.read_all_lines(s.stdout_path)
self.assertIn("return 99\n", stdout_lines)
def test_interactive_stdout(self):
......
......@@ -4,6 +4,7 @@ import os
from avocado import Test
from avocado import main
from avocado.utils import genio
class GenDataTest(Test):
......@@ -23,8 +24,7 @@ class GenDataTest(Test):
dmesg_path = os.path.join(self.job.logdir, "sysinfo", "pre", "dmesg_-c")
self.log.info("dmesg_path: %s", dmesg_path)
if os.path.exists(dmesg_path):
dmesg = open(dmesg_path)
text = dmesg.readlines()[0:50]
text = genio.read_all_lines(dmesg_path)[0:50]
bsod = Image.new("RGB", (640, 480), "blue")
draw = ImageDraw.Draw(bsod)
......@@ -39,7 +39,8 @@ class GenDataTest(Test):
output_path = os.path.join(self.outputdir, "test.json")
output = {"basedir": self.basedir,
"outputdir": self.outputdir}
json.dump(output, open(output_path, "w"))
with open(output_path, "w") as output_file:
json.dump(output, output_file)
if __name__ == "__main__":
......
......@@ -2,6 +2,7 @@ import os
import sys
from avocado import Test
from avocado.utils import genio
class Env(Test):
......@@ -15,7 +16,7 @@ class Env(Test):
def get_proc_content(rel_path):
try:
return open(os.path.join(p_dir, rel_path)).read().strip()
return genio.read_file(os.path.join(p_dir, rel_path)).strip()
except:
return "<NOT AVAILABLE>"
......
......@@ -25,9 +25,9 @@ class WhiteBoard(Test):
if data_file:
self.log.info('Writing data to whiteboard from file: %s',
data_file)
whiteboard_file = open(data_file, 'r')
size = int(data_size)
data = whiteboard_file.read(size)
with open(data_file, 'r') as whiteboard_file:
size = int(data_size)
data = whiteboard_file.read(size)
else:
offset = int(data_size) - 1
data = self.params.get('whiteboard_data_text',
......
......@@ -23,6 +23,7 @@ from six.moves import xrange as range
from avocado.core import exit_codes
from avocado.utils import astring
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
from avocado.utils import path as utils_path
......@@ -526,7 +527,7 @@ class RunnerOperationTest(unittest.TestCase):
" foo:bar:b foo:baz:c bar:bar:bar --dry-run" % AVOCADO)
result = json.loads(process.run(cmd).stdout)
debuglog = result['debuglog']
log = open(debuglog, 'r').read()
log = genio.read_file(debuglog)
# Remove the result dir
shutil.rmtree(os.path.dirname(os.path.dirname(debuglog)))
self.assertIn(tempfile.gettempdir(), debuglog) # Use tmp dir, not default location
......@@ -839,9 +840,10 @@ class RunnerSimpleTest(unittest.TestCase):
"1.")
sleep_dir = astring.string_to_safe_path("1-60")
debug_log = os.path.join(self.tmpdir, "latest", "test-results",
sleep_dir, "debug.log")
debug_log = open(debug_log).read()
debug_log_path = os.path.join(self.tmpdir, "latest", "test-results",
sleep_dir, "debug.log")
debug_log = genio.read_file(debug_log_path)
self.assertIn("Runner error occurred: Timeout reached", debug_log,
"Runner error occurred: Timeout reached message not "
"in the test's debug.log:\n%s" % debug_log)
......
......@@ -6,6 +6,7 @@ import xml.dom.minidom
import unittest
from avocado.core import exit_codes
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
......@@ -98,8 +99,8 @@ class JobTimeOutTest(unittest.TestCase):
def _check_timeout_msg(self, idx):
res_dir = os.path.join(self.tmpdir, "latest", "test-results")
debug_log = glob.glob(os.path.join(res_dir, "%s-*" % idx, "debug.log"))
debug_log = open(debug_log[0]).read()
debug_log_paths = glob.glob(os.path.join(res_dir, "%s-*" % idx, "debug.log"))
debug_log = genio.read_file(debug_log_paths[0])
self.assertIn("Runner error occurred: Timeout reached", debug_log,
"Runner error occurred: Timeout reached message not "
"in the %sst test's debug.log:\n%s"
......
......@@ -10,6 +10,7 @@ import pkg_resources
from avocado.core import exit_codes
from avocado.core.output import TermSupport
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
from avocado.utils import path as utils_path
......@@ -165,15 +166,18 @@ class OutputTest(unittest.TestCase):
def _check_output(path, exps, name):
i = 0
end = len(exps)
for line in open(path):
if exps[i] in line:
i += 1
if i == end:
break
self.assertEqual(i, end, "Failed to find %sth message from\n%s\n"
"\nin the %s. Either it's missing or in wrong "
"order.\n%s" % (i, "\n".join(exps), name,
open(path).read()))
with open(path) as output_file:
output_file_content = output_file.read()
output_file.seek(0)
for line in output_file:
if exps[i] in line:
i += 1
if i == end:
break
self.assertEqual(i, end, "Failed to find %sth message from\n%s\n"
"\nin the %s. Either it's missing or in wrong "
"order.\n%s" % (i, "\n".join(exps), name,
output_file_content))
test = script.Script(os.path.join(self.tmpdir, "output_test.py"),
OUTPUT_TEST_CONTENT)
test.save()
......@@ -189,10 +193,12 @@ class OutputTest(unittest.TestCase):
"[stderr] test_stderr", "[stdout] test_process"]
_check_output(joblog, exps, "job.log")
testdir = res["tests"][0]["logdir"]
self.assertEqual("test_print\ntest_stdout\ntest_process__test_stdout__",
open(os.path.join(testdir, "stdout")).read())
self.assertEqual("test_stderr\n__test_stderr__",
open(os.path.join(testdir, "stderr")).read())
with open(os.path.join(testdir, "stdout")) as stdout_file:
self.assertEqual("test_print\ntest_stdout\ntest_process__test_stdout__",
stdout_file.read())
with open(os.path.join(testdir, "stderr")) as stderr_file:
self.assertEqual("test_stderr\n__test_stderr__",
stderr_file.read())
# Now run the same test, but with combined output
# combined output can not keep track of sys.stdout and sys.stdout
......@@ -205,8 +211,9 @@ class OutputTest(unittest.TestCase):
"--json - -- %s" % (AVOCADO, self.tmpdir, test))
res = json.loads(result.stdout)
testdir = res["tests"][0]["logdir"]
self.assertEqual("test_process__test_stderr____test_stdout__",
open(os.path.join(testdir, "output")).read())
with open(os.path.join(testdir, "output")) as output_file:
self.assertEqual("test_process__test_stderr____test_stdout__",
output_file.read())
def test_check_record_no_module_default(self):
"""
......@@ -276,20 +283,21 @@ class OutputPluginTest(unittest.TestCase):
def check_output_files(self, debug_log):
base_dir = os.path.dirname(debug_log)
json_output = os.path.join(base_dir, 'results.json')
self.assertTrue(os.path.isfile(json_output))
with open(json_output, 'r') as fp:
json_output_path = os.path.join(base_dir, 'results.json')
self.assertTrue(os.path.isfile(json_output_path))
with open(json_output_path, 'r') as fp:
json.load(fp)
xunit_output = os.path.join(base_dir, 'results.xml')
self.assertTrue(os.path.isfile(json_output))
xunit_output_path = os.path.join(base_dir, 'results.xml')
self.assertTrue(os.path.isfile(json_output_path))
try:
minidom.parse(xunit_output)
minidom.parse(xunit_output_path)
except Exception as details:
xunit_output_content = genio.read_file(xunit_output_path)
raise AssertionError("Unable to parse xunit output: %s\n\n%s"
% (details, open(xunit_output).read()))
% (details, xunit_output_content))
tap_output = os.path.join(base_dir, "results.tap")
self.assertTrue(os.path.isfile(tap_output))
tap = open(tap_output).read()
tap = genio.read_file(tap_output)
self.assertIn("..", tap)
self.assertIn("\n# debug.log of ", tap)
......
......@@ -5,6 +5,7 @@ import tempfile
import unittest
from avocado.core import exit_codes
from avocado.utils import genio
from avocado.utils import process
from avocado.utils import script
......@@ -119,9 +120,10 @@ class TestSkipDecorators(unittest.TestCase):
self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
self.assertEqual(json_results['skip'], 3)
self.assertFalse('setup executed' in open(debuglog, 'r').read())
self.assertFalse('test executed' in open(debuglog, 'r').read())
self.assertFalse('teardown executed' in open(debuglog, 'r').read())
debuglog_contents = genio.read_file(debuglog)
self.assertFalse('setup executed' in debuglog_contents)
self.assertFalse('test executed' in debuglog_contents)
self.assertFalse('teardown executed' in debuglog_contents)
def test_skip_setup(self):
os.chdir(basedir)
......
......@@ -4,6 +4,7 @@ import shutil
import tempfile
import unittest
from avocado.utils import genio
from avocado.utils import process
......@@ -183,7 +184,7 @@ class TestStatuses(unittest.TestCase):
self.assertEqual(expected[0], test['status'],
"Status error: '%s' != '%s' (%s)" %
(expected[0], test['status'], variant))
debug_log = open(test['logfile'], 'r').read()
debug_log = genio.read_file(test['logfile'])
for msg in expected[1]:
self.assertIn(msg, debug_log,
"Message '%s' should be in the log (%s)."
......
......@@ -110,8 +110,9 @@ class JobTest(unittest.TestCase):
myjob.run_tests()
finally:
myjob.post_tests()
self.assertEqual(myjob.unique_id[::-1],
open(os.path.join(myjob.logdir, "reversed_id")).read())
with open(os.path.join(myjob.logdir, "reversed_id")) as reverse_id_file:
self.assertEqual(myjob.unique_id[::-1],
reverse_id_file.read())
def test_job_run(self):
class JobFilterLog(job.Job):
......@@ -135,8 +136,9 @@ class JobTest(unittest.TestCase):
self.assertEqual(myjob.run(),
exit_codes.AVOCADO_ALL_OK)
self.assertLessEqual(len(myjob.test_suite), 1)
self.assertEqual(myjob.unique_id[::-1],
open(os.path.join(myjob.logdir, "reversed_id")).read())
with open(os.path.join(myjob.logdir, "reversed_id")) as reverse_id_file:
self.assertEqual(myjob.unique_id[::-1],
reverse_id_file.read())
def test_job_run_account_time(self):
args = argparse.Namespace(base_logdir=self.tmpdir)
......
......@@ -50,10 +50,14 @@ class TestPartition(unittest.TestCase):
self.assertEqual(None, self.disk.get_mountpoint())
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
self.assertEqual(self.mountpoint, self.disk.get_mountpoint())
self.disk.unmount()
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.assertEqual(self.mountpoint, self.disk.get_mountpoint())
self.disk.unmount()
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
@unittest.skipIf(not process.can_sudo('kill -l'),
"requires running kill as a privileged user")
......@@ -61,37 +65,51 @@ class TestPartition(unittest.TestCase):
""" Test force-unmount feature """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
proc = process.SubProcess("cd %s; while :; do echo a > a; rm a; done"
% self.mountpoint, shell=True)
proc.start()
self.assertTrue(self.disk.unmount())
self.assertEqual(proc.poll(), -9) # Process should be killed -9
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
proc = process.SubProcess("cd %s; while :; do echo a > a; rm a; done"
% self.mountpoint, shell=True)
proc.start()
self.assertTrue(self.disk.unmount())
self.assertEqual(proc.poll(), -9) # Process should be killed -9
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
def test_double_mount(self):
""" Check the attempt for second mount fails """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
self.assertRaises(partition.PartitionError, self.disk.mount)
self.assertIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.assertRaises(partition.PartitionError, self.disk.mount)
self.assertIn(self.mountpoint, proc_mounts)
def test_double_umount(self):
""" Check double unmount works well """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.disk.unmount()
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
self.disk.unmount()
self.assertNotIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertNotIn(self.mountpoint, proc_mounts)
def test_format_mounted(self):
""" Check format on mounted device fails """
self.disk.mkfs()
self.disk.mount()
self.assertIn(self.mountpoint, open("/proc/mounts").read())
with open("/proc/mounts") as proc_mounts_file:
proc_mounts = proc_mounts_file.read()
self.assertIn(self.mountpoint, proc_mounts)
self.assertRaises(partition.PartitionError, self.disk.mkfs)
def tearDown(self):
......
......@@ -21,7 +21,8 @@ import sys
from setuptools import setup, find_packages
BASE_PATH = os.path.dirname(__file__)
VERSION = open(os.path.join(BASE_PATH, 'VERSION'), 'r').read().strip()
with open(os.path.join(BASE_PATH, 'VERSION'), 'r') as version_file:
VERSION = version_file.read().strip()
VIRTUAL_ENV = (hasattr(sys, 'real_prefix') or 'VIRTUAL_ENV' in os.environ)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册