diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..5ba74c7624a29abd67a2feb4e2a6a9deb1e95549 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.vscode +.idea +.DS_Store +__pycache__ +*.pyc +*.zip +*.out \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000000000000000000000000000000000000..dc93161655eb0d55fe82f3542909a26dd0240417 --- /dev/null +++ b/main.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +from src.tree import TreeWalker + +if __name__ == '__main__': + walker = TreeWalker("data", "dailycode", "dailycode") + walker.walk() diff --git a/src/tree.py b/src/tree.py new file mode 100644 index 0000000000000000000000000000000000000000..95e1cbc5812810e30465eccc2632464caa5438e7 --- /dev/null +++ b/src/tree.py @@ -0,0 +1,296 @@ +# -*- coding: utf-8 -*- +import logging +from genericpath import exists +import json +import os +import uuid +import sys +import re + +id_set = set() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def load_json(p): + with open(p, 'r', encoding='utf-8') as f: + return json.loads(f.read()) + + +def dump_json(p, j, exist_ok=False, override=False): + if os.path.exists(p): + if exist_ok: + if not override: + return + else: + logger.error(f"{p} already exist") + sys.exit(0) + + with open(p, 'w+', encoding='utf-8') as f: + f.write(json.dumps(j, indent=2, ensure_ascii=False)) + + +def ensure_config(path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + node = {"keywords": []} + dump_json(config_path, node, exist_ok=True, override=False) + return node + else: + return load_json(config_path) + + +def parse_no_name(d): + p = r'(\d+)\.(.*)' + m = re.search(p, d) + + try: + no = int(m.group(1)) + dir_name = m.group(2) + except: + sys.exit(0) + + return no, dir_name + + +def check_export(base, cfg): + flag = False + exports = [] + for export in cfg.get('export', []): + ecfg_path = os.path.join(base, export) + if os.path.exists(ecfg_path): + exports.append(export) + else: + flag = True + if flag: + cfg["export"] = exports + return flag + + +class TreeWalker: + def __init__(self, root, tree_name, title=None, log=None): + self.name = tree_name + self.root = root + self.title = tree_name if title is None else title + self.tree = {} + self.logger = logger if log is None else log + + def walk(self): + root = self.load_root() + root_node = { + "node_id": root["node_id"], + "keywords": root["keywords"], + "children": [] + } + self.tree[root["tree_name"]] = root_node + self.load_levels(root_node) + self.load_chapters(self.root, root_node) + for index, level in enumerate(root_node["children"]): + level_title = list(level.keys())[0] + level_node = list(level.values())[0] + level_path = os.path.join(self.root, f"{index+1}.{level_title}") + self.load_chapters(level_path, level_node) + for index, chapter in enumerate(level_node["children"]): + chapter_title = list(chapter.keys())[0] + chapter_node = list(chapter.values())[0] + chapter_path = os.path.join( + level_path, f"{index+1}.{chapter_title}") + self.load_sections(chapter_path, chapter_node) + for index, section_node in enumerate(chapter_node["children"]): + section_title = list(section_node.keys())[0] + full_path = os.path.join( + chapter_path, f"{index}.{section_title}") + if os.path.isdir(full_path): + self.ensure_exercises(full_path) + + tree_path = os.path.join(self.root, "tree.json") + dump_json(tree_path, self.tree, exist_ok=True, override=True) + return self.tree + + def load_levels(self, root_node): + levels = [] + for level in os.listdir(self.root): + if not os.path.isdir(level): + continue + level_path = os.path.join(self.root, level) + num, config = self.load_level_node(level_path) + levels.append((num, config)) + + levels = self.resort_children(self.root, levels) + root_node["children"] = [item[1] for item in levels] + return root_node + + def load_level_node(self, level_path): + config = self.ensure_level_config(level_path) + num, name = self.extract_node_env(level_path) + + result = { + name: { + "node_id": config["node_id"], + "keywords": config["keywords"], + "children": [], + } + } + + return num, result + + def load_chapters(self, base, level_node): + chapters = [] + for name in os.listdir(base): + full_name = os.path.join(base, name) + if os.path.isdir(full_name): + num, chapter = self.load_chapter_node(full_name) + chapters.append((num, chapter)) + + chapters = self.resort_children(base, chapters) + level_node["children"] = [item[1] for item in chapters] + return level_node + + def load_sections(self, base, chapter_node): + sections = [] + for name in os.listdir(base): + full_name = os.path.join(base, name) + if os.path.isdir(full_name): + num, section = self.load_section_node(full_name) + sections.append((num, section)) + + sections = self.resort_children(base, sections) + chapter_node["children"] = [item[1] for item in sections] + return chapter_node + + def resort_children(self, base, children): + children.sort(key=lambda item: item[0]) + for index, [number, element] in enumerate(children): + title = list(element.keys())[0] + origin = os.path.join(base, f"{number}.{title}") + posted = os.path.join(base, f"{index+1}.{title}") + if origin != posted: + self.logger.info(f"rename [{origin}] to [{posted}]") + os.rename(origin, posted) + return children + + def ensure_chapters(self): + for subdir in os.listdir(self.root): + self.ensure_level_config(subdir) + + def load_root(self): + config_path = os.path.join(self.root, "config.json") + if not os.path.exists(config_path): + config = { + "tree_name": self.name, + "keywords": [], + "node_id": self.gen_node_id(), + } + dump_json(config_path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(config_path, result, exist_ok=True, override=True) + + return config + + def ensure_level_config(self, path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + config = { + "node_id": self.gen_node_id() + } + dump_json(config_path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(config_path, config, exist_ok=True, override=True) + return config + + def ensure_chapter_config(self, path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + config = { + "node_id": self.gen_node_id(), + "keywords": [] + } + dump_json(config_path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(config_path, config, exist_ok=True, override=True) + return config + + def ensure_section_config(self, path): + config_path = os.path.join(path, "config.json") + if not os.path.exists(config_path): + config = { + "node_id": self.gen_node_id(), + "keywords": [], + "children": [], + "export": [] + } + dump_json(config_path, config, exist_ok=True, override=True) + else: + config = load_json(config_path) + flag, result = self.ensure_node_id(config) + if flag: + dump_json(config_path, config, exist_ok=True, override=True) + return config + + def ensure_node_id(self, config): + if "node_id" not in config: + config["node_id"] = self.gen_node_id() + return True, config + else: + return False, config + + def gen_node_id(self): + return f"{self.name}-{uuid.uuid4().hex}" + + def extract_node_env(self, path): + try: + _, dir = os.path.split(path) + self.logger.info(path) + number, title = dir.split(".", 1) + return int(number), title + except Exception as error: + self.logger.error(f"目录 [{path}] 解析失败,结构不合法,可能是缺少序号") + sys.exit(1) + + def load_chapter_node(self, full_name): + config = self.ensure_chapter_config(full_name) + num, name = self.extract_node_env(full_name) + result = { + name: { + "node_id": config["node_id"], + "keywords": config["keywords"], + "children": [], + } + } + return num, result + + def load_section_node(self, full_name): + config = self.ensure_section_config(full_name) + num, name = self.extract_node_env(full_name) + result = { + name: { + "node_id": config["node_id"], + "keywords": config["keywords"], + "children": config.get("children", []) + } + } + # if "children" in config: + # result["children"] = config["children"] + return num, result + + def ensure_exercises(self, section_path): + config = self.ensure_section_config(section_path) + for e in config.get("export", []): + full_name = os.path.join(section_path, e) + exercise = load_json(full_name) + if "exercise_id" not in exercise: + exercise["exercise_id"] = uuid.uuid4().hex + dump_json(full_name, exercise)