提交 fcf9e33b 编写于 作者: Y Yiru Wang Mac
......@@ -19,3 +19,94 @@ openEuler-Advisor 的目标是为 openEuler 制品仓的日常工作提供自动
2. 对 simple-update-robot.py 做进一步的优化,提高自动化处理升级的能力。
3. 完善 upstream-info,覆盖 openEuler 制品仓中所有软件。并将分散中 openEuler 社区中的各个 YAML 统一到 upstream-info 中,便于后续统一管理。
4. 完善 oa_upgradable.py 支持的上游社区代码管理协议,当前发现还需要增加 fossil 的支持。
#### ymal文件规范
###### version_control:
可选为svn, git, hg, github, gnome, metacpan, pypi
###### src_repo:
1、 如果version_control为svn,那src_repo需要 完整的 SVN 仓库地址。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/amanda.yaml
2、如果version_control为git,那src_repo需要 完整的 GIT 仓库地址。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/mdadm.yaml
3、如果version_control为hg,那src_repo需要 完整的 HG 仓库地址。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/nginx.yaml
4、如果version_control为github,那src_repo只需要 proj/repo 即可,不需要完整的URL。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/asciidoc.yaml
5、 如果version_control为gnome,那src_repo只需要 $proj 即可,不需要完整的URL。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/gnome-terminal.yaml。 注意gitlab.gnome.org上很多项目需要访问权限,这些不能作为上游代码仓库。
6、如果version_control为metacpan,那src_repo只需要 $proj 即可,不需要完整的URL。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/perl-Authen-SASL.yaml。 注意在metacpan上的命名规范。
7、 如果version_control为pypi,那src_repo只需要 $proj 即可,不需要完整的URL。例子可以参考https://gitee.com/shinwell_hu/openEuler-Advisor/tree/next/upstream-info/python-apipkg。 注意pypi上的命名规范。
###### tag_prefix:
不同项目的tag规则不同,这里比如tag是v1.1的,那么tag_prefix设置为^v即可。有些软件的tag_prefix会比较复杂。
###### seperator:
不同项目的tag中域分割不同,有些是"-",有些是"_",一般默认是".",建议加上双引号
###### 开源软件上游代码仓信息的验证方法
1)常见代码配置管理方法
git,svn,hg都可以在不下载完整代码仓的情况下获取代码仓的信息。方法如下:
\- git:
git ls-remote --tags $repo_url
\- svn:
​ svn ls -v $repo_url/tags
\- hg:
​ curl $repo_url/json-tags
2)常见代码托管网站的使用方法
\- github
curl https://api.github.com/repos/$user/$repo/release
可以获得json格式完整的release信息清单。但是不是所有项目都支持
curl https://api.github.com/repos/$user/$repo/tags
可以获得json格式完整的tag信息清单。但也不是所有项目都支持,并且已经发现有些项目的这个信息是错误的。
\- metacpan
curl https://fastapi.metacpan.org/release/$repo
可以获得json格式的最新版本信息
\- pypi
curl https://pypi.org/pypi/$repo/json
可以获得项目最新发布版本的信息
\- tag_prefix和tag_pattern的使用
很多软件的tag信息设置是使用了前缀的,比如release-1.2.3,或者v1.2.3。
设置了tag_prefix,就会把所有tag字符串中同样的前缀部分都删除。
比如一个软件同时存在 1.2.3 和 release-1.2.2 两个tag,设置tag_prefix为release-,处理后的tag为1.2.3和1.2.2。
tag_pattern是为了更复杂的形态使用的,不推荐使用。
\- seperator 的使用
设置seperator,可以简单的把这个字符替换成"."。
有些软件的tag分域采用的不是".",这时候设置seperator就可以规范化版本tag。
如果软件tag分域本来就是".",这个时候设置seperator是不影响结果的。
\ No newline at end of file
#!/usr/bin/python3
#******************************************************************************
# Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved.
# licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# Author: wangchuangGG
# Create: 2020-07-20
# ******************************************************************************/
"""
(1) This script is used to check the ABI changes between the old
and new versions of dynamic libraries.
The merged result on difference is saved in the xxx_all_abidiff.out file in the working directory
default path: /var/tmp/xxx_all_abidiff.out
(2) This script depends on abidiff from libabigail package.
(3) Command parameters
This script accept two kind of command: compare_rpm or compare_so
Run it without any paramter prints out help message.
"""
import argparse
import subprocess
import sys
import os
import logging
import io
import shutil
import tempfile
def parse_command_line():
"""Parse the command line arguments."""
parser = argparse.ArgumentParser(prog="check_abi")
parser.add_argument("-d", "--work_path", default="/var/tmp", nargs="?",
help="The work path to put rpm2cpio files and results"
" (e.g. /home/tmp_abidiff default: /var/tmp/)")
parser.add_argument("-a", "--show_all_info", action="store_true", default=False,
help="show all infos includ changes in member name")
parser.add_argument("-v", "--verbose", action="store_true", default=False,
help="Show additional information")
subparser = parser.add_subparsers(dest='command_name',
help="Compare between two RPMs or two .so files")
rpm_parser = subparser.add_parser('compare_rpm', help="Compare between two RPMs")
rpm_parser.add_argument("-r", "--rpms", required=True, nargs=2,
metavar=('old_rpm', 'new_rpm'),
help="Path or URL of both the old and new RPMs")
rpm_parser.add_argument("-d", "--debuginfo_rpm", nargs=2,
metavar=('old_debuginfo_rpm', 'new_debuginfo_rpm'), required=False,
help = "Path or URL of both the old and new debuginfo RPMs, corresponding to compared RPMs.")
rpm_parser.set_defaults(func=process_with_rpm)
so_parser = subparser.add_parser('compare_so', help="Compare between two .so files")
so_parser.add_argument("-s", "--sos", required=True, nargs=2,
metavar=('old_so', 'new_so'),
help="Path or URL of both the old and new .so files")
so_parser.add_argument("-f", "--debuginfo_path", nargs=2, required=False,
metavar=('old_debuginfo_path', 'new_debuginfo_path'),
help = "Path or URL of both the old and new debuginfo files, corresponding to compared .so files.")
so_parser.set_defaults(func=process_with_so)
config = parser.parse_args()
if config.command_name == None:
parser.print_help()
sys.exit(0)
else:
return config
def list_so_files(path):
"""
Generate a list of all .so files in the directory.
"""
# known suffix of exception
# we cannot rely on number suffix for some .so files use complex version scheme.
exception_list = ["hmac"]
so_files = set()
for dirpath, dirnames, files in os.walk(path):
for filename in files:
fp = os.path.join(dirpath, filename)
if os.path.islink(fp):
continue
if filename.split(".")[-1] in exception_list:
continue
if ".so" in filename:
logging.debug(".so file found:%s", fp)
so_files.add(fp)
logging.debug("")
return so_files
def find_all_so_file(path1, path2):
"""
Generate a map between previous and current so files
"""
all_so_pair = {}
previous_sos = list_so_files(path1)
current_sos = list_so_files(path2)
logging.debug("previous_so:%s", previous_sos)
logging.debug("current_so:%s", current_sos)
prev_matched = set()
curr_matched = set()
if previous_sos and current_sos:
for so_file1 in previous_sos:
for so_file2 in current_sos:
base_name1 = (os.path.basename(so_file1)).split('.')[0]
base_name2 = (os.path.basename(so_file2)).split('.')[0]
if base_name1 == base_name2:
all_so_pair[so_file1] = so_file2
prev_matched.add(so_file1)
curr_matched.add(so_file2)
else:
logging.info("Not found so files")
sys.exit(0)
prev_left = previous_sos - prev_matched
curr_left = current_sos - curr_matched
if len(prev_left) != 0:
logging.info("Unmatched .so file in previous version")
logging.info("Usually means deleted .so in current version")
logging.info("%s\n", prev_left)
if len(curr_left) != 0:
logging.info("Unmatched .so file in current version")
logging.info("Usually means newly added .so in current version")
logging.info("%s\n", curr_left)
logging.debug("mapping of .so files:%s\n", all_so_pair)
return all_so_pair
def make_abi_path(work_path, abipath):
"""
Get the path to put so file from rpm
return the path.
"""
fp = os.path.join(work_path, abipath)
if os.path.isdir(fp):
shutil.rmtree(fp)
os.makedirs(fp)
return fp
def get_rpm_path(rpm_url, dest):
"""Get the path of rpm package"""
if os.path.isfile(rpm_url):
abs_rpmpath = os.path.abspath(rpm_url)
logging.debug("rpm exists:%s", abs_rpmpath)
return abs_rpmpath
else:
rpm_name = os.path.basename(rpm_url)
rpm_path = os.path.join(dest, rpm_name)
logging.debug("downloading %s......", rpm_name)
subprocess.call(["curl", rpm_url, "-L",
"--connect-timeout", "10",
"--max-time", "600",
"-sS", "-o", rpm_path])
return rpm_path
def do_rpm2cpio(rpm2cpio_path, rpm_file):
"""
Exec the rpm2cpio at rpm2cpio_path.
"""
cur_dir = os.getcwd()
os.chdir(rpm2cpio_path)
logging.debug("\n----working in path:%s----", os.getcwd())
logging.debug("rpm2cpio %s", rpm_file)
subprocess.run("rpm2cpio {} | cpio -id > /dev/null 2>&1".format(rpm_file), shell=True)
os.chdir(cur_dir)
def merge_all_abidiff_files(all_abidiff_files, work_path, rpm_base_name):
"""
Merge the all diff files to merged_file.
return the merged_file.
"""
merged_file = os.path.join(work_path, "{}_all_abidiff.out".format(rpm_base_name))
if os.path.exists(merged_file):
subprocess.run("rm -rf {}".format(merged_file), shell=True)
ofile = open(merged_file, "a+")
for diff_file in all_abidiff_files:
diff_file_name = os.path.basename(diff_file)
ofile.write("---------------diffs in {}:----------------\n".format(diff_file_name))
for txt in open(diff_file, "r"):
ofile.write(txt)
ofile.close()
return merged_file
def do_abidiff(config, all_so_pair, work_path, base_name, debuginfo_path):
"""
Exec the abidiff and write result to files.
return the abidiff returncode.
"""
if len(all_so_pair) == 0:
logging.info("There are no .so files to compare")
sys.exit(0)
if debuginfo_path:
logging.debug("old_debuginfo_path:%s\nnew_debuginfo_path:%s",
debuginfo_path[0], debuginfo_path[1])
with_debuginfo = True
else:
with_debuginfo = False
return_code = 0
all_abidiff_files = []
for old_so_file in all_so_pair:
new_so_file = all_so_pair[old_so_file]
logging.debug("begin abidiff between %s and %s", old_so_file, new_so_file)
abidiff_file = os.path.join(work_path,
"{}_{}_abidiff.out".format(base_name, os.path.basename(new_so_file)))
so_options = "{} {}".format(old_so_file, new_so_file)
if config.show_all_info:
additional_options = "--harmless"
else:
additional_options = "--changed-fns --deleted-fns --added-fns"
if with_debuginfo:
debug_options = "--d1 {} --d2 {}".format(debuginfo_path[0], debuginfo_path[1])
else:
debug_options = ""
abidiff_cmd = "abidiff {so_options} {debug_options} {additional_options} > {difffile}".format(
so_options=so_options,
debug_options=debug_options,
additional_options=additional_options,
difffile=abidiff_file)
ret = subprocess.run(abidiff_cmd, shell=True)
all_abidiff_files.append(abidiff_file)
logging.info("result write in: %s", abidiff_file)
return_code |= ret.returncode
merged_file = merge_all_abidiff_files(all_abidiff_files, work_path, base_name)
logging.info("all results writed in: %s", merged_file)
return return_code
def validate_sos(config):
"""
Validate the command arguments
"""
for so in config.sos:
if not os.path.isfile(so) or ".so" not in so:
logging.error(f"{so} not exists or not a .so file")
sys.exit(0)
if config.debuginfo_path:
for d in config.debuginfo_path:
if not os.path.exists(d):
logging.error(f"{d} not exists")
sys.exit(0)
def check_result(returncode):
"""
Check the result of abidiff
"""
ABIDIFF_ERROR_BIT = 1
if returncode == 0:
logging.info("No ABI differences found.")
elif returncode & ABIDIFF_ERROR_BIT:
logging.info("An unexpected error happened to abidiff")
else:
logging.info("ABI differences found.")
def process_with_rpm(config):
"""
Process the file with type of rpm.
"""
work_path = config.work_path
temp_path = os.path.abspath(tempfile.mkdtemp(dir=work_path))
abi_paths = [make_abi_path(temp_path, name) for name in ["previous_package", "current_package"]]
logging.debug("abi_paths:%s\n", abi_paths)
rpm_path = [get_rpm_path(x[0], x[1]) for x in zip(config.rpms, abi_paths)]
logging.debug("rpm_path:%s\n", rpm_path)
[do_rpm2cpio(x[0], x[1]) for x in zip(abi_paths, rpm_path)]
if config.debuginfo_rpm:
debuginfo_rpm_path = [get_rpm_path(x[0], x[1]) for x in zip(config.debuginfo_rpm, abi_paths)]
logging.debug("debuginfo_rpm_path:%s\n", debuginfo_rpm_path)
[do_rpm2cpio(x[0], x[1]) for x in zip(abi_paths, debuginfo_rpm_path)]
os.chdir(temp_path)
logging.debug("\n----begin abidiff working in path:%s----", os.getcwd())
so_paths = [ os.path.join(x, "usr/lib64") for x in abi_paths ]
all_so_pairs = find_all_so_file(so_paths[0], so_paths[1])
debuginfo_paths = [ os.path.join(x, "usr/lib/debug") for x in abi_paths ]
rpm_base_name = os.path.basename(rpm_path[0]).split('.')[0]
returncode = do_abidiff(config, all_so_pairs, work_path, rpm_base_name, debuginfo_paths)
logging.debug("\n--- delete temp directory:%s ---", temp_path)
shutil.rmtree(temp_path)
check_result(returncode)
return returncode
def process_with_so(config):
"""
Process the file with type of .so.
"""
validate_sos(config)
work_path = config.work_path
all_so_pair = {}
so_path = list(map(os.path.abspath, config.sos))
all_so_pair[so_path[0]] = so_path[1]
os.chdir(work_path)
logging.debug("\n----begin abidiff with .so working in path:%s----", os.getcwd())
so_base_name = os.path.basename(old_so_path).split('.')[0]
if config.debuginfo_path:
debuginfo_path = list(map(os.path.abspath, config.debuginfo_path))
else:
debuginfo_path = None
returncode = do_abidiff(config, all_so_pair, work_path, so_base_name, debuginfo_path)
check_result(returncode)
return returncode
def main():
"""Entry point for check_abi"""
config = parse_command_line()
if config.verbose:
logging.basicConfig(format='%(message)s', level=logging.DEBUG)
else:
logging.basicConfig(format='%(message)s', level=logging.INFO)
ret = config.func(config)
sys.exit(ret)
if __name__ == "__main__":
main()
......@@ -36,8 +36,8 @@ def load_last_query_result(info, force_reload=False):
else:
return ""
def clean_tags(tags, info):
def clean_tags(tags, info):
if info.get("tag_pattern", "") != "" and info.get("tag_pattern", "") is not None:
pattern_regex = re.compile(info["tag_pattern"])
result_list = [pattern_regex.sub("\\1", x) for x in tags]
......@@ -50,7 +50,7 @@ def clean_tags(tags, info):
if info.get("separator", ".") != "." and info.get("separator", ".") is not None:
separator_regex = re.compile(info["separator"])
result_list = [separator_regex.sub(".", x) for x in result_list]
# Xinwei used to mis-spell 'separator'.
# Followings are kept for compatability until all yaml files are fixed.
if info.get("seperator", ".") != "." and info.get("seperator", ".") is not None:
......@@ -58,7 +58,7 @@ def clean_tags(tags, info):
result_list = [separator_regex.sub(".", x) for x in result_list]
result_list = [x for x in result_list if x[0].isdigit()]
return result_list
......
......@@ -26,12 +26,13 @@ class Gitee(object):
self.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW 64; rv:50.0) Gecko/20100101 Firefox/50.0'}
self.gitee_url = "https://gitee.com/"
self.src_openeuler_url = self.gitee_url + "src-openeuler/{package}/raw/master/"
self.src_openeuler_url = self.gitee_url + "src-openeuler/{package}/raw/{branch}/"
self.advisor_url = self.gitee_url + "openeuler/openEuler-Advisor/raw/master/"
self.specfile_url_template = self.src_openeuler_url + "{specfile}"
self.yamlfile_url_template = self.src_openeuler_url + "{package}.yaml"
#self.advisor_url_template = "https://gitee.com/openeuler/openEuler-Advisor/raw/master/upstream-info/{package}.yaml"
self.advisor_url_template = self.advisor_url + "upstream-info/{package}.yaml"
self.community_url_template = self.gitee_url + "openeuler/community/raw/master/repository/{repository}.yaml"
#self.specfile_exception_url = "https://gitee.com/openeuler/openEuler-Advisor/raw/master/helper/specfile_exceptions.yaml"
self.specfile_exception_url = self.advisor_url + "advisors/helper/specfile_exceptions.yaml"
self.time_format = "%Y-%m-%dT%H:%M:%S%z"
......@@ -96,7 +97,7 @@ Yours openEuler-Advisor.
def get_gitee_json(self, url):
"""
get and load gitee json response
Get and load gitee json response
"""
headers = self.headers.copy()
#headers = {}
......@@ -106,33 +107,31 @@ Yours openEuler-Advisor.
def get_spec_exception(self):
"""
get well known spec file exceptions
Get well known spec file exceptions
"""
resp = self.get_gitee(self.specfile_exception_url)
exps = yaml.load(resp, Loader=yaml.Loader)
return exps
def get_spec(self, pkg):
def get_spec(self, pkg, br="master"):
"""
get openeuler spec file for specific package
Get openeuler spec file for specific package
"""
specurl = self.specfile_url_template.format(package=pkg, specfile=pkg + ".spec")
specurl = self.specfile_url_template.format(branch=br, package=pkg, specfile=pkg + ".spec")
exp = self.get_spec_exception()
if pkg in exp:
dir_name = exp[pkg]["dir"]
file_name = exp[pkg]["file"]
specurl = urllib.parse.urljoin(specurl, os.path.join(dir_name, file_name))
try:
resp = self.get_gitee(specurl)
except urllib.error.HTTPError:
resp = ""
return resp
def get_yaml(self, pkg):
def get_yaml(self, pkg, br="master"):
"""
get upstream yaml metadata for specific package
Get upstream yaml metadata for specific package
"""
yamlurl = self.advisor_url_template.format(package=pkg)
try:
......@@ -140,19 +139,30 @@ Yours openEuler-Advisor.
except urllib.error.HTTPError:
resp = "Not found"
if re.match("Not found", resp):
yamlurl = self.yamlfile_url_template.format(package=pkg)
yamlurl = self.yamlfile_url_template.format(branch=br, package=pkg)
try:
resp = self.get_gitee(yamlurl)
except urllib.error.HTTPError:
resp = "Not found"
if re.match("Not found", resp):
print("Cannot find upstream metadata")
print("Cann't find yaml metadata for {package} from upstream-info.".format(package=pkg))
return False
else:
return resp
else:
return resp
def get_community(self, repo):
"""
Get yaml data from community repo
"""
yamlurl = self.community_url_template.format(repository=repo)
try:
resp = self.get_gitee(yamlurl)
except urllib.error.HTTPError:
resp = ""
return resp
def get_issues(self, pkg, prj="src-openeuler"):
"""
List all open issues of pkg
......
---
# version recommend exception list
gegl04:
- '20001120.v002'
gimp:
- '19990910'
nss:
- '334.20030307'
glibc:
- '9000'
......@@ -12,10 +12,72 @@ import os
import argparse
import urllib.error
import gitee
import check_upstream
import version_recommend
def _get_rec_excpt():
"""
Get except case of version recommend
"""
y_file = open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "helper/ver_rec_excpt.yaml"))
excpt = yaml.load(y_file, Loader=yaml.Loader)
return excpt
def _filter_except(excpts, sources):
"""
Filter except case in sources
"""
for e in excpts:
sources = [s for s in sources if e not in s]
return sources
def get_ver_tags(gt, repo, cwd_path=None):
"""
Get version tags of given package
"""
if cwd_path:
try:
repo_yaml = open(os.path.join(cwd_path, repo + ".yaml")).read()
except FileNotFoundError:
print("Cann't find yaml metadata for {pkg} from current working directory.".format(pkg=repo))
repo_yaml = gt.get_yaml(repo)
else:
repo_yaml = gt.get_yaml(repo)
if repo_yaml:
pkg_info = yaml.load(repo_yaml, Loader=yaml.Loader)
else:
return None
vc_type = pkg_info["version_control"]
if vc_type == "hg":
tags = check_upstream.check_hg(pkg_info)
elif vc_type == "github":
tags = check_upstream.check_github(pkg_info)
elif vc_type == "git":
tags = check_upstream.check_git(pkg_info)
elif vc_type == "gitlab.gnome":
tags = check_upstream.check_gnome(pkg_info)
elif vc_type == "svn":
tags = check_upstream.check_svn(pkg_info)
elif vc_type == "metacpan":
tags = check_upstream.check_metacpan(pkg_info)
elif vc_type == "pypi":
tags = check_upstream.check_pypi(pkg_info)
else:
print("Unsupport version control method {vc}".format(vc=vc_type))
excpt_list = _get_rec_excpt()
if repo in excpt_list:
tags = _filter_except(excpt_list[repo], tags)
return tags
if __name__ == "__main__":
parameters = argparse.ArgumentParser()
parameters.add_argument("-p", "--push", action="store_true",
......@@ -27,56 +89,24 @@ if __name__ == "__main__":
args = parameters.parse_args()
gitee = gitee.Gitee()
prj_name = args.repo
spec_string = gitee.get_spec(prj_name)
user_gitee = gitee.Gitee()
spec_string = user_gitee.get_spec(args.repo)
if not spec_string:
print("{repo} seems to be an empty repository".format(repo=args.repo))
print("{pkg}.spec can't be found on the master branch".format(pkg=args.repo))
sys.exit(1)
s_spec = Spec.from_string(spec_string)
current_version = replace_macros(s_spec.version, s_spec)
print("Checking ", prj_name)
print("current version is ", current_version)
try:
prj_info_string = open(os.path.join(args.default, prj_name + ".yaml")).read()
except FileNotFoundError:
prj_info_string = ""
if not prj_info_string:
print("Get YAML info from gitee")
try:
prj_info_string = gitee.get_yaml(prj_name)
except urllib.error.HTTPError:
print("Failed to get YAML info for {pkg}".format(pkg=prj_name))
sys.exit(1)
spec_file = Spec.from_string(spec_string)
cur_version = replace_macros(spec_file.version, spec_file)
print("Checking ", args.repo)
print("current version is ", cur_version)
prj_info = yaml.load(prj_info_string, Loader=yaml.Loader)
pkg_tags = get_ver_tags(user_gitee, args.repo, args.default)
print("known release tags:", pkg_tags)
vc_type = prj_info["version_control"]
if vc_type == "hg":
tags = check_upstream.check_hg(prj_info)
elif vc_type == "github":
tags = check_upstream.check_github(prj_info)
elif vc_type == "git":
tags = check_upstream.check_git(prj_info)
elif vc_type == "gitlab.gnome":
tags = check_upstream.check_gnome(prj_info)
elif vc_type == "svn":
tags = check_upstream.check_svn(prj_info)
elif vc_type == "metacpan":
tags = check_upstream.check_metacpan(prj_info)
elif vc_type == "pypi":
tags = check_upstream.check_pypi(prj_info)
else:
print("Unsupport version control method {vc}".format(vc=vc_type))
if pkg_tags is None:
sys.exit(1)
ver_rec = version_recommend.VersionRecommend(pkg_tags, cur_version, 0)
print("known release tags :", tags)
v = version_recommend.VersionRecommend(tags, current_version, 0)
print("Latest version is ", v.latest_version)
print("Maintain version is", v.maintain_version)
print("Latest version is", ver_rec.latest_version)
print("Maintain version is", ver_rec.maintain_version)
......@@ -21,8 +21,11 @@ import subprocess
import os.path
import re
import datetime
import oa_upgradable
import version_recommend
def download_source_url(spec, o_ver, n_ver):
"""
Download source file from Source or Source0 URL
......@@ -59,10 +62,61 @@ def download_upstream_url(gt, repo, n_ver):
return False
def update_check(spec, o_ver, n_ver):
"""
Requirements check for upgraded package
"""
if len(spec.patches) >= 1:
print("I'm too naive to handle complicated package.")
print("This package has multiple in-house patches.")
return False
ver_type = version_recommend.VersionType()
if(ver_type.compare(n_ver, o_ver) == 1):
return True
else:
print("Update failed >> [{pkg}: current_ver:{cur_ver}, upgraded_ver:{upd_ver}]".format(
pkg=spec.name, cur_ver=o_ver, upd_ver=n_ver))
return False
def fork_clone_repo(gt, repo):
"""
Fork repo from src-openEuler to private repository, then clone it to local
"""
if not gt.fork_repo(repo):
print("The repo of {pkg} seems to have been forked.".format(pkg=repo))
name = gt.token["user"]
subprocess.call(["git", "clone", "git@gitee.com:{user}/{pkg}".format(user=name, pkg=repo)])
os.chdir(repo)
def download_src(gt, spec, o_ver, n_ver):
"""
Download source code for upgraded package
"""
source_file = download_source_url(spec, o_ver, n_ver)
if source_file:
print(source_file)
return True
else:
source_file = download_upstream_url(gt, spec.name, n_ver)
if source_file:
print(source_file)
return True
else:
print("Failed to download the latest source code.")
return False
def create_spec(repo, spec_str, o_ver, n_ver, src_fn=None):
"""
Create new spec file for upgraded package
"""
fn = open(repo + "_old.spec", "w")
fn.write(spec_str)
fn.close()
fn = open(repo + ".spec", "w")
in_changelog = False
for l in spec_str.splitlines():
......@@ -89,55 +143,133 @@ def create_spec(repo, spec_str, o_ver, n_ver, src_fn=None):
fn.write("\n")
fn.close()
def auto_update_pkg(gt, u_branch, u_pkg):
"""
Auto upgrade based on given branch for single package
"""
spec_str = gt.get_spec(u_pkg, u_branch)
if not spec_str:
print("{pkg}.spec can't be found on the {br} branch. ".format(
pkg=u_pkg, br=u_branch))
sys.exit(1)
pkg_spec = Spec.from_string(spec_str)
pkg_ver = replace_macros(pkg_spec.version, pkg_spec)
pkg_tags = oa_upgradable.get_ver_tags(gt, u_pkg)
if pkg_tags is None:
sys.exit(1)
ver_rec = version_recommend.VersionRecommend(pkg_tags, pkg_ver, 0)
rec_up_ver = pkg_ver
if re.search("master", u_branch):
rec_up_ver = ver_rec.latest_version
elif re.search("LTS", u_branch):
rec_up_ver = ver_rec.maintain_version
else:
print("Only support master and LTS version upgrade.")
sys.exit(1)
fork_clone_repo(gt, u_pkg)
if not update_check(pkg_spec, pkg_ver, rec_up_ver):
sys.exit(1)
if not download_src(gt, pkg_spec, pkg_ver, rec_up_ver):
sys.exit(1)
create_spec(u_pkg, spec_str, pkg_ver, rec_up_ver)
def auto_update_repo(gt, u_branch, u_repo):
"""
Auto upgrade based on given branch for packages in given repository
"""
repo_yaml = gt.get_community(u_repo)
if not repo_yaml:
print("{repo}.yaml in community is empty.".format(repo=u_repo))
sys.exit(1)
pkg_info = yaml.load(repo_yaml, Loader=yaml.loader)
pkg_list = pkg_info.get("repositories")
for pkg in pkg_list:
pkg_name = pkg.get("name")
spec_str = gt.get_spec(pkg_name, u_branch)
if not spec_str:
print("{pkg}.spec can't be found on the {br} branch. ".format(
pkg=pkg_name, br=u_branch))
continue
pkg_spec = Spec.from_string(spec_str)
pkg_ver = replace_macros(pkg_spec.version, pkg_spec)
pkg_tags = oa_upgradable.get_ver_tags(gt, pkg_name)
if pkg_tags is None:
continue
ver_rec = version_recommend.VersionRecommend(pkg_tags, pkg_ver, 0)
rec_up_ver = pkg_ver
if re.search("master", u_branch):
rec_up_ver = ver_rec.latest_version
elif re.search("LTS", u_branch):
rec_up_ver = ver_rec.maintain_version
else:
print("Only support master and LTS version upgrade.")
sys.exit(1)
fork_clone_repo(gt, pkg_name)
if not update_check(pkg_spec, pkg_ver, rec_up_ver):
continue
if not download_src(gt, pkg_spec, pkg_ver, rec_up_ver):
continue
create_spec(pkg_name, spec_str, pkg_ver, rec_up_ver)
if __name__ == "__main__":
pars = argparse.ArgumentParser()
pars.add_argument("pkg", type=str, help="The package to be upgraded")
pars.add_argument("repo_pkg", type=str, help="The repository or package to be upgraded")
pars.add_argument("branch", type=str, help="The branch that upgrade based")
pars.add_argument("-u", "--update", type=str, help="Auto upgrade for packages in repository or single package",
choices=["repo", "pkg"])
pars.add_argument("-n", "--new_version", type=str, help="New upstream version of package will be upgrade to")
pars.add_argument("-s", "--create_spec", help="Create spec file", action="store_true")
pars.add_argument("-d", "--download", help="Download upstream source code", action="store_true")
pars.add_argument("-f", "--fork", help="fork src-openeuler repo into users", action="store_true")
pars.add_argument("-c", "--clone", help="clone privatge repo to local", action="store_true")
pars.add_argument("-fc", "--fork_then_clone", help="Fork src-openeuler repo into users, then clone to local",
action="store_true")
pars.add_argument("-p", "--PR", help="Create upgrade PR", action="store_true")
args = pars.parse_args()
user_gitee = gitee.Gitee()
my_gitee = gitee.Gitee()
my_version = version_recommend.VersionType()
spec_string= my_gitee.get_spec(args.pkg)
s_spec = Spec.from_string(spec_string)
cur_ver = replace_macros(s_spec.version, s_spec)
if args.update:
if args.update == "repo":
auto_update_repo(user_gitee, args.branch, args.repo_pkg)
else:
auto_update_pkg(user_gitee, args.branch, args.repo_pkg)
else:
spec_string = user_gitee.get_spec(args.repo_pkg, args.branch)
if not spec_string:
print("{pkg}.spec can't be found on the {br} branch. ".format(pkg=args.repo_pkg, br=args.branch))
sys.exit(1)
spec_file = Spec.from_string(spec_string)
cur_version = replace_macros(spec_file.version, spec_file)
if args.fork:
if not my_gitee.fork_repo(args.pkg):
print("The repo of {pkg} seems to have been forked.".format(pkg=args.pkg))
if args.fork_then_clone:
fork_clone_repo(user_gitee, args.repo_pkg)
if args.clone:
user=my_gitee.token["user"]
subprocess.call(["git", "clone", "git@gitee.com:{user}/{pkg}".format(user=user, pkg=args.pkg)])
os.chdir(args.pkg)
if args.download or args.create_spec:
if not args.new_version:
print("Please specify the upgraded version of the {repo}".format(repo=args.repo_pkg))
sys.exit(1)
elif not update_check(spec_file, cur_version, args.new_version):
sys.exit(1)
if args.download:
source_file = download_source_url(s_spec, cur_ver, args.new_version)
if source_file:
print(source_file)
else:
source_file = download_upstream_url(my_gitee, args.pkg, args.new_version)
if source_file:
print(source_file)
else:
print("Failed to download the latest source code.")
if args.download:
if not download_src(user_gitee, spec_file, cur_version, args.new_version):
sys.exit(1)
if args.create_spec:
if len(s_spec.patches) >= 1:
print("I'm too naive to handle complicated package.")
print("This package has multiple in-house patches.")
sys.exit(1)
if(my_version.compare(args.new_version, cur_ver) ==1):
create_spec(args.pkg, spec_string, cur_ver, args.new_version)
else:
print("Please check version of {pkg} will upgrade to, it's current version is {version}.".format(
pkg=args.pkg, version=cur_ver))
if args.create_spec:
create_spec(args.repo_pkg, spec_string, cur_version, args.new_version)
if args.PR:
my_gitee.create_pr(my_gitee.token["user"], args.pkg)
if args.PR:
user_gitee.create_pr(user_gitee.token["user"], args.repo_pkg)
......@@ -72,7 +72,7 @@ class VersionType(object):
:return 0: z1 equal then z2
:raises: None
"""
return self._compare(self, z1, z2)
return self._compare(z1, z2)
def _compare(self, z1, z2):
"""
......@@ -91,11 +91,15 @@ class VersionType(object):
len2 = len(d2)
length = min(len1, len2)
for index in range(length):
if d1[index].isdigit() and d1[index].isdigit():
if d1[index].isdigit() and d2[index].isdigit():
if int(d1[index]) > int(d2[index]):
return 1
elif int(d1[index]) < int(d2[index]):
return -1
elif d1[index].isdigit():
return 1
elif d2[index].isdigit():
return -1
else:
if d1[index] > d2[index]:
return 1
......@@ -126,7 +130,7 @@ class VersionType(object):
:returns: The split result
:raises: None
"""
for f, s in re.findall(r'([\d]+)|([^\d.]+)', x):
for f, s in re.findall(r'([\d]+)|([^\d.-]+)', x):
if f:
float(f)
yield f
......@@ -1085,6 +1089,10 @@ class VersionRecommend(object):
if m is None: # 版本号应该是数字开头
return False
m = re.search(r'[ab]\d', version)
if not m is None:
return False
if 'rc' in version \
or 'RC' in version \
or 'dev' in version \
......
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Flask stuff:
instance/
.webassets-cache
# pyenv
.python-version
# dotenv
.env
# virtualenv
venv/
ENV/
# Editors
.idea/
# log file
*.log
[MASTER]
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-whitelist=
# Specify a score threshold to be exceeded before program exits with error.
fail-under=10
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=CVS
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=issue_test,tracking_test
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))"
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.
jobs=1
# Control the amount of potential inferred values when inferring a single
# object. This can help the performance when dealing with large functions or
# complex, nested conditions.
limit-inference-results=100
# List of plugins (as comma separated values of python module names) to load,
# usually to register additional checkers.
load-plugins=
# Pickle collected data for later comparisons.
persistent=yes
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
suggestion-mode=yes
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED.
confidence=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once). You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use "--disable=all --enable=classes
# --disable=W".
disable=print-statement,
parameter-unpacking,
unpacking-in-except,
old-raise-syntax,
backtick,
long-suffix,
old-ne-operator,
old-octal-literal,
import-star-module-level,
non-ascii-bytes-literal,
raw-checker-failed,
bad-inline-option,
locally-disabled,
file-ignored,
suppressed-message,
useless-suppression,
deprecated-pragma,
use-symbolic-message-instead,
apply-builtin,
basestring-builtin,
buffer-builtin,
cmp-builtin,
coerce-builtin,
execfile-builtin,
file-builtin,
long-builtin,
raw_input-builtin,
reduce-builtin,
standarderror-builtin,
unicode-builtin,
xrange-builtin,
coerce-method,
delslice-method,
getslice-method,
setslice-method,
no-absolute-import,
old-division,
dict-iter-method,
dict-view-method,
next-method-called,
metaclass-assignment,
indexing-exception,
raising-string,
reload-builtin,
oct-method,
hex-method,
nonzero-method,
cmp-method,
input-builtin,
round-builtin,
intern-builtin,
unichr-builtin,
map-builtin-not-iterating,
zip-builtin-not-iterating,
range-builtin-not-iterating,
filter-builtin-not-iterating,
using-cmp-argument,
eq-without-hash,
div-method,
idiv-method,
rdiv-method,
exception-message-attribute,
invalid-str-codec,
sys-max-int,
bad-python3-import,
deprecated-string-function,
deprecated-str-translate-call,
deprecated-itertools-function,
deprecated-types-field,
next-method-defined,
dict-items-not-iterating,
dict-keys-not-iterating,
dict-values-not-iterating,
deprecated-operator-function,
deprecated-urllib-function,
xreadlines-attribute,
deprecated-sys-function,
exception-escape,
comprehension-escape
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=c-extension-no-member
[REPORTS]
# Python expression which should return a score less than or equal to 10. You
# have access to the variables 'error', 'warning', 'refactor', and 'convention'
# which contain the number of messages in each category, as well as 'statement'
# which is the total number of statements analyzed. This score is used by the
# global evaluation report (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details.
#msg-template=
# Set the output format. Available formats are text, parseable, colorized, json
# and msvs (visual studio). You can also give a reporter class, e.g.
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages.
reports=no
# Activate the evaluation score.
score=yes
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
# Complete name of functions that never returns. When checking for
# inconsistent-return-statements if a never returning function is called then
# it will be considered as an explicit return statement and no message will be
# printed.
never-returning-functions=sys.exit
[LOGGING]
# The type of string formatting that logging methods do. `old` means using %
# formatting, `new` is for `{}` formatting.
logging-format-style=old
# Logging modules to check that the string format arguments are in logging
# function parameter format.
logging-modules=logging
[SPELLING]
# Limits count of emitted suggestions for spelling mistakes.
max-spelling-suggestions=4
# Spelling dictionary name. Available dictionaries: none. To make it work,
# install the python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains the private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to the private dictionary (see the
# --spelling-private-dict-file option) instead of raising a message.
spelling-store-unknown-words=no
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,
XXX,
TODO
# Regular expression of note tags to take in consideration.
#notes-rgx=
[TYPECHECK]
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# Tells whether to warn about missing members when the owner of the attribute
# is inferred to be None.
ignore-none=yes
# This flag controls whether pylint should warn about no-member and similar
# checks whenever an opaque object is returned when inferring. The inference
# can return multiple potential results while evaluating a Python object, but
# some branches might not be evaluated, which results in partial inference. In
# that case, it might be useful to still emit no-member and other checks for
# the rest of the inferred objects.
ignore-on-opaque-inference=yes
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# Show a hint with possible names when a member name was not found. The aspect
# of finding the hint is based on edit distance.
missing-member-hint=yes
# The minimum edit distance a name should have in order to be considered a
# similar match for a missing member name.
missing-member-hint-distance=1
# The total number of similar names that should be taken in consideration when
# showing a hint for a missing member.
missing-member-max-choices=1
# List of decorators that change the signature of a decorated function.
signature-mutators=
[VARIABLES]
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid defining new builtins when possible.
additional-builtins=
# Tells whether unused global variables should be treated as a violation.
allow-global-unused-variables=yes
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,
_cb
# A regular expression matching the name of dummy variables (i.e. expected to
# not be used).
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
# Argument names that match this expression will be ignored. Default to name
# with leading underscore.
ignored-argument-names=_.*|^ignored_|^unused_
# Tells whether we should check for unused import in __init__ files.
init-import=no
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[FORMAT]
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Maximum number of characters on a single line.
max-line-length=120
# Maximum number of lines in a module.
max-module-lines=1000
# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
# `trailing-comma` allows a space between comma and closing bracket: (a, ).
# `empty-line` allows space-only lines.
no-space-check=trailing-comma,
dict-separator
# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
single-line-class-stmt=no
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
[SIMILARITIES]
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
# Minimum lines number of a similarity.
min-similarity-lines=4
[BASIC]
# Naming style matching correct argument names.
argument-naming-style=snake_case
# Regular expression matching correct argument names. Overrides argument-
# naming-style.
#argument-rgx=
# Naming style matching correct attribute names.
attr-naming-style=snake_case
# Regular expression matching correct attribute names. Overrides attr-naming-
# style.
#attr-rgx=
# Bad variable names which should always be refused, separated by a comma.
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
# Bad variable names regexes, separated by a comma. If names match any regex,
# they will always be refused
bad-names-rgxs=
# Naming style matching correct class attribute names.
class-attribute-naming-style=any
# Regular expression matching correct class attribute names. Overrides class-
# attribute-naming-style.
#class-attribute-rgx=
# Naming style matching correct class names.
class-naming-style=PascalCase
# Regular expression matching correct class names. Overrides class-naming-
# style.
#class-rgx=
# Naming style matching correct constant names.
const-naming-style=UPPER_CASE
# Regular expression matching correct constant names. Overrides const-naming-
# style.
#const-rgx=
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
# Naming style matching correct function names.
function-naming-style=snake_case
# Regular expression matching correct function names. Overrides function-
# naming-style.
#function-rgx=
# Good variable names which should always be accepted, separated by a comma.
good-names=i,
j,
k,
ex,
Run,
_
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs=
# Include a hint for the correct naming format with invalid-name.
include-naming-hint=no
# Naming style matching correct inline iteration names.
inlinevar-naming-style=any
# Regular expression matching correct inline iteration names. Overrides
# inlinevar-naming-style.
#inlinevar-rgx=
# Naming style matching correct method names.
method-naming-style=snake_case
# Regular expression matching correct method names. Overrides method-naming-
# style.
#method-rgx=
# Naming style matching correct module names.
module-naming-style=snake_case
# Regular expression matching correct module names. Overrides module-naming-
# style.
#module-rgx=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
# These decorators are taken in consideration only for invalid-name.
property-classes=abc.abstractproperty
# Naming style matching correct variable names.
variable-naming-style=snake_case
# Regular expression matching correct variable names. Overrides variable-
# naming-style.
#variable-rgx=
[STRING]
# This flag controls whether inconsistent-quotes generates a warning when the
# character used as a quote delimiter is used inconsistently within a module.
check-quote-consistency=no
# This flag controls whether the implicit-str-concat should generate a warning
# on implicit string concatenation in sequences defined over several lines.
check-str-concat-over-line-jumps=no
[IMPORTS]
# List of modules that can be imported at any level, not just the top level
# one.
allow-any-import-level=
# Allow wildcard imports from modules that define __all__.
allow-wildcard-with-all=no
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
# Deprecated modules which should not be used, separated by a comma.
deprecated-modules=optparse,tkinter.tix
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled).
ext-import-graph=
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled).
import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled).
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Couples of modules and preferred modules, separated by a comma.
preferred-modules=
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=cls
[DESIGN]
# Maximum number of arguments for function / method.
max-args=5
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
# Maximum number of branch for function / method body.
max-branches=12
# Maximum number of locals for function / method body.
max-locals=15
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of return / yield for function / method body.
max-returns=6
# Maximum number of statements in function / method body.
max-statements=50
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception
[style]
based_on_style = pep8
column_limit = 120
dedent_closing_brackets = True
[[source]]
name = "pypi"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
verify_ssl = true
[dev-packages]
pylint = "*"
yapf = "*"
pyopenssl = "*"
[packages]
flask = "*"
flask-sqlalchemy = "*"
flask-apscheduler = "*"
requests = "*"
werkzeug = "*"
flask-httpauth = "*"
sqlalchemy = "*"
pandas = "*"
[requires]
python_version = "3.7"
此差异已折叠。
补丁跟踪
===
# 一 简介
在 openEuler 发行版开发过程,需要及时更新上游社区各个软件包的最新代码,修改功能 bug 及安全问题,确保发布的 openEuler 发行版尽可能避免缺陷和漏洞。
本工具对软件包进行补丁管理,主动监控上游社区提交,自动生成补丁,并自动提交 issue 给对应的 maintainer,同时自动验证补丁基础功能,减少验证工作量支持 maintainer 快速决策。
# 二 架构
### 2.1 CS架构
补丁跟踪采用 C/S 架构,其中服务端(patch-tracking) 负责执行补丁跟踪任务,包括:维护跟踪项,识别上游仓库分支代码变更并形成补丁文件,向 Gitee 提交 issue 及 PR,同时 patch-tracking 提供 RESTful 接口,用于对跟踪项进行增删改查操作。客户端,即命令行工具(patch-tracking-cli),通过调用 patch-tracking 的 RESTful 接口,实现对跟踪项的增删改查操作。
### 2.2 核心流程
* 补丁跟踪服务流程
**主要步骤:**
1. 命令行工具写入跟踪项。
2. 自动从跟踪项配置的上游仓库(例如Github)获取补丁文件。
3. 创建临时分支,将获取到的补丁文件提交到临时分支。
4. 自动提交issue到对应项目,并生成关联 issue 的 PR。
![PatchTracking](images/PatchTracking.jpg)
* Maintainer对提交的补丁处理流程
**主要步骤:**
1. Maintainer分析临时分支中的补丁文件,判断是否合入。
2. 执行构建,构建成功后判断是否合入PR。
![Maintainer](images/Maintainer.jpg)
### 2.3 数据结构
* Tracking表
| 序号 | 名称 | 说明 | 类型 | 键 | 允许空 |
|:----:| ----| ----| ----| ----| ----|
| 1 | id | 自增补丁跟踪项序号 | int | - | NO |
| 2 | version_control | 上游SCM的版本控制系统类型 | String | - | NO |
| 3 | scm_repo | 上游SCM仓库地址 | String | - | NO |
| 4 | scm_branch | 上游SCM跟踪分支 | String | - | NO |
| 5 | scm_commit | 上游代码最新处理过的Commit ID | String | - | YES |
| 6 | repo | 包源码在Gitee的仓库地址 | String | Primary | NO |
| 7 | branch | 包源码在Gitee的仓库分支 | String | Primary | NO |
| 8 | enabled | 是否启动跟踪 | Boolean | -| NO |
* Issue表
| 序号 | 名称 | 说明 | 类型 | 键 | 允许空 |
|:----:| ----| ----| ----| ----| ----|
| 1 | issue | issue编号 | String | Primary | NO |
| 2 | repo | 包源码在Gitee的仓库地址 | String | - | NO |
| 3 | branch | 包源码在Gitee的仓库分支 | String | - | NO |
# 三 部署
>环境已安装 Python >= 3.7 以及 pip3
### 3.1 安装依赖
```shell script
yum install -y gcc gcc-c++ python3-devel openssl-devel
pip3 install flask flask-sqlalchemy flask-apscheduler requests flask_httpauth pandas
pip3 install -I uwsgi
```
### 3.2 安装
这里以 `patch-tracking-1.0.0-1.oe1.noarch.rpm` 为例
```shell script
rpm -ivh patch-tracking-1.0.0-1.oe1.noarch.rpm
```
### 3.3 生成证书
```shell script
openssl req -x509 -days 3650 -subj "/CN=self-signed" \
-nodes -newkey rsa:4096 -keyout self-signed.key -out self-signed.crt
```
`self-signed.key``self-signed.crt` 拷贝到 __/etc/patch-tracking__ 目录
### 3.4 配置
在配置文件中进行对应参数的配置。
配置文件路径 `/etc/patch-tracking/settings.conf`
- 服务监听地址
```python
LISTEN = "127.0.0.1:5001"
```
- GitHub Token,用于访问托管在 GitHub 上游开源软件仓的仓库信息
生成 GitHub Token 的方法参考 [Creating a personal access token](https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token)
```python
GITHUB_ACCESS_TOKEN = ""
```
- 对于托管在gitee上的需要跟踪的仓库,配置一个有该仓库权限的gitee的token,用于提交patch文件,提交issue,提交PR等操作。
```python
GITEE_ACCESS_TOKEN = ""
```
- 定时扫描数据库中是否有新增或修改的跟踪项,对扫描到的跟踪项执行获取上游补丁任务,在这里配置扫描的时间间隔,数字单位是秒
```python
SCAN_DB_INTERVAL = 3600
```
- 命令行工具运行过程中,POST接口需要进行认证的用户名和密码
```python
USER = "admin"
PASSWORD = ""
```
`USER`默认值为`admin`
>`PASSWORD`口令的复杂度要求:
>* 长度大于等于6个字符
>* 至少有一个数字
>* 至少有一个大写字母
>* 至少有一个小写字母
>* 至少有一个特殊字符 (~!@#%^*_+=-)
需要将口令的哈希值通过命令工具生成后将其配置到此处,获取方法为执行命令`generate_password <password>`,例如:
[root]# generate_password Test@123
pbkdf2:sha256:150000$w38eLeRm$ebb5069ba3b4dda39a698bd1d9d7f5f848af3bd93b11e0cde2b28e9e34bfbbae
`pbkdf2:sha256:150000$w38eLeRm$ebb5069ba3b4dda39a698bd1d9d7f5f848af3bd93b11e0cde2b28e9e34bfbbae`配置到`PASSWORD = ""`引号中。
### 3.5 启动补丁跟踪服务
可以使用以下两种方式启动服务:
1. 使用 systemd 方式
```shell script
systemctl start patch-tracking
```
2. 直接执行可执行程序
```shell script
/usr/bin/patch-tracking
```
# 四 使用
### 4.1 添加跟踪项
将需要跟踪的软件仓库和分支与其上游开源软件仓库与分支关联起来,有 3 种使用方法。
#### 4.1.1 命令行直接添加
参数含义:
>--user :POST接口需要进行认证的用户名,同settings.conf中的USER参数 \
--password :POST接口需要进行认证的口令,为settings.conf中的PASSWORD哈希值对应的实际的口令字符串 \
--server :启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--version_control :上游仓库版本的控制工具,只支持github \
--repo 需要进行跟踪的仓库名称,格式:组织/仓库 \
--branch 需要进行跟踪的仓库的分支名称 \
--scm_repo 被跟踪的上游仓库的仓库名称,github格式:组织/仓库 \
--scm_branch 被跟踪的上游仓库的仓库的分支 \
--enable 是否自动跟踪该仓库
例如:
```shell script
patch-tracking-cli add --server 127.0.0.1:5001 --user admin --password Test@123 --version_control github --repo testPatchTrack/testPatch1 --branch master --scm_repo BJMX/testPatch01 --scm_branch test --enable true
```
#### 4.1.2 指定文件添加
参数含义:
>--server :启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--user :POST接口需要进行认证的用户名,同settings.conf中的USER参数 \
--password :POST接口需要进行认证的口令,为settings.conf中的PASSWORD哈希值对应的实际的口令字符串 \
--file :yaml文件路径
文件内容是仓库、分支、版本管理工具、是否启动监控等信息,将这些写入文件名为xxx.yaml,例如tracking.yaml,文件路径作为`--file`的入参调用命令。
例如:
```shell script
patch-tracking-cli add --server 127.0.0.1:5001 --user admin --password Test@123 --file tracking.yaml
```
yaml内容格式如下,冒号左边的内容不可修改,右边内容根据实际情况填写。
```shell script
version_control: github
scm_repo: xxx/xxx
scm_branch: master
repo: xxx/xxx
branch: master
enabled: true
```
>version_control :上游仓库版本的控制工具,只支持github \
scm_repo 被跟踪的上游仓库的仓库名称,github格式:组织/仓库 \
scm_branch 被跟踪的上游仓库的仓库的分支 \
repo 需要进行跟踪的仓库名称,格式:组织/仓库 \
branch 需要进行跟踪的仓库的分支名称 \
enable 是否自动跟踪该仓库
#### 4.1.3 指定目录添加
在指定的目录,例如`test_yaml`下放入多个`xxx.yaml`文件,执行命令,记录指定目录下所有yaml文件的跟踪项。yaml文件都放在不会读取子目录内文件。,
参数含义:
>--user :POST接口需要进行认证的用户名,同settings.conf中的USER参数 \
--password :POST接口需要进行认证的口令,为settings.conf中的PASSWORD哈希值对应的实际的口令字符串 \
--server :启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--dir :存放yaml文件目录的路径
```shell script
patch-tracking-cli add --server 127.0.0.1:5001 --user admin --password Test@123 --dir /home/Work/test_yaml/
```
### 4.2 查询跟踪项
参数含义:
>--server :必选参数,启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--table :必选参数,需要查询的表 \
--repo :可选参数,需要查询的repo;如果没有该参数查询表中所有内容 \
--branch :可选参数,需要查询的branch,必须和--repo同时查询,没有--repo不允许单独查询该参数
#### 4.2.1 查询tracking表
```shell script
patch-tracking-cli query --server <LISTEN> --table tracking
```
例如:
```shell script
patch-tracking-cli query --server 127.0.0.1:5001 --table tracking
```
### 4.3 查询生成的 Issue 列表
```shell script
patch-tracking-cli query --server <LISTEN> --table issue
```
例如:
```shell script
patch-tracking-cli query --server 127.0.0.1:5001 --table issue
```
### 4.4 码云查看 issue 及 PR
登录Gitee上进行跟踪的软件项目,在该项目的Issues和Pull Requests页签下,可以查看到名为`[patch tracking] TIME`,例如` [patch tracking] 20200713101548`的条目。
即是刚生成的补丁文件的issue和对应PR。
# 五 常见问题与解决方法
%define name patch-tracking
%define version 1.0.0
%define release 1
Summary: This is a tool for automatically tracking upstream repository code patches
Name: %{name}
Version: %{version}
Release: %{release}
Source0: %{name}-%{version}.tar
License: Mulan PSL v2
Group: Development/Libraries
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot
Prefix: %{_prefix}
BuildArch: noarch
Vendor: ChenYanpan <chenyanpan@huawei.com>
Url: https://openeuler.org/zh/
BuildRequires: python3-setuptools
# Requires: python3.7 python3-flask python3-sqlalchemy python3-requests
%description
This is a tool for automatically tracking upstream repository code patches
%prep
%setup -n %{name}-%{version}
%build
%py3_build
%install
%py3_install
%post
sed -i "s|\blogging.conf\b|/etc/patch-tracking/logging.conf|" %{python3_sitelib}/patch_tracking/app.py
sed -i "s|\bsqlite:///db.sqlite\b|sqlite:////var/patch-tracking/db.sqlite|" %{python3_sitelib}/patch_tracking/app.py
sed -i "s|\bsettings.conf\b|/etc/patch-tracking/settings.conf|" %{python3_sitelib}/patch_tracking/app.py
chmod +x /usr/bin/patch-tracking-cli
chmod +x /usr/bin/patch-tracking
chmod +x /usr/bin/generate_password
sed -i "s|\bpatch-tracking.log\b|/var/log/patch-tracking.log|" /etc/patch-tracking/logging.conf
%preun
%systemd_preun patch-tracking.service
%clean
rm -rf $RPM_BUILD_ROOT
%files
%{python3_sitelib}/*
/etc/patch-tracking/logging.conf
/etc/patch-tracking/settings.conf
/usr/bin/patch-tracking
/usr/bin/patch-tracking-cli
/var/patch-tracking/db.sqlite
/usr/bin/generate_password
/usr/lib/systemd/system/patch-tracking.service
""" module of patch_tracking """
"""
api action method
"""
from sqlalchemy import and_
from patch_tracking.database import db
from patch_tracking.database.models import Tracking, Issue
def create_tracking(data):
"""
create tracking
"""
version_control = data.get("version_control")
scm_repo = data.get('scm_repo')
scm_branch = data.get('scm_branch')
scm_commit = data.get('scm_commit')
repo = data.get('repo')
branch = data.get('branch')
enabled = data.get('enabled')
tracking = Tracking(version_control, scm_repo, scm_branch, scm_commit, repo, branch, enabled)
db.session.add(tracking)
db.session.commit()
def update_tracking(data):
"""
update tracking
"""
repo = data.get('repo')
branch = data.get('branch')
tracking = Tracking.query.filter(and_(Tracking.repo == repo, Tracking.branch == branch)).one()
tracking.version_control = data.get("version_control")
tracking.scm_repo = data.get('scm_repo')
tracking.scm_branch = data.get('scm_branch')
tracking.scm_commit = data.get('scm_commit')
tracking.enabled = data.get('enabled')
db.session.commit()
def delete_tracking(repo_, branch_=None):
"""
delete tracking
"""
if branch_:
Tracking.query.filter(Tracking.repo == repo_, Tracking.branch == branch_).delete()
else:
Tracking.query.filter(Tracking.repo == repo_).delete()
db.session.commit()
def create_issue(data):
"""
create issue
"""
issue = data.get('issue')
repo = data.get('repo')
branch = data.get('branch')
issue_ = Issue(issue, repo, branch)
db.session.add(issue_)
db.session.commit()
def update_issue(data):
"""
update issue
"""
issue = data.get('issue')
issue_ = Issue.query.filter(Issue.issue == issue).one()
issue_.issue = data.get('issue')
db.session.add(issue_)
db.session.commit()
def delete_issue(issue):
"""
delete issue
"""
issue_ = Issue.query.filter(Issue.issue == issue).one()
db.session.delete(issue_)
db.session.commit()
'''
Response contain and code ID
'''
import json
class ResponseCode:
"""
Description: response code to web
changeLog:
"""
SUCCESS = "2001"
INPUT_PARAMETERS_ERROR = "4001"
TRACKING_NOT_FOUND = "4002"
ISSUE_NOT_FOUND = "4003"
GITHUB_ADDRESS_ERROR = "5001"
GITEE_ADDRESS_ERROR = "5002"
GITHUB_CONNECTION_ERROR = "5003"
GITEE_CONNECTION_ERROR = "5004"
INSERT_DATA_ERROR = "6004"
DELETE_DB_ERROR = "6001"
CONFIGFILE_PATH_EMPTY = "6002"
DIS_CONNECTION_DB = "6003"
CODE_MSG_MAP = {
SUCCESS: "Successful Operation!",
INPUT_PARAMETERS_ERROR: "Please enter the correct parameters",
TRACKING_NOT_FOUND: "The tracking you are looking for does not exist",
ISSUE_NOT_FOUND: "The issue you are looking for does not exist",
GITHUB_ADDRESS_ERROR: "The Github address is wrong",
GITEE_ADDRESS_ERROR: "The Gitee address is wrong",
GITHUB_CONNECTION_ERROR: "Unable to connect to the github",
GITEE_CONNECTION_ERROR: "Unable to connect to the gitee",
DELETE_DB_ERROR: "Failed to delete database",
CONFIGFILE_PATH_EMPTY: "Initialization profile does not exist or cannot be found",
DIS_CONNECTION_DB: "Unable to connect to the database, check the database configuration"
}
@classmethod
def ret_message(cls, code, data=None):
"""
generate response dictionary
"""
return json.dumps({"code": code, "msg": cls.CODE_MSG_MAP[code], "data": data})
def __str__(self):
return 'ResponseCode'
"""
module of issue API
"""
import logging
from flask import request
from flask import Blueprint
from patch_tracking.database.models import Issue
from patch_tracking.api.constant import ResponseCode
log = logging.getLogger(__name__)
issue = Blueprint('issue', __name__)
@issue.route('', methods=["GET"])
def get():
"""
Returns list of issue.
"""
if not request.args:
issues = Issue.query.all()
else:
allowed_key = ['repo', 'branch']
input_params = request.args
data = dict()
for k, param in input_params.items():
if k in allowed_key:
data[k] = param
else:
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
issues = Issue.query.filter_by(**data).all()
resp_data = list()
for item in issues:
resp_data.append(item.to_json())
return ResponseCode.ret_message(code=ResponseCode.SUCCESS, data=resp_data)
"""
module of issue API
"""
import logging
from flask import request, Blueprint
from sqlalchemy.exc import SQLAlchemyError
from patch_tracking.database.models import Tracking
from patch_tracking.api.business import create_tracking, update_tracking, delete_tracking
from patch_tracking.api.constant import ResponseCode
from patch_tracking.util.auth import auth
logger = logging.getLogger(__name__)
tracking = Blueprint('tracking', __name__)
@tracking.route('', methods=["DELETE"])
@auth.login_required
def delete():
"""
Delete tracking(s).
"""
input_params = request.args
keys = list(input_params.keys())
if not keys or "repo" not in keys:
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
if len(set(keys) - {"repo", "branch"}) != 0:
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
try:
if "branch" in keys:
if Tracking.query.filter(Tracking.repo == input_params['repo'], Tracking.branch == input_params['branch']):
delete_tracking(input_params['repo'], input_params['branch'])
logger.info('Delete tracking repo: %s, branch: %s', input_params['repo'], input_params['branch'])
else:
if Tracking.query.filter(Tracking.repo == input_params['repo']):
delete_tracking(input_params['repo'])
logger.info('Delete tracking repo: %s', input_params['repo'])
return ResponseCode.ret_message(code=ResponseCode.SUCCESS)
except SQLAlchemyError as err:
return ResponseCode.ret_message(code=ResponseCode.DELETE_DB_ERROR, data=err)
@tracking.route('', methods=["GET"])
def get():
"""
Returns list of tracking
"""
if not request.args:
trackings = Tracking.query.all()
else:
allowed_key = ['repo', 'branch', 'enabled']
input_params = request.args
data = dict()
for k, param in input_params.items():
if k in allowed_key:
if k == 'enabled':
param = bool(param == 'true')
data[k] = param
else:
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
trackings = Tracking.query.filter_by(**data).all()
resp_data = list()
for item in trackings:
resp_data.append(item.to_json())
return ResponseCode.ret_message(code=ResponseCode.SUCCESS, data=resp_data)
@tracking.route('', methods=["POST"])
@auth.login_required
def post():
"""
Creates or update a tracking.
"""
required_params = ['version_control', 'scm_repo', 'scm_branch', 'scm_commit', 'repo', 'branch', 'enabled']
input_params = request.json
data = dict()
for item in input_params:
if item in required_params:
data[item] = input_params[item]
required_params.remove(item)
else:
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
if len(required_params) > 1 or (len(required_params) == 1 and required_params[0] != 'scm_commit'):
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
if data['version_control'] != 'github':
return ResponseCode.ret_message(ResponseCode.INPUT_PARAMETERS_ERROR)
track = Tracking.query.filter_by(repo=data['repo'], branch=data['branch']).first()
if track:
try:
update_tracking(data)
logger.info('Update tracking. Data: %s.', data)
except SQLAlchemyError as err:
return ResponseCode.ret_message(code=ResponseCode.INSERT_DATA_ERROR, data=err)
else:
try:
create_tracking(data)
logger.info('Create tracking. Data: %s.', data)
except SQLAlchemyError as err:
return ResponseCode.ret_message(code=ResponseCode.INSERT_DATA_ERROR, data=err)
return ResponseCode.ret_message(code=ResponseCode.SUCCESS, data=request.json)
"""
flask app
"""
import logging.config
import sys
from flask import Flask
from patch_tracking.api.issue import issue
from patch_tracking.api.tracking import tracking
from patch_tracking.database import db
from patch_tracking.task import task
logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
app = Flask(__name__)
logger = logging.getLogger(__name__)
def check_settings_conf():
"""
check settings.conf
"""
setting_error = False
required_settings = ['LISTEN', 'GITHUB_ACCESS_TOKEN', 'GITEE_ACCESS_TOKEN', 'SCAN_DB_INTERVAL', 'USER', 'PASSWORD']
for setting in required_settings:
if setting in app.config:
if not app.config[setting]:
logger.error('%s is empty in settings.conf.', setting)
setting_error = True
else:
logger.error('%s not configured in settings.conf.', setting)
setting_error = True
if setting_error:
sys.exit()
app.config.from_pyfile("settings.conf")
check_settings_conf()
GITHUB_ACCESS_TOKEN = app.config['GITHUB_ACCESS_TOKEN']
GITEE_ACCESS_TOKEN = app.config['GITEE_ACCESS_TOKEN']
SCAN_DB_INTERVAL = app.config['SCAN_DB_INTERVAL']
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite?check_same_thread=False'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SWAGGER_UI_DOC_EXPANSION'] = 'list'
app.config['ERROR_404_HELP'] = False
app.config['RESTX_MASK_SWAGGER'] = False
app.config['SCHEDULER_EXECUTORS'] = {'default': {'type': 'threadpool', 'max_workers': 100}}
app.register_blueprint(issue, url_prefix="/issue")
app.register_blueprint(tracking, url_prefix="/tracking")
db.init_app(app)
task.init(app)
if __name__ == "__main__":
app.run(ssl_context="adhoc")
#!/usr/bin/env python3
"""
command line to generate password hash by pbkdf2
"""
import sys
import re
from werkzeug.security import generate_password_hash
def password_strength_check(password):
"""
Verify the strength of 'password'
Returns a dict indicating the wrong criteria
"""
# calculating the length
length_error = len(password) < 6
# searching for digits
digit_error = re.search(r"\d", password) is None
# searching for uppercase
uppercase_error = re.search(r"[A-Z]", password) is None
# searching for lowercase
lowercase_error = re.search(r"[a-z]", password) is None
# searching for symbols
symbol_error = re.search(r"[~!@#%^*_+=-]", password) is None
# overall result
password_ok = not (length_error or digit_error or uppercase_error or lowercase_error or symbol_error)
return {
'ok': password_ok,
'error': {
'length': length_error,
'digit': digit_error,
'uppercase': uppercase_error,
'lowercase': lowercase_error,
'symbol': symbol_error,
}
}
ret = password_strength_check(sys.argv[1])
if not ret['ok']:
print("Password strength is not satisfied.")
for item in ret['error']:
if ret['error'][item]:
print("{} not satisfied.".format(item))
print(
"""
password strength require:
6 characters or more
at least 1 digit [0-9]
at least 1 alphabet [a-z]
at least 1 alphabet of Upper Case [A-Z]
at least 1 special character from [~!@#%^*_+=-]
"""
)
else:
print(generate_password_hash(sys.argv[1]))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import sys
from patch_tracking.cli.patch_tracking_cli import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
sys.exit(main())
#!/usr/bin/env python3
"""
command line of creating tracking item
"""
import argparse
import os
import sys
import pandas
import requests
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def query_table(args):
"""
query table
"""
server = args.server
if args.table == "tracking":
url = '/'.join(['https:/', server, 'tracking'])
if args.branch and args.repo:
params = {'repo': args.repo, 'branch': args.branch}
else:
params = {'repo': args.repo}
try:
ret = requests.get(url, params=params, verify=False)
if ret.status_code == 200 and ret.json()['code'] == '2001':
return 'success', ret
return 'error', ret
except Exception as exception:
return 'error', 'Connect server error: ' + str(exception)
elif args.table == "issue":
url = '/'.join(['https:/', server, 'issue'])
params = {'repo': args.repo, 'branch': args.branch}
try:
ret = requests.get(url, params=params, verify=False)
if ret.status_code == 200 and ret.json()['code'] == '2001':
return 'success', ret
return 'error', ret
except Exception as exception:
return 'error', 'Connect server error: ' + str(exception)
return 'error', 'table ' + args.table + ' not found'
def add_param_check_url(params, file_path=None):
"""
check url
"""
scm_url = f"https://github.com/{params['scm_repo']}/tree/{params['scm_branch']}"
url = f"https://gitee.com/{params['repo']}/tree/{params['branch']}"
patch_tracking_url = f"https://{params['server']}"
server_ret = server_check(patch_tracking_url)
if server_ret[0] != 'success':
return 'error'
scm_ret = repo_branch_check(scm_url)
if scm_ret[0] != 'success':
if file_path:
print(
f"scm_repo: {params['scm_repo']} and scm_branch: {params['scm_branch']} check failed. \n"
f"Error in {file_path}. {scm_ret[1]}"
)
else:
print(f"scm_repo: {params['scm_repo']} and scm_branch: {params['scm_branch']} check failed. {scm_ret[1]}")
return 'error'
ret = repo_branch_check(url)
if ret[0] != 'success':
if file_path:
print(f"repo: {params['repo']} and branch: {params['branch']} check failed. {ret[1]}. Error in {file_path}")
else:
print(f"repo: {params['repo']} and branch: {params['branch']} check failed. {ret[1]}.")
return 'error'
return None
def server_check(url):
"""
check if patch_tracking server start
"""
try:
ret = requests.head(url=url, verify=False)
except Exception as exception:
print(f"Error: Cannot connect to {url}, please make sure patch-tracking service is running.")
return 'error', exception
if ret.status_code == 200 or ret.status_code == 404:
return 'success', ret
print(f"Unexpected Error: {ret.text}")
return 'error', ret.text
def repo_branch_check(url):
"""
check if repo/branch exist
"""
headers = {
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " +
"Ubuntu Chromium/83.0.4103.61 Chrome/83.0.4103.61 Safari/537.36"
}
try:
ret = requests.get(url=url, headers=headers)
except Exception as exception:
return 'error', exception
if ret.status_code == 404:
return 'error', f'{url} not exist.'
if ret.status_code == 200:
return 'success', ret
return 'error', ret.text
def params_input_track(params, file_path=None):
"""
load tracking from command line arguments
"""
if add_param_check_url(params, file_path) == 'error':
return 'error', 'Check input params error.'
repo = params['repo']
branch = params['branch']
scm_repo = params['scm_repo']
scm_branch = params['scm_branch']
version_control = params['version_control'].lower()
enabled = params['enabled'].lower()
server = params['server']
user = params['user']
password = params['password']
enabled = bool(enabled == 'true')
url = '/'.join(['https:/', server, 'tracking'])
data = {
'version_control': version_control,
'scm_repo': scm_repo,
'scm_branch': scm_branch,
'repo': repo,
'branch': branch,
'enabled': enabled
}
try:
ret = requests.post(url, json=data, verify=False, auth=HTTPBasicAuth(user, password))
except Exception as exception:
return 'error', 'Connect server error: ' + str(exception)
if ret.status_code == 401 or ret.status_code == 403:
return 'error', 'Authenticate Error. Please make sure user and password are correct.'
if ret.status_code == 200 and ret.json()['code'] == '2001':
return 'success', 'created'
print("status_code: {}, return text: {}".format(ret.status_code, ret.text))
return 'error', 'Unexpected Error.'
def add(args):
"""
add tracking
"""
style1 = bool(args.version_control) or bool(args.repo) or bool(args.branch) or bool(args.scm_repo) or bool(
args.scm_branch
) or bool(args.enabled)
style2 = bool(args.file)
style3 = bool(args.dir)
if str([style1, style2, style3]).count('True') >= 2:
print("mix different usage style")
print(add_usage)
return
if style2:
file_input_track(args.file, args)
elif style3:
dir_input_track(args.dir, args)
else:
params = {
'repo': args.repo,
'branch': args.branch,
'scm_repo': args.scm_repo,
'scm_branch': args.scm_branch,
'version_control': args.version_control,
'enabled': args.enabled,
'server': args.server,
'user': args.user,
'password': args.password
}
ret = params_input_track(params)
if ret[0] == 'success':
print('Tracking successfully.')
else:
print(ret[1])
def delete(args):
"""
delete tracking
"""
server = args.server
user = args.user
password = args.password
url = '/'.join(['https:/', server, 'tracking'])
if args.branch:
params = {'repo': args.repo, 'branch': args.branch}
else:
params = {'repo': args.repo}
try:
ret = requests.delete(url, params=params, verify=False, auth=HTTPBasicAuth(user, password))
if ret.status_code == 200 and ret.json()['code'] == '2001':
print('Tracking delete successfully.')
return
print("Tracking delete failed. Error: %s", ret)
except Exception as exception:
print('Error: Connect server error: %s', str(exception))
def query(args):
"""
query table data
"""
if args.branch and not args.repo:
print(query_usage)
return
status, ret = query_table(args)
if status == "success":
df = pandas.DataFrame.from_dict(ret.json()["data"], orient="columns")
df.index = range(1, len(df) + 1)
print(df)
else:
print(ret)
def file_input_track(file_path, args):
"""
load tracking from file
"""
if os.path.exists(file_path) and os.path.isfile(file_path):
if os.path.splitext(file_path)[-1] != ".yaml":
print('Please input yaml file. Error in {}'.format(file_path))
return
with open(file_path) as file:
content = file.readlines()
params = dict()
for item in content:
if ":" in item:
k = item.split(':')[0]
value = item.split(':')[1].strip(' ').strip('\n')
params.update({k: value})
params.update({'server': args.server, 'user': args.user, 'password': args.password})
ret = params_input_track(params, file_path)
if ret[0] == 'success':
print('Tracking successfully {} for {}'.format(ret[1], file_path))
else:
print('Tracking failed for {}: {}'.format(file_path, ret[1]))
else:
print('yaml path error. Params error in {}'.format(file_path))
def dir_input_track(dir_path, args):
"""
load tracking from dir
"""
if os.path.exists(dir_path) and os.path.isdir(dir_path):
for root, _, files in os.walk(dir_path):
if not files:
print('error: dir path empty')
return
for file in files:
if os.path.splitext(file)[-1] == ".yaml":
file_path = os.path.join(root, file)
file_input_track(file_path, args)
else:
print('Please input yaml file. Error in {}'.format(file))
else:
print('error: dir path error. Params error in {}'.format(dir_path))
parser = argparse.ArgumentParser(
prog='patch_tracking_cli',
allow_abbrev=False,
description="command line tool for manipulating patch tracking information"
)
subparsers = parser.add_subparsers(description=None, dest='subparser_name', help='additional help')
# common argument
common_parser = argparse.ArgumentParser(add_help=False)
common_parser.add_argument("--server", required=True, help="patch tracking daemon server")
# authentication argument
authentication_parser = argparse.ArgumentParser(add_help=False)
authentication_parser.add_argument('--user', required=True, help='authentication username')
authentication_parser.add_argument('--password', required=True, help='authentication password')
# add
add_usage = """
%(prog)s --server SERVER --user USER --password PASSWORD
--version_control github --scm_repo SCM_REPO --scm_branch SCM_BRANCH
--repo REPO --branch BRANCH --enabled True
%(prog)s --server SERVER --user USER --password PASSWORD --file FILE
%(prog)s --server SERVER --user USER --password PASSWORD --dir DIR"""
parser_add = subparsers.add_parser(
'add', parents=[common_parser, authentication_parser], help="add tracking", usage=add_usage
)
parser_add.set_defaults(func=add)
parser_add.add_argument("--version_control", choices=['github'], help="upstream version control system")
parser_add.add_argument("--scm_repo", help="upstream scm repository")
parser_add.add_argument("--scm_branch", help="upstream scm branch")
parser_add.add_argument("--repo", help="source package repository")
parser_add.add_argument("--branch", help="source package branch")
parser_add.add_argument("--enabled", choices=["True", "true", "False", "false"], help="whether tracing is enabled")
parser_add.add_argument('--file', help='import patch tracking from file')
parser_add.add_argument('--dir', help='import patch tracking from files in directory')
# delete
del_usage = """
%(prog)s --server SERVER --table TABLE --repo REPO [--branch BRANCH]"""
parser_delete = subparsers.add_parser('delete', parents=[common_parser, authentication_parser], help="delete tracking")
parser_delete.set_defaults(func=delete)
parser_delete.add_argument("--repo", required=True, help="source package repository")
parser_delete.add_argument("--branch", help="source package branch")
# query
query_usage = """
%(prog)s --server SERVER --table {tracking,issue} [--repo REPO] [--branch BRANCH]"""
parser_query = subparsers.add_parser('query', parents=[common_parser], help="query tracking/issue")
parser_query.set_defaults(func=query)
parser_query.add_argument("--table", required=True, choices=["tracking", "issue"], help="query tracking or issue")
parser_query.add_argument("--repo", help="source package repository")
parser_query.add_argument("--branch", help="source package branch")
def main():
args_ = parser.parse_args()
if args_.subparser_name:
if args_.func(args_) != "success":
sys.exit(1)
else:
sys.exit(0)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()
"""
database init
"""
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def reset_database():
"""
reset database
"""
db.drop_all()
db.create_all()
"""
module of database model
"""
from patch_tracking.database import db
class Tracking(db.Model):
"""
database model of tracking
"""
id = db.Column(db.Integer, autoincrement=True)
version_control = db.Column(db.String(80))
scm_repo = db.Column(db.String(80))
scm_branch = db.Column(db.String(80))
scm_commit = db.Column(db.String(80))
repo = db.Column(db.String(80), primary_key=True)
branch = db.Column(db.String(80), primary_key=True)
enabled = db.Column(db.Boolean)
def __init__(self, version_control, scm_repo, scm_branch, scm_commit, repo, branch, enabled=True):
self.version_control = version_control
self.scm_repo = scm_repo
self.scm_branch = scm_branch
self.scm_commit = scm_commit
self.repo = repo
self.branch = branch
self.enabled = enabled
def __repr__(self):
return '<Tracking %r %r>' % (self.repo, self.branch)
def to_json(self):
"""
convert to json
"""
return {
'version_control': self.version_control,
'scm_repo': self.scm_repo,
'scm_branch': self.scm_branch,
'scm_commit': self.scm_commit,
'repo': self.repo,
'branch': self.branch,
'enabled': self.enabled
}
class Issue(db.Model):
"""
database model of issue
"""
issue = db.Column(db.String(80), primary_key=True)
repo = db.Column(db.String(80))
branch = db.Column(db.String(80))
def __init__(self, issue, repo, branch):
self.issue = issue
self.repo = repo
self.branch = branch
def __repr__(self):
return '<Issue %r %r %r>' % (self.issue, self.repo, self.branch)
def to_json(self):
"""
convert to json
"""
return {'issue': self.issue, 'repo': self.repo, 'branch': self.branch}
"""
reset database
"""
from patch_tracking.app import app
from patch_tracking.database import reset_database
def reset():
"""
reset database
"""
with app.app_context():
reset_database()
if __name__ == "__main__":
reset()
[loggers]
keys=root
[handlers]
keys=console,logfile
[formatters]
keys=simple
[logger_root]
level=DEBUG
handlers=console,logfile
[handler_console]
class=StreamHandler
level=DEBUG
formatter=simple
args=(sys.stdout,)
[formatter_simple]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=
[handler_logfile]
class=handlers.RotatingFileHandler
level=DEBUG
args=('patch-tracking.log', 'a', 1024*1024*100, 10)
formatter=simple
#!/bin/bash
app_file=`rpm -ql patch-tracking | grep app.py`
app_path=${app_file%/app.py}
chdir_path=${app_file%/patch_tracking/app.py}
settings_file='/etc/patch-tracking/settings.conf'
server=`grep 'LISTEN' $settings_file | awk -F'=' '{print $2}' | sed -e 's/^[ ]"//g' | sed -e 's/"$//g'`
/usr/local/bin/uwsgi --master --https ${server},/etc/patch-tracking/self-signed.crt,/etc/patch-tracking/self-signed.key --wsgi-file ${app_file} --callable app --chdir ${chdir_path} --threads 100 --lazy
[Unit]
Description=uWSGI Emperor
After=syslog.target
[Service]
ExecStart=/usr/bin/patch-tracking
RuntimeDirectory=patch-tracking
Restart=always
RestartSec=10
KillSignal=SIGQUIT
Type=notify
StandardError=syslog
NotifyAccess=all
[Install]
WantedBy=multi-user.target
# server settings
LISTEN = "127.0.0.1:5001"
# GitHub API settings
GITHUB_ACCESS_TOKEN = ""
# Gitee API settings
GITEE_ACCESS_TOKEN = ""
# Time interval
SCAN_DB_INTERVAL = 3600
# username
USER = "admin"
# password
PASSWORD = ""
"""
apscheduler init
"""
from flask_apscheduler import APScheduler
scheduler = APScheduler()
"""
load job/task of tracking
"""
import datetime
import logging
from patch_tracking.task import scheduler
from patch_tracking.database.models import Tracking
from patch_tracking.util.github_api import GitHubApi
from patch_tracking.api.business import update_tracking
logger = logging.getLogger(__name__)
def init(app):
"""
scheduler jobs init
"""
scan_db_interval = app.config['SCAN_DB_INTERVAL']
scheduler.init_app(app)
scheduler.add_job(
id='Add Tracking job - Update DB',
func=patch_tracking_task,
trigger='interval',
args=(app, ),
seconds=int(scan_db_interval),
next_run_time=datetime.datetime.now()
)
scheduler.add_job(
id=str("Check empty commitID"),
func=check_empty_commit_id,
trigger='interval',
args=(app, ),
seconds=600,
next_run_time=datetime.datetime.now(),
misfire_grace_time=300,
)
scheduler.start()
def add_job(job_id, func, args):
"""
add job
"""
logger.info("Add Tracking job - %s", job_id)
scheduler.add_job(
id=job_id, func=func, args=args, trigger='date', run_date=datetime.datetime.now(), misfire_grace_time=600
)
def check_empty_commit_id(flask_app):
"""
check commit ID for empty tracking
"""
with flask_app.app_context():
new_track = get_track_from_db()
github_api = GitHubApi()
for item in new_track:
if item.scm_commit:
continue
status, result = github_api.get_latest_commit(item.scm_repo, item.scm_branch)
if status == 'success':
commit_id = result['latest_commit']
data = {
'version_control': item.version_control,
'repo': item.repo,
'branch': item.branch,
'enabled': item.enabled,
'scm_commit': commit_id,
'scm_branch': item.scm_branch,
'scm_repo': item.scm_repo
}
update_tracking(data)
else:
logger.error(
'Check empty CommitID: Fail to get latest commit id of scm_repo: %s scm_branch: %s. Return val: %s',
item.scm_repo, item.scm_branch, result
)
def get_track_from_db():
"""
query all trackings from database
"""
all_track = Tracking.query.filter_by(enabled=True)
return all_track
def patch_tracking_task(flask_app):
"""
add patch trackings to jobs
"""
with flask_app.app_context():
all_track = get_track_from_db()
all_job_id = list()
for item in scheduler.get_jobs():
all_job_id.append(item.id)
for track in all_track:
if track.branch.split('/')[0] != 'patch-tracking':
job_id = str(track.repo + ":" + track.branch)
if job_id not in all_job_id:
add_job(
job_id=job_id,
func='patch_tracking.task.task_apscheduler:upload_patch_to_gitee',
args=(track, )
)
"""
tracking job
"""
import logging
import base64
import time
from patch_tracking.util.gitee_api import create_branch, upload_patch, create_gitee_issue
from patch_tracking.util.gitee_api import create_pull_request, get_path_content, upload_spec, create_spec
from patch_tracking.util.github_api import GitHubApi
from patch_tracking.database.models import Tracking
from patch_tracking.api.business import update_tracking, create_issue
from patch_tracking.task import scheduler
from patch_tracking.util.spec import Spec
logger = logging.getLogger(__name__)
def upload_patch_to_gitee(track):
"""
upload a patch file to Gitee
"""
cur_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
with scheduler.app.app_context():
logger.info('[Patch Tracking %s] track.scm_commit_id: %s.', cur_time, track.scm_commit)
patch = get_scm_patch(track)
if patch:
issue = create_patch_issue_pr(patch, cur_time)
if issue:
create_issue_db(issue)
else:
logger.info('[Patch Tracking %s] No issue need to create.', cur_time)
else:
logger.debug('[Patch Tracking %s] No new commit.', cur_time)
def get_all_commit_info(scm_repo, db_commit, latest_commit):
"""
get all commit information between to commits
"""
commit_list = list()
github_api = GitHubApi()
while db_commit != latest_commit:
status, result = github_api.get_commit_info(scm_repo, latest_commit)
logger.debug('get_commit_info: %s %s', status, result)
if status == 'success':
if 'parent' in result:
ret = github_api.get_patch(scm_repo, latest_commit, latest_commit)
logger.debug('get patch api ret: %s', ret)
if ret['status'] == 'success':
result['patch_content'] = ret['api_ret']
# inverted insert commit_list
commit_list.insert(0, result)
else:
logger.error('Get scm: %s commit: %s patch failed. Result: %s', scm_repo, latest_commit, result)
latest_commit = result['parent']
else:
logger.info(
'[Patch Tracking] Successful get scm commit from %s to %s ID/message/time/patch.', db_commit,
latest_commit
)
break
else:
logger.error(
'[Patch Tracking] Get scm: %s commit: %s ID/message/time failed. Result: %s', scm_repo, latest_commit,
result
)
return commit_list
def get_scm_patch(track):
"""
Traverse the Tracking data table to get the patch file of enabled warehouse.
Different warehouse has different acquisition methods
:return:
"""
github_api = GitHubApi()
scm_dict = dict(
scm_repo=track.scm_repo,
scm_branch=track.scm_branch,
scm_commit=track.scm_commit,
enabled=track.enabled,
repo=track.repo,
branch=track.branch,
version_control=track.version_control
)
status, result = github_api.get_latest_commit(scm_dict['scm_repo'], scm_dict['scm_branch'])
logger.debug(
'repo: %s branch: %s. get_latest_commit: %s %s', scm_dict['scm_repo'], scm_dict['scm_branch'], status, result
)
if status == 'success':
commit_id = result['latest_commit']
if not scm_dict['scm_commit']:
data = {
'version_control': scm_dict['version_control'],
'repo': scm_dict['repo'],
'branch': scm_dict['branch'],
'enabled': scm_dict['enabled'],
'scm_commit': commit_id,
'scm_branch': scm_dict['scm_branch'],
'scm_repo': scm_dict['scm_repo']
}
update_tracking(data)
logger.info(
'[Patch Tracking] Scm_repo: %s Scm_branch: %s.Get latest commit ID: %s From commit ID: None.',
scm_dict['scm_repo'], scm_dict['scm_branch'], result['latest_commit']
)
else:
if commit_id != scm_dict['scm_commit']:
commit_list = get_all_commit_info(scm_dict['scm_repo'], scm_dict['scm_commit'], commit_id)
scm_dict['commit_list'] = commit_list
return scm_dict
logger.info(
'[Patch Tracking] Scm_repo: %s Scm_branch: %s.Get latest commit ID: %s From commit ID: %s. Nothing need to do.',
scm_dict['scm_repo'], scm_dict['scm_branch'], commit_id, scm_dict['scm_commit']
)
else:
logger.error(
'[Patch Tracking] Fail to get latest commit id of scm_repo: %s scm_branch: %s. Return val: %s',
scm_dict['scm_repo'], scm_dict['scm_branch'], result
)
return None
def create_patch_issue_pr(patch, cur_time):
"""
Create temporary branches, submit files, and create PR and issue
:return:
"""
issue_dict = dict()
if not patch:
return None
issue_dict['repo'] = patch['repo']
issue_dict['branch'] = patch['branch']
new_branch = 'patch-tracking/' + cur_time
result = create_branch(patch['repo'], patch['branch'], new_branch)
if result == 'success':
logger.info('[Patch Tracking %s] Successful create branch: %s', cur_time, new_branch)
else:
logger.error('[Patch Tracking %s] Fail to create branch: %s', cur_time, new_branch)
patch_lst = list()
# 表格格式会导致 Gitee 敏感词,先去掉
issue_table = ""
for latest_commit in patch['commit_list']:
scm_commit_url = '/'.join(['https://github.com', patch['scm_repo'], 'commit', latest_commit['commit_id']])
issue_table += '[{}]({}) | {} | {}'.format(
latest_commit['commit_id'][0:7], scm_commit_url, latest_commit['time'], latest_commit['message']
) + '\n'
patch_file_content = latest_commit['patch_content']
post_data = {
'repo': patch['repo'],
'branch': new_branch,
'latest_commit_id': latest_commit['commit_id'],
'patch_file_content': str(patch_file_content),
'cur_time': cur_time,
'commit_url': scm_commit_url
}
result = upload_patch(post_data)
if result == 'success':
logger.info(
'[Patch Tracking %s] Successfully upload patch file of commit: %s', cur_time, latest_commit['commit_id']
)
else:
logger.error(
'[Patch Tracking %s] Fail to upload patch file of commit: %s', cur_time, latest_commit['commit_id']
)
patch_lst.append(str(latest_commit['commit_id']))
logger.debug(issue_table)
result = create_gitee_issue(patch['repo'], issue_table, cur_time)
if result[0] == 'success':
issue_num = result[1]
logger.info('[Patch Tracking %s] Successfully create issue: %s', cur_time, issue_num)
ret = create_pull_request(patch['repo'], patch['branch'], new_branch, issue_num, cur_time)
if ret == 'success':
logger.info('[Patch Tracking %s] Successfully create PR of issue: %s.', cur_time, issue_num)
else:
logger.error('[Patch Tracking %s] Fail to create PR of issue: %s. Result: %s', cur_time, issue_num, ret)
issue_dict['issue'] = issue_num
upload_spec_to_repo(patch, patch_lst, cur_time)
data = {
'version_control': patch['version_control'],
'repo': patch['repo'],
'branch': patch['branch'],
'enabled': patch['enabled'],
'scm_commit': patch['commit_list'][-1]['commit_id'],
'scm_branch': patch['scm_branch'],
'scm_repo': patch['scm_repo']
}
update_tracking(data)
else:
logger.error('[Patch Tracking %s] Fail to create issue: %s. Result: %s', cur_time, issue_table, result[1])
return issue_dict
def upload_spec_to_repo(patch, patch_lst, cur_time):
"""
update and upload spec file
"""
new_branch = 'patch-tracking/' + cur_time
_, repo_name = patch['repo'].split('/')
spec_file = repo_name + '.spec'
patch_file_lst = [patch + '.patch' for patch in patch_lst]
log_title = "{} patch-tracking".format(cur_time)
log_content = "append patch file of upstream repository from <{}> to <{}>".format(patch_lst[0], patch_lst[-1])
ret = get_path_content(patch['repo'], patch['branch'], spec_file)
if 'content' in ret:
spec_content = str(base64.b64decode(ret['content']), encoding='utf-8')
spec_sha = ret['sha']
new_spec = modify_spec(log_title, log_content, patch_file_lst, spec_content)
update_spec_to_repo(patch['repo'], new_branch, cur_time, new_spec, spec_sha)
else:
if 'message' in ret and 'File Not Found' in ret['message']:
spec_content = ''
new_spec = modify_spec(log_title, log_content, patch_file_lst, spec_content)
create_spec_to_repo(patch['repo'], new_branch, cur_time, new_spec)
else:
logger.error('[Patch Tracking %s] Fail to update spec: %s. Result: %s', cur_time, spec_file, ret)
def modify_spec(log_title, log_content, patch_file_lst, spec_content):
"""
modify spec file
"""
spec = Spec(spec_content)
return spec.update(log_title, log_content, patch_file_lst)
def update_spec_to_repo(repo, branch, cur_time, spec_content, spec_sha):
"""
update spec file
"""
ret = upload_spec(repo, branch, cur_time, spec_content, spec_sha)
if ret == 'success':
logger.info('[Patch Tracking %s] Successfully update spec file.', cur_time)
else:
logger.error('[Patch Tracking %s] Fail to update spec file. Result: %s', cur_time, ret)
def create_spec_to_repo(repo, branch, cur_time, spec_content):
"""
create new spec file
"""
ret = create_spec(repo, branch, spec_content, cur_time)
if ret == 'success':
logger.info('[Patch Tracking %s] Successfully create spec file.', cur_time)
else:
logger.error('[Patch Tracking %s] Fail to create spec file. Result: %s', cur_time, ret)
def create_issue_db(issue):
"""
create issue into database
"""
issue_num = issue['issue']
tracking = Tracking.query.filter_by(repo=issue['repo'], branch=issue['branch']).first()
tracking_repo = tracking.repo
tracking_branch = tracking.branch
data = {'issue': issue_num, 'repo': tracking_repo, 'branch': tracking_branch}
logger.debug('issue data: %s', data)
create_issue(data)
# pylint: disable=R0801
'''
Automated testing of the Issue interface, GET requests
'''
import unittest
import json
from patch_tracking.app import app
from patch_tracking.api.business import create_issue
from patch_tracking.database import reset_db
from patch_tracking.api.constant import ResponseCode
class TestIssue(unittest.TestCase):
'''
Automated testing of the Issue interface, GET requests
'''
def setUp(self) -> None:
'''
Prepare the environment
:return:
'''
self.client = app.test_client()
reset_db.reset()
def test_none_data(self):
'''
In the absence of data, the GET interface queries all the data
:return:
'''
with app.app_context():
resp = self.client.get("/issue")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_query_inserted_data(self):
'''
The GET interface queries existing data
:return:
'''
with app.app_context():
data_insert = {"issue": "A", "repo": "A", "branch": "A"}
create_issue(data_insert)
resp = self.client.get("/issue?repo=A&branch=A")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert, resp_dict.get("data"), msg="Error in data information return")
def test_find_all_data(self):
'''
The GET interface queries all the data
:return:
'''
with app.app_context():
data_insert_c = {"issue": "C", "repo": "C", "branch": "C"}
data_insert_d = {"issue": "D", "repo": "D", "branch": "D"}
create_issue(data_insert_c)
create_issue(data_insert_d)
resp = self.client.get("/issue")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert_c, resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert_d, resp_dict.get("data"), msg="Error in data information return")
def test_find_nonexistent_data(self):
'''
The GET interface queries data that does not exist
:return:
'''
with app.app_context():
resp = self.client.get("/issue?repo=aa&branch=aa")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_get_error_parameters(self):
'''
The get interface passes in the wrong parameter
:return:
'''
with app.app_context():
data_insert = {"issue": "BB", "repo": "BB", "branch": "BB"}
create_issue(data_insert)
resp = self.client.get("/issue?oper=BB&chcnsrb=BB")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_get_interface_uppercase(self):
'''
The get interface uppercase
:return:
'''
with app.app_context():
data_insert = {"issue": "CCC", "repo": "CCC", "branch": "CCC"}
create_issue(data_insert)
resp = self.client.get("/issue?RrPo=CCC&brANch=CCC")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
if __name__ == '__main__':
unittest.main()
[loggers]
keys=root
[handlers]
keys=console
[formatters]
keys=simple
[logger_root]
level=DEBUG
handlers=console
[handler_console]
class=StreamHandler
level=DEBUG
formatter=simple
args=(sys.stdout,)
[formatter_simple]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=
# -*- coding:utf-8 -*-
'''
Automated testing of the Tracking interface, including POST requests and GET requests
'''
import unittest
import json
from base64 import b64encode
from werkzeug.security import generate_password_hash
from patch_tracking.app import app
from patch_tracking.database import reset_db
from patch_tracking.api.business import create_tracking
from patch_tracking.api.constant import ResponseCode
class TestTracking(unittest.TestCase):
'''
Automated testing of the Tracking interface, including POST requests and GET requests
'''
def setUp(self) -> None:
'''
Prepare the environment
:return:
'''
self.client = app.test_client()
reset_db.reset()
app.config["USER"] = "hello"
app.config["PASSWORD"] = generate_password_hash("world")
credentials = b64encode(b"hello:world").decode('utf-8')
self.auth = {"Authorization": f"Basic {credentials}"}
def test_none_data(self):
'''
In the absence of data, the GET interface queries all the data
:return:
'''
with app.app_context():
resp = self.client.get("/tracking")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_find_nonexistent_data(self):
'''
The GET interface queries data that does not exist
:return:
'''
with app.app_context():
resp = self.client.get("/tracking?repo=aa&branch=aa")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_insert_data(self):
'''
The POST interface inserts data
:return:
'''
data = {
"version_control": "github",
"scm_repo": "A",
"scm_branch": "A",
"scm_commit": "A",
"repo": "A",
"branch": "A",
"enabled": 0
}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
def test_query_inserted_data(self):
'''
The GET interface queries existing data
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "B",
"scm_branch": "B",
"scm_commit": "B",
"repo": "B",
"branch": "B",
"enabled": False
}
create_tracking(data_insert)
resp = self.client.get("/tracking?repo=B&branch=B")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert, resp_dict.get("data"), msg="Error in data information return")
def test_only_input_branch(self):
'''
Get interface queries enter only BRANCH, not REPO
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "C",
"scm_branch": "C",
"scm_commit": "C",
"repo": "C",
"branch": "C",
"enabled": 0
}
create_tracking(data_insert)
resp = self.client.get("/tracking?branch=B")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_fewer_parameters(self):
'''
When the POST interface passes in parameters, fewer parameters must be passed
:return:
'''
data = {"version_control": "github", "scm_commit": "AA", "repo": "AA", "branch": "AA", "enabled": 1}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_error_parameters_value(self):
'''
The post interface passes in the wrong parameter
:return:
'''
data = {"version_control": "github", "scm_commit": "AA", "repo": "AA", "branch": "AA", "enabled": "AA"}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_post_error_parameters(self):
'''
The post interface passes in the wrong parameter
:return:
'''
data = {"version_control": "github", "scm_commit": "AA", "oper": "AA", "hcnarb": "AA", "enabled": "AA"}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_get_error_parameters(self):
'''
The get interface passes in the wrong parameter
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "BB",
"scm_branch": "BB",
"scm_commit": "BB",
"repo": "BB",
"branch": "BB",
"enabled": True
}
create_tracking(data_insert)
resp = self.client.get("/tracking?oper=B&chcnsrb=B")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_update_data(self):
'''
update data
:return:
'''
with app.app_context():
data_old = {
"version_control": "github",
"scm_repo": "str",
"scm_branch": "str",
"scm_commit": "str",
"repo": "string",
"branch": "string",
"enabled": False
}
self.client.post("/tracking", json=data_old, content_type="application/json", headers=self.auth)
data_new = {
"branch": "string",
"enabled": True,
"repo": "string",
"scm_branch": "string",
"scm_commit": "string",
"scm_repo": "string",
"version_control": "github",
}
self.client.post("/tracking", json=data_new, content_type="application/json")
resp = self.client.get("/tracking?repo=string&branch=string")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
#self.assertIn(data_new, resp_dict.get("data"), msg="Error in data information return")
def test_get_interface_uppercase(self):
'''
The get interface uppercase
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "BBB",
"scm_branch": "BBB",
"scm_commit": "BBB",
"repo": "BBB",
"branch": "BBB",
"enabled": False
}
create_tracking(data_insert)
resp = self.client.get("/tracking?rep=BBB&BRAnch=BBB")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_version_control_error(self):
'''
The POST version control error
:return:
'''
data = {
"version_control": "gitgitgit",
"scm_repo": "A",
"scm_branch": "A",
"scm_commit": "A",
"repo": "A",
"branch": "A",
"enabled": 0
}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
if __name__ == '__main__':
unittest.main()
"""
http basic auth
"""
from werkzeug.security import check_password_hash
from flask_httpauth import HTTPBasicAuth
from flask import current_app as app
auth = HTTPBasicAuth()
@auth.verify_password
def verify_password(username, password):
"""
verify password
"""
if username == app.config["USER"] and \
check_password_hash(app.config["PASSWORD"], password):
return username
return None
"""
function of invoking Gitee API
"""
import base64
import logging
import requests
from flask import current_app
log = logging.getLogger(__name__)
ORG_URL = "https://gitee.com/api/v5/orgs"
REPO_URL = "https://gitee.com/api/v5/repos"
def get_path_content(repo, branch, path):
"""
get file content
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
url = '/'.join([REPO_URL, repo, 'contents', path])
param = {'access_token': gitee_token, 'ref': branch}
ret = requests.get(url, params=param).json()
return ret
def create_branch(repo, branch, new_branch):
"""
create branch
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
url = '/'.join([REPO_URL, repo, 'branches'])
data = {'access_token': gitee_token, 'refs': branch, 'branch_name': new_branch}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
def upload_patch(data):
"""
upload patch
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
patch_file_name = data['latest_commit_id'] + '.patch'
url = '/'.join([REPO_URL, data['repo'], 'contents', patch_file_name])
content = base64.b64encode(data['patch_file_content'].encode("utf-8"))
message = '[patch tracking] ' + data['cur_time'] + ' - ' + data['commit_url'] + '\n'
data = {'access_token': gitee_token, 'content': content, 'message': message, 'branch': data['branch']}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
def create_spec(repo, branch, spec_content, cur_time):
"""
create spec
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
spec_file_name = repo + '.spec'
url = '/'.join([REPO_URL, owner, repo, 'contents', spec_file_name])
content = base64.b64encode(spec_content.encode("utf-8"))
message = '[patch tracking] ' + cur_time + ' - ' + 'create spec file' + '\n'
data = {'access_token': gitee_token, 'content': content, 'message': message, 'branch': branch}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
def upload_spec(repo, branch, cur_time, spec_content, spec_sha):
"""
upload spec
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
spec_file_name = repo + '.spec'
url = '/'.join([REPO_URL, owner, repo, 'contents', spec_file_name])
content = base64.b64encode(spec_content.encode("utf-8"))
message = '[patch tracking] ' + cur_time + ' - ' + 'update spec file' + '\n'
data = {
'access_token': gitee_token,
'owner': owner,
'repo': repo,
'path': spec_file_name,
'content': content,
'message': message,
'branch': branch,
'sha': spec_sha
}
response = requests.put(url, data=data)
if response.status_code == 200:
return 'success'
return response.json()
def create_gitee_issue(repo, issue_body, cur_time):
"""
create issue
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
url = '/'.join([REPO_URL, owner, 'issues'])
data = {'access_token': gitee_token, 'repo': repo, 'title': '[patch tracking] ' + cur_time, 'body': issue_body}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success', response.json()['number']
return 'error', response.json()
def create_pull_request(repo, branch, patch_branch, issue_num, cur_time):
"""
create pull request
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
url = '/'.join([REPO_URL, owner, repo, 'pulls'])
data = {
'access_token': gitee_token,
'repo': repo,
'title': '[patch tracking] ' + cur_time,
'head': patch_branch,
'base': branch,
'body': '#' + issue_num,
"prune_source_branch": "true"
}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
"""
functionality of invoking GitHub API
"""
import time
import logging
import requests
from requests.exceptions import ConnectionError as requests_connectionError
from flask import current_app
logger = logging.getLogger(__name__)
class GitHubApi:
"""
Encapsulates GitHub functionality
"""
def __init__(self):
github_token = current_app.config['GITHUB_ACCESS_TOKEN']
token = 'token ' + github_token
self.headers = {
'User-Agent': 'Mozilla/5.0',
'Authorization': token,
'Content-Type': 'application/json',
'Connection': 'close',
'method': 'GET',
'Accept': 'application/json'
}
def api_request(self, url):
"""
request GitHub API
"""
logger.debug("Connect url: %s", url)
count = 30
while count > 0:
try:
response = requests.get(url, headers=self.headers)
return response
except requests_connectionError as err:
logger.warning(err)
time.sleep(10)
count -= 1
continue
if count == 0:
logger.error('Fail to connnect to github: %s after retry 30 times.', url)
return 'connect error'
def get_commit_info(self, repo_url, commit_id):
"""
get commit info
"""
res_dict = dict()
api_url = 'https://api.github.com/repos'
url = '/'.join([api_url, repo_url, 'commits', commit_id])
ret = self.api_request(url)
if ret != 'connect error':
if ret.status_code == 200:
res_dict['commit_id'] = commit_id
res_dict['message'] = ret.json()['commit']['message']
res_dict['time'] = ret.json()['commit']['author']['date']
if 'parents' in ret.json() and ret.json()['parents']:
res_dict['parent'] = ret.json()['parents'][0]['sha']
return 'success', res_dict
logger.error('%s failed. Return val: %s', url, ret)
return 'error', ret.json()
return 'error', 'connect error'
def get_latest_commit(self, repo_url, branch):
"""
get latest commit_ID, commit_message, commit_date
:param repo_url:
:param branch:
:return: res_dict
"""
api_url = 'https://api.github.com/repos'
url = '/'.join([api_url, repo_url, 'branches', branch])
ret = self.api_request(url)
res_dict = dict()
if ret != 'connect error':
if ret.status_code == 200:
res_dict['latest_commit'] = ret.json()['commit']['sha']
res_dict['message'] = ret.json()['commit']['commit']['message']
res_dict['time'] = ret.json()['commit']['commit']['committer']['date']
return 'success', res_dict
logger.error('%s failed. Return val: %s', url, ret)
return 'error', ret.json()
return 'error', 'connect error'
def get_patch(self, repo_url, scm_commit, last_commit):
"""
get patch
"""
api_url = 'https://github.com'
if scm_commit != last_commit:
commit = scm_commit + '...' + last_commit + '.diff'
else:
commit = scm_commit + '^...' + scm_commit + '.diff'
ret_dict = dict()
url = '/'.join([api_url, repo_url, 'compare', commit])
ret = self.api_request(url)
if ret != 'connect error':
if ret.status_code == 200:
patch_content = ret.text
ret_dict['status'] = 'success'
ret_dict['api_ret'] = patch_content
else:
logger.error('%s failed. Return val: %s', url, ret)
ret_dict['status'] = 'error'
ret_dict['api_ret'] = ret.text
else:
ret_dict['status'] = 'error'
ret_dict['api_ret'] = 'fail to connect github by api.'
return ret_dict
"""
functionality of modify the spec file
"""
import re
class Spec:
"""
functionality of update spec file
"""
def __init__(self, content):
self._lines = content.splitlines()
self.version = "0.0"
self.release = {"num": 0, "lineno": 0}
self.source_lineno = 0
self.patch = {"threshold": 6000, "max_num": 0, "lineno": 0}
self.changelog_lineno = 0
# 规避空文件异常
if len(self._lines) == 0:
self._lines.append("")
# 查找配置项最后一次出现所在行的行号
for i, line in enumerate(self._lines):
match_find = re.match(r"[ \t]*Version:[ \t]*([\d.]+)", line)
if match_find:
self.version = match_find[1]
continue
match_find = re.match(r"[ \t]*Release:[ \t]*([\d.]+)", line)
if match_find:
self.release["num"] = int(match_find[1])
self.release["lineno"] = i
continue
match_find = re.match(r"[ \t]*%changelog", line)
if match_find:
self.changelog_lineno = i
continue
match_find = re.match(r"[ \t]*Source([\d]*):", line)
if match_find:
self.source_lineno = i
continue
match_find = re.match(r"[ \t]*Patch([\d]+):", line)
if match_find:
num = int(match_find[1])
self.patch["lineno"] = 0
if num > self.patch["max_num"]:
self.patch["max_num"] = num
self.patch["lineno"] = i
continue
if self.patch["lineno"] == 0:
self.patch["lineno"] = self.source_lineno
if self.patch["max_num"] < self.patch["threshold"]:
self.patch["max_num"] = self.patch["threshold"]
else:
self.patch["max_num"] += 1
def update(self, log_title, log_content, patches):
"""
Update items in spec file
"""
self.release["num"] += 1
self._lines[self.release["lineno"]
] = re.sub(r"[\d]+", str(self.release["num"]), self._lines[self.release["lineno"]])
log_title = "* " + log_title + " " + self.version + "-" + str(self.release["num"])
log_content = "- " + log_content
self._lines.insert(self.changelog_lineno + 1, log_title + "\n" + log_content + "\n")
patch_list = []
for patch in patches:
patch_list.append("Patch" + str(self.patch["max_num"]) + ": " + patch)
self.patch["max_num"] += 1
self._lines.insert(self.patch["lineno"] + 1, "\n".join(patch_list))
return self.__str__()
def __str__(self):
return "\n".join(self._lines)
if __name__ == "__main__":
SPEC_CONTENT = """Name: diffutils
Version: 3.7
Release: 3
Source: ftp://ftp.gnu.org/gnu/diffutils/diffutils-%{version}.tar.xz
Patch: diffutils-cmp-s-empty.patch
%changelog
* Mon Nov 11 2019 shenyangyang<shenyangyang4@huawei.com> 3.7-3
- DESC:delete unneeded comments
* Thu Oct 24 2019 shenyangyang<shenyangyang4@huawei.com> 3.7-2
- Type:enhancement
"""
s = Spec(SPEC_CONTENT)
s.update("Mon Nov 11 2019 patch-tracking", "DESC:add patch files", [
"xxx.patch",
"yyy.patch",
])
print(s)
SPEC_CONTENT = """"""
s = Spec(SPEC_CONTENT)
s.update("Mon Nov 11 2019 patch-tracking", "DESC:add patch files", [
"xxx.patch",
"yyy.patch",
])
print(s)
"""
setup about building of pactch tracking
"""
import setuptools
setuptools.setup(
name='patch-tracking',
version='1.0.0',
packages=setuptools.find_packages(),
url='https://openeuler.org/zh/',
license='Mulan PSL v2',
author='ChenYanpan',
author_email='chenyanpan@huawei.com',
description='This is a tool for automatically tracking upstream repository code patches',
requires=['requests', 'flask', 'flask_restx', 'Flask_SQLAlchemy', 'Flask_APScheduler'],
data_files=[
('/etc/patch-tracking/', ['patch_tracking/settings.conf']),
('/etc/patch-tracking/', ['patch_tracking/logging.conf']),
('/var/patch-tracking/', ['patch_tracking/db.sqlite']),
('/usr/bin/', ['patch_tracking/cli/patch-tracking-cli']),
('/usr/bin/', ['patch_tracking/patch-tracking']),
('/usr/bin/', ['patch_tracking/cli/generate_password']),
('/usr/lib/systemd/system/', ['patch_tracking/patch-tracking.service']),
],
)
---
version_control: github
src_repo: python/cpython
tag_prefix: "^v?3*|^v"
seperator: "."
url: https://github.com/python/cpython.git
---
version_control: github
src_repo: python/cpython
tag_prefix: "^v"
seperator: "."
url: https://github.com/python/cpython.git
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册