提交 f869ecea 编写于 作者: M Mars Liu

new pipeline

上级 4f45ddd1
此差异已折叠。
from src.tree import gen_tree
from src.tree import TreeWalker
if __name__ == '__main__':
gen_tree('data')
walker = TreeWalker("data", "java", "Java")
walker.walk()
# -*- coding: UTF-8 -*-
import importlib
def dispatch(config, options, actions, targets):
''' 分发命令行 action '''
action_len = len(actions)
if action_len < 2:
return
index = 1
next = targets
action = actions[index]
print(f"[命令路由中..]: {actions[0]}")
while action_len >= index:
if type(next) == type({}):
if index == action_len:
if next.get('run') != None:
print(f"[命令路由执行]:", '->'.join(actions))
next['run']()
break
action = actions[index]
if next.get(action) != None:
print(f"[命令路由中..]: {action}")
next = next[action]
index += 1
else:
print("[命令路由错误]: 未找到支持的命令行路由:", '->'.join(actions))
index += 1
else:
print(f"[命令路由执行]:", '->'.join(actions))
next()
index += 1
break
def dispatch_runner(config, options, actions, targets):
''' 分发命令行 action '''
action_len = len(actions)
if action_len < 2:
return
def load_and_run(target):
modules = target.split('.')
class_pos = len(modules)-2
path_pos = len(modules)-1
if class_pos >= 0 and modules[class_pos][0].isupper():
constructor = modules[class_pos]
runner = modules[path_pos]
module_path = '.'.join(modules[:class_pos])
importlib.import_module(module_path).__getattribute__(
constructor)(config, options).__getattribute__(runner)()
else:
runner = modules[path_pos]
module_path = '.'.join(modules[:path_pos])
importlib.import_module(module_path).__getattribute__(
runner)(config, options)
index = 1
next = targets
while action_len >= index:
if type(next) == type({}):
if index == action_len:
if next.get('run') != None:
load_and_run(next['run'])
break
action = actions[index]
if next.get(action) != None:
next = next[action]
index += 1
else:
load_and_run(next)
index += 1
break
# -*- coding: UTF-8 -*-
import os
import logging
import platform
from logging.handlers import RotatingFileHandler
def is_osx():
p = platform.platform()
return p.find('macOS') >= 0 or p.find('Darwin') >= 0
def get_root_log_dir(config, options):
action = 'default'
if options.action:
action = options.action
if is_osx():
return '/tmp/csdn/ai/{}'.format(action)
else:
is_dev = (options.cluster is None) or (options.cluster == 'dev')
if is_dev:
return '../log/csdn/ai/{}'.format(action)
else:
return '/var/csdn/csdn/ai/{}'.format(action)
class TruncatedFileHandler(RotatingFileHandler):
'''
日志文件按固定大小自动分割
'''
def __init__(self, filename, mode='a', maxBytes=0, encoding=None, delay=0):
super(TruncatedFileHandler, self).__init__(
filename, mode, maxBytes, 0, encoding, delay)
def doRollover(self):
"""Truncate the file"""
if self.stream:
self.stream.close()
dfn = self.baseFilename + ".1"
if os.path.exists(dfn):
os.remove(dfn)
os.rename(self.baseFilename, dfn)
os.remove(dfn)
self.mode = 'w'
self.stream = self._open()
def init_log(config, options):
# 创建日志目录
root_log_dir = get_root_log_dir(config, options)
os.makedirs(root_log_dir, exist_ok=True)
print('root_log_dir:', root_log_dir)
# 文件日志控制器
log_filename = root_log_dir+'/app.log'
file_handler = TruncatedFileHandler(log_filename, "w", 10*1024)
# 控制台日志控制器
console_handler = logging.StreamHandler()
# 日志配置
logging.basicConfig(
# 日志格式
format="[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d at %(funcName)s]: %(message)s",
# 日期格式
datefmt='%Y-%m-%d %H:%M:%S',
# 日志级别
level=logging.INFO,
# 输出目标,日志文件+控制台
handlers=[
file_handler,
console_handler
]
)
# -*- coding: UTF-8 -*-
import os
import json
from lib.apollo import ApolloClient
def load_apollo_config(options):
'''
阿波罗配置加载
===
* 如果是 `pro` 环境,则使用线上配置
* 否则,使用内网配置
'''
config_url = None
if options.cluster == 'pro':
config_url = 'http://pro.config.csdn.net:8080'
elif options.cluster == 'fat':
config_url = 'http://fat.config.csdn.net:8080'
elif options.cluster == 'uat':
config_url = 'http://uat.config.csdn.net:8080'
else:
config_url = 'http://dev.config.csdn.net:8080'
client = ApolloClient(
app_id="949",
cluster="default",
config_url=config_url,
start_hot_update=False
)
config = client.get_value("csdn-ai", namespace="application")
return json.loads(config)
def load_config(options, args):
'''
配置加载
===
* 如果本地 config/config 目录下存在配置,则使用本地配置文件
* 如果本地 config/config 目录下不存在配置,
* 默认使用 阿波罗配置中心 cluster=dev 配置
* 如果指定 --cluster,则使用指定 cluster 的阿波罗配置中心的配置
'''
profile_path = "config/config/{}.json".format(options.profile)
config = None
if options.cluster:
config = load_apollo_config(options)
else:
if os.path.exists(profile_path):
with open(profile_path, "r") as f:
config = json.loads(f.read())
else:
# try:
# options.cluster = 'dev'
# config = load_apollo_config(options)
# except:
config = {}
return config
# -*- coding: UTF-8 -*-
import hashlib
import sys
import socket
import logging
import os
import json
import os
import threading
import inspect
import ctypes
import time
import urllib.request
from urllib.error import HTTPError
from urllib import parse
import yaml
# 定义常量
CONFIGURATIONS = "configurations"
NOTIFICATION_ID = "notificationId"
NAMESPACE_NAME = "namespaceName"
# 对时间戳,uri,秘钥进行加签
def signature(timestamp, uri, secret):
import hmac
import base64
string_to_sign = '' + timestamp + '\n' + uri
hmac_code = hmac.new(
secret.encode(), string_to_sign.encode(), hashlib.sha1).digest()
return base64.b64encode(hmac_code).decode()
def url_encode_wrapper(params):
return url_encode(params)
def no_key_cache_key(namespace, key):
return "{}{}{}".format(namespace, len(namespace), key)
# 返回是否获取到的值,不存在则返回None
def get_value_from_dict(namespace_cache, key):
if namespace_cache:
# print(namespace_cache)
kv_data = namespace_cache.get(CONFIGURATIONS)
if kv_data is None:
return None
if key in kv_data:
return kv_data[key]
return None
def init_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 53))
ip = s.getsockname()[0]
return ip
finally:
s.close()
return ""
def http_request(url, timeout, headers={}):
try:
request = urllib.request.Request(url, headers=headers)
res = urllib.request.urlopen(request, timeout=timeout)
body = res.read().decode("utf-8")
return res.code, body
except HTTPError as e:
if e.code == 304:
logging.getLogger(__name__).warning(
"http_request error,code is 304, maybe you should check secret")
return 304, None
logging.getLogger(__name__).warning(
"http_request error,code is %d, msg is %s", e.code, e.msg)
raise e
def url_encode(params):
return parse.urlencode(params)
def makedirs_wrapper(path):
os.makedirs(path, exist_ok=True)
class ApolloClient(object):
def __init__(self, config_url, app_id, cluster='default', secret='', start_hot_update=True,
change_listener=None, config_format='json', refresh=False):
# 核心路由参数
self.config_url = config_url
self.cluster = cluster
self.app_id = app_id
# 非核心参数
self.ip = init_ip()
self.secret = secret
self.config_format = config_format
# 检查参数变量
# 私有控制变量
self._cycle_time = 2
self._stopping = False
self._cache = {}
self._no_key = {}
self._hash = {}
self._pull_timeout = 75
self._cache_file_path = os.path.expanduser('~') + '/data/apollo/cache/'
self._long_poll_thread = None
self._change_listener = change_listener # "add" "delete" "update"
self.refresh = refresh
if self.refresh:
try:
os.remove(self._cache_file_path)
except:
pass
# 私有启动方法
self._path_checker()
# if start_hot_update:
# self._start_hot_update()
# 启动心跳线程
# heartbeat = threading.Thread(target=self._heartBeat)
# heartbeat.setDaemon(True)
# heartbeat.start()
def get_json_from_net(self, namespace='application'):
url = '{}/configs/{}/{}/{}?releaseKey={}&ip={}'.format(
self.config_url,
self.app_id,
self.cluster,
namespace,
"",
self.ip)
try:
print(url)
code, body = http_request(
url, timeout=3, headers=self._signHeaders(url))
# print(code, body)
if code == 200:
data = json.loads(body)
if self.config_format == 'yaml':
data = yaml.load(data["configurations"]
['content'], Loader=yaml.FullLoader)
else:
data = data["configurations"]
# print(data)
return_data = {CONFIGURATIONS: data}
# print('return_data:', return_data)
return return_data
else:
print('http_request error code', code)
return None
except Exception as e:
logging.getLogger(__name__).error(str(e))
return None
def get_value(self, key, default_val=None, namespace='application'):
try:
# 读取内存配置
# print('读取内存配置')
namespace_cache = self._cache.get(namespace)
val = get_value_from_dict(namespace_cache, key)
if val is not None:
# print('get value from cache')
return val
no_key = no_key_cache_key(namespace, key)
if no_key in self._no_key:
# print('get value from no_key_cache_key')
return default_val
# 读取网络配置
# print('读取网络配置')
namespace_data = self.get_json_from_net(namespace)
val = get_value_from_dict(namespace_data, key)
if val is not None:
# print('get_json_from_net:', namespace_cache)
self._update_cache_and_file(namespace_data, namespace)
return val
# 读取文件配置
# print('读取文件配置')
namespace_cache = self._get_local_cache(namespace)
val = get_value_from_dict(namespace_cache, key)
if val is not None:
# print('get_value_from_dict:', namespace_cache)
self._update_cache_and_file(namespace_cache, namespace)
return val
# 如果全部没有获取,则把默认值返回,设置本地缓存为None
# print('如果全部没有获取,则把默认值返回,设置本地缓存为None')
self._set_local_cache_none(namespace, key)
return default_val
except Exception as e:
logging.getLogger(__name__).error("get_value has error, [key is %s], [namespace is %s], [error is %s], ",
key, namespace, e)
return default_val
# 设置某个namespace的key为none,这里不设置default_val,是为了保证函数调用实时的正确性。
# 假设用户2次default_val不一样,然而这里却用default_val填充,则可能会有问题。
def _set_local_cache_none(self, namespace, key):
no_key = no_key_cache_key(namespace, key)
self._no_key[no_key] = key
def _start_hot_update(self):
self._long_poll_thread = threading.Thread(target=self._listener)
# 启动异步线程为守护线程,主线程推出的时候,守护线程会自动退出。
self._long_poll_thread.setDaemon(True)
self._long_poll_thread.start()
def stop(self):
self._stopping = True
logging.getLogger(__name__).info("Stopping listener...")
# 调用设置的回调函数,如果异常,直接try掉
def _call_listener(self, namespace, old_kv, new_kv):
if self._change_listener is None:
return
if old_kv is None:
old_kv = {}
if new_kv is None:
new_kv = {}
try:
for key in old_kv:
new_value = new_kv.get(key)
old_value = old_kv.get(key)
if new_value is None:
# 如果newValue 是空,则表示key,value被删除了。
self._change_listener("delete", namespace, key, old_value)
continue
if new_value != old_value:
self._change_listener("update", namespace, key, new_value)
continue
for key in new_kv:
new_value = new_kv.get(key)
old_value = old_kv.get(key)
if old_value is None:
self._change_listener("add", namespace, key, new_value)
except BaseException as e:
logging.getLogger(__name__).warning(str(e))
def _path_checker(self):
if not os.path.isdir(self._cache_file_path):
makedirs_wrapper(self._cache_file_path)
# 更新本地缓存和文件缓存
def _update_cache_and_file(self, namespace_data, namespace='application'):
# 不使用本地缓存
if self.refresh:
return
# 更新本地缓存
self._cache[namespace] = namespace_data
# 更新文件缓存
new_string = json.dumps(namespace_data)
new_hash = hashlib.md5(new_string.encode('utf-8')).hexdigest()
if self._hash.get(namespace) == new_hash:
pass
else:
with open(os.path.join(self._cache_file_path, '%s_configuration_%s.txt' % (self.app_id, namespace)),
'w') as f:
f.write(new_string)
self._hash[namespace] = new_hash
# 从本地文件获取配置
def _get_local_cache(self, namespace='application'):
cache_file_path = os.path.join(
self._cache_file_path, '%s_configuration_%s.txt' % (self.app_id, namespace))
if os.path.isfile(cache_file_path):
with open(cache_file_path, 'r') as f:
result = json.loads(f.readline())
return result
return {}
def _long_poll(self):
notifications = []
for key in self._cache:
namespace_data = self._cache[key]
notification_id = -1
if NOTIFICATION_ID in namespace_data:
notification_id = self._cache[key][NOTIFICATION_ID]
notifications.append({
NAMESPACE_NAME: key,
NOTIFICATION_ID: notification_id
})
try:
# 如果长度为0直接返回
if len(notifications) == 0:
return
url = '{}/notifications/v2'.format(self.config_url)
params = {
'appId': self.app_id,
'cluster': self.cluster,
'notifications': json.dumps(notifications, ensure_ascii=False)
}
param_str = url_encode_wrapper(params)
url = url + '?' + param_str
code, body = http_request(
url, self._pull_timeout, headers=self._signHeaders(url))
http_code = code
if http_code == 304:
logging.getLogger(__name__).debug('No change, loop...')
return
if http_code == 200:
data = json.loads(body)
for entry in data:
namespace = entry[NAMESPACE_NAME]
n_id = entry[NOTIFICATION_ID]
logging.getLogger(__name__).info(
"%s has changes: notificationId=%d", namespace, n_id)
self._get_net_and_set_local(
namespace, n_id, call_change=True)
return
else:
logging.getLogger(__name__).warning('Sleep...')
except Exception as e:
logging.getLogger(__name__).warning(str(e))
def _get_net_and_set_local(self, namespace, n_id, call_change=False):
namespace_data = self.get_json_from_net(namespace)
namespace_data[NOTIFICATION_ID] = n_id
old_namespace = self._cache.get(namespace)
self._update_cache_and_file(namespace_data, namespace)
if self._change_listener is not None and call_change:
old_kv = old_namespace.get(CONFIGURATIONS)
new_kv = namespace_data.get(CONFIGURATIONS)
self._call_listener(namespace, old_kv, new_kv)
def _listener(self):
logging.getLogger(__name__).info('start long_poll')
while not self._stopping:
self._long_poll()
time.sleep(self._cycle_time)
logging.getLogger(__name__).info("stopped, long_poll")
# 给header增加加签需求
def _signHeaders(self, url):
headers = {}
if self.secret == '':
return headers
uri = url[len(self.config_url):len(url)]
time_unix_now = str(int(round(time.time() * 1000)))
headers['Authorization'] = 'Apollo ' + self.app_id + \
':' + signature(time_unix_now, uri, self.secret)
headers['Timestamp'] = time_unix_now
return headers
def _heartBeat(self):
while not self._stopping:
time.sleep(60 * 10) # 10分钟
for namespace in self._notification_map:
self._do_heartBeat(namespace)
def _do_heartBeat(self, namespace):
release_key = self._release_key_map.get(namespace)
url = '{}/configs/{}/{}/{}?releaseKey={}&ip={}'.format(self.config_url, self.app_id, self.cluster, namespace,
release_key, self.ip)
try:
code, body = http_request(
url, timeout=3, headers=self._signHeaders(url))
if code == 200:
data = json.loads(body)
self._release_key_map[namespace] = data["releaseKey"]
data = data["configurations"]
self._update_cache_and_file(data, namespace)
else:
return None
except Exception as e:
logging.getLogger(__name__).error(str(e))
return None
if __name__ == "__main__":
client = ApolloClient(
app_id="949",
cluster="default",
config_url='http://dev.config.csdn.net:8080',
refresh=True
)
val = client.get_value("csdn-ai", namespace="application")
# print(val)
# -*- coding: UTF-8 -*-
from common.logger import init_log
from config.config import load_config
from options import parse_options, show_help
from tree import gen_tree
def test(config, options, actions):
import test as test
test.dispatch(config, options, actions)
def tree(config, options, actions):
import test as test
gen_tree("../data")
def run(options):
# 操作入口
if options.action is not None:
actions = options.action.split('.')
if len(actions) == 0:
return
print('@init config...')
config = load_config(options, args)
print('')
print('@init log...')
init_log(config, options)
print('')
print('@dispatch action:{}...'.format(options.action))
root_action = actions[0]
next = actions[1:]
dispatch = {
'test': lambda: test(config, options, next),
'tree': lambda: tree(config, options, next)
}
dispatch[root_action]()
else:
show_help()
if __name__ == "__main__":
[options, args] = parse_options()
run(options)
from optparse import OptionParser
def parse_common_options(parser):
'''
## 公共选项
* -t 或者 --tag_id : 某个操作限制到指定tag_id的数据范围
* --reset: 重置选项
* --tag_name: 标签名字
* --model: 模型名字
* --log: 日志级别
* --train: 训练
* --port: 端口
* --show_config: 显示配置
* --count: 指定数量
* --query: 自定义查询
'''
parser.add_option(
"-t", "--tag_id",
dest="tag_id",
help="tag_id",
metavar="TAG_ID"
)
parser.add_option(
"--reset",
dest="reset",
help="reset",
action="store_true",
metavar="RESET"
)
parser.add_option(
"--tag_name",
dest="tag_name",
help="tag_name",
metavar="TAG_NAME"
)
parser.add_option(
"--model",
dest="model",
help="model",
metavar="MODEL"
)
parser.add_option(
"--log",
dest="log",
help="log",
metavar="LOG"
)
parser.add_option(
"--train",
dest="train",
help="train",
action="store_true",
metavar="TRAIN"
)
parser.add_option(
"--port",
dest="port",
help="port",
metavar="PORT"
)
parser.add_option(
"--server",
dest="server",
help="server",
metavar="SERVER"
)
parser.add_option(
"--show_config",
dest="show_config",
help="show_config",
metavar="SHOW_CONFIG"
)
parser.add_option(
"--count",
dest="count",
help="count",
metavar="COUNT"
)
parser.add_option(
"--query",
dest="query",
help="query",
metavar="QUERY"
)
def parse_profile_options(parser):
'''
## 环境配置选项
* -p 或 --profile 指定配置环境,可选的有 `dev`, `fat`, `pre`, `pro`
* 如果本地 config/config 目录下存在配置,则使用本地配置文件
* 如果本地 config/config 目录下不存在配置,
* 默认使用 阿波罗配置中心 cluster=dev 配置
* 如果指定 --cluster,则使用指定 cluster 的阿波罗配置中心的配置
'''
parser.add_option(
"-p", "--profile",
dest="profile",
help="profile",
default='pro',
metavar="PROFILE"
)
parser.add_option(
"--cluster",
dest="cluster",
help="cluster",
metavar="REMOTE"
)
def parse_action_options(parser):
'''
## 操作选项
* -a 或 --action 指定了操作目标,多级目标用点号分割,例如:
* -a dataset.build.tag.all
* -a server.ask
* -a test.code
* 参考[README](./README.md)
'''
parser.add_option(
"-a", "--action",
dest="action",
help="action",
metavar="ACTION"
)
def parse_test_options(parser):
'''
## 测试选项
执行 -a test.xx 测试时默认执行冒烟测试,下面的选项改变行为
* --label 指定执行测试并生成待标注数据
* --count 指定标注上限
'''
parser.add_option(
"--label",
dest="label",
help="label",
action="store_true",
metavar="LABEL"
)
def parse_db_options(parser):
'''
## 数据库 migrate 选项
* --message 传入一个消息变量,db.migrate action 接收此参数
* --revision 传入版本参数,db.upgrade, db.downgrade, db.stamp, db.show, db.edit 接受此参数
'''
parser.add_option(
"--message",
dest="message",
help="message",
metavar="MESSAGE"
)
parser.add_option(
"--revision",
dest="revision",
help="revision",
metavar="REVISION"
)
def parse_options():
parser = OptionParser()
parse_common_options(parser)
parse_profile_options(parser)
parse_action_options(parser)
parse_test_options(parser)
parse_db_options(parser)
(options, args) = parser.parse_args()
return [options, args]
def show_help():
'''
命令行选项说明:
==
'''
help = '\n'.join([
show_help.__doc__,
parse_common_options.__doc__,
parse_profile_options.__doc__,
parse_action_options.__doc__
])
print(help)
import logging
from genericpath import exists
import json
import os
......@@ -7,6 +8,9 @@ import re
id_set = set()
logger = logging.getLogger(__name__)
def load_json(p):
with open(p, 'r') as f:
return json.loads(f.read())
......@@ -18,13 +22,23 @@ def dump_json(p, j, exist_ok=False, override=False):
if not override:
return
else:
print(f"{p} already exist")
logger.error(f"{p} already exist")
sys.exit(0)
with open(p, 'w') as f:
with open(p, 'w+') as f:
f.write(json.dumps(j, indent=2, ensure_ascii=False))
def ensure_config(path):
config_path = os.path.join(path, "config.json")
if not os.path.exists(config_path):
node = {"keywords": []}
dump_json(config_path, node, exist_ok=True, override=False)
return node
else:
return load_json(config_path)
def parse_no_name(d):
p = r'(\d+)\.(.*)'
m = re.search(p, d)
......@@ -37,6 +51,7 @@ def parse_no_name(d):
return no, dir_name
def check_export(base, cfg):
flag = False
exports = []
......@@ -51,142 +66,211 @@ def check_export(base, cfg):
return flag
def gen_tree(data_path):
root = {}
def gen_node_id():
# return ''.join(str(uuid.uuid5(uuid.NAMESPACE_URL, 'skill_tree')).split('-'))
return "java-" + uuid.uuid4().hex
def list_dir(p):
v = os.listdir(p)
v.sort()
for no_name in v:
no_dir = os.path.join(p, no_name)
if os.path.isdir(no_dir):
yield no_dir, no_name
def ensure_id_helper(node):
flag = False
if (node.get('node_id') is None) or node.get('node_id') in id_set:
node['node_id'] = gen_node_id()
flag = True
id_set.add(node['node_id'])
if 'children' in node:
for c in node["children"]:
flag = flag or ensure_id_helper(list(c.values())[0])
return flag
def gen_node_id():
return "oceanbase-" + uuid.uuid4().hex
def ensure_node_id(cfg):
return ensure_id_helper(cfg)
def ensure_title_helper(node, cfg_path, title=""):
flag = False
class TreeWalker:
def __init__(self, root, tree_name, title=None):
self.name = tree_name
self.root = root
self.title = tree_name if title is None else title
self.tree = {}
if node.get('title') is None:
if cfg_path:
node['title'] = re.sub("^[0-9]{1,3}\.", "", os.path.split(os.path.dirname(cfg_path))[-1])
else:
node['title'] = title
flag = True
def walk(self):
root = self.load_root()
root_node = {
"node_id": root["node_id"],
"keywords": root["keywords"],
"children": []
}
self.tree[root["tree_name"]] = root_node
self.load_levels(root_node)
self.load_chapters(self.root, root_node)
for index, level in enumerate(root_node["children"]):
level_title = list(level.keys())[0]
level_node = list(level.values())[0]
level_path = os.path.join(self.root, f"{index+1}.{level_title}")
self.load_chapters(level_path, level_node)
for index, chapter in enumerate(level_node["children"]):
chapter_title = list(chapter.keys())[0]
chapter_node = list(chapter.values())[0]
chapter_path = os.path.join(level_path, f"{index+1}.{chapter_title}")
self.load_sections(chapter_path, chapter_node)
for index, section_node in enumerate(chapter_node["children"]):
section_title = list(section_node.keys())[0]
full_path = os.path.join(chapter_path, f"{index}.{section_title}")
if os.path.isdir(full_path):
self.ensure_exercises(full_path)
tree_path = os.path.join(self.root, "tree.json")
dump_json(tree_path, self.tree, exist_ok=True, override=True)
return self.tree
def load_levels(self, root_node):
levels = []
for level in os.listdir(self.root):
if not os.path.isdir(level):
continue
level_path = os.path.join(self.root, level)
num, config = self.load_level_node(level_path)
levels.append((num, config))
levels.sort(key=lambda item: item[0])
root_node["children"] = [item[1] for item in levels]
return root_node
def load_level_node(self, level_path):
config = self.ensure_level_config(level_path)
num, name = self.extract_node_env(level_path)
result = {
name: {
"node_id": config["node_id"],
"keywords": config["keywords"],
"children": [],
}
}
if 'children' in node:
for c in node["children"]:
flag = flag or ensure_title_helper(list(c.values())[0], None, list(c.keys())[0])
return flag
def ensure_title(cfg, cfg_path):
return ensure_title_helper(cfg, cfg_path)
def make_node(name, node_id, keywords, children=None):
node = {}
node_children = children or []
node[name] = {
'node_id': node_id,
'keywords': keywords,
'children': node_children
return num, result
def load_chapters(self, base, level_node):
chapters = []
for name in os.listdir(base):
full_name = os.path.join(base, name)
if os.path.isdir(full_name):
num, chapter = self.load_chapter_node(full_name)
chapters.append((num, chapter))
chapters.sort(key=lambda item: item[0])
level_node["children"] = [item[1] for item in chapters]
return level_node
def load_sections(self, base, chapter_node):
sections = []
for name in os.listdir(base):
full_name = os.path.join(base, name)
if os.path.isdir(full_name):
num, section = self.load_section_node(full_name)
sections.append((num, section))
sections.sort(key=lambda item: item[0])
chapter_node["children"] = [item[1] for item in sections]
return chapter_node
def ensure_chapters(self):
for subdir in os.listdir(self.root):
self.ensure_level_config(subdir)
def load_root(self):
config_path = os.path.join(self.root, "config.json")
if not os.path.exists(config_path):
config = {
"tree_name": self.name,
"keywords": [],
"node_id": self.gen_node_id(),
}
dump_json(config_path, config, exist_ok=True, override=True)
else:
config = load_json(config_path)
flag, result = self.ensure_node_id(config)
if flag:
dump_json(config_path, result, exist_ok=True, override=True)
return config
def ensure_level_config(self, path):
config_path = os.path.join(path, "config.json")
if not os.path.exists(config_path):
config = {
"node_id": self.gen_node_id()
}
dump_json(path, config, exist_ok=True, override=True)
else:
config = load_json(config_path)
flag, result = self.ensure_node_id(config)
if flag:
dump_json(path, config, exist_ok=True, override=True)
return config
def ensure_chapter_config(self, path):
config_path = os.path.join(path, "config.json")
if not os.path.exists(config_path):
config = {
"node_id": self.gen_node_id(),
"keywords": []
}
dump_json(path, config, exist_ok=True, override=True)
else:
config = load_json(config_path)
flag, result = self.ensure_node_id(config)
if flag:
dump_json(path, config, exist_ok=True, override=True)
return config
def ensure_section_config(self, path):
config_path = os.path.join(path, "config.json")
if not os.path.exists(config_path):
config = {
"node_id": self.gen_node_id(),
"keywords": [],
"children":[],
"export":[]
}
dump_json(config_path, config, exist_ok=True, override=True)
else:
config = load_json(config_path)
flag, result = self.ensure_node_id(config)
if flag:
dump_json(config_path, config, exist_ok=True, override=True)
return config
def ensure_node_id(self, config):
if "node_id" not in config:
config["node_id"] = self.gen_node_id()
return True, config
else:
return False, config
def gen_node_id(self):
return f"{self.name}-{uuid.uuid4().hex}"
def extract_node_env(self, path):
_, dir = os.path.split(path)
number, title = dir.split(".", 1)
return int(number), title
def load_chapter_node(self, full_name):
config = self.ensure_chapter_config(full_name)
num, name = self.extract_node_env(full_name)
result = {
name: {
"node_id": config["node_id"],
"keywords": config["keywords"],
"children": [],
}
}
return node, node_children
# 根节点
cfg_path = os.path.join(data_path, 'config.json')
cfg = load_json(cfg_path)
if ensure_node_id(cfg):
dump_json(cfg_path, cfg, exist_ok=True, override=True)
if ensure_title(cfg, cfg_path):
cfg["title"] = "C"
dump_json(cfg_path, cfg, exist_ok=True, override=True)
tree_node = {
"node_id": cfg['node_id'],
"keywords": cfg['keywords'],
"children": []
}
root[cfg['tree_name']] = tree_node
# 难度节点
for level_no_dir, level_no_name in list_dir(data_path):
print(level_no_dir)
no, level_name = parse_no_name(level_no_name)
level_path = os.path.join(level_no_dir, 'config.json')
level_cfg = load_json(level_path)
if ensure_node_id(level_cfg) or check_export(level_no_dir, level_cfg):
dump_json(level_path, level_cfg, exist_ok=True, override=True)
if ensure_title(level_cfg, level_path):
dump_json(level_path, level_cfg, exist_ok=True, override=True)
level_node, level_node_children = make_node(
level_name, level_cfg['node_id'], level_cfg['keywords'])
tree_node['children'].append(level_node)
# 章节点
for chapter_no_dir, chapter_no_name in list_dir(level_no_dir):
no, chapter_name = parse_no_name(chapter_no_name)
chapter_path = os.path.join(chapter_no_dir, 'config.json')
chapter_cfg = load_json(chapter_path)
if ensure_node_id(chapter_cfg) or check_export(chapter_no_dir, chapter_cfg):
dump_json(chapter_path, chapter_cfg, exist_ok=True, override=True)
if ensure_title(chapter_cfg, chapter_path):
dump_json(chapter_path, chapter_cfg, exist_ok=True, override=True)
chapter_node, chapter_node_children = make_node(
chapter_name, chapter_cfg['node_id'], chapter_cfg['keywords'])
level_node_children.append(chapter_node)
# 知识点
for section_no_dir, section_no_name in list_dir(chapter_no_dir):
no, section_name = parse_no_name(section_no_name)
sec_path = os.path.join(section_no_dir, 'config.json')
sec_cfg = load_json(sec_path)
flag = ensure_node_id(sec_cfg) or check_export(section_no_dir, sec_cfg)
section_node, section_node_children = make_node(
section_name, sec_cfg['node_id'], sec_cfg['keywords'], sec_cfg['children'])
chapter_node_children.append(section_node)
# 确保习题分配了习题ID
for export in sec_cfg["export"]:
ecfg_path = os.path.join(section_no_dir, export)
ecfg = load_json(ecfg_path)
if (ecfg.get('exercise_id') is None) or (ecfg.get('exercise_id') in id_set):
ecfg['exercise_id'] = uuid.uuid4().hex
dump_json(ecfg_path, ecfg, exist_ok=True, override=True)
id_set.add(ecfg['exercise_id'])
if flag:
dump_json(sec_path, sec_cfg, exist_ok=True, override=True)
if ensure_title(sec_cfg, sec_path):
dump_json(sec_path, sec_cfg, exist_ok=True, override=True)
# 保存技能树骨架
tree_path = os.path.join(data_path, 'tree.json')
dump_json(tree_path, root, exist_ok=True, override=True)
return num, result
def load_section_node(self, full_name):
config = self.ensure_section_config(full_name)
num, name = self.extract_node_env(full_name)
result = {
name: {
"node_id": config["node_id"],
"keywords": config["keywords"],
"children": config.get("children", [])
}
}
# if "children" in config:
# result["children"] = config["children"]
return num, result
def ensure_exercises(self, section_path):
config = self.ensure_section_config(section_path)
for e in config.get("export", []):
full_name = os.path.join(section_path, e)
exercise = load_json(full_name)
if "exercise_id" not in exercise:
exercise["exercise_id"] = uuid.uuid4().hex
dump_json(full_name, exercise)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册