未验证 提交 3b0f36a7 编写于 作者: C Cleber Rosa

Merge remote-tracking branch 'clebergnu/utils_cpu_py3'

Signed-off-by: NCleber Rosa <crosa@redhat.com>
......@@ -27,16 +27,15 @@ import glob
import logging
def _list_matches(lst, pattern):
def _list_matches(content_list, pattern):
"""
True if any item in list matches the specified pattern.
Checks if any item in list matches the specified pattern
"""
compiled = re.compile(pattern)
for element in lst:
match = compiled.search(element)
for content in content_list:
match = re.search(pattern, content)
if match:
return 1
return 0
return True
return False
def _get_cpu_info():
......@@ -47,9 +46,9 @@ def _get_cpu_info():
:rtype: `list`
"""
cpuinfo = []
with open('/proc/cpuinfo') as proc_cpuinfo:
with open('/proc/cpuinfo', 'rb') as proc_cpuinfo:
for line in proc_cpuinfo:
if line == '\n':
if line == b'\n':
break
cpuinfo.append(line)
return cpuinfo
......@@ -64,8 +63,8 @@ def _get_cpu_status(cpu):
:returns: `bool` True if online or False if not
:rtype: 'bool'
"""
with open('/sys/devices/system/cpu/cpu%s/online' % cpu) as online:
if '1' in online.read():
with open('/sys/devices/system/cpu/cpu%s/online' % cpu, 'rb') as online:
if b'1' in online.read():
return True
return False
......@@ -117,22 +116,22 @@ def get_cpu_arch():
"""
Work out which CPU architecture we're running on
"""
cpu_table = [('^cpu.*(RS64|POWER3|Broadband Engine)', 'power'),
('^cpu.*POWER4', 'power4'),
('^cpu.*POWER5', 'power5'),
('^cpu.*POWER6', 'power6'),
('^cpu.*POWER7', 'power7'),
('^cpu.*POWER8', 'power8'),
('^cpu.*POWER9', 'power9'),
('^cpu.*PPC970', 'power970'),
('(ARM|^CPU implementer|^CPU part|^CPU variant'
'|^Features|^BogoMIPS|^CPU revision)', 'arm'),
('(^cpu MHz dynamic|^cpu MHz static|^features'
'|^bogomips per cpu|^max thread id)', 's390'),
('^type', 'sparc64'),
('^flags.*:.* lm .*', 'x86_64'),
('^flags', 'i386'),
('^hart\\s*: 1$', 'riscv')]
cpu_table = [(b'^cpu.*(RS64|POWER3|Broadband Engine)', 'power'),
(b'^cpu.*POWER4', 'power4'),
(b'^cpu.*POWER5', 'power5'),
(b'^cpu.*POWER6', 'power6'),
(b'^cpu.*POWER7', 'power7'),
(b'^cpu.*POWER8', 'power8'),
(b'^cpu.*POWER9', 'power9'),
(b'^cpu.*PPC970', 'power970'),
(b'(ARM|^CPU implementer|^CPU part|^CPU variant'
b'|^Features|^BogoMIPS|^CPU revision)', 'arm'),
(b'(^cpu MHz dynamic|^cpu MHz static|^features'
b'|^bogomips per cpu|^max thread id)', 's390'),
(b'^type', 'sparc64'),
(b'^flags.*:.* lm .*', 'x86_64'),
(b'^flags', 'i386'),
(b'^hart\\s*: 1$', 'riscv')]
cpuinfo = _get_cpu_info()
for (pattern, arch) in cpu_table:
if _list_matches(cpuinfo, pattern):
......@@ -151,12 +150,12 @@ def cpu_online_list():
Reports a list of indexes of the online cpus
"""
cpus = []
search_str = 'processor'
search_str = b'processor'
index = 2
if platform.machine() == 's390x':
search_str = 'cpu number'
search_str = b'cpu number'
index = 3
with open('/proc/cpuinfo', 'r') as proc_cpuinfo:
with open('/proc/cpuinfo', 'rb') as proc_cpuinfo:
for line in proc_cpuinfo:
if line.startswith(search_str):
cpus.append(int(line.split()[index])) # grab cpu number
......@@ -181,8 +180,8 @@ def online(cpu):
"""
Online given CPU
"""
with open("/sys/devices/system/cpu/cpu%s/online" % cpu, "w") as fd:
fd.write('1')
with open("/sys/devices/system/cpu/cpu%s/online" % cpu, "wb") as fd:
fd.write(b'1')
if _get_cpu_status(cpu):
return 0
return 1
......@@ -192,8 +191,8 @@ def offline(cpu):
"""
Offline given CPU
"""
with open("/sys/devices/system/cpu/cpu%s/online" % cpu, "w") as fd:
fd.write('0')
with open("/sys/devices/system/cpu/cpu%s/online" % cpu, "wb") as fd:
fd.write(b'0')
if _get_cpu_status(cpu):
return 1
return 0
......@@ -214,19 +213,46 @@ def get_cpuidle_state():
for state_no in states:
state_file = "/sys/devices/system/cpu/cpu%s/cpuidle/state%s/disable" % (cpu, state_no)
try:
cpu_idlestate[cpu][state_no] = int(open(state_file).read())
cpu_idlestate[cpu][state_no] = int(open(state_file, 'rb').read())
except IOError as err:
logging.warning("Failed to read idle state on cpu %s "
"for state %s:\n%s", cpu, state_no, err)
return cpu_idlestate
def set_cpuidle_state(state_number="all", disable=1, setstate=None):
def _bool_to_binary(value):
'''
Turns a boolean value (True or False) into data suitable for writing to
/proc/* and /sys/* files.
'''
if value is True:
return b'1'
if value is False:
return b'0'
raise TypeError('Value is not a boolean: %s', value)
def _legacy_disable(value):
'''
Support for the original acceptable disable parameter values
TODO: this should be removed in the near future
Reference: https://trello.com/c/aJzNUeA5/
'''
if value is 0:
return b'0'
if value is 1:
return b'1'
return _bool_to_binary(value)
def set_cpuidle_state(state_number="all", disable=True, setstate=None):
"""
Set/Reset cpu idle states for all cpus
:param state_number: cpuidle state number, default: `all` all states
:param disable: whether to disable/enable given cpu idle state, default: 1
:param disable: whether to disable/enable given cpu idle state,
default is to disable (True)
:param setstate: cpuidle state value, output of `get_cpuidle_state()`
"""
cpus_list = cpu_online_list()
......@@ -239,8 +265,9 @@ def set_cpuidle_state(state_number="all", disable=1, setstate=None):
for cpu in cpus_list:
for state_no in states:
state_file = "/sys/devices/system/cpu/cpu%s/cpuidle/state%s/disable" % (cpu, state_no)
disable = _legacy_disable(disable)
try:
open(state_file, "w").write(str(disable))
open(state_file, "wb").write(disable)
except IOError as err:
logging.warning("Failed to set idle state on cpu %s "
"for state %s:\n%s", cpu, state_no, err)
......@@ -248,8 +275,9 @@ def set_cpuidle_state(state_number="all", disable=1, setstate=None):
for cpu, stateval in setstate.items():
for state_no, value in stateval.items():
state_file = "/sys/devices/system/cpu/cpu%s/cpuidle/state%s/disable" % (cpu, state_no)
disable = _legacy_disable(value)
try:
open(state_file, "w").write(str(value))
open(state_file, "wb").write(disable)
except IOError as err:
logging.warning("Failed to set idle state on cpu %s "
"for state %s:\n%s", cpu, state_no, err)
import io
import sys
import unittest
import StringIO
try:
from unittest import mock
......@@ -12,8 +12,22 @@ from avocado.utils import cpu
def recent_mock():
major = int(mock.__version__.split('.')[0])
return major >= 2
'''
Checks if a recent and capable enough mock library is available
On Python 2.7, it requires at least mock version 2.0. On Python 3,
mock from the standard library is used, but Python 3.6 or later is
required.
Also, it assumes that on a future Python major version, functionality
won't regress.
'''
if sys.version_info[0] < 3:
major = int(mock.__version__.split('.')[0])
return major >= 2
elif sys.version_info[0] == 3:
return sys.version_info[1] >= 6
return True
class Cpu(unittest.TestCase):
......@@ -21,14 +35,14 @@ class Cpu(unittest.TestCase):
@staticmethod
def _get_file_mock(content):
file_mock = mock.Mock()
file_mock.__enter__ = mock.Mock(return_value=io.StringIO(content))
file_mock.__enter__ = mock.Mock(return_value=io.BytesIO(content))
file_mock.__exit__ = mock.Mock()
return file_mock
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_s390x_cpu_online(self):
s390x = u"""vendor_id : IBM/S390
s390x = b"""vendor_id : IBM/S390
# processors : 2
bogomips per cpu: 2913.00
max thread id : 0
......@@ -53,7 +67,7 @@ cpu MHz static : 5504
"""
s390x_2 = u"""vendor_id : IBM/S390
s390x_2 = b"""vendor_id : IBM/S390
# processors : 4
bogomips per cpu: 2913.00
max thread id : 0
......@@ -98,7 +112,7 @@ cpu MHz static : 5504
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_x86_64_cpu_online(self):
x86_64 = u"""processor : 0
x86_64 = b"""processor : 0
vendor_id : GenuineIntel
cpu family : 6
model : 60
......@@ -323,7 +337,7 @@ power management:
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_i386(self):
cpu_output = u"""processor : 0
cpu_output = b"""processor : 0
vendor_id : GenuineIntel
cpu family : 6
model : 13
......@@ -360,7 +374,7 @@ power management:
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_x86_64(self):
cpu_output = u"""processor : 0
cpu_output = b"""processor : 0
vendor_id : GenuineIntel
cpu family : 6
model : 60
......@@ -394,7 +408,7 @@ power management:
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_ppc64_power8(self):
cpu_output = u"""processor : 88
cpu_output = b"""processor : 88
cpu : POWER8E (raw), altivec supported
clock : 3325.000000MHz
revision : 2.1 (pvr 004b 0201)
......@@ -412,7 +426,7 @@ firmware : OPAL v3
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_ppc64_le_power8(self):
cpu_output = u"""processor : 88
cpu_output = b"""processor : 88
cpu : POWER8E (raw), altivec supported
clock : 3325.000000MHz
revision : 2.1 (pvr 004b 0201)
......@@ -430,7 +444,7 @@ firmware : OPAL v3
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_ppc64_le_power9(self):
cpu_output = u"""processor : 20
cpu_output = b"""processor : 20
cpu : POWER9 (raw), altivec supported
clock : 2050.000000MHz
revision : 1.0 (pvr 004e 0100)
......@@ -448,7 +462,7 @@ firmware : OPAL
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_s390(self):
cpu_output = u"""vendor_id : IBM/S390
cpu_output = b"""vendor_id : IBM/S390
# processors : 2
bogomips per cpu: 2913.00
max thread id : 0
......@@ -478,7 +492,7 @@ cpu MHz static : 5504
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_arm_v7(self):
cpu_output = u"""Processor : ARMv7 Processor rev 2 (v7l)
cpu_output = b"""Processor : ARMv7 Processor rev 2 (v7l)
BogoMIPS : 994.65
Features : swp half thumb fastmult vfp edsp thumbee neon vfpv3
CPU implementer : 0x41
......@@ -498,7 +512,7 @@ Serial : 3534268a5e0700ec
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_arm_v8(self):
cpu_output = u"""processor : 0
cpu_output = b"""processor : 0
BogoMIPS : 200.00
Features : fp asimd evtstrm aes pmull sha1 sha2 crc32 cpuid
CPU implementer : 0x43
......@@ -515,7 +529,7 @@ CPU revision : 1
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_cpu_arch_risc_v(self):
cpu_output = u"""hart : 1
cpu_output = b"""hart : 1
isa : rv64imafdc
mmu : sv39
uarch : sifive,rocket0
......@@ -530,7 +544,7 @@ uarch : sifive,rocket0
retval = {0: {0: 0}}
with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0]):
with mock.patch('glob.glob', return_value=['/sys/devices/system/cpu/cpu0/cpuidle/state1']):
with mock.patch('avocado.utils.cpu.open', return_value=StringIO.StringIO('0')):
with mock.patch('avocado.utils.cpu.open', return_value=io.BytesIO(b'0')):
self.assertEqual(cpu.get_cpuidle_state(), retval)
@unittest.skipUnless(recent_mock(),
......@@ -539,38 +553,38 @@ uarch : sifive,rocket0
retval = {0: {0: 1}}
with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0]):
with mock.patch('glob.glob', return_value=['/sys/devices/system/cpu/cpu0/cpuidle/state1']):
with mock.patch('avocado.utils.cpu.open', return_value=StringIO.StringIO('1')):
with mock.patch('avocado.utils.cpu.open', return_value=io.BytesIO(b'1')):
self.assertEqual(cpu.get_cpuidle_state(), retval)
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_set_cpuidle_state_default(self):
output = StringIO.StringIO()
output = io.BytesIO()
with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0]):
with mock.patch('glob.glob', return_value=['/sys/devices/system/cpu/cpu0/cpuidle/state1']):
with mock.patch('avocado.utils.cpu.open', return_value=output):
cpu.set_cpuidle_state()
self.assertEqual(output.getvalue(), "1")
self.assertEqual(output.getvalue(), b'1')
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_set_cpuidle_state_withstateno(self):
output = StringIO.StringIO()
output = io.BytesIO()
with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0]):
with mock.patch('glob.glob', return_value=['/sys/devices/system/cpu/cpu0/cpuidle/state2']):
with mock.patch('avocado.utils.cpu.open', return_value=output):
cpu.set_cpuidle_state(disable='0', state_number='2')
self.assertEqual(output.getvalue(), "0")
cpu.set_cpuidle_state(disable=False, state_number='2')
self.assertEqual(output.getvalue(), b'0')
@unittest.skipUnless(recent_mock(),
"mock library version cannot (easily) patch open()")
def test_set_cpuidle_state_withsetstate(self):
output = StringIO.StringIO()
output = io.BytesIO()
with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0, 2]):
with mock.patch('glob.glob', return_value=['/sys/devices/system/cpu/cpu0/cpuidle/state1']):
with mock.patch('avocado.utils.cpu.open', return_value=output):
cpu.set_cpuidle_state(setstate={0: {0: 1}, 2: {0: 0}})
self.assertEqual(output.getvalue(), "10")
self.assertEqual(output.getvalue(), b'10')
if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册