From 46daa7ac87884497638d89d8864613c2f55cf687 Mon Sep 17 00:00:00 2001 From: Mars Liu Date: Mon, 8 Nov 2021 12:40:18 +0800 Subject: [PATCH] new pipeline --- main.py | 5 +- src/tree.py | 358 +++++++++++++++++++++++++++++++--------------------- 2 files changed, 220 insertions(+), 143 deletions(-) diff --git a/main.py b/main.py index 08e822c..9d3092d 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ -from src.tree import gen_tree +from src.tree import TreeWalker if __name__ == '__main__': - gen_tree('data') + walker = TreeWalker("data", "opencv", "OpenCV") + walker.walk() diff --git a/src/tree.py b/src/tree.py index 3b72d84..c36a946 100644 --- a/src/tree.py +++ b/src/tree.py @@ -1,3 +1,4 @@ +import logging from genericpath import exists import json import os @@ -7,6 +8,8 @@ import re id_set = set() +logger = logging.getLogger(__name__) + def load_json(p): with open(p, 'r') as f: @@ -19,13 +22,23 @@ def dump_json(p, j, exist_ok=False, override=False): if not override: return else: - print(f"{p} already exist") + logger.error(f"{p} already exist") sys.exit(0) with open(p, 'w+') as f: f.write(json.dumps(j, indent=2, ensure_ascii=False)) +def ensure_config(path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + node = {"keywords": []} + dump_json(config_path, node, exist_ok=True, override=False) + return node + else: + return load_json(config_path) + + def parse_no_name(d): p = r'(\d+)\.(.*)' m = re.search(p, d) @@ -53,148 +66,211 @@ def check_export(base, cfg): return flag -def gen_tree(data_path): - root = {} - - def gen_node_id(): - # return ''.join(str(uuid.uuid5(uuid.NAMESPACE_URL, 'skill_tree')).split('-')) - return "opencv-" + uuid.uuid4().hex - - def list_dir(p): - v = os.listdir(p) - v.sort() - for no_name in v: - no_dir = os.path.join(p, no_name) - if os.path.isdir(no_dir): - yield no_dir, no_name - - def ensure_id_helper(node): - flag = False - - if (node.get('node_id') is None) or node.get('node_id') in id_set: - node['node_id'] = gen_node_id() - flag = True - - id_set.add(node['node_id']) - - if 'children' in node: - for c in node["children"]: - flag = flag or ensure_id_helper(list(c.values())[0]) - - return flag - - def ensure_node_id(cfg): - return ensure_id_helper(cfg) +def gen_node_id(): + return "oceanbase-" + uuid.uuid4().hex - def ensure_title_helper(node, cfg_path, title=""): - flag = False - if node.get('title') is None: - if cfg_path: - node['title'] = re.sub( - "^[0-9]{1,3}\.", "", os.path.split(os.path.dirname(cfg_path))[-1]) - else: - node['title'] = title - flag = True - - if 'children' in node: - for c in node["children"]: - flag = flag or ensure_title_helper( - list(c.values())[0], None, list(c.keys())[0]) - - return flag +class TreeWalker: + def __init__(self, root, tree_name, title=None): + self.name = tree_name + self.root = root + self.title = tree_name if title is None else title + self.tree = {} - def ensure_title(cfg, cfg_path): - return ensure_title_helper(cfg, cfg_path) + def walk(self): + root = self.load_root() + root_node = { + "node_id": root["node_id"], + "keywords": root["keywords"], + "children": [] + } + self.tree[root["tree_name"]] = root_node + self.load_levels(root_node) + self.load_chapters(self.root, root_node) + for index, level in enumerate(root_node["children"]): + level_title = list(level.keys())[0] + level_node = list(level.values())[0] + level_path = os.path.join(self.root, f"{index+1}.{level_title}") + self.load_chapters(level_path, level_node) + for index, chapter in enumerate(level_node["children"]): + chapter_title = list(chapter.keys())[0] + chapter_node = list(chapter.values())[0] + chapter_path = os.path.join(level_path, f"{index+1}.{chapter_title}") + self.load_sections(chapter_path, chapter_node) + for index, section_node in enumerate(chapter_node["children"]): + section_title = list(section_node.keys())[0] + full_path = os.path.join(chapter_path, f"{index}.{section_title}") + if os.path.isdir(full_path): + self.ensure_exercises(full_path) + + tree_path = os.path.join(self.root, "tree.json") + dump_json(tree_path, self.tree, exist_ok=True, override=True) + return self.tree + + def load_levels(self, root_node): + levels = [] + for level in os.listdir(self.root): + if not os.path.isdir(level): + continue + level_path = os.path.join(self.root, level) + num, config = self.load_level_node(level_path) + levels.append((num, config)) + levels.sort(key=lambda item: item[0]) + root_node["children"] = [item[1] for item in levels] + return root_node + + def load_level_node(self, level_path): + config = self.ensure_level_config(level_path) + num, name = self.extract_node_env(level_path) + + result = { + name: { + "node_id": config["node_id"], + "keywords": config["keywords"], + "children": [], + } + } - def make_node(name, node_id, keywords, children=None): - node = {} - node_children = children or [] - node[name] = { - 'node_id': node_id, - 'keywords': keywords, - 'children': node_children + return num, result + + def load_chapters(self, base, level_node): + chapters = [] + for name in os.listdir(base): + full_name = os.path.join(base, name) + if os.path.isdir(full_name): + num, chapter = self.load_chapter_node(full_name) + chapters.append((num, chapter)) + + chapters.sort(key=lambda item: item[0]) + level_node["children"] = [item[1] for item in chapters] + return level_node + + def load_sections(self, base, chapter_node): + sections = [] + for name in os.listdir(base): + full_name = os.path.join(base, name) + if os.path.isdir(full_name): + num, section = self.load_section_node(full_name) + sections.append((num, section)) + + sections.sort(key=lambda item: item[0]) + chapter_node["children"] = [item[1] for item in sections] + return chapter_node + + def ensure_chapters(self): + for subdir in os.listdir(self.root): + self.ensure_level_config(subdir) + + def load_root(self): + config_path = os.path.join(self.root, "config.json") + if not os.path.exists(config_path): + config = { + "tree_name": self.name, + "keywords": [], + "node_id": self.gen_node_id(), + } + dump_json(config_path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(config_path, result, exist_ok=True, override=True) + + return config + + def ensure_level_config(self, path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + config = { + "node_id": self.gen_node_id() + } + dump_json(path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(path, config, exist_ok=True, override=True) + return config + + def ensure_chapter_config(self, path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + config = { + "node_id": self.gen_node_id(), + "keywords": [] + } + dump_json(path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(path, config, exist_ok=True, override=True) + return config + + def ensure_section_config(self, path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + config = { + "node_id": self.gen_node_id(), + "keywords": [], + "children":[], + "export":[] + } + dump_json(config_path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(config_path, config, exist_ok=True, override=True) + return config + + def ensure_node_id(self, config): + if "node_id" not in config: + config["node_id"] = self.gen_node_id() + return True, config + else: + return False, config + + def gen_node_id(self): + return f"{self.name}-{uuid.uuid4().hex}" + + def extract_node_env(self, path): + _, dir = os.path.split(path) + number, title = dir.split(".", 1) + return int(number), title + + def load_chapter_node(self, full_name): + config = self.ensure_chapter_config(full_name) + num, name = self.extract_node_env(full_name) + result = { + name: { + "node_id": config["node_id"], + "keywords": config["keywords"], + "children": [], + } + } + return num, result + + def load_section_node(self, full_name): + config = self.ensure_section_config(full_name) + num, name = self.extract_node_env(full_name) + result = { + name: { + "node_id": config["node_id"], + "keywords": config["keywords"], + "children": config.get("children", []) + } } - return node, node_children - - # 根节点 - cfg_path = os.path.join(data_path, 'config.json') - cfg = load_json(cfg_path) - if ensure_node_id(cfg): - dump_json(cfg_path, cfg, exist_ok=True, override=True) - - if ensure_title(cfg, cfg_path): - cfg["title"] = "C" - dump_json(cfg_path, cfg, exist_ok=True, override=True) - tree_node = { - "node_id": cfg['node_id'], - "keywords": cfg['keywords'], - "children": [] - } - root[cfg['tree_name']] = tree_node - - # 难度节点 - for level_no_dir, level_no_name in list_dir(data_path): - print(level_no_dir) - no, level_name = parse_no_name(level_no_name) - level_path = os.path.join(level_no_dir, 'config.json') - level_cfg = load_json(level_path) - if ensure_node_id(level_cfg) or check_export(level_no_dir, level_cfg): - dump_json(level_path, level_cfg, exist_ok=True, override=True) - if ensure_title(level_cfg, level_path): - dump_json(level_path, level_cfg, exist_ok=True, override=True) - - level_node, level_node_children = make_node( - level_name, level_cfg['node_id'], level_cfg['keywords']) - tree_node['children'].append(level_node) - - # 章节点 - for chapter_no_dir, chapter_no_name in list_dir(level_no_dir): - no, chapter_name = parse_no_name(chapter_no_name) - chapter_path = os.path.join(chapter_no_dir, 'config.json') - chapter_cfg = load_json(chapter_path) - if ensure_node_id(chapter_cfg) or check_export(chapter_no_dir, chapter_cfg): - dump_json(chapter_path, chapter_cfg, - exist_ok=True, override=True) - if ensure_title(chapter_cfg, chapter_path): - dump_json(chapter_path, chapter_cfg, - exist_ok=True, override=True) - - chapter_node, chapter_node_children = make_node( - chapter_name, chapter_cfg['node_id'], chapter_cfg['keywords']) - level_node_children.append(chapter_node) - - # 知识点 - for section_no_dir, section_no_name in list_dir(chapter_no_dir): - no, section_name = parse_no_name(section_no_name) - sec_path = os.path.join(section_no_dir, 'config.json') - sec_cfg = load_json(sec_path) - flag = ensure_node_id(sec_cfg) or check_export( - section_no_dir, sec_cfg) - - section_node, section_node_children = make_node( - section_name, sec_cfg['node_id'], sec_cfg['keywords'], sec_cfg.get('children', [])) - chapter_node_children.append(section_node) - - # 确保习题分配了习题ID - - for export in sec_cfg.get("export", []): - ecfg_path = os.path.join(section_no_dir, export) - ecfg = load_json(ecfg_path) - - if (ecfg.get('exercise_id') is None) or (ecfg.get('exercise_id') in id_set): - ecfg['exercise_id'] = uuid.uuid4().hex - dump_json(ecfg_path, ecfg, - exist_ok=True, override=True) - - id_set.add(ecfg['exercise_id']) - - if flag: - dump_json(sec_path, sec_cfg, exist_ok=True, override=True) - - if ensure_title(sec_cfg, sec_path): - dump_json(sec_path, sec_cfg, exist_ok=True, override=True) - - # 保存技能树骨架 - tree_path = os.path.join(data_path, 'tree.json') - dump_json(tree_path, root, exist_ok=True, override=True) + # if "children" in config: + # result["children"] = config["children"] + return num, result + + def ensure_exercises(self, section_path): + config = self.ensure_section_config(section_path) + for e in config.get("export", []): + full_name = os.path.join(section_path, e) + exercise = load_json(full_name) + if "exercise_id" not in exercise: + exercise["exercise_id"] = uuid.uuid4().hex + dump_json(full_name, exercise) + -- GitLab