提交 7bd4b423 编写于 作者: A aodongbiao

添加rom、ram分析工具

Signed-off-by: Naodongbiao <aodongbiao@huawei.com>
上级 81e80bc5
[toc]
# Rom_analyzer.py
## 功能介绍
基于BUILD.gn、bundle.json、编译产物system_module_info.json、out/{product_name}/packages/phone目录下的编译产物,分析各子系统及部件的rom大小。
结果以json与xls格式进行存储,其中,json格式是必输出的,xls格式需要-e参数控制。
## 使用说明
前置条件:
1. 获取整个rom_ram_analyzer目录
1. 对系统进行编译
1. linux平台
1. python3.8及以后
1. 安装requirements
```txt
xlwt==1.3.0
```
命令介绍:
1. `-h``--help`命令查看帮助
```shell
> python3 rom_analyzer.py -h
usage: rom_analyzer.py [-h] [-v] -p PROJECT_PATH -j MODULE_INFO_JSON -n PRODUCT_NAME -d PRODUCT_DIR [-o OUTPUT_FILE] [-e EXCEL]
analyze rom size of component.
optional arguments:
-h, --help show this help message and exit
-v, -version show program\'s version number and exit
-p PROJECT_PATH, --project_path PROJECT_PATH
root path of openharmony. eg: -p ~/openharmony
-j MODULE_INFO_JSON, --module_info_json MODULE_INFO_JSON
path of out/{product_name}/packages/phone/system_module_info.json
-n PRODUCT_NAME, --product_name PRODUCT_NAME
product name. eg: -n rk3568
-d PRODUCT_DIR, --product_dir PRODUCT_DIR
subdirectories of out/{product_name}/packages/phone to be counted.eg: -d system -d vendor
-o OUTPUT_FILE, --output_file OUTPUT_FILE
basename of output file, default: rom_analysis_result. eg: demo/rom_analysis_result
-e EXCEL, --excel EXCEL
if output result as excel, default: False. eg: -e True
```
1. 使用示例
```shell
python3 rom_analyzer.py -p ~/nomodify_oh/ -j ../system_module_info.json -n rk3568 -d system -d vendor -d updater -o demo/demo -e True
# oh:rootpath of openharmony
# rk3568: product_name, same as out/{product_name}
# demo/demo: path of output file, where the second 'demo' is the basename of output file
# -e True:output result in excel format additionally
```
## 输出格式说明(json)
```json
{
子系统名: {
"size": 整个子系统输出文件的总大小,
"file_count": 整个子系统产生的文件数,
输出文件名: 本文件的大小,
...
},
...
}
```
# ram_analyzer.py
## 功能介绍
基于out/{product_name}/packages/phone下所有cfg文件、out/{product_name}/packages/phone/system/profile下所有xml文件,分析各进程及对应部件的ram占用(默认取Pss)
结果以json与xls格式存储,其中,json格式是必输出的,xls格式需要-e参数控制。
## 使用说明
前置条件:
1. 获取整个rom_ram_analyzer目录
2. hdc可用
2. 设备已连接
3. python3.8及以后
4. 安装requirements
```txt
xlwt==1.3.0
```
5. 准备好相关数据:
1. out/{product_name}/packages/phone下所有cfg文件,并将其放置于同一个目录中(ps:同名文件仅保存一份即可)
1. out/{product_name}/packages/phone/system/profile下所有xml文件
6. 运行rom_analyzer.py产生的json结果一份(即-o参数对应的文件,默认rom_analysis_result.json)
命令介绍:
1. 使用`-h``--help`查看帮助
```shell
> python .\ram_analyzer.py -h
usage: ram_analyzer.py [-h] [-v] -x XML_PATH -c CFG_PATH [-j ROM_RESULT] -n DEVICE_NUM [-o OUTPUT_FILENAME] [-e EXCEL]
analyze ram size of component
optional arguments:
-h, --help show this help message and exit
-v, -version show program\'s version number and exit
-x XML_PATH, --xml_path XML_PATH
path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile
-c CFG_PATH, --cfg_path CFG_PATH
path of cfg files. eg: -c ./cfgs/
-j ROM_RESULT, --rom_result ROM_RESULT
json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json.eg: -j ./demo/rom_analysis_result.json
-n DEVICE_NUM, --device_num DEVICE_NUM
device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800
-o OUTPUT_FILENAME, --output_filename OUTPUT_FILENAME
base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result
-e EXCEL, --excel EXCEL
if output result as excel, default: False. eg: -e True
```
2. 使用示例:
```shell
python .\ram_analyzer.py -x .\profile\ -c .\init\ -n 7001005458323933328a01fce16d3800 -j .\rom_analysis_result.json -o /demo/demo -e True
# demo/demo: path of output file, where the second 'demo' is the basename of output file
# -e True:output result in excel format additionally
```
## 输出格式说明(json)
```json
{
进程名:{
"size": 本进程占用内存的大小,
部件名: {
elf文件名: elf文件大小
...
}
...
},
...
}
```
\ No newline at end of file
import sys
import typing
import os
from pathlib import Path
from typing import *
class BasicTool:
@classmethod
def find_all_files(cls, folder: str, real_path: bool = True, apply_abs: bool = True, de_duplicate: bool = True,
p_filter: typing.Callable = lambda x: True) -> list:
filepath_list = set()
for root, _, file_names in os.walk(folder):
filepath_list.update(
[os.path.abspath(os.path.realpath(
os.path.join(root, f) if real_path else os.path.join(root, f))) if apply_abs else os.path.relpath(
os.path.realpath(os.path.join(root, f) if real_path else os.path.join(root, f))) for f in file_names
if p_filter(os.path.join(root, f))])
if de_duplicate:
filepath_list = set(filepath_list)
filepath_list = sorted(filepath_list, key=str.lower)
return filepath_list
@classmethod
def get_abs_path(cls, path: str) -> str:
return os.path.abspath(os.path.expanduser(path))
if __name__ == '__main__':
# print(BasicTool.get_abs_path("~/git/.."))
for i in BasicTool.find_all_files(".", apply_abs=False):
print(i)
\ No newline at end of file
import os
import json
class GnCommonTool:
"""
处理BUILD.gn文件的通用方法
"""
@classmethod
def is_gn_variable(cls, target: str, has_quote: bool = True):
"""
判断target是否是gn中的变量:
规则:如果是有引号的模式,则没有引号均认为是变量,有引号的情况下,如有是"$xxx"的模式,则认为xxx是变量;如果是无引号模式,则只要$开头就认为是变量
b = "xxx"
c = b
c = "${b}"
"$p"
"""
target = target.strip()
if not has_quote:
return target.startswith("$")
if target.startswith('"') and target.endswith('"'):
target = target.strip('"')
if target.startswith("${") and target.endswith("}"):
return True
elif target.startswith("$"):
return True
return False
else:
return True
# 给__find_variables_in_gn用的,减少io
__var_val_mem_dict = dict()
@classmethod
def find_variables_in_gn(cls, var_name_tuple: tuple, path: str, stop_tail: str = "home") -> tuple:
"""
同时查找多个gn变量的值
var_name_tuple:变量名的tuple,变量名应是未经过处理后的,如:
xxx
"${xxx}"
"$xxx"
"""
if os.path.isfile(path):
path = os.path.split(path)[0]
var_val_dict = dict()
not_found_count = len(var_name_tuple)
for var in var_name_tuple:
val = GnCommonTool.__var_val_mem_dict.get(var)
if val is not None:
not_found_count -= 1
var_val_dict[var] = val
while not path.endswith(stop_tail) and not_found_count != 0:
for v in var_name_tuple:
cmd = r"grep -Ern '^( *){} *= *\".*?\"' --include=*.gn* {}| grep -Ev '\$' | head -n 1 | grep -E '\".*\"' -wo".format(
v.strip('"').lstrip("${").rstrip('}'), path)
output = os.popen(cmd).read().strip().strip('"')
if len(output) != 0:
not_found_count -= 1
var_val_dict[v] = output
GnCommonTool.__var_val_mem_dict[v] = output
path = os.path.split(path)[0]
return tuple(var_val_dict.values())
@classmethod
def __find_part_subsystem_from_bundle(cls, gnpath: str, stop_tail: str = "home") -> tuple:
"""
根据BUILD.gn的全路径,一层层往上面查找bundle.json文件,
并从bundle.json中查找part_name和subsystem
"""
filename = "bundle.json"
part_name = None
subsystem_name = None
if stop_tail not in gnpath:
return part_name, subsystem_name
if os.path.isfile(gnpath):
gnpath = os.path.split(gnpath)[0]
while not gnpath.endswith(stop_tail):
bundle_path = os.path.join(gnpath, filename)
if not os.path.isfile(bundle_path): # 如果该文件不在该目录下
gnpath = os.path.split(gnpath)[0]
continue
with open(bundle_path, 'r', encoding='utf-8') as f:
content = json.load(f)
try:
part_name = content["component"]["name"]
subsystem_name = content["component"]["subsystem"]
except KeyError:
...
finally:
break
part_name = None if (part_name is not None and len(
part_name) == 0) else part_name
subsystem_name = None if (subsystem_name is not None and len(
subsystem_name) == 0) else subsystem_name
return part_name, subsystem_name
@classmethod
def find_part_subsystem(cls, gn_file: str, project_path: str) -> tuple:
"""
查找gn_file对应的part_name和subsystem
如果在gn中找不到,就到bundle.json中去找
"""
part_name = None
subsystem_name = None
part_var_flag = False # 标识这个变量从gn中取出的原始值是不是变量
subsystem_var_flag = False
var_list = list()
part_name_pattern = r"part_name *=\s*\S*"
subsystem_pattern = r"subsystem_name *=\s*\S*"
meta_grep_pattern = "grep -E '{}' {} | head -n 1"
part_cmd = meta_grep_pattern.format(part_name_pattern, gn_file)
subsystem_cmd = meta_grep_pattern.format(subsystem_pattern, gn_file)
part = os.popen(part_cmd).read().strip()
if len(part) != 0:
part = part.split('=')[-1].strip()
if GnCommonTool.is_gn_variable(part):
part_var_flag = True
var_list.append(part)
else:
part_name = part.strip('"')
if len(part_name) == 0:
part_name = None
subsystem = os.popen(subsystem_cmd).read().strip()
if len(subsystem) != 0: # 这里是只是看有没有grep到关键字
subsystem = subsystem.split('=')[-1].strip()
if GnCommonTool.is_gn_variable(subsystem):
subsystem_var_flag = True
var_list.append(subsystem)
else:
subsystem_name = subsystem.strip('"')
if len(subsystem_name) == 0:
subsystem_name = None
if part_var_flag and subsystem_var_flag:
part_name, subsystem_name = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)
elif part_var_flag:
t = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)[0]
part_name = t if t is not None and len(t) != 0 else part_name
elif subsystem_var_flag:
t = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)[0]
subsystem_name = t if t is not None and len(
t) != 0 else subsystem_name
if part_name is not None and subsystem_name is not None:
return part_name, subsystem_name
# 如果有一个没有找到,就要一层层去找bundle.json文件
t_part_name, t_subsystem_name = cls.__find_part_subsystem_from_bundle(
gn_file, stop_tail=project_path)
if t_part_name is not None:
part_name = t_part_name
if t_subsystem_name is not None:
subsystem_name = t_subsystem_name
return part_name, subsystem_name
import xlwt
from xlwt import Worksheet
import typing
from typing import Optional
from collections.abc import Iterable
class SimpleExcelWriter:
def __init__(self, default_sheet_name: str = "sheet1"):
self.__book = xlwt.Workbook(encoding='utf-8', style_compression=0)
self.__sheet_dict = {
default_sheet_name: self.__book.add_sheet(
sheetname=default_sheet_name, cell_overwrite_ok=True)
}
self.__sheet_pos = {
default_sheet_name: (0, 0) # 记录各个sheet页已经写到什么位置了,当前值为还没有写的
}
self.__default_sheet_name = default_sheet_name
# 表头样式
self.__head_style = xlwt.XFStyle()
# 内容样式
self.__content_style = xlwt.XFStyle()
# 字体
font = xlwt.Font()
font.bold = True
# 居中对齐
alignment = xlwt.Alignment()
alignment.horz = xlwt.Alignment.HORZ_CENTER # 水平方向
alignment.vert = xlwt.Alignment.VERT_CENTER # 垂直方向
# 设置背景颜色
pattern = xlwt.Pattern()
pattern.pattern = xlwt.Pattern.SOLID_PATTERN
pattern.pattern_fore_colour = 22 # 背景颜色
self.__head_style.font = font
self.__head_style.alignment = alignment
self.__head_style.pattern = pattern
self.__content_style.alignment = alignment
def __increment_y(self, sheet_name: str, value: int = 1) -> int:
if sheet_name in self.__sheet_pos.keys():
x, y = self.__sheet_pos.get(sheet_name)
y = y + value
self.__sheet_pos[sheet_name] = (x, y)
return y
def __increment_x(self, sheet_name: str, value: int = 1) -> int:
if sheet_name in self.__sheet_pos.keys():
x, y = self.__sheet_pos.get(sheet_name)
x = x + value
self.__sheet_pos[sheet_name] = (x, 0)
return x
def append_line(self, content: list, sheet_name: str = None):
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
print("error: sheet name '{}' not exist".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
x, y = self.__sheet_pos.get(sheet_name)
for ele in content:
sheet.write(x, y, ele, style=self.__content_style)
y = self.__increment_y(sheet_name)
self.__increment_x(sheet_name)
def write_merge(self, x0: int, y0: int, x1: int, y1: int, content: typing.Any,
sheet_name: str = None):
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
print("error: sheet name '{}' not exist".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
sheet.write_merge(x0, x1, y0, y1, content, style=self.__content_style)
def set_sheet_header(self, headers: Iterable, sheet_name: str = None):
"""
给sheet页设置表头
"""
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
print("error: sheet name '{}' not exist".format(sheet_name))
return
x, y = self.__sheet_pos.get(sheet_name)
if x != 0 or y != 0:
print(
"error: pos of sheet '{}' is not (0,0). set_sheet_header must before write".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
for h in headers:
sheet.write(x, y, h, self.__head_style)
y = self.__increment_y(sheet_name)
self.__increment_x(sheet_name)
def add_sheet(self, sheet_name: str, cell_overwrite_ok=True) -> Optional[xlwt.Worksheet]:
if sheet_name in self.__sheet_dict.keys():
print("error: sheet name '{}' has exist".format(sheet_name))
return
self.__sheet_dict[sheet_name] = self.__book.add_sheet(
sheetname=sheet_name, cell_overwrite_ok=cell_overwrite_ok)
self.__sheet_pos[sheet_name] = (0, 0)
return self.__sheet_dict.get(sheet_name)
def save(self, file_name: str):
self.__book.save(file_name)
if __name__ == '__main__':
writer = SimpleExcelWriter(default_sheet_name="first")
writer.add_sheet("second")
writer.add_sheet("third")
writer.set_sheet_header(["h", "m", "n"])
writer.append_line([1, 2, 3])
writer.append_line([2, 3, 4], "second")
writer.append_line([3, 4, 5], "third")
writer.append_line([3, 2, 1])
writer.save("demo.xls")
import argparse
import copy
import glob
import json
import os
import re
import sys
import subprocess
import typing
import xml.dom.minidom as dom
from packages.simple_excel_writer import SimpleExcelWriter
debug = True if sys.gettrace() else False
class HDCTool:
@classmethod
def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool:
"""
验证hdc是否可用
True:可用
False:不可用
"""
cp = subprocess.run(["hdc"], capture_output=True)
stdout = str(cp.stdout)
stderr = str(cp.stderr)
return verify_str in stdout or verify_str in stderr
@classmethod
def verify_device(cls, device_num: str) -> bool:
"""
验证设备是否已经连接
True:已连接
False:未连接
"""
cp = subprocess.run(["hdc", "list", "targets"], capture_output=True)
stdout = str(cp.stdout)
stderr = str(cp.stderr)
return device_num in stderr or device_num in stdout
__MODE = typing.Literal["stdout", "stderr"]
@classmethod
def exec(cls, args: list, output_from: __MODE = "stdout"):
cp = subprocess.run(args, capture_output=True)
if output_from == "stdout":
return cp.stdout.decode()
elif output_from == "stderr":
return cp.stderr.decode()
else:
print("error: 'output_from' must be stdout or stdin")
def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable):
for k in key_list:
if k not in target_dict.keys():
continue
del target_dict[k]
class RamAnalyzer:
@classmethod
def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]:
"""
将hidumper的拥有的数据行进行分割,得到
[pid, name, pss, vss, rss, uss]格式的list
"""
trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据,包括小括号)")
content = re.sub(trival_pattern, "", content)
blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)")
return re.sub(blank_pattern, ' ', content.strip()).split()
__SS_Mode = typing.Literal["Pss", "Vss", "Rss", "Uss"] # 提示输入
__ss_dict: typing.Dict[str, int] = {
"Pss": 2,
"Vss": 3,
"Rss": 4,
"Uss": 5
}
@classmethod
def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: __SS_Mode = "Pss") -> typing.Dict[
typing.Text, int]:
"""
解析:hidumper --meme的结果
返回{process_name: pss}形式的字典
'248 samgr 1464(0 in SwapPss) kB 15064 kB 6928 kB 1072 kB\r'
"""
def find_full_process_name(hname: str) -> str:
for lname in __process_name_list:
if lname.startswith(hname):
return lname
def process_ps_ef(content: str) -> list:
line_list = content.strip().split("\n")[1:]
process_name_list = list()
for line in line_list:
process_name = line.split()[7]
if process_name.startswith('['):
# 内核进程
continue
process_name_list.append(process_name)
return process_name_list
if ss not in cls.__ss_dict.keys():
print("error: {} is not a valid parameter".format(ss))
return dict()
output = content.split('\n')
process_pss_dict = dict()
__process_name_list: typing.List[str] = process_ps_ef(
HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"]))
for line in output:
if "Total Memory Usage by Size" in line:
break
if line.isspace():
continue
processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(line)
if not processed[0].isnumeric(): # 如果第一列不是数字(pid),就过
continue
name = processed[1] # 否则的话就取名字,和对应的size
size = int(processed[cls.__ss_dict.get(ss)])
process_pss_dict[find_full_process_name(name)] = size
return process_pss_dict
@classmethod
def process_hidumper_info(cls, device_num: str, ss: __SS_Mode) -> typing.Dict[str, int]:
"""
处理进程名与对应进程大小
"""
def exec_once() -> typing.Dict[str, int]:
stdout = HDCTool.exec(["hdc", "-t", device_num, "shell", "hidumper", "--mem"])
name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss)
return name_size_dict
if not HDCTool.verify_hdc():
print("error: Command 'hdc' not found")
return dict()
if not HDCTool.verify_device(device_num):
print("error: {} is inaccessible or not found".format(device_num))
return dict()
return exec_once()
@classmethod
def __parse_process_xml(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]):
"""
解析xml文件,结存存入 result_dict中,格式:{process_name: os_list}
其中,so_list中是so的base_name
"""
if not (os.path.isfile(file_path) and file_path.endswith(".xml")):
print("warning: {} not exist or not a xml file".format(file_path))
return
doc = dom.parse(file_path)
info = doc.getElementsByTagName("info")[0]
process = info.getElementsByTagName("process")[0]
process_name = process.childNodes[0].data
result_dict[process_name] = list()
libs = info.getElementsByTagName("loadlibs")[0].getElementsByTagName("libpath")
for lib in libs:
so = lib.childNodes[0].data
result_dict.get(process_name).append(os.path.split(so)[-1])
if debug:
print(process_name, " ", so)
@classmethod
def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]:
"""
利用rom_analyzer.py的分析结果,重组成
{file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}}
的形式
"""
with open(rom_result_json, 'r', encoding='utf-8') as f:
rom_info_dict = json.load(f)
elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict()
for subsystem_name in rom_info_dict.keys():
sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(subsystem_name)
delete_values_from_dict(sub_val_dict, ["size", "file_count"])
for component_name in sub_val_dict.keys():
component_val_dict: typing.Dict[str, str] = sub_val_dict.get(component_name)
delete_values_from_dict(component_val_dict, ["size", "file_count"])
for file_name, size in component_val_dict.items():
file_basename: str = os.path.split(file_name)[-1]
elf_info_dict[file_basename] = {
"subsystem_name": subsystem_name,
"component_name": component_name,
"size": size
}
return elf_info_dict
@classmethod
def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict):
"""
解析cfg,因为有的cfg会拉起xml中的进程,所以也可能会去解析xml
"""
with open(cfg_path, 'r', encoding='utf-8') as f:
cfg_dict = json.loads(f.read())
services = cfg_dict.get("services")
if services is None:
print("warning: 'services' not in {}".format(cfg_path))
return
for service in services:
process_name = service.get("name")
first, *path_list = service.get("path")
if first.endswith("sa_main"):
# 由sa_main去来起进程
xml_base_name = os.path.split(path_list[0])[-1]
cls.__parse_process_xml(os.path.join(profile_path, xml_base_name), result_dict)
else:
# 直接执行
# process_name = os.path.split(first)[-1]
if result_dict.get(process_name) is None:
result_dict[process_name] = list()
result_dict.get(process_name).append(os.path.split(first)[-1])
@classmethod
def get_process_so_relationship(cls, xml_path: str, cfg_path: str, profile_path: str) -> typing.Dict[
str, typing.List[str]]:
"""
从out/{product_name}/packages/phone/sa_profile/merged_sa查找xml文件并处理得到进程与so的对应关系
"""
# xml_path = os.path.join(product_path, "packages", "phone", "sa_profile", "merged_sa")
# 从merged_sa里面收集
xml_list = glob.glob(xml_path + os.sep + "*[.]xml", recursive=True)
process_elf_dict: typing.Dict[str, typing.List[str]] = dict()
for xml in xml_list:
if debug:
print("parsing: ", xml)
try:
cls.__parse_process_xml(xml, process_elf_dict)
except:
print("parse '{}' failed".format(xml))
finally:
...
# 从system/etc/init/*.cfg中收集,如果是sa_main拉起的,则从system/profile/*.xml中进行解析
cfg_list = glob.glob(cfg_path + os.sep + "*[.]cfg", recursive=True)
for cfg in cfg_list:
if debug:
print("parsing: ", cfg)
try:
cls.__parse_process_cfg(cfg, profile_path, process_elf_dict)
except:
print("parse '{}' failed".format(cfg))
finally:
...
return process_elf_dict
@classmethod
def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: __SS_Mode):
"""
保存结果到excel中
"""
tmp_dict = copy.deepcopy(data_dict)
writer = SimpleExcelWriter("ram_info")
writer.set_sheet_header(
["process_name", "process_size({}, KB)".format(ss), "component_name", "elf_name", "elf_size(KB)"])
process_start_r = 1
process_end_r = 0
process_c = 0
process_size_c = 1
component_start_r = 1
component_end_r = 0
component_c = 2
for process_name in tmp_dict.keys():
process_val_dict: typing.Dict[str, typing.Dict[str, int]] = tmp_dict.get(process_name)
process_size = process_val_dict.get("size")
delete_values_from_dict(process_val_dict, ["size"])
for component_name, component_val_dict in process_val_dict.items():
elf_count_of_component = len(component_val_dict)
for elf_name, size in component_val_dict.items():
writer.append_line([process_name, process_size, component_name, elf_name, "%.2f" % (size / 1024)])
component_end_r += elf_count_of_component
# 重写component
writer.write_merge(component_start_r, component_c, component_end_r,
component_c, component_name)
component_start_r = component_end_r + 1
process_end_r += elf_count_of_component
writer.write_merge(process_start_r, process_c, process_end_r, process_c, process_name)
writer.write_merge(process_start_r, process_size_c, process_end_r, process_size_c, process_size)
process_start_r = process_end_r + 1
writer.save(filename)
@classmethod
def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str,
evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \
typing.Tuple[
bool, str, str, int]:
"""
全局查找进程的相关elf文件
subsystem_name与component_name可明确指定,或为*以遍历整个dict
evaluator:评估elf文件的从phone下面开始的路径与service_name的关系,评判如何才是找到了
returns: 是否查找到,elf文件名,部件名,size
"""
subsystem_name_list = [subsystem_name] if subsystem_name != "*" else rom_result_dict.keys()
for sn in subsystem_name_list:
sub_val_dict = rom_result_dict.get(sn)
component_name_list = [component_name] if component_name != '*' else sub_val_dict.keys()
for cn in component_name_list:
if cn == "size" or cn == "file_count":
continue
component_val_dict: typing.Dict[str, int] = sub_val_dict.get(cn)
for k, v in component_val_dict.items():
if k == "size" or k == "file_count":
continue
if not evaluator(service_name, k):
continue
return True, os.path.split(k)[-1], cn, v
return False, str(), str(), int()
@classmethod
def analysis(cls, cfg_path: str, xml_path: str, rom_result_json: str, device_num: str,
output_file: str, ss: __SS_Mode, output_excel: bool):
"""
process size subsystem/component so so_size
"""
if not HDCTool.verify_hdc():
print("error: Command 'hdc' not found")
return
if not HDCTool.verify_device(device_num):
print("error: {} is inaccessible or not found".format(device_num))
return
with open(rom_result_json, 'r', encoding='utf-8') as f:
rom_result_dict: typing.Dict = json.loads(f.read())
# 从rom的分析结果中将需要的elf信息重组
so_info_dict: typing.Dict[
str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result(
rom_result_json)
process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(xml_path, cfg_path,
profile_path)
process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(device_num, ss)
result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict()
def get(key: typing.Any, dt: typing.Dict[str, typing.Any]):
for k, v in dt.items():
if k.startswith(key) or key == v[0]:
# 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理,即:如果service_name找不到,但是直接执行的bin等于这个名字,也认为找到
return v
for process_name, process_size in process_size_dict.items(): # 从进程出发
if process_name == "init":
_, bin, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init",
lambda x, y: os.path.split(y)[
-1].lower() == x.lower(),
rom_result_dict)
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
result_dict[process_name]["init"] = dict()
result_dict[process_name]["init"][bin if len(bin) != 0 else "UNKOWN"] = size
continue
# 如果是hap,特殊处理
if (process_name.startswith("com.") or process_name.startswith("ohos.")):
_, hap_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*", "*",
lambda x, y: len(
y.split(
'/')) >= 3 and x.lower().startswith(
y.split('/')[2].lower()),
rom_result_dict)
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
result_dict[process_name][component_name] = dict()
result_dict[process_name][component_name][hap_name if len(hap_name) != 0 else "UNKOWN"] = size
continue
so_list: list = get(process_name, process_elf_dict) # 得到进程相关的elf文件list
if so_list is None:
print("warning: process '{}' not found in .xml or .cfg".format(process_name))
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
result_dict[process_name]["UNKOWN"] = dict()
result_dict[process_name]["UNKOWN"]["UNKOWN"] = int()
continue
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
for so in so_list:
unit = so_info_dict.get(so)
if unit is None:
print("warning: '{}' in {} not found in json from rom_analysis".format(so, process_name))
continue
component_name = unit.get("component_name")
so_size = unit.get("size")
if result_dict.get(process_name).get(component_name) is None:
result_dict[process_name][component_name] = dict()
result_dict[process_name][component_name][so] = so_size
base_dir, _ = os.path.split(output_file)
if len(base_dir) != 0 and not os.path.isdir(base_dir):
os.makedirs(base_dir, exist_ok=True)
with open(output_file + ".json", 'w', encoding='utf-8') as f:
f.write(json.dumps(result_dict, indent=4))
if output_excel:
cls.__save_result_as_excel(result_dict, output_file + ".xls", ss)
def get_args():
VERSION = 1.0
parser = argparse.ArgumentParser(
description="analyze ram size of component"
)
parser.add_argument("-v", "-version", action="version",
version=f"version {VERSION}")
parser.add_argument("-x", "--xml_path", type=str, required=True,
help="path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile")
parser.add_argument("-c", "--cfg_path", type=str, required=True,
help="path of cfg files. eg: -c ./cfgs/")
parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json",
help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json."
"eg: -j ./demo/rom_analysis_result.json")
parser.add_argument("-n", "--device_num", type=str, required=True,
help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800")
parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str,
help="base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result")
parser.add_argument("-e", "--excel", type=bool, default=False,
help="if output result as excel, default: False. eg: -e True")
args = parser.parse_args()
return args
if __name__ == '__main__':
args = get_args()
cfg_path = args.cfg_path
profile_path = args.xml_path
rom_result = args.rom_result
device_num = args.device_num
output_filename = args.output_filename
output_excel = args.excel
RamAnalyzer.analysis(cfg_path, profile_path, rom_result,
device_num=device_num, output_file=output_filename, ss="Pss", output_excel=output_excel)
import argparse
import json
import os
import sys
import typing
from copy import deepcopy
from typing import *
from packages.basic_tool import BasicTool
from packages.gn_common_tool import GnCommonTool
from packages.simple_excel_writer import SimpleExcelWriter
debug = bool(sys.gettrace())
class RomAnalyzer:
@classmethod
def __collect_product_info(cls, system_module_info_json: Text,
project_path: Text) -> Dict[Text, Dict[Text, Text]]:
"""
根据system_module_info.json生成target字典
"""
with open(system_module_info_json, 'r', encoding='utf-8') as f:
product_list = json.loads(f.read())
project_path = BasicTool.get_abs_path(project_path)
product_info_dict: Dict[Text, Dict[Text, Text]] = dict()
for unit in product_list:
dest: List = unit.get("dest")
if dest is None:
print("warning: keyword 'dest' not found in {}".format(system_module_info_json))
continue
label: Text = unit.get("label")
gn_path = component_name = subsystem_name = None
if label:
gn_path = os.path.join(project_path, label.split(':')[0].lstrip('/'), "BUILD.gn")
component_name, subsystem_name = GnCommonTool.find_part_subsystem(gn_path, project_path)
else:
print("warning: keyword 'label' not found in {}".format(unit))
for target in dest:
product_info_dict[target] = {
"component_name": component_name,
"subsystem_name": subsystem_name,
"gn_path": gn_path,
}
return product_info_dict
@classmethod
def __save_result_as_excel(cls, result_dict: dict, output_name: str):
header = ["subsystem_name", "component_name", "output_file", "size(Byte)"]
tmp_dict = deepcopy(result_dict)
excel_writer = SimpleExcelWriter("rom")
excel_writer.set_sheet_header(headers=header)
subsystem_start_row = 1
subsystem_end_row = 0
subsystem_col = 0
component_start_row = 1
component_end_row = 0
component_col = 1
for subsystem_name in tmp_dict.keys():
subsystem_dict = tmp_dict.get(subsystem_name)
subsystem_size = subsystem_dict.get("size")
subsystem_file_count = subsystem_dict.get("file_count")
del subsystem_dict["file_count"]
del subsystem_dict["size"]
subsystem_end_row += subsystem_file_count
for component_name in subsystem_dict.keys():
component_dict: Dict[str, int] = subsystem_dict.get(component_name)
component_size = component_dict.get("size")
component_file_count = component_dict.get("file_count")
del component_dict["file_count"]
del component_dict["size"]
component_end_row += component_file_count
for file_name, size in component_dict.items():
excel_writer.append_line(
[subsystem_name, component_name, file_name, size])
excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col,
component_name)
component_start_row = component_end_row + 1
excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col,
subsystem_name)
subsystem_start_row = subsystem_end_row + 1
excel_writer.save(output_name + ".xls")
@classmethod
def __put(cls, unit: typing.Dict[Text, Any], result_dict: typing.Dict[Text, Dict]):
"""
{
subsystem_name:{
component_name: {
file_name: file_size
}
}
}
"""
component_name = "others" if unit.get("component_name") is None else unit.get("component_name")
subsystem_name = "others" if unit.get("subsystem_name") is None else unit.get("subsystem_name")
size = unit.get("size")
relative_filepath = unit.get("relative_filepath")
if result_dict.get(subsystem_name) is None:
result_dict[subsystem_name] = dict()
result_dict[subsystem_name]["size"] = 0
result_dict[subsystem_name]["file_count"] = 0
if result_dict.get(subsystem_name).get(component_name) is None:
result_dict[subsystem_name][component_name] = dict()
result_dict[subsystem_name][component_name]["size"] = 0
result_dict[subsystem_name][component_name]["file_count"] = 0
result_dict[subsystem_name]["size"] += size
result_dict[subsystem_name]["file_count"] += 1
result_dict[subsystem_name][component_name]["size"] += size
result_dict[subsystem_name][component_name]["file_count"] += 1
result_dict[subsystem_name][component_name][relative_filepath] = size
@classmethod
def analysis(cls, system_module_info_json: Text, product_dirs: List[str],
project_path: Text, product_name: Text, output_file: Text, output_execel: bool):
"""
system_module_info_json: json文件
product_dirs:要处理的产物的路径列表如["vendor", "system/"]
project_path: 项目根路径
product_name: eg,rk3568
output_file: basename of output file
"""
project_path = BasicTool.get_abs_path(project_path)
phone_dir = os.path.join(project_path, "out", product_name, "packages", "phone")
product_dirs = [os.path.join(phone_dir, d) for d in product_dirs]
product_info_dict = cls.__collect_product_info(system_module_info_json, project_path) # 所有产物信息
result_dict: Dict[Text:Dict] = dict()
for d in product_dirs:
file_list: List[Text] = BasicTool.find_all_files(d)
for f in file_list:
size = os.path.getsize(f)
relative_filepath = f.replace(phone_dir, "").lstrip(os.sep)
unit: Dict[Text, Any] = product_info_dict.get(relative_filepath)
if unit is None:
unit = {
"relative_filepath": relative_filepath,
}
unit["size"] = size
unit["relative_filepath"] = relative_filepath
cls.__put(unit, result_dict)
output_dir, _ = os.path.split(output_file)
if len(output_dir) != 0:
os.makedirs(output_dir, exist_ok=True)
with open(output_file + ".json", 'w', encoding='utf-8') as f:
f.write(json.dumps(result_dict, indent=4))
if output_execel:
cls.__save_result_as_excel(result_dict, output_file)
def get_args():
VERSION = 2.0
parser = argparse.ArgumentParser(
description=f"analyze rom size of component.\n")
parser.add_argument("-v", "-version", action="version",
version=f"version {VERSION}")
parser.add_argument("-p", "--project_path", type=str, required=True,
help="root path of openharmony. eg: -p ~/openharmony")
parser.add_argument("-j", "--module_info_json", required=True, type=str,
help="path of out/{product_name}/packages/phone/system_module_info.json")
parser.add_argument("-n", "--product_name", required=True, type=str, help="product name. eg: -n rk3568")
parser.add_argument("-d", "--product_dir", required=True, action="append",
help="subdirectories of out/{product_name}/packages/phone to be counted."
"eg: -d system -d vendor")
parser.add_argument("-o", "--output_file", type=str, default="rom_analysis_result",
help="basename of output file, default: rom_analysis_result. eg: demo/rom_analysis_result")
parser.add_argument("-e", "--excel", type=bool, default=False,
help="if output result as excel, default: False. eg: -e True")
args = parser.parse_args()
return args
if __name__ == '__main__':
args = get_args()
module_info_json = args.module_info_json
project_path = args.project_path
product_name = args.product_name
product_dirs = args.product_dir
output_file = args.output_file
output_excel = args.excel
RomAnalyzer.analysis(module_info_json, product_dirs, project_path, product_name, output_file, output_excel)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册