提交 7b481121 编写于 作者: C custa

patch tracking codebase

上级 56bd4d03
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Flask stuff:
instance/
.webassets-cache
# pyenv
.python-version
# dotenv
.env
# virtualenv
venv/
ENV/
# Editors
.idea/
# log file
*.log
[MASTER]
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-whitelist=
# Specify a score threshold to be exceeded before program exits with error.
fail-under=10
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=CVS
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=issue_test,tracking_test
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))"
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.
jobs=1
# Control the amount of potential inferred values when inferring a single
# object. This can help the performance when dealing with large functions or
# complex, nested conditions.
limit-inference-results=100
# List of plugins (as comma separated values of python module names) to load,
# usually to register additional checkers.
load-plugins=
# Pickle collected data for later comparisons.
persistent=yes
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
suggestion-mode=yes
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED.
confidence=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once). You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use "--disable=all --enable=classes
# --disable=W".
disable=print-statement,
parameter-unpacking,
unpacking-in-except,
old-raise-syntax,
backtick,
long-suffix,
old-ne-operator,
old-octal-literal,
import-star-module-level,
non-ascii-bytes-literal,
raw-checker-failed,
bad-inline-option,
locally-disabled,
file-ignored,
suppressed-message,
useless-suppression,
deprecated-pragma,
use-symbolic-message-instead,
apply-builtin,
basestring-builtin,
buffer-builtin,
cmp-builtin,
coerce-builtin,
execfile-builtin,
file-builtin,
long-builtin,
raw_input-builtin,
reduce-builtin,
standarderror-builtin,
unicode-builtin,
xrange-builtin,
coerce-method,
delslice-method,
getslice-method,
setslice-method,
no-absolute-import,
old-division,
dict-iter-method,
dict-view-method,
next-method-called,
metaclass-assignment,
indexing-exception,
raising-string,
reload-builtin,
oct-method,
hex-method,
nonzero-method,
cmp-method,
input-builtin,
round-builtin,
intern-builtin,
unichr-builtin,
map-builtin-not-iterating,
zip-builtin-not-iterating,
range-builtin-not-iterating,
filter-builtin-not-iterating,
using-cmp-argument,
eq-without-hash,
div-method,
idiv-method,
rdiv-method,
exception-message-attribute,
invalid-str-codec,
sys-max-int,
bad-python3-import,
deprecated-string-function,
deprecated-str-translate-call,
deprecated-itertools-function,
deprecated-types-field,
next-method-defined,
dict-items-not-iterating,
dict-keys-not-iterating,
dict-values-not-iterating,
deprecated-operator-function,
deprecated-urllib-function,
xreadlines-attribute,
deprecated-sys-function,
exception-escape,
comprehension-escape
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=c-extension-no-member
[REPORTS]
# Python expression which should return a score less than or equal to 10. You
# have access to the variables 'error', 'warning', 'refactor', and 'convention'
# which contain the number of messages in each category, as well as 'statement'
# which is the total number of statements analyzed. This score is used by the
# global evaluation report (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details.
#msg-template=
# Set the output format. Available formats are text, parseable, colorized, json
# and msvs (visual studio). You can also give a reporter class, e.g.
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages.
reports=no
# Activate the evaluation score.
score=yes
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
# Complete name of functions that never returns. When checking for
# inconsistent-return-statements if a never returning function is called then
# it will be considered as an explicit return statement and no message will be
# printed.
never-returning-functions=sys.exit
[LOGGING]
# The type of string formatting that logging methods do. `old` means using %
# formatting, `new` is for `{}` formatting.
logging-format-style=old
# Logging modules to check that the string format arguments are in logging
# function parameter format.
logging-modules=logging
[SPELLING]
# Limits count of emitted suggestions for spelling mistakes.
max-spelling-suggestions=4
# Spelling dictionary name. Available dictionaries: none. To make it work,
# install the python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains the private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to the private dictionary (see the
# --spelling-private-dict-file option) instead of raising a message.
spelling-store-unknown-words=no
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,
XXX,
TODO
# Regular expression of note tags to take in consideration.
#notes-rgx=
[TYPECHECK]
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# Tells whether to warn about missing members when the owner of the attribute
# is inferred to be None.
ignore-none=yes
# This flag controls whether pylint should warn about no-member and similar
# checks whenever an opaque object is returned when inferring. The inference
# can return multiple potential results while evaluating a Python object, but
# some branches might not be evaluated, which results in partial inference. In
# that case, it might be useful to still emit no-member and other checks for
# the rest of the inferred objects.
ignore-on-opaque-inference=yes
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# Show a hint with possible names when a member name was not found. The aspect
# of finding the hint is based on edit distance.
missing-member-hint=yes
# The minimum edit distance a name should have in order to be considered a
# similar match for a missing member name.
missing-member-hint-distance=1
# The total number of similar names that should be taken in consideration when
# showing a hint for a missing member.
missing-member-max-choices=1
# List of decorators that change the signature of a decorated function.
signature-mutators=
[VARIABLES]
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid defining new builtins when possible.
additional-builtins=
# Tells whether unused global variables should be treated as a violation.
allow-global-unused-variables=yes
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,
_cb
# A regular expression matching the name of dummy variables (i.e. expected to
# not be used).
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
# Argument names that match this expression will be ignored. Default to name
# with leading underscore.
ignored-argument-names=_.*|^ignored_|^unused_
# Tells whether we should check for unused import in __init__ files.
init-import=no
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[FORMAT]
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Maximum number of characters on a single line.
max-line-length=120
# Maximum number of lines in a module.
max-module-lines=1000
# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
# `trailing-comma` allows a space between comma and closing bracket: (a, ).
# `empty-line` allows space-only lines.
no-space-check=trailing-comma,
dict-separator
# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
single-line-class-stmt=no
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
[SIMILARITIES]
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
# Minimum lines number of a similarity.
min-similarity-lines=4
[BASIC]
# Naming style matching correct argument names.
argument-naming-style=snake_case
# Regular expression matching correct argument names. Overrides argument-
# naming-style.
#argument-rgx=
# Naming style matching correct attribute names.
attr-naming-style=snake_case
# Regular expression matching correct attribute names. Overrides attr-naming-
# style.
#attr-rgx=
# Bad variable names which should always be refused, separated by a comma.
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
# Bad variable names regexes, separated by a comma. If names match any regex,
# they will always be refused
bad-names-rgxs=
# Naming style matching correct class attribute names.
class-attribute-naming-style=any
# Regular expression matching correct class attribute names. Overrides class-
# attribute-naming-style.
#class-attribute-rgx=
# Naming style matching correct class names.
class-naming-style=PascalCase
# Regular expression matching correct class names. Overrides class-naming-
# style.
#class-rgx=
# Naming style matching correct constant names.
const-naming-style=UPPER_CASE
# Regular expression matching correct constant names. Overrides const-naming-
# style.
#const-rgx=
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
# Naming style matching correct function names.
function-naming-style=snake_case
# Regular expression matching correct function names. Overrides function-
# naming-style.
#function-rgx=
# Good variable names which should always be accepted, separated by a comma.
good-names=i,
j,
k,
ex,
Run,
_
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs=
# Include a hint for the correct naming format with invalid-name.
include-naming-hint=no
# Naming style matching correct inline iteration names.
inlinevar-naming-style=any
# Regular expression matching correct inline iteration names. Overrides
# inlinevar-naming-style.
#inlinevar-rgx=
# Naming style matching correct method names.
method-naming-style=snake_case
# Regular expression matching correct method names. Overrides method-naming-
# style.
#method-rgx=
# Naming style matching correct module names.
module-naming-style=snake_case
# Regular expression matching correct module names. Overrides module-naming-
# style.
#module-rgx=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
# These decorators are taken in consideration only for invalid-name.
property-classes=abc.abstractproperty
# Naming style matching correct variable names.
variable-naming-style=snake_case
# Regular expression matching correct variable names. Overrides variable-
# naming-style.
#variable-rgx=
[STRING]
# This flag controls whether inconsistent-quotes generates a warning when the
# character used as a quote delimiter is used inconsistently within a module.
check-quote-consistency=no
# This flag controls whether the implicit-str-concat should generate a warning
# on implicit string concatenation in sequences defined over several lines.
check-str-concat-over-line-jumps=no
[IMPORTS]
# List of modules that can be imported at any level, not just the top level
# one.
allow-any-import-level=
# Allow wildcard imports from modules that define __all__.
allow-wildcard-with-all=no
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
# Deprecated modules which should not be used, separated by a comma.
deprecated-modules=optparse,tkinter.tix
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled).
ext-import-graph=
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled).
import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled).
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Couples of modules and preferred modules, separated by a comma.
preferred-modules=
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=cls
[DESIGN]
# Maximum number of arguments for function / method.
max-args=5
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
# Maximum number of branch for function / method body.
max-branches=12
# Maximum number of locals for function / method body.
max-locals=15
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of return / yield for function / method body.
max-returns=6
# Maximum number of statements in function / method body.
max-statements=50
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception
[style]
based_on_style = pep8
column_limit = 120
dedent_closing_brackets = True
[[source]]
name = "pypi"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
verify_ssl = true
[dev-packages]
pylint = "*"
yapf = "*"
pyopenssl = "*"
[packages]
flask = "*"
flask-sqlalchemy = "*"
flask-apscheduler = "*"
requests = "*"
werkzeug = "*"
flask-httpauth = "*"
[requires]
python_version = "3.7"
此差异已折叠。
补丁跟踪
===
# 一 简介
在 openEuler 发行版开发过程,需要及时更新上游社区各个软件包的最新代码,修改功能 bug 及安全问题,确保发布的 openEuler 发行版尽可能避免缺陷和漏洞。
本工具对软件包进行补丁管理,主动监控上游社区提交,自动生成补丁,并自动提交 issue 给对应的 maintainer,同时自动验证补丁基础功能,减少验证工作量支持 maintainer 快速决策。
# 二 架构
### 2.1 CS架构
补丁跟踪采用 C/S 架构,其中服务端(patch-tracking) 负责执行补丁跟踪任务,包括:维护跟踪项,识别上游仓库分支代码变更并形成补丁文件,向 Gitee 提交 issue 及 PR,同时 patch-tracking 提供 RESTful 接口,用于对跟踪项进行增删改查操作。客户端,即命令行工具(patch-tracking-cli),通过调用 patch-tracking 的 RESTful 接口,实现对跟踪项的增删改查操作。
### 2.2 核心流程
* 补丁跟踪服务流程
**主要步骤:**
1. 命令行工具写入跟踪项。
2. 自动从跟踪项配置的上游仓库(例如Github)获取补丁文件。
3. 创建临时分支,将获取到的补丁文件提交到临时分支。
4. 自动提交issue到对应项目,并生成关联 issue 的 PR。
![PatchTracking](images/PatchTracking.jpg)
* Maintainer对提交的补丁处理流程
**主要步骤:**
1. Maintainer分析临时分支中的补丁文件,判断是否合入。
2. 执行构建,构建成功后判断是否合入PR。
![Maintainer](images/Maintainer.jpg)
### 2.3 数据结构
* Tracking表
| 序号 | 名称 | 说明 | 类型 | 键 | 允许空 |
|:----:| ----| ----| ----| ----| ----|
| 1 | id | 自增补丁跟踪项序号 | int | - | NO |
| 2 | version_control | 上游SCM的版本控制系统类型 | String | - | NO |
| 3 | scm_repo | 上游SCM仓库地址 | String | - | NO |
| 4 | scm_branch | 上游SCM跟踪分支 | String | - | NO |
| 5 | scm_commit | 上游代码最新处理过的Commit ID | String | - | YES |
| 6 | repo | 包源码在Gitee的仓库地址 | String | Primary | NO |
| 7 | branch | 包源码在Gitee的仓库分支 | String | Primary | NO |
| 8 | enabled | 是否启动跟踪 | Boolean | -| NO |
* Issue表
| 序号 | 名称 | 说明 | 类型 | 键 | 允许空 |
|:----:| ----| ----| ----| ----| ----|
| 1 | issue | issue编号 | String | Primary | NO |
| 2 | repo | 包源码在Gitee的仓库地址 | String | - | NO |
| 3 | branch | 包源码在Gitee的仓库分支 | String | - | NO |
# 三 部署
>环境已安装 Python >= 3.7 以及 pip3
### 3.1 安装依赖
```shell script
yum install -y gcc python3-devel openssl-devel
pip3 install flask flask-sqlalchemy flask-apscheduler requests flask_httpauth
pip3 install -I uwsgi
```
### 3.2 安装
```shell script
rpm -ivh patch-tracking-xxx.rpm
```
### 3.3 配置
在配置文件中进行对应参数的配置。
配置文件路径 `/etc/patch-tracking/settings.conf`
- 服务监听地址
```python
LISTEN = "127.0.0.1:5001"
```
- GitHub Token,用于访问托管在 GitHub 上游开源软件仓的仓库信息
生成 GitHub Token 的方法参考 [Creating a personal access token](https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token)
```python
GITHUB_ACCESS_TOKEN = ""
```
- 对于托管在gitee上的需要跟踪的仓库,配置一个有该仓库权限的gitee的token,用于提交patch文件,提交issue,提交PR等操作。
```python
GITEE_ACCESS_TOKEN = ""
```
- 定时扫描数据库中是否有新增或修改的跟踪项,对扫描到的跟踪项执行获取上游补丁任务,在这里配置扫描的时间间隔,数字单位是秒
```python
SCAN_DB_INTERVAL = 3600
```
- 命令行工具运行过程中,POST接口需要进行认证的用户名和密码
```python
USER = "admin"
PASSWORD = ""
```
`USER`默认值为`admin`
>`PASSWORD`口令的复杂度要求:
>* 长度大于等于6个字符
>* 至少有一个数字
>* 至少有一个大写字母
>* 至少有一个小写字母
>* 至少有一个特殊字符 (~!@#%^*_+=-)
需要将口令的哈希值通过命令工具生成后将其配置到此处,获取方法为执行命令`generate_password <password>`,例如:
[root]# generate_password Test@123
pbkdf2:sha256:150000$w38eLeRm$ebb5069ba3b4dda39a698bd1d9d7f5f848af3bd93b11e0cde2b28e9e34bfbbae
`pbkdf2:sha256:150000$w38eLeRm$ebb5069ba3b4dda39a698bd1d9d7f5f848af3bd93b11e0cde2b28e9e34bfbbae`配置到`PASSWORD = ""`引号中。
### 3.4 启动补丁跟踪服务
可以使用以下两种方式启动服务:
1. 使用 systemd 方式
```shell script
systemctl start patch-tracking
```
2. 直接执行可执行程序
```shell script
/usr/bin/patch-tracking
```
# 四 使用
### 4.1 添加跟踪项
将需要跟踪的软件仓库和分支与其上游开源软件仓库与分支关联起来,有 3 种使用方法。
#### 4.1.1 命令行直接添加
参数含义:
>--user :POST接口需要进行认证的用户名,同settings.conf中的USER参数 \
--password :POST接口需要进行认证的口令,为settings.conf中的PASSWORD哈希值对应的实际的口令字符串 \
--server :启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--version_control :上游仓库版本的控制工具,只支持github \
--repo 需要进行跟踪的仓库名称,格式:组织/仓库 \
--branch 需要进行跟踪的仓库的分支名称 \
--scm_repo 被跟踪的上游仓库的仓库名称,github格式:组织/仓库 \
--scm_branch 被跟踪的上游仓库的仓库的分支 \
--enable 是否自动跟踪该仓库
例如:
```shell script
patch-tracking-cli --server 127.0.0.1:5001 --user admin --password Test@123 --version_control github --repo testPatchTrack/testPatch1 --branch master --scm_repo BJMX/testPatch01 --scm_branch test --enable true
```
#### 4.1.2 指定文件添加
参数含义:
>--server :启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--user :POST接口需要进行认证的用户名,同settings.conf中的USER参数 \
--password :POST接口需要进行认证的口令,为settings.conf中的PASSWORD哈希值对应的实际的口令字符串 \
--file :yaml文件路径
文件内容是仓库、分支、版本管理工具、是否启动监控等信息,将这些写入文件名为xxx.yaml,例如tracking.yaml,文件路径作为`--file`的入参调用命令。
例如:
```shell script
patch-tracking-cli --server 127.0.0.1:5001 --user admin --password Test@123 --file tracking.yaml
```
yaml内容格式如下,冒号左边的内容不可修改,右边内容根据实际情况填写。
```shell script
version_control: github
scm_repo: xxx/xxx
scm_branch: master
repo: xxx/xxx
branch: master
enabled: true
```
>version_control :上游仓库版本的控制工具,只支持github \
scm_repo 被跟踪的上游仓库的仓库名称,github格式:组织/仓库 \
scm_branch 被跟踪的上游仓库的仓库的分支 \
repo 需要进行跟踪的仓库名称,格式:组织/仓库 \
branch 需要进行跟踪的仓库的分支名称 \
enable 是否自动跟踪该仓库
#### 4.1.3 指定目录添加
在指定的目录,例如`test_yaml`下放入多个`xxx.yaml`文件,执行命令,记录指定目录下所有yaml文件的跟踪项。yaml文件都放在不会读取子目录内文件。,
参数含义:
>--user :POST接口需要进行认证的用户名,同settings.conf中的USER参数 \
--password :POST接口需要进行认证的口令,为settings.conf中的PASSWORD哈希值对应的实际的口令字符串 \
--server :启动Patch Tracking服务的URL,例如:127.0.0.1:5001 \
--dir :存放yaml文件目录的路径
```shell script
patch-tracking-cli --server 127.0.0.1:5001 --user admin --password Test@123 --dir /home/Work/test_yaml/
```
### 4.2 查询跟踪项
```shell script
curl -k https://<LISTEN>/tracking
```
例如:
```shell script
curl -k https://127.0.0.1:5001/tracking
```
### 4.3 查询生成的 Issue 列表
```shell script
curl -k https://<LISTEN>/issue
```
例如:
```shell script
curl -k https://127.0.0.1:5001/issue
```
### 4.4 码云查看 issue 及 PR
登录Gitee上进行跟踪的软件项目,在该项目的Issues和Pull Requests页签下,可以查看到名为`[patch tracking] TIME`,例如` [patch tracking] 20200713101548`的条目。
即是刚生成的补丁文件的issue和对应PR。
# 五 常见问题与解决方法
%define name patch-tracking
%define version 1.0.0
%define release 1
Summary: This is a tool for automatically tracking upstream repository code patches
Name: %{name}
Version: %{version}
Release: %{release}
Source0: %{name}-%{version}.tar
License: Mulan PSL v2
Group: Development/Libraries
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-buildroot
Prefix: %{_prefix}
BuildArch: noarch
Vendor: ChenYanpan <chenyanpan@huawei.com>
Url: https://openeuler.org/zh/
BuildRequires: python3-setuptools
# Requires: python3.7 python3-flask python3-sqlalchemy python3-requests
%description
This is a tool for automatically tracking upstream repository code patches
%prep
%setup -n %{name}-%{version}
%build
%py3_build
%install
%py3_install
%post
sed -i "s|\blogging.conf\b|/etc/patch-tracking/logging.conf|" %{python3_sitelib}/patch_tracking/app.py
sed -i "s|\bsqlite:///db.sqlite\b|sqlite:////var/patch-tracking/db.sqlite|" %{python3_sitelib}/patch_tracking/app.py
sed -i "s|\bsettings.conf\b|/etc/patch-tracking/settings.conf|" %{python3_sitelib}/patch_tracking/app.py
chmod +x /usr/bin/patch-tracking-cli
chmod +x /usr/bin/patch-tracking
chmod +x /usr/bin/generate_password
sed -i "s|\bpatch-tracking.log\b|/var/log/patch-tracking.log|" /etc/patch-tracking/logging.conf
%preun
%systemd_preun patch-tracking.service
%clean
rm -rf $RPM_BUILD_ROOT
%files
%{python3_sitelib}/*
/etc/patch-tracking/logging.conf
/etc/patch-tracking/settings.conf
/usr/bin/patch-tracking
/usr/bin/patch-tracking-cli
/var/patch-tracking/db.sqlite
/etc/patch-tracking/self-signed.crt
/etc/patch-tracking/self-signed.key
/usr/bin/generate_password
/usr/lib/systemd/system/patch-tracking.service
"""
api action method
"""
from sqlalchemy import and_
from patch_tracking.database import db
from patch_tracking.database.models import Tracking, Issue
def create_tracking(data):
"""
create tracking
"""
version_control = data.get("version_control")
scm_repo = data.get('scm_repo')
scm_branch = data.get('scm_branch')
scm_commit = data.get('scm_commit')
repo = data.get('repo')
branch = data.get('branch')
enabled = data.get('enabled')
tracking = Tracking(version_control, scm_repo, scm_branch, scm_commit, repo, branch, enabled)
db.session.add(tracking)
db.session.commit()
def update_tracking(data):
"""
update tracking
"""
repo = data.get('repo')
branch = data.get('branch')
tracking = Tracking.query.filter(and_(Tracking.repo == repo, Tracking.branch == branch)).one()
tracking.version_control = data.get("version_control")
tracking.scm_repo = data.get('scm_repo')
tracking.scm_branch = data.get('scm_branch')
tracking.scm_commit = data.get('scm_commit')
tracking.enabled = data.get('enabled')
db.session.commit()
def delete_tracking(id_):
"""
delete tracking
"""
post = Tracking.query.filter(Tracking.id == id_).one()
db.session.delete(post)
db.session.commit()
def create_issue(data):
"""
create issue
"""
issue = data.get('issue')
repo = data.get('repo')
branch = data.get('branch')
issue_ = Issue(issue, repo, branch)
db.session.add(issue_)
db.session.commit()
def update_issue(data):
"""
update issue
"""
issue = data.get('issue')
issue_ = Issue.query.filter(Issue.issue == issue).one()
issue_.issue = data.get('issue')
db.session.add(issue_)
db.session.commit()
def delete_issue(issue):
"""
delete issue
"""
issue_ = Issue.query.filter(Issue.issue == issue).one()
db.session.delete(issue_)
db.session.commit()
'''
Response contain and code ID
'''
import json
class ResponseCode:
"""
Description: response code to web
changeLog:
"""
SUCCESS = "2001"
INPUT_PARAMETERS_ERROR = "4001"
TRACKING_NOT_FOUND = "4002"
ISSUE_NOT_FOUND = "4003"
GITHUB_ADDRESS_ERROR = "5001"
GITEE_ADDRESS_ERROR = "5002"
GITHUB_CONNECTION_ERROR = "5003"
GITEE_CONNECTION_ERROR = "5004"
INSERT_DATA_ERROR = "6004"
DELETE_DB_ERROR = "6001"
CONFIGFILE_PATH_EMPTY = "6002"
DIS_CONNECTION_DB = "6003"
CODE_MSG_MAP = {
SUCCESS: "Successful Operation!",
INPUT_PARAMETERS_ERROR: "Please enter the correct parameters",
TRACKING_NOT_FOUND: "The tracking you are looking for does not exist",
ISSUE_NOT_FOUND: "The issue you are looking for does not exist",
GITHUB_ADDRESS_ERROR: "The Github address is wrong",
GITEE_ADDRESS_ERROR: "The Gitee address is wrong",
GITHUB_CONNECTION_ERROR: "Unable to connect to the github",
GITEE_CONNECTION_ERROR: "Unable to connect to the gitee",
DELETE_DB_ERROR: "Failed to delete database",
CONFIGFILE_PATH_EMPTY: "Initialization profile does not exist or cannot be found",
DIS_CONNECTION_DB: "Unable to connect to the database, check the database configuration"
}
@classmethod
def gen_dict(cls, code, data=None):
"""
generate response dictionary
"""
return json.dumps({"code": code, "msg": cls.CODE_MSG_MAP[code], "data": data})
def __str__(self):
return 'ResponseCode'
"""
module of issue API
"""
import logging
from flask import request
from flask import Blueprint
from patch_tracking.database.models import Issue
from patch_tracking.api.constant import ResponseCode
log = logging.getLogger(__name__)
issue = Blueprint('issue', __name__)
@issue.route('', methods=["GET"])
def get():
"""
Returns list of issue.
"""
if not request.args:
issues = Issue.query.all()
else:
required_params = ['repo', 'branch']
input_params = request.args
data = dict()
for k, param in input_params.items():
if k in required_params:
data[k] = param
else:
return ResponseCode.gen_dict(ResponseCode.INPUT_PARAMETERS_ERROR)
issues = Issue.query.filter_by(**data).all()
resp_data = list()
for item in issues:
resp_data.append(item.to_json())
return ResponseCode.gen_dict(code=ResponseCode.SUCCESS, data=resp_data)
"""
module of issue API
"""
import logging
from flask import request, Blueprint
from patch_tracking.database.models import Tracking
from patch_tracking.api.business import create_tracking, update_tracking
from patch_tracking.api.constant import ResponseCode
from patch_tracking.util.auth import auth
logger = logging.getLogger(__name__)
tracking = Blueprint('tracking', __name__)
@tracking.route('', methods=["GET"])
def get():
"""
Returns list of tracking
"""
if not request.args:
trackings = Tracking.query.all()
else:
required_params = ['repo', 'branch', 'enabled']
input_params = request.args
data = dict()
for k, param in input_params.items():
if k in required_params:
if k == 'enabled':
param = bool(param == 'true')
data[k] = param
required_params.remove(k)
else:
return ResponseCode.gen_dict(ResponseCode.INPUT_PARAMETERS_ERROR)
if 'repo' in required_params and 'branch' not in required_params:
return ResponseCode.gen_dict(ResponseCode.INPUT_PARAMETERS_ERROR)
trackings = Tracking.query.filter_by(**data).all()
resp_data = list()
for item in trackings:
resp_data.append(item.to_json())
return ResponseCode.gen_dict(code=ResponseCode.SUCCESS, data=resp_data)
@tracking.route('', methods=["POST"])
@auth.login_required
def post():
"""
Creates os update a tracking.
"""
required_params = ['version_control', 'scm_repo', 'scm_branch', 'scm_commit', 'repo', 'branch', 'enabled']
input_params = request.json
data = dict()
for item in input_params:
if item in required_params:
data[item] = input_params[item]
required_params.remove(item)
else:
return ResponseCode.gen_dict(ResponseCode.INPUT_PARAMETERS_ERROR)
if required_params:
if len(required_params) == 1 and required_params[0] == 'scm_commit':
pass
else:
return ResponseCode.gen_dict(ResponseCode.INPUT_PARAMETERS_ERROR)
if data['version_control'] != 'github':
return ResponseCode.gen_dict(ResponseCode.INPUT_PARAMETERS_ERROR)
track = Tracking.query.filter_by(repo=data['repo'], branch=data['branch']).first()
if track:
try:
update_tracking(data)
logger.info('Update tracking. Data: %s.', data)
except Exception as exception:
return ResponseCode.gen_dict(code=ResponseCode.INSERT_DATA_ERROR, data=exception)
else:
try:
create_tracking(data)
logger.info('Create tracking. Data: %s.', data)
except Exception as exception:
return ResponseCode.gen_dict(code=ResponseCode.INSERT_DATA_ERROR, data=exception)
return ResponseCode.gen_dict(code=ResponseCode.SUCCESS, data=request.json)
"""
flask app
"""
import logging.config
import sys
from flask import Flask
from patch_tracking.api.issue import issue
from patch_tracking.api.tracking import tracking
from patch_tracking.database import db
from patch_tracking.task import task
logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
app = Flask(__name__)
logger = logging.getLogger(__name__)
app.config.from_pyfile("settings.conf")
def check_settings_conf():
"""
check settings.conf
"""
flag = 0
required_settings = ['LISTEN', 'GITHUB_ACCESS_TOKEN', 'GITEE_ACCESS_TOKEN', 'SCAN_DB_INTERVAL', 'USER', 'PASSWORD']
for setting in required_settings:
if setting in app.config:
if not app.config[setting]:
logger.error('%s is empty in settings.conf.', setting)
flag = 1
else:
logger.error('%s not configured in settings.conf.', setting)
flag = 1
if flag:
sys.exit()
check_settings_conf()
GITHUB_ACCESS_TOKEN = app.config['GITHUB_ACCESS_TOKEN']
GITEE_ACCESS_TOKEN = app.config['GITEE_ACCESS_TOKEN']
SCAN_DB_INTERVAL = app.config['SCAN_DB_INTERVAL']
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite?check_same_thread=False'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SWAGGER_UI_DOC_EXPANSION'] = 'list'
app.config['ERROR_404_HELP'] = False
app.config['RESTX_MASK_SWAGGER'] = False
app.config['SCHEDULER_EXECUTORS'] = {'default': {'type': 'threadpool', 'max_workers': 100}}
app.register_blueprint(issue, url_prefix="/issue")
app.register_blueprint(tracking, url_prefix="/tracking")
db.init_app(app)
task.job_init(app)
if __name__ == "__main__":
app.run(ssl_context="adhoc")
#!/usr/bin/env python3
"""
command line to generate password hash by pbkdf2
"""
import sys
import re
from werkzeug.security import generate_password_hash
def password_strength_check(password):
"""
Verify the strength of 'password'
Returns a dict indicating the wrong criteria
"""
# calculating the length
length_error = len(password) < 6
# searching for digits
digit_error = re.search(r"\d", password) is None
# searching for uppercase
uppercase_error = re.search(r"[A-Z]", password) is None
# searching for lowercase
lowercase_error = re.search(r"[a-z]", password) is None
# searching for symbols
symbol_error = re.search(r"[~!@#%^*_+=-]", password) is None
# overall result
password_ok = not (length_error or digit_error or uppercase_error or lowercase_error or symbol_error)
return {
'ok': password_ok,
'error': {
'length': length_error,
'digit': digit_error,
'uppercase': uppercase_error,
'lowercase': lowercase_error,
'symbol': symbol_error,
}
}
ret = password_strength_check(sys.argv[1])
if not ret['ok']:
print("Password strength is not satisfied.")
for item in ret['error']:
if ret['error'][item]:
print("{} not satisfied.".format(item))
print(
"""
password strength require:
6 characters or more
at least 1 digit [0-9]
at least 1 alphabet [a-z]
at least 1 alphabet of Upper Case [A-Z]
at least 1 special character from [~!@#%^*_+=-]
"""
)
else:
print(generate_password_hash(sys.argv[1]))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import sys
from patch_tracking.cli.patch_tracking_cli import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
sys.exit(main())
#!/usr/bin/env python3
"""
command line of creating tracking item
"""
import argparse
import sys
import os
import requests
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
USAGE = """
patch-tracking-cli --help
patch-tracking-cli --server SERVER --version_control github --scm_repo SCM_REPO --scm_branch SCM_BRANCH \\
--repo REPO --branch BRANCH --enabled True --user USER --password PWD
patch-tracking-cli --server SERVER --file FILE --user USER --password PWD
patch-tracking-cli --server SERVER --dir DIR --user USER --password PWD
"""
parser = argparse.ArgumentParser(
usage=USAGE, allow_abbrev=False, description="command line to create/update patch tracking item"
)
parser.add_argument("--server", help="patch tracking daemon server")
parser.add_argument("--version_control", choices=['github'], help="upstream version control system")
parser.add_argument("--scm_repo", help="upstream scm repository")
parser.add_argument("--scm_branch", help="upstream scm branch")
parser.add_argument("--repo", help="source package repository")
parser.add_argument("--branch", help="source package branch")
parser.add_argument("--enabled", choices=["True", "true", "False", "false"], help="whether tracing is enabled")
parser.add_argument('--file', help='import patch tracking from file')
parser.add_argument('--dir', help='import patch tracking from files in directory')
parser.add_argument('--user', help='Authentication username')
parser.add_argument('--password', help='Authentication password')
args = parser.parse_args()
style1 = args.version_control or args.repo or args.branch or args.scm_repo or args.scm_branch or args.enabled
style2 = bool(args.file)
style3 = bool(args.dir)
if str([style1, style2, style3]).count('True') >= 2:
print("mix different usage style")
parser.print_usage()
sys.exit(-1)
def single_input_track(params, file_path=None):
"""
load tracking from ommand lcine arguments
"""
if param_check(params, file_path) == 'error':
return 'error', 'Check input params error.'
if param_check_url(params, file_path) == 'error':
return 'error', 'Check input params error.'
repo = params['repo']
branch = params['branch']
scm_repo = params['scm_repo']
scm_branch = params['scm_branch']
version_control = params['version_control'].lower()
enabled = params['enabled'].lower()
server = params['server']
user = params['user']
password = params['password']
enabled = bool(enabled == 'true')
url = '/'.join(['https:/', server, 'tracking'])
data = {
'version_control': version_control,
'scm_repo': scm_repo,
'scm_branch': scm_branch,
'repo': repo,
'branch': branch,
'enabled': enabled
}
try:
ret = requests.post(url, json=data, verify=False, auth=HTTPBasicAuth(user, password))
except Exception as exception:
return 'error', 'Connect server error: ' + str(exception)
if ret.status_code == 401 or ret.status_code == 403:
return 'error', 'Authenticate Error. Please make sure user and password are correct.'
if ret.status_code == 200 and ret.json()['code'] == '2001':
return 'success', 'created'
else:
print("status_code: {}, return text: {}".format(ret.status_code, ret.text))
return 'error', 'Unexpected Error.'
def file_input_track(file_path):
"""
load tracking from file
"""
if os.path.exists(file_path) and os.path.isfile(file_path):
if os.path.splitext(file_path)[-1] != ".yaml":
print('Please input yaml file. Error in {}'.format(file_path))
return None
with open(file_path) as file:
content = file.readlines()
params = dict()
for item in content:
if ":" in item:
k = item.split(':')[0]
value = item.split(':')[1].strip(' ').strip('\n')
params.update({k: value})
params.update({'server': args.server, 'user': args.user, 'password': args.password})
ret = single_input_track(params, file_path)
if ret[0] == 'success':
print('Tracking successfully {} for {}'.format(ret[1], file_path))
else:
print('Tracking failed for {}: {}'.format(file_path, ret[1]))
else:
print('yaml path error. Params error in {}'.format(file_path))
def dir_input_track(dir_path):
"""
load tracking from dir
"""
if os.path.exists(dir_path) and os.path.isdir(dir_path):
for root, _, files in os.walk(dir_path):
if not files:
print('error: dir path empty')
return None
for file in files:
if os.path.splitext(file)[-1] == ".yaml":
file_path = os.path.join(root, file)
file_input_track(file_path)
else:
print('Please input yaml file. Error in {}'.format(file))
else:
print('error: dir path error. Params error in {}'.format(dir_path))
def patch_tracking_server_check(url):
"""
check if patch_tracking server start
"""
try:
ret = requests.head(url=url, verify=False)
except Exception as exception:
print(f"Error: Cannot connect to {url}, please make sure patch-tracking service is running.")
return 'error', exception
if ret.status_code == 200 or ret.status_code == 404:
return 'success', ret
print(f"Unexpected Error: {ret.text}")
return 'error', ret.text
def repo_branch_check(url):
"""
check if repo/branch exist
"""
headers = {
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " +
"Ubuntu Chromium/83.0.4103.61 Chrome/83.0.4103.61 Safari/537.36"
}
try:
ret = requests.get(url=url, headers=headers)
except Exception as exception:
return 'error', exception
if ret.status_code == 404:
return 'error', f'{url} not exist.'
if ret.status_code == 200:
return 'success', ret
return 'error', ret.text
def command_default_param_check():
flag = 0
if not args.server:
print("Error: --server not configure.")
flag = 1
if not args.user:
print("Error: --user not configure.")
flag = 1
if not args.password:
print("Error: --password not configure.")
flag = 1
if flag == 1:
return 'error'
else:
return 'success'
def param_check(params, file_path=None):
"""
check if param is valid
"""
flag = 0
required_param = ['version_control', 'scm_repo', 'scm_branch', 'repo', 'branch', 'enabled', 'user', 'password']
for req in required_param:
if req not in params:
if file_path:
print(f'param: --{req} must be configured. Error in {file_path}')
else:
print(f'param: --{req} must be configured.')
flag = 1
for k, value in params.items():
if not value:
if file_path:
print(f'param: --{k} must be configured. Error in {file_path}')
else:
print(f'param: --{k} cannot be empty.')
flag = 1
if flag:
return 'error'
return None
def param_check_url(params, file_path=None):
"""
check url
"""
scm_url = f"https://github.com/{params['scm_repo']}/tree/{params['scm_branch']}"
url = f"https://gitee.com/{params['repo']}/tree/{params['branch']}"
patch_tracking_url = f"https://{params['server']}"
server_ret = patch_tracking_server_check(patch_tracking_url)
if server_ret[0] != 'success':
return 'error'
scm_ret = repo_branch_check(scm_url)
if scm_ret[0] != 'success':
if file_path:
print(
f"scm_repo: {params['scm_repo']} and scm_branch: {params['scm_branch']} check failed. \n"
f"Error in {file_path}. {scm_ret[1]}"
)
else:
print(f"scm_repo: {params['scm_repo']} and scm_branch: {params['scm_branch']} check failed. {scm_ret[1]}")
return 'error'
ret = repo_branch_check(url)
if ret[0] != 'success':
if file_path:
print(f"repo: {params['repo']} and branch: {params['branch']} check failed. {ret[1]}. Error in {file_path}")
else:
print(f"repo: {params['repo']} and branch: {params['branch']} check failed. {ret[1]}.")
return 'error'
return None
def main():
"""
main
"""
if command_default_param_check() == 'error':
return None
if style2:
file_input_track(args.file)
elif style3:
dir_input_track(args.dir)
else:
params = {
'repo': args.repo,
'branch': args.branch,
'scm_repo': args.scm_repo,
'scm_branch': args.scm_branch,
'version_control': args.version_control,
'enabled': args.enabled,
'server': args.server,
'user': args.user,
'password': args.password
}
ret = single_input_track(params)
if ret[0] == 'success':
print('Tracking successfully.')
else:
print(ret[1])
if __name__ == '__main__':
main()
"""
database init
"""
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def reset_database():
"""
reset database
"""
db.drop_all()
db.create_all()
"""
module of database model
"""
from patch_tracking.database import db
class Tracking(db.Model):
"""
database model of tracking
"""
id = db.Column(db.Integer, autoincrement=True)
version_control = db.Column(db.String(80))
scm_repo = db.Column(db.String(80))
scm_branch = db.Column(db.String(80))
scm_commit = db.Column(db.String(80))
repo = db.Column(db.String(80), primary_key=True)
branch = db.Column(db.String(80), primary_key=True)
enabled = db.Column(db.Boolean)
def __init__(self, version_control, scm_repo, scm_branch, scm_commit, repo, branch, enabled=True):
self.version_control = version_control
self.scm_repo = scm_repo
self.scm_branch = scm_branch
self.scm_commit = scm_commit
self.repo = repo
self.branch = branch
self.enabled = enabled
def __repr__(self):
return '<Tracking %r %r>' % (self.repo, self.branch)
def to_json(self):
"""
convert to json
"""
return {
'version_control': self.version_control,
'scm_repo': self.scm_repo,
'scm_branch': self.scm_branch,
'scm_commit': self.scm_commit,
'repo': self.repo,
'branch': self.branch,
'enabled': self.enabled
}
class Issue(db.Model):
"""
database model of issue
"""
issue = db.Column(db.String(80), primary_key=True)
repo = db.Column(db.String(80))
branch = db.Column(db.String(80))
def __init__(self, issue, repo, branch):
self.issue = issue
self.repo = repo
self.branch = branch
def __repr__(self):
return '<Issue %r %r %r>' % (self.issue, self.repo, self.branch)
def to_json(self):
"""
convert to json
"""
return {'issue': self.issue, 'repo': self.repo, 'branch': self.branch}
"""
reset database
"""
from patch_tracking.app import app
from patch_tracking.database import reset_database
def reset():
"""
reset database
"""
with app.app_context():
reset_database()
if __name__ == "__main__":
reset()
[loggers]
keys=root
[handlers]
keys=console,logfile
[formatters]
keys=simple
[logger_root]
level=DEBUG
handlers=console,logfile
[handler_console]
class=StreamHandler
level=DEBUG
formatter=simple
args=(sys.stdout,)
[formatter_simple]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=
[handler_logfile]
class=handlers.RotatingFileHandler
level=DEBUG
args=('patch-tracking.log', 'a', 1024*1024*100, 10)
formatter=simple
#!/bin/bash
app_file=`rpm -ql patch-tracking | grep app.py`
app_path=${app_file%/app.py}
chdir_path=${app_file%/patch_tracking/app.py}
settings_file='/etc/patch-tracking/settings.conf'
server=`grep 'LISTEN' $settings_file | awk -F'=' '{print $2}' | sed -e 's/^[ ]"//g' | sed -e 's/"$//g'`
/usr/local/bin/uwsgi --master --https ${server},/etc/patch-tracking/self-signed.crt,/etc/patch-tracking/self-signed.key --wsgi-file ${app_file} --callable app --chdir ${chdir_path} --threads 100 --lazy
[Unit]
Description=uWSGI Emperor
After=syslog.target
[Service]
ExecStart=/usr/bin/patch-tracking
RuntimeDirectory=patch-tracking
Restart=always
RestartSec=10
KillSignal=SIGQUIT
Type=notify
StandardError=syslog
NotifyAccess=all
[Install]
WantedBy=multi-user.target
-----BEGIN CERTIFICATE-----
MIIFDTCCAvWgAwIBAgIUUYmYR5HWybac4V6yIDD4I9fiKCwwDQYJKoZIhvcNAQEL
BQAwFjEUMBIGA1UEAwwLc2VsZi1zaWduZWQwHhcNMjAwNzA0MDM0MDQzWhcNMzAw
NzAyMDM0MDQzWjAWMRQwEgYDVQQDDAtzZWxmLXNpZ25lZDCCAiIwDQYJKoZIhvcN
AQEBBQADggIPADCCAgoCggIBALN1yRKuGsXiYL40CNnbPuGMZrcSJvH2T14TuvlK
6GyFd6KQcBMgDTcwzferw/dQS5IeGD+jpfP2qNGeH7jrti9BZj12vZWSAb4Cx/Re
5RbK3B6M7s45MCmWfMjs1J8hc42mZKr8VZ+x0xUAzQbyLd+MIBS/T7nigqaAzHBg
U9P3mB+cUDYb0YbOCP8uXif/TjRtlCYpDrX37EGOgBZFt6SFaiAOzW/JLm9szV9+
S7zCn/lWaZb4rMd9ieoKAseCZqDz09J6sq8ncws4g/g+k3WezzUd/PlrWf+Bo+HK
q2q7rsnCnfQa51JNji8wrsM34Mm/giVtx1MpKCOr2mckbP03ouqonqb7CwqRBbsl
KIMwuYBfzZ0saurPI4AYvanTxzZDQg+PGWUIbYPGq6PFwxPYFJzRteuSempXWpny
pCNPNYow/BgZKUoiZHPRYY4vh2GfDOJQrV/islgiIg27AuCKHzSSfU1F/wNT18zh
aIEJTmRAnFIe5THqlFLe3Q4HMJ7om21KA/SuERB7VWKod2lxJ2UGb/Peg3od2AjS
w6dU4iYGtXL2fbsrtrphMK9cg964LkJOevCr0bjZXPkUst9tvBcqwDVhUJodiwqQ
jULsios7DHnZK4IteZHcaqzh7PFUpSZQFKRR6mxKSd7G52ta1+QCXNTE/sUZA1Kf
FfcNAgMBAAGjUzBRMB0GA1UdDgQWBBSeDa5DTb3b9EPGHke3Aw08o3I4LjAfBgNV
HSMEGDAWgBSeDa5DTb3b9EPGHke3Aw08o3I4LjAPBgNVHRMBAf8EBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4ICAQCs0/SZEa1schHjvldJq3gd7MsZHBMAPZkbvVO7NcjF
uZ8ZnNYHQFhQNA1h40EzOnyfA2Xb0jFJE2TEFzjYVjRi7VUDM/EIh5i+ebmfS92b
mGQsGmL0AKCszwpQriuHpc9KiCQViUSnO2gWAO5TcfHbXzKkXQL6Yqk6QA3kd4lO
2v8gEyaAG/Og/rafqcOciyNqcmLCtfewfn6lxy+sEducPj5vbStqFq3is/PtDRoV
Mzef3xFt+ndGhSsegqVCAa4eLgdqGum0NA5zOqzjb+5MLVRAnF5XPITV/kPoXHWp
iQOLxtjm+bGPewEhEZMu1fOSjSHNosIFw8RBOaoPfamBI+LGCda1RZgxnOg3L1rZ
zV4DEzok9d8a7appqblI1WbhTBeTjema/82HAZxoR2W0EAG4cyVlw1um02Jw0Kqp
i9NuLscWNzWRnWpWTATlHMqA9q/Xh8F8eLKOsf3WHiY1PD2nKLZddIzqVUiLMQJV
tYB697J1tdetggt+IHHkb1xoqHj3RAwyrTODkgw5eHutOeFbiJNoGbMblhcN+z9y
EINRiPnbLYbB8FPfba9wQSHoqUORzhhOM50sUrUJx/QukqSYQ86p2tsT5tQ6Ic1i
yrhezqdRmOW2aX+2P23Y+yzBvkP5PysWcyiHjzRUHNyC9MNC7XRUIEQ/Fo+QNODb
oQ==
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCzdckSrhrF4mC+
NAjZ2z7hjGa3Eibx9k9eE7r5SuhshXeikHATIA03MM33q8P3UEuSHhg/o6Xz9qjR
nh+467YvQWY9dr2VkgG+Asf0XuUWytwejO7OOTAplnzI7NSfIXONpmSq/FWfsdMV
AM0G8i3fjCAUv0+54oKmgMxwYFPT95gfnFA2G9GGzgj/Ll4n/040bZQmKQ619+xB
joAWRbekhWogDs1vyS5vbM1ffku8wp/5VmmW+KzHfYnqCgLHgmag89PSerKvJ3ML
OIP4PpN1ns81Hfz5a1n/gaPhyqtqu67Jwp30GudSTY4vMK7DN+DJv4IlbcdTKSgj
q9pnJGz9N6LqqJ6m+wsKkQW7JSiDMLmAX82dLGrqzyOAGL2p08c2Q0IPjxllCG2D
xqujxcMT2BSc0bXrknpqV1qZ8qQjTzWKMPwYGSlKImRz0WGOL4dhnwziUK1f4rJY
IiINuwLgih80kn1NRf8DU9fM4WiBCU5kQJxSHuUx6pRS3t0OBzCe6JttSgP0rhEQ
e1ViqHdpcSdlBm/z3oN6HdgI0sOnVOImBrVy9n27K7a6YTCvXIPeuC5CTnrwq9G4
2Vz5FLLfbbwXKsA1YVCaHYsKkI1C7IqLOwx52SuCLXmR3Gqs4ezxVKUmUBSkUeps
SknexudrWtfkAlzUxP7FGQNSnxX3DQIDAQABAoICAQCrsFUVDQJKLSDW91s8abfH
+xXNsY0W0cnuvDuWAqdII4xoN30xnull0shKWcca1XPnL+mNANhlBadPG9NHjCJ5
JT1WMkKAVPZbvbdkwGC1pJBgnf5dx3KfZvytEX79Wvh9HSKUPuL/7BWAs4pzScC/
bQTINJtmwCC0gOaV4GJymR6tp1NJ4OVc7cLHt6mW5HcCS49/zqnnR3B864L5S+u8
d3MnhmHev38wVMxKvr5gsWZxGc3dBL3wANev07IDA2uCMqOFa6OFVN2Ib6I6Hkvf
LHcaXz1FtgGdI38RJl9GtpYrKokJH7ANGmucFBwuYkgpW5F8k4Etu2NOdTx2ju/A
2x/3WWJwy3iZowj6fP72147znhsmACm9klhM7UPaV0EQqwVmJDKAmSqPkK09LJv+
O0ehnTpVxO9U6W4a+Wwx87PrjxpI8eTZNiktdei5Qxl+2R2XVISk6PwTZsX69atI
/ZocKWAic0/5G0h791X1981hG3TFRkbQjlbORPHm4ZUCx5//bbtnfppiuKlsiTS4
VALC2xXvTynY9p+tJC69Zy5epTD9b8OMzKLOOi9qPEG0cAc2Zs/uPZEc9RfPq7Ml
1NbDpyLJ3TRp40BE28Y8bjurM3NJ0l0B2us8YQWZj+SfjyO4/RJ28uJDnX0FEbxw
aH+gbat2vUXvH7BlSd3tNQKCAQEA4OCdZOAyf3gZJP7ytcLBotkKJJcO+7m9K1n/
G6o4+dDtcl6/TfomGPib0KnHNAbyvrTAYO9oVRXrm1DStc0mv2gFOQ3KKPrSa5gZ
TrP3Xi9yEP/Dbqe9Evt0GMeSOe98YAvqmlh4CUl8LltwMnNEKTNbHO9GAF0CMgIj
XrFmmtROoZT1uovhpiWQX7BmbpUkkyIB0jz4EYgap0ur0kGp5NcB2c5YdwR0L9ah
rU4JFq85r/qja0DsXkdyS+i8x+iho6Mg8ynJIcuMoho6d781/WlovvWt2160m8gR
lglQa6frccT6u+uKpIk7UJxbxL69uHJA0irOqRzfLDiB7+m5EwKCAQEAzEwKNUVF
VjA9fFTcKg/tkRpsYw9f4SocEf74OefGVjTBMMweB/MC7tboSFVmf2ntSfAu7hEL
MtI+HSXv49JQaBNkBc0svyySm9YsujjzCosRP9f7j3fLfvPmxdPF6xXz4/c7RcoA
1WY426JYXUYVgx4yQX6e4vb4m6dckNUhwZLAMJs+P5szD+Hb8EQejrceIP1zYaGt
GsVFy6CSLfbcAJc+/ozI9RWob6ia4YjlMXHIw3IC3ztqxI/trEgZarPq3SA9YVFw
yMwWm+uYrZwDNNvGZ4iB3KSF+E2IHPyo5uLbkBViKT/a0ngAQP4xkeYZq7jygN8w
vIuzR+5L6YazXwKCAQBfR8xoiXXb/I7q1fsQeEyDK2LYzghTMAeu7prgpecuMg/p
faug5nRt8ChU6Rq2OJtxojRA3i9encMOM9iTnzDjuLc9zVHyuxOc8v0GE8qj5YZ3
HWc442mBOXmfZi/WzFnueB4W95UkmjY7jhKjzaL7sf7Q67DFRqM/fRhvbssCnyIR
5IOZXttlAlWBtcQw5pBwpuAOrDaPdxOT/sP2ekv54f+uwXdKNmDkRBSM0ZuYOPZA
Ufob345HBA6xixMxwKd8Jgo3/WRzJUOZC3PqeOHvVVJEVuQrJp1vw/1vjNK+So+/
zK5QISTFQkAXj6hyXD8Wf5JneivGC62jlu23MVhxAoIBAAnYKC3E9sBedrgFBs7o
EZSKZ2qmlQIum0eqt59iscX5qM2HKHNNnHiR1oOVyeid3BdSAZDrNVTvmJqi91pN
Ch7ZwFofNubHaRElUuZuVBfP97bR24dgSGgHrLkfqUvYtPXpNev4/e1KjbbXrdZg
yCyXSeiqB1H8gKJPgEBiZMwFHEm7UVaTTfSX95cuUSKjZEpGrEaqGcNOejyDskeQ
u60znI97jTtyHbmzsDLp+9FUIE56sfS70jtCjDtfBgqEPO8G3K5R1FN4siY1Rhgn
imgDpx3aEBfnvaTnZ4WuDx2BFP9uaFqAfzThH3ICTbUwF1CVCup21sxfFvaCXxoT
qZUCggEAJ0Z4PJigFtKkEyskZVlHoAPokc3PiKUq3CjECL2L6LvJ7zAj3e0PyJKX
4XKkR/cslB0enWDBeLls2yMHbol2h8nxguzS9PVQyHUdK6NqY1wKiMuzBnpTr8iP
QaJ9vpT5lXkVE8FrNsj5wlwEwxZoaAU4VUGErd8Yx0iDV3HwKi2jkY5/pL2/ZD4L
TxqvoDiTri7RFzIFWzqLawHMYZFF/FSaON9a0uRz7CTZmom/XYuHRadLPbzHPDFr
2duRr5E74jYYtTUbOKXPsXH+HiUtaRzwyiDT81N3vb+eJhbNRkp6KOdFWopXdSmc
HWHZfW1YKIWIprRdHko8qpGgYcCzSQ==
-----END PRIVATE KEY-----
# server settings
LISTEN = "127.0.0.1:5001"
# GitHub API settings
GITHUB_ACCESS_TOKEN = ""
# Gitee API settings
GITEE_ACCESS_TOKEN = ""
# Time interval
SCAN_DB_INTERVAL = 3600
# username
USER = "admin"
# password
PASSWORD = ""
"""
apscheduler init
"""
from flask_apscheduler import APScheduler
scheduler = APScheduler()
"""
load job/task of tracking
"""
import datetime
import logging
from patch_tracking.task import scheduler
from patch_tracking.task import task_apscheduler
from patch_tracking.util.github_api import GitHubApi
from patch_tracking.api.business import update_tracking
logger = logging.getLogger(__name__)
def job_init(app):
"""
jobs init
"""
scan_db_interval = app.config['SCAN_DB_INTERVAL']
with app.app_context():
new_track = task_apscheduler.get_track_from_db()
scheduler.init_app(app)
scheduler.add_job(
id='Add Tracking job - Update DB',
func=load,
trigger='interval',
args=(new_track, ),
seconds=int(scan_db_interval),
next_run_time=datetime.datetime.now()
)
scheduler.add_job(
id=str("Check empty commitID"),
func=get_commit_id_empty,
trigger='interval',
args=(new_track, app),
seconds=600,
next_run_time=datetime.datetime.now(),
misfire_grace_time=300,
)
scheduler.start()
def add_job(job_id, func, args):
"""
add job
"""
logger.info("Add Tracking job - %s", job_id)
scheduler.add_job(
id=job_id, func=func, args=args, trigger='date', run_date=datetime.datetime.now(), misfire_grace_time=600
)
def get_commit_id_empty(new_track, flask_app):
"""
check commit ID for empty tracking
"""
with flask_app.app_context():
github_api = GitHubApi()
for item in new_track:
if item.scm_commit:
continue
status, result = github_api.get_latest_commit(item.scm_repo, item.scm_branch)
if status == 'success':
commit_id = result['latest_commit']
data = {
'version_control': item.version_control,
'repo': item.repo,
'branch': item.branch,
'enabled': item.enabled,
'scm_commit': commit_id,
'scm_branch': item.scm_branch,
'scm_repo': item.scm_repo
}
update_tracking(data)
else:
logger.error(
'Check empty CommitID: Fail to get latest commit id of scm_repo: %s scm_branch: %s. Return val: %s',
item.scm_repo, item.scm_branch, result
)
def load(all_track):
"""
load trackings to jobs
"""
all_job_id = list()
for item in scheduler.get_jobs():
all_job_id.append(item.id)
for track in all_track:
if track.branch.split('/')[0] != 'patch-tracking':
job_id = str(track.repo + ":" + track.branch)
if job_id not in all_job_id:
add_job(
job_id=job_id, func='patch_tracking.task.task_apscheduler:upload_patch_to_gitee', args=(track, )
)
"""
tracking job
"""
import logging
import base64
import time
from patch_tracking.util.gitee_api import post_create_branch, post_upload_patch, post_create_issue, \
post_create_pull_request, get_path_content, put_upload_spec, post_create_spec
from patch_tracking.util.github_api import GitHubApi
from patch_tracking.database.models import Tracking
from patch_tracking.api.business import update_tracking, create_issue
from patch_tracking.task import scheduler
from patch_tracking.util.spec import Spec
logger = logging.getLogger(__name__)
def get_track_from_db():
"""
query all trackings from database
"""
all_track = Tracking.query.filter_by(enabled=True)
return all_track
def upload_patch_to_gitee(track):
"""
upload a patch file to Gitee
"""
cur_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
with scheduler.app.app_context():
patch = get_scm_patch(track)
if patch:
issue = upload_patch(patch, cur_time)
if issue:
create_issue_db(issue)
else:
logger.debug('No issue need to create.')
else:
logger.debug('No new commit.')
def get_all_commit_info(scm_repo, db_commit, latest_commit):
"""
get all commit information between to commits
"""
commit_list = list()
github_api = GitHubApi()
while db_commit != latest_commit:
status, result = github_api.get_commit_info(scm_repo, latest_commit)
logger.debug('get_commit_info: %s %s', status, result)
if status == 'success':
if 'parent' in result:
ret = github_api.get_patch(scm_repo, latest_commit, latest_commit)
logger.debug('get patch api ret: %s', ret)
if ret['status'] == 'success':
result['patch_content'] = ret['api_ret']
# inverted insert commit_list
commit_list.insert(0, result)
else:
logger.error('Get scm: %s commit: %s patch failed. Result: %s', scm_repo, latest_commit, result)
latest_commit = result['parent']
else:
logger.info('Successful get scm commit from %s to %s ID/message/time/patch.', db_commit, latest_commit)
break
else:
logger.error('Get scm: %s commit: %s ID/message/time failed. Result: %s', scm_repo, latest_commit, result)
return commit_list
def get_scm_patch(track):
"""
Traverse the Tracking data table to get the patch file of enabled warehouse.
Different warehouse has different acquisition methods
:return:
"""
scm_dict = dict()
github_api = GitHubApi()
scm_dict['scm_repo'] = track.scm_repo
scm_dict['scm_branch'] = track.scm_branch
scm_dict['scm_commit'] = track.scm_commit
scm_dict['enabled'] = track.enabled
scm_dict['repo'] = track.repo
scm_dict['branch'] = track.branch
scm_dict['version_control'] = track.version_control
status, result = github_api.get_latest_commit(scm_dict['scm_repo'], scm_dict['scm_branch'])
logger.debug('get_latest_commit: %s %s', status, result)
if status == 'success':
commit_id = result['latest_commit']
if not scm_dict['scm_commit']:
data = {
'version_control': scm_dict['version_control'],
'repo': scm_dict['repo'],
'branch': scm_dict['branch'],
'enabled': scm_dict['enabled'],
'scm_commit': commit_id,
'scm_branch': scm_dict['scm_branch'],
'scm_repo': scm_dict['scm_repo']
}
update_tracking(data)
else:
if commit_id != scm_dict['scm_commit']:
commit_list = get_all_commit_info(scm_dict['scm_repo'], scm_dict['scm_commit'], commit_id)
scm_dict['commit_list'] = commit_list
return scm_dict
logger.info(
'Latest commit id not change of scm_repo: %s scm_branch: %s. Nothing need to do.', scm_dict['scm_repo'],
scm_dict['scm_branch']
)
else:
logger.error(
'Fail to get latest commit id of scm_repo: %s scm_branch: %s. Return val: %s', scm_dict['scm_repo'],
scm_dict['scm_branch'], result
)
return None
def upload_patch(patch, cur_time):
"""
Create temporary branches, submit files, and create PR and issue
:return:
"""
issue_dict = dict()
if not patch:
return None
issue_dict['repo'] = patch['repo']
issue_dict['branch'] = patch['branch']
new_branch = 'patch-tracking/' + cur_time
result = post_create_branch(patch['repo'], patch['branch'], new_branch)
if result == 'success':
logger.info('Successful create branch: %s', new_branch)
else:
logger.error('Fail to create branch: %s', new_branch)
patch_lst = list()
# 表格格式会导致 Gitee 敏感词,先去掉
issue_table = ""
for latest_commit in patch['commit_list']:
scm_commit_url = '/'.join(['https://github.com', patch['scm_repo'], 'commit', latest_commit['commit_id']])
issue_table += '[{}]({}) | {} | {}'.format(
latest_commit['commit_id'][0:7], scm_commit_url, latest_commit['time'], latest_commit['message']
) + '\n'
patch_file_content = latest_commit['patch_content']
post_data = {
'repo': patch['repo'],
'branch': new_branch,
'latest_commit_id': latest_commit['commit_id'],
'patch_file_content': str(patch_file_content),
'cur_time': cur_time,
'commit_url': scm_commit_url
}
result = post_upload_patch(post_data)
if result == 'success':
logger.info('Successfully upload patch file of commit: %s', latest_commit['commit_id'])
else:
logger.error('Fail to upload patch file of commit: %s', latest_commit['commit_id'])
patch_lst.append(str(latest_commit['commit_id']))
logger.debug(issue_table)
result = post_create_issue(patch['repo'], issue_table, cur_time)
if result[0] == 'success':
issue_num = result[1]
logger.info('Successfully create issue: %s', issue_num)
ret = post_create_pull_request(patch['repo'], patch['branch'], new_branch, issue_num, cur_time)
if ret == 'success':
logger.info('Successfully create PR of issue: %s.', issue_num)
else:
logger.error('Fail to create PR of issue: %s. Result: %s', issue_num, ret)
issue_dict['issue'] = issue_num
upload_spec(patch, patch_lst, cur_time)
data = {
'version_control': patch['version_control'],
'repo': patch['repo'],
'branch': patch['branch'],
'enabled': patch['enabled'],
'scm_commit': patch['commit_list'][-1]['commit_id'],
'scm_branch': patch['scm_branch'],
'scm_repo': patch['scm_repo']
}
update_tracking(data)
else:
logger.error('Fail to create issue: %s. Result: %s', issue_table, result[1])
return issue_dict
def upload_spec(patch, patch_lst, cur_time):
"""
update and upload spec file
"""
new_branch = 'patch-tracking/' + cur_time
_, repo_name = patch['repo'].split('/')
spec_file = repo_name + '.spec'
patch_file_lst = [patch + '.patch' for patch in patch_lst]
log_title = "{} patch-tracking".format(cur_time)
log_content = "append patch file of upstream repository from <{}> to <{}>".format(patch_lst[0], patch_lst[-1])
ret = get_path_content(patch['repo'], patch['branch'], spec_file)
if 'content' in ret:
spec_content = str(base64.b64decode(ret['content']), encoding='utf-8')
spec_sha = ret['sha']
new_spec = modify_spec(log_title, log_content, patch_file_lst, spec_content)
update_spec(patch['repo'], new_branch, cur_time, new_spec, spec_sha)
else:
if 'message' in ret and 'File Not Found' in ret['message']:
spec_content = ''
new_spec = modify_spec(log_title, log_content, patch_file_lst, spec_content)
create_spec(patch['repo'], new_branch, cur_time, new_spec)
else:
logger.error('Fail to update spec: %s. Result: %s', spec_file, ret)
def modify_spec(log_title, log_content, patch_file_lst, spec_content):
"""
modify spec file
"""
spec = Spec(spec_content)
return spec.update(log_title, log_content, patch_file_lst)
def update_spec(repo, branch, cur_time, spec_content, spec_sha):
"""
update spec file
"""
ret = put_upload_spec(repo, branch, cur_time, spec_content, spec_sha)
if ret == 'success':
logger.info('Successfully update spec file.')
else:
logger.error('Fail to update spec file. Result: %s', ret)
def create_spec(repo, branch, cur_time, spec_content):
"""
create new spec file
"""
ret = post_create_spec(repo, branch, spec_content, cur_time)
if ret == 'success':
logger.info('Successfully create spec file.')
else:
logger.error('Fail to create spec file. Result: %s', ret)
def create_issue_db(issue):
"""
create issue into database
"""
issue_num = issue['issue']
tracking = Tracking.query.filter_by(repo=issue['repo'], branch=issue['branch']).first()
tracking_repo = tracking.repo
tracking_branch = tracking.branch
data = {'issue': issue_num, 'repo': tracking_repo, 'branch': tracking_branch}
logger.debug('issue data: %s', data)
create_issue(data)
# pylint: disable=R0801
'''
Automated testing of the Issue interface, GET requests
'''
import unittest
import json
from patch_tracking.app import app
from patch_tracking.api.business import create_issue
from patch_tracking.database import reset_db
from patch_tracking.api.constant import ResponseCode
class TestIssue(unittest.TestCase):
'''
Automated testing of the Issue interface, GET requests
'''
def setUp(self) -> None:
'''
Prepare the environment
:return:
'''
self.client = app.test_client()
reset_db.reset()
def test_none_data(self):
'''
In the absence of data, the GET interface queries all the data
:return:
'''
with app.app_context():
resp = self.client.get("/issue")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_query_inserted_data(self):
'''
The GET interface queries existing data
:return:
'''
with app.app_context():
data_insert = {"issue": "A", "repo": "A", "branch": "A"}
create_issue(data_insert)
resp = self.client.get("/issue?repo=A&branch=A")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert, resp_dict.get("data"), msg="Error in data information return")
def test_find_all_data(self):
'''
The GET interface queries all the data
:return:
'''
with app.app_context():
data_insert_c = {"issue": "C", "repo": "C", "branch": "C"}
data_insert_d = {"issue": "D", "repo": "D", "branch": "D"}
create_issue(data_insert_c)
create_issue(data_insert_d)
resp = self.client.get("/issue")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert_c, resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert_d, resp_dict.get("data"), msg="Error in data information return")
def test_find_nonexistent_data(self):
'''
The GET interface queries data that does not exist
:return:
'''
with app.app_context():
resp = self.client.get("/issue?repo=aa&branch=aa")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_get_error_parameters(self):
'''
The get interface passes in the wrong parameter
:return:
'''
with app.app_context():
data_insert = {"issue": "BB", "repo": "BB", "branch": "BB"}
create_issue(data_insert)
resp = self.client.get("/issue?oper=BB&chcnsrb=BB")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_get_interface_uppercase(self):
'''
The get interface uppercase
:return:
'''
with app.app_context():
data_insert = {"issue": "CCC", "repo": "CCC", "branch": "CCC"}
create_issue(data_insert)
resp = self.client.get("/issue?RrPo=CCC&brANch=CCC")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
if __name__ == '__main__':
unittest.main()
[loggers]
keys=root
[handlers]
keys=console
[formatters]
keys=simple
[logger_root]
level=DEBUG
handlers=console
[handler_console]
class=StreamHandler
level=DEBUG
formatter=simple
args=(sys.stdout,)
[formatter_simple]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=
# -*- coding:utf-8 -*-
'''
Automated testing of the Tracking interface, including POST requests and GET requests
'''
import unittest
import json
from base64 import b64encode
from werkzeug.security import generate_password_hash
from patch_tracking.app import app
from patch_tracking.database import reset_db
from patch_tracking.api.business import create_tracking
from patch_tracking.api.constant import ResponseCode
class TestTracking(unittest.TestCase):
'''
Automated testing of the Tracking interface, including POST requests and GET requests
'''
def setUp(self) -> None:
'''
Prepare the environment
:return:
'''
self.client = app.test_client()
reset_db.reset()
app.config["USER"] = "hello"
app.config["PASSWORD"] = generate_password_hash("world")
credentials = b64encode(b"hello:world").decode('utf-8')
self.auth = {"Authorization": f"Basic {credentials}"}
def test_none_data(self):
'''
In the absence of data, the GET interface queries all the data
:return:
'''
with app.app_context():
resp = self.client.get("/tracking")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_find_nonexistent_data(self):
'''
The GET interface queries data that does not exist
:return:
'''
with app.app_context():
resp = self.client.get("/tracking?repo=aa&branch=aa")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertEqual(resp_dict.get("data"), [], msg="Error in data information return")
def test_insert_data(self):
'''
The POST interface inserts data
:return:
'''
data = {
"version_control": "github",
"scm_repo": "A",
"scm_branch": "A",
"scm_commit": "A",
"repo": "A",
"branch": "A",
"enabled": 0
}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
def test_query_inserted_data(self):
'''
The GET interface queries existing data
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "B",
"scm_branch": "B",
"scm_commit": "B",
"repo": "B",
"branch": "B",
"enabled": False
}
create_tracking(data_insert)
resp = self.client.get("/tracking?repo=B&branch=B")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
self.assertIn(data_insert, resp_dict.get("data"), msg="Error in data information return")
def test_only_input_branch(self):
'''
Get interface queries enter only BRANCH, not REPO
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "C",
"scm_branch": "C",
"scm_commit": "C",
"repo": "C",
"branch": "C",
"enabled": 0
}
create_tracking(data_insert)
resp = self.client.get("/tracking?branch=B")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_fewer_parameters(self):
'''
When the POST interface passes in parameters, fewer parameters must be passed
:return:
'''
data = {"version_control": "github", "scm_commit": "AA", "repo": "AA", "branch": "AA", "enabled": 1}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_error_parameters_value(self):
'''
The post interface passes in the wrong parameter
:return:
'''
data = {"version_control": "github", "scm_commit": "AA", "repo": "AA", "branch": "AA", "enabled": "AA"}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_post_error_parameters(self):
'''
The post interface passes in the wrong parameter
:return:
'''
data = {"version_control": "github", "scm_commit": "AA", "oper": "AA", "hcnarb": "AA", "enabled": "AA"}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_get_error_parameters(self):
'''
The get interface passes in the wrong parameter
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "BB",
"scm_branch": "BB",
"scm_commit": "BB",
"repo": "BB",
"branch": "BB",
"enabled": True
}
create_tracking(data_insert)
resp = self.client.get("/tracking?oper=B&chcnsrb=B")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_update_data(self):
'''
update data
:return:
'''
with app.app_context():
data_old = {
"version_control": "github",
"scm_repo": "str",
"scm_branch": "str",
"scm_commit": "str",
"repo": "string",
"branch": "string",
"enabled": False
}
self.client.post("/tracking", json=data_old, content_type="application/json", headers=self.auth)
data_new = {
"branch": "string",
"enabled": True,
"repo": "string",
"scm_branch": "string",
"scm_commit": "string",
"scm_repo": "string",
"version_control": "github",
}
self.client.post("/tracking", json=data_new, content_type="application/json")
resp = self.client.get("/tracking?repo=string&branch=string")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.SUCCESS, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.SUCCESS),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertIsNotNone(resp_dict.get("data"), msg="Error in data information return")
#self.assertIn(data_new, resp_dict.get("data"), msg="Error in data information return")
def test_get_interface_uppercase(self):
'''
The get interface uppercase
:return:
'''
with app.app_context():
data_insert = {
"version_control": "github",
"scm_repo": "BBB",
"scm_branch": "BBB",
"scm_commit": "BBB",
"repo": "BBB",
"branch": "BBB",
"enabled": False
}
create_tracking(data_insert)
resp = self.client.get("/tracking?rep=BBB&BRAnch=BBB")
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return"
)
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
def test_version_control_error(self):
'''
The POST version control error
:return:
'''
data = {
"version_control": "gitgitgit",
"scm_repo": "A",
"scm_branch": "A",
"scm_commit": "A",
"repo": "A",
"branch": "A",
"enabled": 0
}
resp = self.client.post("/tracking", json=data, content_type="application/json", headers=self.auth)
resp_dict = json.loads(resp.data)
self.assertIn("code", resp_dict, msg="Error in data format return")
self.assertEqual(ResponseCode.INPUT_PARAMETERS_ERROR, resp_dict.get("code"), msg="Error in status code return")
self.assertIn("msg", resp_dict, msg="Error in data format return")
self.assertEqual(
ResponseCode.CODE_MSG_MAP.get(ResponseCode.INPUT_PARAMETERS_ERROR),
resp_dict.get("msg"),
msg="Error in status code return"
)
self.assertIn("data", resp_dict, msg="Error in data format return")
self.assertEqual(resp_dict.get("data"), None, msg="Error in data information return")
if __name__ == '__main__':
unittest.main()
"""
http basic auth
"""
from werkzeug.security import check_password_hash
from flask_httpauth import HTTPBasicAuth
from flask import current_app as app
auth = HTTPBasicAuth()
@auth.verify_password
def verify_password(username, password):
"""
verify password
"""
if username == app.config["USER"] and \
check_password_hash(app.config["PASSWORD"], password):
return username
return None
"""
function of invoking Gitee API
"""
import base64
import logging
import requests
from flask import current_app
log = logging.getLogger(__name__)
ORG_URL = "https://gitee.com/api/v5/orgs"
REPO_URL = "https://gitee.com/api/v5/repos"
def get_path_content(repo, branch, path):
"""
get file content
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
url = '/'.join([REPO_URL, repo, 'contents', path])
param = {'access_token': gitee_token, 'ref': branch}
ret = requests.get(url, params=param).json()
return ret
def post_create_branch(repo, branch, new_branch):
"""
create branch
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
url = '/'.join([REPO_URL, repo, 'branches'])
data = {'access_token': gitee_token, 'refs': branch, 'branch_name': new_branch}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
def post_upload_patch(data):
"""
upload patch
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
patch_file_name = data['latest_commit_id'] + '.patch'
url = '/'.join([REPO_URL, data['repo'], 'contents', patch_file_name])
content = base64.b64encode(data['patch_file_content'].encode("utf-8"))
message = '[patch tracking] ' + data['cur_time'] + ' - ' + data['commit_url'] + '\n'
data = {'access_token': gitee_token, 'content': content, 'message': message, 'branch': data['branch']}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
def post_create_spec(repo, branch, spec_content, cur_time):
"""
create spec
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
spec_file_name = repo + '.spec'
url = '/'.join([REPO_URL, owner, repo, 'contents', spec_file_name])
content = base64.b64encode(spec_content.encode("utf-8"))
message = '[patch tracking] ' + cur_time + ' - ' + 'create spec file' + '\n'
data = {'access_token': gitee_token, 'content': content, 'message': message, 'branch': branch}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
def put_upload_spec(repo, branch, cur_time, spec_content, spec_sha):
"""
upload spec
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
spec_file_name = repo + '.spec'
url = '/'.join([REPO_URL, owner, repo, 'contents', spec_file_name])
content = base64.b64encode(spec_content.encode("utf-8"))
message = '[patch tracking] ' + cur_time + ' - ' + 'update spec file' + '\n'
data = {
'access_token': gitee_token,
'owner': owner,
'repo': repo,
'path': spec_file_name,
'content': content,
'message': message,
'branch': branch,
'sha': spec_sha
}
response = requests.put(url, data=data)
if response.status_code == 200:
return 'success'
return response.json()
def post_create_issue(repo, issue_body, cur_time):
"""
create issue
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
url = '/'.join([REPO_URL, owner, 'issues'])
data = {'access_token': gitee_token, 'repo': repo, 'title': '[patch tracking] ' + cur_time, 'body': issue_body}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success', response.json()['number']
return 'error', response.json()
def post_create_pull_request(repo, branch, patch_branch, issue_num, cur_time):
"""
create pull request
"""
gitee_token = current_app.config['GITEE_ACCESS_TOKEN']
owner, repo = repo.split('/')
url = '/'.join([REPO_URL, owner, repo, 'pulls'])
data = {
'access_token': gitee_token,
'repo': repo,
'title': '[patch tracking] ' + cur_time,
'head': patch_branch,
'base': branch,
'body': '#' + issue_num,
"prune_source_branch": "true"
}
response = requests.post(url, data=data)
if response.status_code == 201:
return 'success'
return response.json()
"""
functionality of invoking GitHub API
"""
import time
import logging
import requests
from requests.exceptions import ConnectionError as requests_connectionError
from flask import current_app
logger = logging.getLogger(__name__)
class GitHubApi:
"""
Encapsulates GitHub functionality
"""
def __init__(self):
github_token = current_app.config['GITHUB_ACCESS_TOKEN']
token = 'token ' + github_token
self.headers = {
'User-Agent': 'Mozilla/5.0',
'Authorization': token,
'Content-Type': 'application/json',
'Connection': 'close',
'method': 'GET',
'Accept': 'application/json'
}
def api_request(self, url):
"""
request GitHub API
"""
logger.debug("Connect url: %s", url)
count = 30
while count > 0:
try:
response = requests.get(url, headers=self.headers)
return response
except requests_connectionError as err:
logger.warning(err)
time.sleep(10)
count -= 1
continue
if count == 0:
logger.error('Fail to connnect to github: %s after retry 30 times.', url)
return 'connect error'
def get_commit_info(self, repo_url, commit_id):
"""
get commit info
"""
res_dict = dict()
api_url = 'https://api.github.com/repos'
url = '/'.join([api_url, repo_url, 'commits', commit_id])
ret = self.api_request(url)
if ret != 'connect error':
if ret.status_code == 200:
res_dict['commit_id'] = commit_id
res_dict['message'] = ret.json()['commit']['message']
res_dict['time'] = ret.json()['commit']['author']['date']
if 'parents' in ret.json() and ret.json()['parents']:
res_dict['parent'] = ret.json()['parents'][0]['sha']
return 'success', res_dict
logger.error('%s failed. Return val: %s', url, ret)
return 'error', ret.json()
return 'error', 'connect error'
def get_latest_commit(self, repo_url, branch):
"""
get latest commit_ID, commit_message, commit_date
:param repo_url:
:param branch:
:return: res_dict
"""
api_url = 'https://api.github.com/repos'
url = '/'.join([api_url, repo_url, 'branches', branch])
ret = self.api_request(url)
res_dict = dict()
if ret != 'connect error':
if ret.status_code == 200:
res_dict['latest_commit'] = ret.json()['commit']['sha']
res_dict['message'] = ret.json()['commit']['commit']['message']
res_dict['time'] = ret.json()['commit']['commit']['committer']['date']
return 'success', res_dict
logger.error('%s failed. Return val: %s', url, ret)
return 'error', ret.json()
return 'error', 'connect error'
def get_patch(self, repo_url, scm_commit, last_commit):
"""
get patch
"""
api_url = 'https://github.com'
if scm_commit != last_commit:
commit = scm_commit + '...' + last_commit + '.diff'
else:
commit = scm_commit + '^...' + scm_commit + '.diff'
ret_dict = dict()
url = '/'.join([api_url, repo_url, 'compare', commit])
ret = self.api_request(url)
if ret != 'connect error':
if ret.status_code == 200:
patch_content = ret.text
ret_dict['status'] = 'success'
ret_dict['api_ret'] = patch_content
else:
logger.error('%s failed. Return val: %s', url, ret)
ret_dict['status'] = 'error'
ret_dict['api_ret'] = ret.text
else:
ret_dict['status'] = 'error'
ret_dict['api_ret'] = 'fail to connect github by api.'
return ret_dict
"""
functionality of modify the spec file
"""
import re
class Spec:
"""
functionality of update spec file
"""
def __init__(self, content):
self._lines = content.splitlines()
self.version = "0.0"
self.release = {"num": 0, "lineno": 0}
self.source_lineno = 0
self.patch = {"threshold": 6000, "max_num": 0, "lineno": 0}
self.changelog_lineno = 0
# 规避空文件异常
if len(self._lines) == 0:
self._lines.append("")
# 查找配置项最后一次出现所在行的行号
for i, line in enumerate(self._lines):
match_find = re.match(r"[ \t]*Version:[ \t]*([\d.]+)", line)
if match_find:
self.version = match_find[1]
continue
match_find = re.match(r"[ \t]*Release:[ \t]*([\d.]+)", line)
if match_find:
self.release["num"] = int(match_find[1])
self.release["lineno"] = i
continue
match_find = re.match(r"[ \t]*%changelog", line)
if match_find:
self.changelog_lineno = i
continue
match_find = re.match(r"[ \t]*Source([\d]*):", line)
if match_find:
self.source_lineno = i
continue
match_find = re.match(r"[ \t]*Patch([\d]+):", line)
if match_find:
num = int(match_find[1])
self.patch["lineno"] = 0
if num > self.patch["max_num"]:
self.patch["max_num"] = num
self.patch["lineno"] = i
continue
if self.patch["lineno"] == 0:
self.patch["lineno"] = self.source_lineno
if self.patch["max_num"] < self.patch["threshold"]:
self.patch["max_num"] = self.patch["threshold"]
else:
self.patch["max_num"] += 1
def update(self, log_title, log_content, patches):
"""
Update items in spec file
"""
self.release["num"] += 1
self._lines[self.release["lineno"]
] = re.sub(r"[\d]+", str(self.release["num"]), self._lines[self.release["lineno"]])
log_title = "* " + log_title + " " + self.version + "-" + str(self.release["num"])
log_content = "- " + log_content
self._lines.insert(self.changelog_lineno + 1, log_title + "\n" + log_content + "\n")
patch_list = []
for patch in patches:
patch_list.append("Patch" + str(self.patch["max_num"]) + ": " + patch)
self.patch["max_num"] += 1
self._lines.insert(self.patch["lineno"] + 1, "\n".join(patch_list))
return self.__str__()
def __str__(self):
return "\n".join(self._lines)
if __name__ == "__main__":
SPEC_CONTENT = """Name: diffutils
Version: 3.7
Release: 3
Source: ftp://ftp.gnu.org/gnu/diffutils/diffutils-%{version}.tar.xz
Patch: diffutils-cmp-s-empty.patch
%changelog
* Mon Nov 11 2019 shenyangyang<shenyangyang4@huawei.com> 3.7-3
- DESC:delete unneeded comments
* Thu Oct 24 2019 shenyangyang<shenyangyang4@huawei.com> 3.7-2
- Type:enhancement
"""
s = Spec(SPEC_CONTENT)
s.update("Mon Nov 11 2019 patch-tracking", "DESC:add patch files", [
"xxx.patch",
"yyy.patch",
])
print(s)
SPEC_CONTENT = """"""
s = Spec(SPEC_CONTENT)
s.update("Mon Nov 11 2019 patch-tracking", "DESC:add patch files", [
"xxx.patch",
"yyy.patch",
])
print(s)
"""
setup about building of pactch tracking
"""
from setuptools import setup, find_packages
setup(
name='patch-tracking',
version='1.0.0',
packages=find_packages(),
url='https://openeuler.org/zh/',
license='Mulan PSL v2',
author='ChenYanpan',
author_email='chenyanpan@huawei.com',
description='This is a tool for automatically tracking upstream repository code patches',
requires=['requests', 'flask', 'flask_restx', 'Flask_SQLAlchemy', 'Flask_APScheduler'],
data_files=[
('/etc/patch-tracking/', ['patch_tracking/settings.conf']),
('/etc/patch-tracking/', ['patch_tracking/logging.conf']),
('/var/patch-tracking/', ['patch_tracking/db.sqlite']),
('/usr/bin/', ['patch_tracking/cli/patch-tracking-cli']),
('/usr/bin/', ['patch_tracking/patch-tracking']),
('/usr/bin/', ['patch_tracking/cli/generate_password']),
('/etc/patch-tracking/', ['patch_tracking/self-signed.crt']),
('/etc/patch-tracking/', ['patch_tracking/self-signed.key']),
('/usr/lib/systemd/system/', ['patch_tracking/patch-tracking.service']),
],
)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册