提交 19766ccf 编写于 作者: S Shinwell Hu

sophisticated version recommending

上级 b20a499b
#!/usr/bin/python3
import http.cookiejar
import urllib.request
import re
import yaml
import json
import sys
import subprocess
import requests
from urllib.parse import urljoin
from datetime import datetime
time_format = "%Y-%m-%dT%H:%M:%S%z"
def eprint(*args, **kwargs):
print("DEBUG: ", *args, file=sys.stderr, **kwargs)
def load_last_query_result(info, force_reload=False):
if force_reload:
last_query = info.pop("last_query")
eprint("{repo} > Force reload".format(repo=info["src_repo"]))
return ""
else:
if "last_query" in info.keys():
last_query = info.pop("last_query")
#age = datetime.now() - datetime.strptime(last_query["time_stamp"], time_format)
age = datetime.now() - last_query["time_stamp"].replace(tzinfo=None)
if age.days < 7:
eprint("{repo} > Reuse Last Query".format(repo=info["src_repo"]))
return last_query["raw_data"]
else:
eprint("{repo} > Last Query Too Old.".format(repo=info["src_repo"]))
return ""
else:
return ""
def clean_tags(tags, info):
if info.get("tag_pattern", "") != "":
pattern_regex = re.compile(info["tag_pattern"])
result_list = [pattern_regex.sub("\\1", x) for x in tags]
elif info.get("tag_prefix", "") != "":
prefix_regex = re.compile(info["tag_prefix"])
result_list = [prefix_regex.sub("", x) for x in tags]
else:
result_list = tags
if info.get("seperator", ".") != ".":
seperator_regex = re.compile(info["seperator"])
result_list = [seperator_regex.sub(".", x) for x in result_list]
return result_list
def dirty_redirect_tricks(url, resp):
cookie = set()
href = ""
need_trick = False
for line in resp.splitlines():
line = line.strip()
if line.startswith("Redirecting"):
eprint("Redirecting with document.cookie")
need_trick = True
m = re.search("document\.cookie=\"(.*)\";", line)
if m:
cookie = cookie | set(m.group(1).split(';'))
m = re.search("document\.location\.href=\"(.*)\";", line)
if m:
href = m.group(1)
new_url = urljoin(url, href)
if "" in cookie: cookie.remove("")
return need_trick, new_url, list(cookie)
def check_hg(info):
resp = load_last_query_result(info)
if resp == "":
headers = {
'User-Agent' : 'Mozilla/5.0 (X11; Linux x86_64)'
}
url = urljoin(info["src_repo"] + "/", "json-tags")
resp = requests.get(url, headers=headers)
need_trick, url, cookie = dirty_redirect_tricks(url, resp.text)
if need_trick:
# I dont want to introduce another dependency on requests
# but urllib handling cookie is outragely complex
c_dict = {}
for c in cookie:
k, v = c.split('=')
c_dict[k] = v
resp = requests.get(url, headers=headers, cookies=c_dict)
last_query = {}
last_query["time_stamp"] = datetime.now()
last_query["raw_data"] = resp.text
info["last_query"] = last_query
# try and except ?
tags_json = json.loads(resp.text)
sort_tags = tags_json["tags"]
sort_tags.sort(reverse=True, key=lambda x: x['date'][0])
result_list = [tag['tag'] for tag in sort_tags]
result_list = clean_tags(result_list, info)
return result_list
def check_github(info):
resp = load_last_query_result(info)
if info.get("query_type", "git-ls") != "git-ls":
resp = ""
cmd_list = ["git", "ls-remote", "--tags", "https://github.com/" + info["src_repo"] + ".git"]
if resp == "":
eprint("{repo} > Using git ls-remote".format(repo=info["src_repo"]))
subp = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
resp = subp.stdout.read().decode("utf-8")
if subp.wait() != 0:
eprint("{repo} > git ls-remote encount errors".format(repo=info["src_repo"]))
sys.exit(1)
last_query = {}
last_query["time_stamp"] = datetime.now()
last_query["raw_data"] = resp
info["last_query"] = last_query
info["query_type"] = "git-ls"
tags = []
pattern = re.compile("^([^ \t]*)[ \t]*refs\/tags\/([^ \t]*)")
for line in resp.splitlines():
m = pattern.match(line)
if m:
tags.append(m.group(2))
tags = clean_tags(tags, info)
return tags
if __name__ == "__main__":
pass
"""
def compare_tags (a, b)
arr_a = a.split(".")
arr_b = b.split(".")
len = [arr_a.length, arr_b.length].min
idx = 0
while idx < len do
res1 = arr_a[idx].to_i <=> arr_b[idx].to_i
return res1 if res1 != 0
res2 = arr_a[idx].length <=> arr_b[idx].length
return -res2 if res2 != 0
res3 = arr_a[idx][-1].to_i <=> arr_b[idx][-1].to_i
return res3 if res3 != 0
idx = idx + 1
end
return arr_a.length <=> arr_b.length
end
def sort_tags (tags)
tags.sort! { |a, b|
compare_tags(a,b)
}
return tags
end
"""
#!/usr/bin/python3
from pyrpm.spec import Spec, replace_macros
import yaml
import json
import datetime
import sys
import os
import argparse
import urllib.error
import gitee
import check_upstream
import version_recommend
if __name__ == "__main__":
parameters = argparse.ArgumentParser()
parameters.add_argument("-p", "--push", action="store_true",
help="Push the version bump as an issue to src-openeuler repository")
parameters.add_argument("-d", "--default", type=str, default=os.getcwd(),
help="The fallback place to look for YAML information")
parameters.add_argument("repo", type=str,
help="Repository to be checked for upstream version info")
args = parameters.parse_args()
gitee = gitee.Gitee()
prj_name = args.repo
spec_string = gitee.get_spec(prj_name)
s_spec = Spec.from_string(spec_string)
current_version = s_spec.version
print(prj_name)
print(current_version)
try:
prj_info_string = gitee.get_yaml(prj_name)
except urllib.error.HTTPError:
prj_info_string = ""
if not prj_info_string:
print("Fallback to {dir}".format(dir=args.default))
prj_info_string = open(os.path.join(args.default, prj_name + ".yaml")).read()
if not prj_info_string:
print("Failed to get YAML info for {pkg}".format(pkg=prj_name))
sys.exit(1)
prj_info = yaml.load(prj_info_string, Loader=yaml.Loader)
vc_type = prj_info["version_control"]
if vc_type == "hg":
tags = check_upstream.check_hg(prj_info)
elif vc_type == "github":
tags = check_upstream.check_github(prj_info)
else:
pass
print("tags :", tags)
v = version_recommend.VersionRecommend(tags, current_version, 0)
print("Latest version is ", v.latest_version)
print("Maintain version is", v.maintain_version)
"""
if vc_type == "svn":
tags = check_upstream_svn(prj_info)
elif vc_type == "git":
tags = check_upstream_git(prj_info)
tags = clean_tags(tags.lines)
elif vc_type == "metacpan":
tags = check_upstream_metacpan(prj_info)
tags = clean_tags(tags.lines)
elif vc_type == "gitlab.gnome":
tags = check_upstream_gnome(prj_info)
tags = clean_tags(tags.lines)
elif vc_type == "pypi":
tags = check_upstream_pypi(prj_info)
tags = clean_tags(tags.lines)
else:
print("Unsupport version control method {vc}".format(vc=vc_type))
sys.exit(1)
"""
"""
def compare_tags (a, b)
arr_a = a.split(".")
arr_b = b.split(".")
len = [arr_a.length, arr_b.length].min
idx = 0
while idx < len do
res1 = arr_a[idx].to_i <=> arr_b[idx].to_i
return res1 if res1 != 0
res2 = arr_a[idx].length <=> arr_b[idx].length
return -res2 if res2 != 0
res3 = arr_a[idx][-1].to_i <=> arr_b[idx][-1].to_i
return res3 if res3 != 0
idx = idx + 1
end
return arr_a.length <=> arr_b.length
end
def sort_tags (tags)
tags.sort! { |a, b|
compare_tags(a,b)
}
return tags
end
def clean_tags(tags)
new_tags = []
tags.each{|line|
new_tags = new_tags.append clean_tag(line, Prj_info)
}
return new_tags
end
def upgrade_recommend(tags_param, cur_tag, policy)
tags = tags_param.reverse
tag1 = cur_tag
tag2 = cur_tag
if policy == "latest" then
return tags[0]
elsif policy == "latest-stable" then
tags.each { |tag|
if tag.split(".").count {|f| f.to_i != 0 } >= 3 then
tag1 = tag
break
end
}
tags.each { |tag|
if tag.split(".").count {|f| f.to_i != 0} >= 2 then
tag2 = tag
break
end
}
if tag2[0].to_i > tag1[0].to_i then
return tag2
else
return tag1
end
elsif policy == "perfer-stable" then
tags.each { |tag|
if tag.start_with?(cur_tag) then
return tag
end
}
if cur_tag.split(".").length >= 3 then
search_tag = cur_tag.split(".")[0..1].join(".")
tags.each { |tag|
if tag.start_with?(search_tag) then
return tag
end
}
end
return cur_tag
else
return cur_tag
end
end
print Prj_name, ":\n"
tags = sort_tags(tags)
print "Latest upstream is ", tags[-1], "\n"
#print "Recommended is ", upgrade_recommend(tags, Cur_ver, "latest-stable"), "\n"
print "Current version is ", Cur_ver, "\n"
puts "This package has #{spec_struct.get_diverse} patches"
if tags.length == 0 or compare_tags(tags[-1], Cur_ver) < 0 then
STDERR.puts "DEBUG #{Prj_name} > tags are #{tags}"
File.delete("upstream-info/"+Prj_name+".yaml") if File.exist?("upstream-info/"+Prj_name+".yaml")
File.open("known-issues/"+Prj_name+".yaml", "w") { |file| file.write(Prj_info.to_yaml) }
else
File.open("upstream-info/"+Prj_name+".yaml", "w") { |file| file.write(Prj_info.to_yaml) }
end
File.delete(specfile) if specfile != ""
if options[:push] then
puts "Push to gitee\n"
ad = Advisor.new
ad.new_issue("src-openeuler", Prj_name, "Upgrade to Latest Release", "Dear #{Prj_name} maintainer:\n\n We found the latst version of #{Prj_name} is #{tags[-1]}, while the current version in openEuler is #{Cur_ver}.\n\n Please consider upgrading.\n\n\nYours openEuler Advisor.")
else
puts "keep it to us\n"
end
"""
#!/usr/bin/python3
"""
This is a command line tool to create reminder list for TC member
"""
import urllib
import urllib.request
import urllib.parse
import argparse
import json
import sys
import os
import yaml
from pprint import pprint
from datetime import datetime
class Advisor(object):
"""
This is a object abstract TC robot
"""
def __init__(self):
self.secret = open(os.path.expanduser("~/.gitee_personal_token.json"), "r")
self.token = json.load(self.secret)
self.header = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0"}
self.tc_members = None
self.time_format = "%Y-%m-%dT%H:%M:%S%z"
def get_json(self, url):
"""
Return object parsed from remote json
"""
headers = self.header.copy()
headers["Content-Type"] = "application/json;charset=UTF-8"
req = urllib.request.Request(url = url,
headers = headers,
method = "GET")
with urllib.request.urlopen(req) as u:
resp = json.loads(u.read().decode("utf-8"))
return resp
def get_file(self, repo, path):
"""
Get remote raw file
"""
url = "https://gitee.com/{repo}/raw/master/{path}".format(repo=repo, path=path)
req = urllib.request.Request(url = url,
headers = self.header,
method = "GET")
with urllib.request.urlopen(req) as u:
resp = u.read()
return resp
def get_prs(self):
"""
Get list of PRs
"""
pulls_url = "https://gitee.com/api/v5/repos/openeuler/community/pulls"
list_url = pulls_url + "?access_token={token}&state=open&sort=created&direction=desc&page=1&per_page=100"
url = list_url.format(token=self.token["access_token"])
return self.get_json(url)
def get_recent_prs(self, num):
"""
Get list of _recent_ PRs
"""
pulls_url = "https://gitee.com/api/v5/repos/openeuler/community/pulls"
list_all_url = pulls_url + "?access_token={token}&state=all&sort=created&direction=desc&"
#list_all_url = pulls_url + "?access_token={token}&state=all&sort=created&direction=desc&per_page=100&page="
list_all_url = list_all_url.format(token=self.token["access_token"])
result = []
page = 1
if num <= 100:
list_all_url = list_all_url + "per_page={num}&page=1".format(num=num)
return self.get_json(url)
list_all_url = list_all_url + "per_page=100&page="
while num > 100:
url = list_all_url + str(page)
num -= 100
page += 1
result += self.get_json(url)
url = list_all_url + str(page)
result += self.get_json(url)
return result
def get_pr_comments(self, number):
"""
Get Comments for a specific PR
"""
pulls_url = "https://gitee.com/api/v5/repos/openeuler/community/pulls"
desc_url = pulls_url + "/{number}/comments?access_token={token}&page=1&per_page=100"
url = desc_url.format(number=number, token=self.token["access_token"])
return self.get_json(url)
def get_tc_members(self):
"""
Get list of current TC members
"""
m = yaml.load(adv.get_file("openeuler/community", "sig/TC/OWNERS"), Loader=yaml.Loader)
self.tc_members = m["maintainers"]
return m["maintainers"]
def filter_out_tc(self, users):
"""
Pick TC members from users
"""
if not self.tc_members:
self.get_tc_members()
return [x for x in self.tc_members if x in users]
if __name__ == "__main__":
par = argparse.ArgumentParser()
par.add_argument("-n", "--number", help="Number of recent PRs to be processed", default="100")
args = par.parse_args()
adv = Advisor()
tc_members = adv.get_tc_members()
print("Current TC members :", tc_members)
tc_statistic = {}
for t in tc_members:
tc_statistic[t] = 0
PRs = adv.get_recent_prs(int(args.number))
print("Statistic of recent {num} PRs".format(len(PRs))
for pr in PRs:
commenter = pr["user"]["login"]
if commenter in tc_members:
tc_statistic[commenter] += 1
comments = adv.get_pr_comments(pr["number"])
for comment in comments:
commenter = comment["user"]["login"]
if commenter in tc_members:
tc_statistic[commenter] += 1
for tc in tc_statistic.keys():
print("{tc} mades {num} comments".format(tc=tc, num=tc_statistic[tc]))
#!/usr/bin/python3
import re
import datetime
import time
__ALL__ = ["VersionRecommend"]
class VersionType:
def version_match(self, pkg_version):
pass
def latest_version(self, version_entry):
version_entry.sort(reverse = True)
return version_entry[0]
def maintain_version(self, version_entry, current_version, pkg_type):
return None
def __init__(self):
self._version_type = ''
def get_version_mode(self):
return self._version_type
class VersionType_x_y_z_w(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 4: # 通过 '.'分割后,应该剩下4位
return False
if len(digital_list[0]) > 2: # 第一位版本号不应该大于2位
return False
if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位
return False
if len(digital_list[2]) > 3: # 第三位版本号不应该大于3位
return False
if len(digital_list[3]) > 1: # 第四位版本号不应该大于1位
return False
return True
def maintain_version(self, version_entry, current_version, pkg_type):
# todo 通过软件类型进一步选择版本号
version_entry.sort(True) # 将版本列表降序排序
version_digital = re.split(r'[._]', current_version) # 将版本号做拆分
for version in version_entry:
version_temp = re.split(r'[._]', version)
if version_digital[0:2] == version_temp[0:2]: # 如果版本号与当前版本前两位一致,说明是维护分支的最新版本
return version
return None
def __init__(self):
self._version_type = 'x.y.z.w'
class VersionType_x_y_z(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位
return False
if len(digital_list[0]) > 2: # 第一位版本号不应该大于2位
return False
if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位
return False
if len(digital_list[2]) > 3: # 第三位版本号不应该大于3位
return False
return True
def maintain_version(self, version_entry, current_version, pkg_type):
version_entry.sort(reverse = True) # 将版本列表降序排序
version_digital = re.split(r'[._]', current_version) # 将版本号做拆分
for version in version_entry:
version_temp = re.split(r'[._]', version)
if version_digital[0:2] == version_temp[0:2]: # 如果版本号与当前版本前两位一致,说明是维护分支的最新版本
return version
return None
def __init__(self):
self._version_type = 'x.y.z'
class VersionType_x_y(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 2: # 通过 '.'分割后,应该剩下2位
return False
if len(digital_list[0]) > 2: # 第一位版本号不应该大于2位
return False
if len(digital_list[1]) > 3: # 第二位版本号不应该大于2位
return False
return True
def __init__(self):
self._version_type = 'x.y'
class VersionType_x(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 1: # 通过 '.'分割后,应该剩下1位
return False
if len(digital_list[0]) > 3: # 第一位版本号不应该大于3位
return False
return True
def __init__(self):
self._version_type = 'x'
class VersionType_yyyy_x_y(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位
return False
if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位
return False
year = int(digital_list[0])
if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间
return False
if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位
return False
if len(digital_list[2]) > 2: # 第三位版本号不应该大于2位
return False
return True
def __init__(self):
self._version_type = 'yyyy.x.y'
class VersionType_yyyy_x(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 2: # 通过 '.'分割后,应该剩下2位
return False
if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位
return False
year = int(digital_list[0])
if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间
return False
if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位
return False
return True
def __init__(self):
self._version_type = 'yyyy.x'
class VersionType_yyyy_mm_dd(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位
return False
if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位
return False
if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12
return False
if int(digital_list[2]) > 31 or int(digital_list[2]) == 0: # 第三位为发布日期,小于31
return False
# 判断日期是否为合法日期
try:
if '_' in version:
d_time = time.mktime(time.strptime(version, "%Y_%m_%d"))
else:
d_time = time.mktime(time.strptime(version, "%Y.%m.%d"))
now_str = datetime.datetime.now().strftime('%Y-%m-%d')
end_time = time.mktime(time.strptime(now_str, '%Y-%m-%d'))
if d_time > end_time: # 判断日期是否大于当前日期
return False
else:
return True
except: # 时间格式非法
print('Time foramt failed %s.', version)
return False
def __init__(self):
self._version_type = 'yyyy.mm.dd'
class VersionType_yyyy_mm(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 2: # 通过 '.'分割后,应该剩下2位
return False
if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位
return False
year = int(digital_list[0])
if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间
return False
if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12
return False
if year == datetime.datetime.now().year and \
int(digital_list[1]) > datetime.datetime.now().month:
return False
return True
def __init__(self):
self._version_type = 'yyyy.mm'
class VersionType_x_yymm_z(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
digital_list = re.split(r'[._]', version)
if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位
return False
if len(digital_list[0]) > 2: # 第一位为主版本号,小于2位
return False
# 将年月拆分后分别判断
year = str(digital_list[1][:2])
month = str(digital_list[1][-2:])
if year > datetime.datetime.now().year[-2:]: # 年份不能大于当前年份,不用考虑20000 年前的情况
return False
if month > 12 or month == 0:
return False
if len(digital_list[2]) > 2: # 迭代号不大于2位
return False
return True
def __init__(self):
self._version_type = 'x.yymm.z'
class VersionType_yyyymmdd(VersionType):
def version_match(self, pkg_version):
version = pkg_version.strip()
if len(version) != 8: # 日期长度满足 8 位要求
return False
if not version.isdigit(): # 连续数字,没有其他分别符号
return False
digital_list = [version[:4], version[4:6], version[6:]]
if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12
return False
if int(digital_list[2]) > 31 or int(digital_list[2]) == 0: # 第三位为发布日期,小于31
return False
# 判断日期是否为合法日期
try:
d_time = time.mktime(time.strptime(version, "%Y%m%d"))
now_str = datetime.datetime.now().strftime('%Y-%m-%d')
end_time = time.mktime(time.strptime(now_str, '%Y-%m-%d'))
if d_time > end_time: # 判断日期是否大于当前日期
return False
else:
return True
except: # 时间格式非法
print('Time format failed %s,', version)
return False
def __init__(self):
self._version_type = 'yyyymmdd'
class VersionRecommend:
def __init__(self, version_entry, current_version, pkg_type):
self.version_type = self._version_match(current_version)
self.latest_version = self._get_latest_version(version_entry)
self.maintain_version = self._get_maintain_version(version_entry, current_version, pkg_type)
def _version_match(self, version):
version_method = [VersionType_x_y_z_w(),
VersionType_x_y_z(),
VersionType_x_y(),
VersionType_x(),
VersionType_yyyy_x_y(),
VersionType_yyyy_x(),
VersionType_yyyy_mm_dd(),
VersionType_yyyy_mm(),
VersionType_x_yymm_z(),
VersionType_yyyymmdd()]
for method in version_method:
if method.version_match(version):
print(version, method.get_version_mode())
return method
def _get_latest_version(self, version_entry):
return self.version_type.latest_version(version_entry)
def _get_maintain_version(self, version_entry, current_version, pkg_type):
return self.version_type.maintain_version(version_entry, current_version, pkg_type)
if __name__ == '__main__':
version_recommend = VersionRecommend(['1.2.3','1.2.4','1.3.0','2.0.1'],'1.2.3',0)
print(version_recommend.latest_version)
print(version_recommend.maintain_version)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册