diff --git a/core/engine/cluster/cluster.py b/core/engine/cluster/cluster.py index 8c45335799afb165b66c133bd217caf3320f703f..4c392e5470c58f213562d49a3f78f7d870462981 100644 --- a/core/engine/cluster/cluster.py +++ b/core/engine/cluster/cluster.py @@ -29,8 +29,13 @@ class ClusterEngine(Engine): abs_dir = os.path.dirname(os.path.abspath(__file__)) backend = envs.get_runtime_environ("engine_backend") - if backend == "PaddleCloud": + if not backend: + backend = "" + backend = backend.upper() + if backend == "PADDLECLOUD": self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh") + elif backend == "KUBERNETES": + self.submit_script = os.path.join(abs_dir, "k8s/cluster.sh") else: raise ValueError("{} can not be supported now".format(backend)) @@ -48,6 +53,14 @@ class ClusterEngine(Engine): proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) proc.wait() + @staticmethod + def workspace_replace(): + workspace = envs.get_runtime_environ("engine_workspace") + + for k, v in os.environ.items(): + v = v.replace("{workspace}", workspace) + os.environ[k] = str(v) + def run(self): role = envs.get_runtime_environ("engine_role") diff --git a/core/engine/cluster/k8s/__init__.py b/core/engine/cluster/k8s/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..abf198b97e6e818e1fbe59006f98492640bcee54 --- /dev/null +++ b/core/engine/cluster/k8s/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/engine/cluster/k8s/cluster.sh b/core/engine/cluster/k8s/cluster.sh new file mode 100644 index 0000000000000000000000000000000000000000..6c79b57adfea616dd39d302b2cd8302ba5059864 --- /dev/null +++ b/core/engine/cluster/k8s/cluster.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +################################################### +# Usage: submit.sh +# Description: run k8s submit client implement +################################################### + +# ---------------------------------------------------------------------------- # +# variable define # +# ---------------------------------------------------------------------------- # + +#----------------------------------------------------------------------------------------------------------------- +#fun : create ConfigMap for k8s pod +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function create_config_map() { + echo "Create configmap" + echo "Delete exist configmap which named 'modelconfig'" + kubectl delete configmap modelconfig + kubectl create configmap modelconfig --from-file=${abs_dir}/k8s/set_k8s_env.sh,${paddlerec_model_config} +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : create yaml config for k8s job +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function create_k8s_yaml() { + echo "Create k8s.yaml" + if [ -f ${PWD}/k8s.yaml ]; then + echo "Delete exist k8s.yaml at ${PWD}" + rm ${PWD}/k8s.yaml + fi + + let total_pod_num=${engine_submit_trainer_num}+${engine_submit_server_num} + echo "--K8S ENV-- Job name: ${engine_job_name}" + echo "--K8S ENV-- Total pod nums: ${total_pod_num}" + echo "--K8S ENV-- Trainer nums: ${engine_submit_trainer_num}" + echo "--K8S ENV-- Pserver nums: ${engine_submit_server_num}" + echo "--K8S ENV-- Docker image: ${engine_submit_docker_image}" + echo "--K8S ENV-- Threads(cpu_num) ${CPU_NUM}" + echo "--K8S ENV-- Memory ${engine_submit_memory}" + echo "--K8S ENV-- Storage ${engine_submit_storage}" + echo "--K8S ENV-- Log level ${engine_submit_log_level}" + + sed -e "s#<$ JOB_NAME $>#$engine_job_name#g" \ + -e "s#<$ TOTAL_POD_NUM $>#$total_pod_num#g" \ + -e "s#<$ TRAINER_NUM $>#$engine_submit_trainer_num#g" \ + -e "s#<$ PSERVER_NUM $>#$engine_submit_server_num#g" \ + -e "s#<$ IMAGE $>#$engine_submit_docker_image#g" \ + -e "s#<$ CPU_NUM $>#$CPU_NUM#g" \ + -e "s#<$ MEMORY $>#$engine_submit_memory#g" \ + -e "s#<$ STORAGE $>#$engine_submit_storage#g" \ + -e "s#<$ GLOG_V $>#$engine_submit_log_level#g" \ + ${abs_dir}/k8s/k8s.yaml.template >${PWD}/k8s.yaml +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : submit to k8s cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function submit() { + echo "Submit" + echo "Delete exist job which named ${engine_job_name}" + kubectl delete jobs.batch.volcano.sh $engine_job_name + kubectl apply -f ${PWD}/k8s.yaml +} + +function main() { + create_config_map + create_k8s_yaml + submit +} + +main diff --git a/core/engine/cluster/k8s/k8s.yaml.template b/core/engine/cluster/k8s/k8s.yaml.template new file mode 100644 index 0000000000000000000000000000000000000000..264f0f1100429d9404eb3c76ef830eab66799f93 --- /dev/null +++ b/core/engine/cluster/k8s/k8s.yaml.template @@ -0,0 +1,191 @@ +apiVersion: batch.volcano.sh/v1alpha1 +kind: Job +metadata: + name: <$ JOB_NAME $> +spec: + minAvailable: <$ TOTAL_POD_NUM $> + schedulerName: volcano + policies: + - event: PodEvicted + action: RestartJob + - event: PodFailed + action: RestartJob + tasks: + - replicas: <$ PSERVER_NUM $> + name: pserver + template: + metadata: + labels: + paddle-job-pserver: paddle-rec + spec: + imagePullSecrets: + - name: default-secret + containers: + - image: <$ IMAGE $> + command: + - '/bin/bash' + args: + - "-c" + - | + set -ex + sh /usr/paddlerec/set_k8s_env.sh start_fluid + imagePullPolicy: Always + volumeMounts: + - name: model-config + mountPath: "/usr/paddlerec" + name: pserver + resources: + limits: + cpu: <$ CPU_NUM $> + memory: <$ MEMORY $> + ephemeral-storage: <$ STORAGE $> + requests: + cpu: 1 + memory: 1Gi + ephemeral-storage: 1Gi + env: + - name: GLOG_v + value: "<$ GLOG_V $>" + - name: GLOG_logtostderr + value: "1" + - name: TOPOLOGY + value: "" + - name: TRAINER_PACKAGE + value: /root/paddlejob + - name: PADDLE_INIT_NICS + value: eth2 + - name: NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: PADDLE_CURRENT_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: PADDLE_JOB_NAME + value: paddle-rec + - name: PADDLE_IS_LOCAL + value: "0" + - name: PADDLE_TRAINERS_NUM + value: "<$ TRAINER_NUM $>" + - name: PADDLE_PSERVERS_NUM + value: "<$ PSERVER_NUM $>" + - name: FLAGS_rpc_deadline + value: "100000" + - name: ENTRY + value: python -m paddlerec.run -m /usr/paddlerec/config.yaml -r WORKER + - name: PADDLE_PORT + value: "30240" + - name: PADDLE_TRAINING_ROLE + value: PSERVER + - name: TRAINING_ROLE + value: PSERVER + volumes: + - name: model-config + configMap: + name: modelconfig + defaultMode: 0777 + restartPolicy: OnFailure + + - replicas: <$ TRAINER_NUM $> + policies: + - event: TaskCompleted + action: CompleteJob + name: trainer + template: + metadata: + labels: + paddle-job: paddle-rec + spec: + imagePullSecrets: + - name: default-secret + containers: + - image: <$ IMAGE $> + command: + - '/bin/bash' + args: + - "-c" + - | + set -ex + sh /usr/paddlerec/set_k8s_env.sh start_fluid + imagePullPolicy: Always + volumeMounts: + - name: model-config + mountPath: "/usr/paddlerec" + name: trainer + resources: + limits: + cpu: <$ CPU_NUM $> + memory: <$ MEMORY $> + ephemeral-storage: <$ STORAGE $> + requests: + cpu: 1 + memory: 1Gi + ephemeral-storage: 1Gi + env: + - name: GLOG_v + value: "<$ GLOG_V $>" + - name: GLOG_logtostderr + value: "1" + - name: TRAINER_PACKAGE + value: /root/paddlejob + - name: PADDLE_INIT_NICS + value: eth2 + - name: NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: PADDLE_CURRENT_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: PADDLE_JOB_NAME + value: paddle-rec + - name: PADDLE_IS_LOCAL + value: "0" + - name: FLAGS_rpc_deadline + value: "3600" + - name: PADDLE_PORT + value: "30240" + - name: PADDLE_PSERVERS_NUM + value: "<$ PSERVER_NUM $>" + - name: PADDLE_TRAINERS_NUM + value: "<$ TRAINER_NUM $>" + - name: PADDLE_TRAINING_ROLE + value: TRAINER + - name: TRAINING_ROLE + value: TRAINER + - name: ENTRY + value: python -m paddlerec.run -m /usr/paddlerec/config.yaml -r WORKER + volumes: + - name: model-config + configMap: + name: modelconfig + defaultMode: 0777 + restartPolicy: OnFailure + + diff --git a/core/engine/cluster/k8s/set_k8s_env.sh b/core/engine/cluster/k8s/set_k8s_env.sh new file mode 100644 index 0000000000000000000000000000000000000000..686aa2f8ca30fe5ee98a6bc03a2f66a63ce1008e --- /dev/null +++ b/core/engine/cluster/k8s/set_k8s_env.sh @@ -0,0 +1,74 @@ +#!/bin/bash +set -x + +check_failed_cnt() { + max_failed=$1 + failed_count=$(python -m paddlerec.tools.k8s_tools count_pods_by_phase paddle-job=${PADDLE_JOB_NAME} Failed) + if [ $failed_count -gt $max_failed ]; then + stdbuf -oL echo "Failed trainer count beyond the threadhold: "$max_failed + echo "Failed trainer count beyond the threshold: " $max_failed >/dev/termination-log + exit 0 + fi +} + +check_trainer_ret() { + ret=$1 + stdbuf -oL echo "job returned $ret...setting pod return message..." + stdbuf -oL echo "===============================" + + if [ $ret -eq 136 ]; then + echo "Error Arithmetic Operation(Floating Point Exception)" >/dev/termination-log + elif [ $ret -eq 139 ]; then + echo "Segmentation Fault" >/dev/termination-log + elif [ $ret -eq 1 ]; then + echo "General Error" >/dev/termination-log + elif [ $ret -eq 134 ]; then + echo "Program Abort" >/dev/termination-log + fi + stdbuf -oL echo "termination log wroted..." + exit $ret +} + +start_fluid_process() { + pserver_label="paddle-job-pserver=${PADDLE_JOB_NAME}" + trainer_label="paddle-job=${PADDLE_JOB_NAME}" + hostname=${HOSTNAME} + task_index="" + + if [ "${PADDLE_TRAINING_ROLE}" == "TRAINER" ] || [ "${PADDLE_TRAINING_ROLE}" == "PSERVER" ]; then + stdbuf -oL python -m paddlerec.tools.k8s_tools wait_pods_running ${pserver_label} ${PADDLE_PSERVERS_NUM} + fi + + export PADDLE_PSERVERS_IP_PORT_LIST=$(python -m paddlerec.tools.k8s_tools fetch_endpoints ${pserver_label} ${PADDLE_PORT}) + export PADDLE_TRAINER_IPS=$(python -m paddlerec.tools.k8s_tools fetch_ips ${trainer_label}) + + if [ "${PADDLE_TRAINING_ROLE}" == "TRAINER" ]; then + check_failed_cnt 1 + task_index=$(python -m paddlerec.tools.k8s_tools fetch_id ${trainer_label}) + else + task_index=$(python -m paddlerec.tools.k8s_tools fetch_id ${pserver_label}) + fi + + export PADDLE_TRAINER_ID=${task_index} + export PADDLE_PSERVER_ID=${task_index} + + stdbuf -oL sh -c "${ENTRY}" + check_trainer_ret $? +} + +usage() { + echo "usage: paddle_k8s []:" + echo " start_fluid Start paddle fluid distributed training, set env" +} + +case "$1" in +start_fluid) + start_fluid_process + ;; +--help) + usage + ;; +*) + usage + ;; +esac diff --git a/core/engine/local_cluster.py b/core/engine/local_cluster.py index 68071299a95ff88161cb26d2d88252f3e5d00477..cf9b6032162a61b16e4f01552c23cff7312b3965 100755 --- a/core/engine/local_cluster.py +++ b/core/engine/local_cluster.py @@ -31,7 +31,7 @@ class LocalClusterEngine(Engine): server_num = self.envs["server_num"] ports = [self.envs["start_port"]] logs_dir = self.envs["log_dir"] - selected_gpus = self.envs["selected_gpus"].split(",") + default_env = os.environ.copy() current_env = copy.copy(default_env) current_env["CLUSTER_INSTANCE"] = "1" @@ -97,25 +97,8 @@ class LocalClusterEngine(Engine): stderr=fn, cwd=os.getcwd()) procs.append(proc) - - # only wait worker to finish here - for i, proc in enumerate(procs): - if i < server_num: - continue - procs[i].wait() - if len(log_fns) > 0: - log_fns[i].close() - - for i in range(server_num): - if len(log_fns) > 0: - log_fns[i].close() - procs[i].terminate() - print( - "all workers already completed, you can view logs under the `{}` directory". - format(logs_dir), - file=sys.stderr) elif fleet_mode.upper() == "COLLECTIVE": - + selected_gpus = self.envs["selected_gpus"].split(",") selected_gpus_num = len(selected_gpus) for i in range(selected_gpus_num - 1): @@ -150,5 +133,22 @@ class LocalClusterEngine(Engine): cwd=os.getcwd()) procs.append(proc) + # only wait worker to finish here + for i, proc in enumerate(procs): + if i < server_num: + continue + procs[i].wait() + if len(log_fns) > 0: + log_fns[i].close() + + for i in range(server_num): + if len(log_fns) > 0: + log_fns[i].close() + procs[i].terminate() + print( + "all workers already completed, you can view logs under the `{}` directory". + format(logs_dir), + file=sys.stderr) + def run(self): self.start_procs() diff --git a/core/utils/envs.py b/core/utils/envs.py index 36993a93a9ea1f83f1cd24fc80a1b9623b61b445..bfc18b148e9db719f0dff6cda7e5fee4f7ee2d2d 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -134,6 +134,10 @@ def os_path_adapter(value): def workspace_adapter(value): workspace = global_envs.get("workspace") + return workspace_adapter_by_specific(value, workspace) + + +def workspace_adapter_by_specific(value, workspace): workspace = paddlerec_adapter(workspace) value = value.replace("{workspace}", workspace) return value diff --git a/core/utils/validation.py b/core/utils/validation.py index 43c21d3e12d2ca84246e1d298b296c8eb5e76868..a2911cb36899430ec3a0f3b5a5c842a01470ccd9 100755 --- a/core/utils/validation.py +++ b/core/utils/validation.py @@ -117,7 +117,8 @@ def register(): in_value_handler) validations["train.epochs"] = ValueFormat("int", 1, ge_value_handler) validations["train.engine"] = ValueFormat( - "str", ["single", "local_cluster", "cluster"], in_value_handler) + "str", ["train", "infer", "local_cluster_train", "cluster_train"], + in_value_handler) requires = ["workspace", "dataset", "mode", "runner", "phase"] return validations, requires diff --git a/doc/design.md b/doc/design.md index 2a9d94533d2c40bb2f96b97fd2873364ee62f0cd..d9aef40e5bbc817a512d5304534e344ec79823b5 100644 --- a/doc/design.md +++ b/doc/design.md @@ -56,9 +56,9 @@ engine = which_engine(args) engine.run() ``` -我们以`single engine`为例,概览engine的行为: +我们以`单机 engine`为例,概览engine的行为: ```python -def single_train_engine(args): +def train_engine(args): _envs = envs.load_yaml(args.model) run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) trainer_class = run_extras.get( diff --git a/doc/model_develop.md b/doc/model_develop.md index 16eb7f57c3619b3221ebe0d4d90d84bb351887bc..da9523fac2e20258cd488f61ca07900772f5ce78 100644 --- a/doc/model_develop.md +++ b/doc/model_develop.md @@ -136,7 +136,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 10 save_checkpoint_interval: 2 diff --git a/doc/train.md b/doc/train.md index e629eb628521e0cd6012348db7222517218690d6..a448aaa024f8dc621cf1597f2f57b4ed446ddce4 100644 --- a/doc/train.md +++ b/doc/train.md @@ -77,7 +77,7 @@ mode: single_cpu_train # 执行名为 single_cpu_train 的运行器 runner: - name: single_cpu_train # 定义 runner 名为 single_cpu_train - class: train # 执行单机训练,亦可为 single_train + class: train # 执行单机训练 device: cpu # 执行在 cpu 上 epochs: 10 # 训练轮数 @@ -120,7 +120,7 @@ mode: single_gpu_train # 执行名为 single_gpu_train 的运行器 runner: - name: single_gpu_train # 定义 runner 名为 single_gpu_train - class: train # 执行单机训练,亦可为 single_train + class: train # 执行单机训练 device: gpu # 执行在 gpu 上 selected_gpus: "0" # 默认选择在id=0的卡上执行训练 epochs: 10 # 训练轮数 @@ -135,7 +135,7 @@ mode: single_multi_gpu_train # 执行名为 single_multi_gpu_train 的运行器 runner: - name: single_multi_gpu_train # 定义 runner 名为 single_multi_gpu_train - class: train # 执行单机训练,亦可为 single_train + class: train # 执行单机训练 device: gpu # 执行在 gpu 上 selected_gpus: "0,1,2,3" # 选择多卡执行训练 epochs: 10 # 训练轮数 @@ -149,7 +149,7 @@ mode: local_cluster_cpu_train # 执行名为 local_cluster_cpu_train 的运行 runner: - name: local_cluster_cpu_train # 定义 runner 名为 runner_train - class: local_cluster # 执行本地模拟分布式——参数服务器训练 + class: local_cluster_train # 执行本地模拟分布式——参数服务器训练 device: cpu # 执行在 cpu 上(paddle后续版本会支持PS-GPU) worker_num: 1 # (可选)worker进程数量,默认1 server_num: 1 # (可选)server进程数量,默认1 diff --git a/doc/trainer_develop.md b/doc/trainer_develop.md index be72e004c86750aade7bcf028799ffe8d6cd1c00..1b9f51b0dbf31d83baa36664f20add227b1fbce1 100644 --- a/doc/trainer_develop.md +++ b/doc/trainer_develop.md @@ -137,7 +137,7 @@ class TerminalBase(object): ```yaml runner: - name: train_runner - class: single_train + class: train epochs: 2 device: cpu instance_class_path: "{workspace}/your_instance.py" @@ -174,7 +174,7 @@ class Startup(StartupBase): ```yaml runner: - name: runner1 - class: single_train + class: train startup_class_path: "{workspace}/tdm_startup.py" epochs: 10 device: cpu diff --git a/doc/yaml.md b/doc/yaml.md index a369590941e447f3d25aee522957941822c149c4..4d08ef72253b5fa1371faa5890c53ba32c0ca2e6 100644 --- a/doc/yaml.md +++ b/doc/yaml.md @@ -15,7 +15,7 @@ | 名称 | 类型 | 取值 | 是否必须 | 作用描述 | | :---------------------------: | :----------: | :-------------------------------------------: | :------: | :------------------------------------------------------------------: | | name | string | 任意 | 是 | 指定runner名称 | -| class | string | train(默认) / infer / local_cluster / cluster | 是 | 指定运行runner的类别(单机/分布式, 训练/预测) | +| class | string | train(默认) / infer / local_cluster_train / cluster_train | 是 | 指定运行runner的类别(单机/分布式, 训练/预测) | | device | string | cpu(默认) / gpu | 否 | 程序执行设备 | | fleet_mode | string | ps(默认) / pslib / collective | 否 | 分布式运行模式 | | selected_gpus | string | "0"(默认) | 否 | 程序运行GPU卡号,若以"0,1"的方式指定多卡,则会默认启用collective模式 | diff --git a/models/contentunderstanding/classification/config.yaml b/models/contentunderstanding/classification/config.yaml index 9e0bdd1e851ada704eb2377efe0a82154fd2b371..3d1d387a6700edb3a322463e267045d9c2ff2e28 100644 --- a/models/contentunderstanding/classification/config.yaml +++ b/models/contentunderstanding/classification/config.yaml @@ -31,7 +31,7 @@ mode: runner1 runner: - name: runner1 - class: single_train + class: train epochs: 10 device: cpu save_checkpoint_interval: 2 diff --git a/models/contentunderstanding/tagspace/config.yaml b/models/contentunderstanding/tagspace/config.yaml index 8ca28f2977dd4bfd382e250e5c6513b156360404..3ac1e5c7866628a1abcf3216c12b392b4fe0d358 100644 --- a/models/contentunderstanding/tagspace/config.yaml +++ b/models/contentunderstanding/tagspace/config.yaml @@ -38,7 +38,7 @@ mode: runner1 runner: - name: runner1 - class: single_train + class: train epochs: 10 device: cpu save_checkpoint_interval: 2 diff --git a/models/demo/movie_recommand/rank/config.yaml b/models/demo/movie_recommand/rank/config.yaml index 4ad9e458a3a9bbf2a6c658c2d046807581b6c1f5..8f11d51f3ab8f4627e899dc208c8feb2109c8eab 100644 --- a/models/demo/movie_recommand/rank/config.yaml +++ b/models/demo/movie_recommand/rank/config.yaml @@ -55,7 +55,7 @@ mode: runner_train #mode: runner_infer runner: - name: runner_train - class: single_train + class: train save_checkpoint_interval: 1 # save model interval of epochs save_inference_interval: 1 # save inference save_checkpoint_path: "increment" # save checkpoint path diff --git a/models/demo/movie_recommand/recall/config.yaml b/models/demo/movie_recommand/recall/config.yaml index 91575611c41fea3e1a90d1a5eb77ad4c99ea7fbe..ca24df12d872bfe0937aecedce332e2219f783f1 100644 --- a/models/demo/movie_recommand/recall/config.yaml +++ b/models/demo/movie_recommand/recall/config.yaml @@ -55,7 +55,7 @@ mode: runner_train #mode: runner_infer runner: - name: runner_train - class: single_train + class: train save_checkpoint_interval: 1 # save model interval of epochs save_inference_interval: 1 # save inference save_checkpoint_path: "increment" # save checkpoint path diff --git a/models/match/dssm/config.yaml b/models/match/dssm/config.yaml index 5bc3ff7103d13944f10ae610f1cce09ed7fc3435..7d28f3ded2324dc1bd712652551e92b8d3d53f1e 100755 --- a/models/match/dssm/config.yaml +++ b/models/match/dssm/config.yaml @@ -42,7 +42,7 @@ mode: train_runner # runner is a kind of paddle training class, which wraps the train/infer process. runner: - name: train_runner - class: single_train + class: train # num of epochs epochs: 4 # device to run training or infer diff --git a/models/match/multiview-simnet/config.yaml b/models/match/multiview-simnet/config.yaml index 8370776a8118a525a5ca7c9bf59d0d0fbb38624c..3cf6c97f1ccda281f24eafd4babd81075ad28fdf 100755 --- a/models/match/multiview-simnet/config.yaml +++ b/models/match/multiview-simnet/config.yaml @@ -49,7 +49,7 @@ mode: train_runner # runner is a kind of paddle training class, which wraps the train/infer process. runner: - name: train_runner - class: single_train + class: train # num of epochs epochs: 2 # device to run training or infer diff --git a/models/multitask/mmoe/config.yaml b/models/multitask/mmoe/config.yaml index 9f36f84991ea30ffeb1745bc2d769b19a9887ab2..751ed1986d769e409db61ef54e528d6af4a8f6e2 100644 --- a/models/multitask/mmoe/config.yaml +++ b/models/multitask/mmoe/config.yaml @@ -43,7 +43,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/multitask/share-bottom/config.yaml b/models/multitask/share-bottom/config.yaml index 3a44b8e7b23a545e5daf67a789a0c3537f614c4e..ebbdfcc20b356f61f5b469f9e46e3e79415fe362 100644 --- a/models/multitask/share-bottom/config.yaml +++ b/models/multitask/share-bottom/config.yaml @@ -42,7 +42,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/rank/afm/config.yaml b/models/rank/afm/config.yaml index 4204cb74106eaa4ff64864dd2d7e36863b2b01b0..81cc01a6ea095a1f9118e251ccc006569105f985 100644 --- a/models/rank/afm/config.yaml +++ b/models/rank/afm/config.yaml @@ -49,7 +49,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 2 device: cpu init_model_path: "" diff --git a/models/rank/dcn/config.yaml b/models/rank/dcn/config.yaml index 6fbdadc3fb52b0f3e0d63355e79eff02050fd98c..e7538744f264301c7974e1f4e20e9901aa7be76f 100755 --- a/models/rank/dcn/config.yaml +++ b/models/rank/dcn/config.yaml @@ -50,7 +50,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/deep_crossing/config.yaml b/models/rank/deep_crossing/config.yaml index a46aa25aec59a7e83c69d23be24122553a608d06..5033cd5627fb2e3a8fcce7af32a1b9fb73fca6fb 100755 --- a/models/rank/deep_crossing/config.yaml +++ b/models/rank/deep_crossing/config.yaml @@ -48,7 +48,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/deepfm/config.yaml b/models/rank/deepfm/config.yaml index c4a22480e8ebd355cb2c8b58e6fe46557932d4f5..8443d45ae269320fa7af8f4a8ed4827bd55d03d4 100755 --- a/models/rank/deepfm/config.yaml +++ b/models/rank/deepfm/config.yaml @@ -48,7 +48,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 2 device: cpu init_model_path: "" diff --git a/models/rank/din/config.yaml b/models/rank/din/config.yaml index 3156255adcd3447cb3a0a7bb8419cecfda5c8e44..ab327885fba35d08aca86799b5c14fb5b159d03d 100755 --- a/models/rank/din/config.yaml +++ b/models/rank/din/config.yaml @@ -46,7 +46,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/ffm/config.yaml b/models/rank/ffm/config.yaml index 84968b3516ecd94ece1e2220652710a0cae2f952..863ff6c01e87e43e0cdab72da9d96ab41c351117 100644 --- a/models/rank/ffm/config.yaml +++ b/models/rank/ffm/config.yaml @@ -47,7 +47,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/fgcnn/config.yaml b/models/rank/fgcnn/config.yaml index 74df4274b5934e2f1b8f97e3543ffd5398030afa..24ee2636bc0e8edec50b5d2c021235f847ccca56 100755 --- a/models/rank/fgcnn/config.yaml +++ b/models/rank/fgcnn/config.yaml @@ -52,7 +52,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/fm/config.yaml b/models/rank/fm/config.yaml index 10dac82a51444591127388e2de3f485a68ff9959..617d727c7b2086c8f1eabb01d0f2829a933526f6 100644 --- a/models/rank/fm/config.yaml +++ b/models/rank/fm/config.yaml @@ -47,7 +47,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/fnn/config.yaml b/models/rank/fnn/config.yaml index bf8dd1e49644b5166eb580c9a8e222ca19174b31..16e985e24fa5d1dc4d144098ebd768dd1940d4da 100755 --- a/models/rank/fnn/config.yaml +++ b/models/rank/fnn/config.yaml @@ -47,7 +47,7 @@ mode: train_FM_runner #for FM phase: train_FM_runner for dnn phase: train_DNN_ru runner: - name: train_FM_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" @@ -57,7 +57,7 @@ runner: save_inference_path: "inference" print_interval: 1 - name: train_DNN_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "increment/0" diff --git a/models/rank/logistic_regression/config.yaml b/models/rank/logistic_regression/config.yaml index deddadc55c7c137dffd99494957f8a8da6a6b248..4dbb48cab1c343a8df74330adf511238c2861e8b 100644 --- a/models/rank/logistic_regression/config.yaml +++ b/models/rank/logistic_regression/config.yaml @@ -46,7 +46,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 2 device: cpu init_model_path: "" diff --git a/models/rank/nfm/config.yaml b/models/rank/nfm/config.yaml index 7a12129f680817d2d0b84119a39345544f403739..ffea34621030917b848b5ba2ab8a65f5ddae3d7e 100644 --- a/models/rank/nfm/config.yaml +++ b/models/rank/nfm/config.yaml @@ -53,7 +53,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/pnn/config.yaml b/models/rank/pnn/config.yaml index 7c9a6631e9920c3634e57ae4dbab24542e094617..836e3175fd580f424208d39106438ff916e4149a 100644 --- a/models/rank/pnn/config.yaml +++ b/models/rank/pnn/config.yaml @@ -50,7 +50,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/wide_deep/config.yaml b/models/rank/wide_deep/config.yaml index 9f16c8e84298db02cf080a6719151786ba22d617..16c112050a2b6a7a8e9193c5124327695b72afb1 100755 --- a/models/rank/wide_deep/config.yaml +++ b/models/rank/wide_deep/config.yaml @@ -45,7 +45,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/rank/xdeepfm/config.yaml b/models/rank/xdeepfm/config.yaml index 1ab1c1b4f844afdade9cde580acfe658bcf43a75..0571a88bd7da3d5c5661d2bee4246bd942d48300 100755 --- a/models/rank/xdeepfm/config.yaml +++ b/models/rank/xdeepfm/config.yaml @@ -46,7 +46,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train epochs: 1 device: cpu init_model_path: "" diff --git a/models/recall/fasttext/config.yaml b/models/recall/fasttext/config.yaml index f11ef416077a78d82deabc511127d8017e30678c..87da67f7da5e674eafbb13758fab468c748f1a12 100644 --- a/models/recall/fasttext/config.yaml +++ b/models/recall/fasttext/config.yaml @@ -50,7 +50,7 @@ mode: train_runner # runner is a kind of paddle training class, which wraps the train/infer process. runner: - name: train_runner - class: single_train + class: train # num of epochs epochs: 2 # device to run training or infer diff --git a/models/recall/gnn/config.yaml b/models/recall/gnn/config.yaml index d87854d8b8ab35206201272ec0e920c46432ac6b..88ff55f5ef50ad1a34081c5c47c12fe233738cb2 100755 --- a/models/recall/gnn/config.yaml +++ b/models/recall/gnn/config.yaml @@ -47,7 +47,7 @@ mode: train_runner # runner is a kind of paddle training class, which wraps the train/infer process. runner: - name: train_runner - class: single_train + class: train # num of epochs epochs: 2 # device to run training or infer diff --git a/models/recall/gru4rec/config.yaml b/models/recall/gru4rec/config.yaml index d59d03a50c4bb3b147041bcf0e0c6e596632fe20..b74db3dddcda54fff83740280e5b8b9a159d9d95 100644 --- a/models/recall/gru4rec/config.yaml +++ b/models/recall/gru4rec/config.yaml @@ -45,7 +45,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/recall/ncf/config.yaml b/models/recall/ncf/config.yaml index 779525c26cf0f232b4fdaccfe1d49f0ec1d655cf..2d603397ae8e4dd82dcf1659af70759228f75a25 100644 --- a/models/recall/ncf/config.yaml +++ b/models/recall/ncf/config.yaml @@ -42,7 +42,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/recall/ssr/config.yaml b/models/recall/ssr/config.yaml index 5aa86943dcf0804891fb19d20dd004bbc1ac9194..ae23609ec939fdd7c64f2fa9337e80426ced0aea 100644 --- a/models/recall/ssr/config.yaml +++ b/models/recall/ssr/config.yaml @@ -41,7 +41,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/recall/word2vec/config.yaml b/models/recall/word2vec/config.yaml index a3d799aa5c2d4dbaf11a4568aa988ca8f1af188a..3626198626ceed12bb18d05ba9f51dbd8d6f7cdd 100755 --- a/models/recall/word2vec/config.yaml +++ b/models/recall/word2vec/config.yaml @@ -47,7 +47,7 @@ mode: train_runner # runner is a kind of paddle training class, which wraps the train/infer process. runner: - name: train_runner - class: single_train + class: train # num of epochs epochs: 2 # device to run training or infer diff --git a/models/recall/youtube_dnn/config.yaml b/models/recall/youtube_dnn/config.yaml index 5bbc41a9e850044101fa844fca256db358dc1754..94aff6c692fc878a7e169af623f609eee0c767b8 100644 --- a/models/recall/youtube_dnn/config.yaml +++ b/models/recall/youtube_dnn/config.yaml @@ -38,7 +38,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/rerank/listwise/config.yaml b/models/rerank/listwise/config.yaml index 2ddfa32fe08aa8bece00727aefc46bb893b4d090..8891a057e2e93b026653642ed97dd8325f5f0afa 100644 --- a/models/rerank/listwise/config.yaml +++ b/models/rerank/listwise/config.yaml @@ -43,7 +43,7 @@ mode: train_runner runner: - name: train_runner - class: single_train + class: train device: cpu epochs: 3 save_checkpoint_interval: 2 diff --git a/models/treebased/tdm/config.yaml b/models/treebased/tdm/config.yaml index 74e82bba3ae710edea99234e0a56e790613e17d5..3ed4a2572163eb44815407af4062973077e69d66 100755 --- a/models/treebased/tdm/config.yaml +++ b/models/treebased/tdm/config.yaml @@ -64,7 +64,7 @@ mode: runner1 # runner is a kind of paddle training class, which wraps the train/infer process. runner: - name: runner1 - class: single_train + class: train startup_class_path: "{workspace}/tdm_startup.py" # num of epochs epochs: 10 diff --git a/run.py b/run.py index 7d703178446913df89447028a8e3a44210da12ba..1abefd5f391ee30591d2dd3a3e583927b1bd4f11 100755 --- a/run.py +++ b/run.py @@ -26,10 +26,8 @@ from paddlerec.core.utils import validation engines = {} device = ["CPU", "GPU"] -engine_choices = [ - "TRAIN", "SINGLE_TRAIN", "INFER", "SINGLE_INFER", "LOCAL_CLUSTER", - "LOCAL_CLUSTER_TRAIN", "CLUSTER_TRAIN" -] + +engine_choices = ["TRAIN", "INFER", "LOCAL_CLUSTER_TRAIN", "CLUSTER_TRAIN"] def engine_registry(): @@ -37,16 +35,11 @@ def engine_registry(): engines["PSLIB"] = {} engines["TRANSPILER"]["TRAIN"] = single_train_engine - engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine engines["TRANSPILER"]["INFER"] = single_infer_engine - engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine - engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine - engines["PSLIB"]["SINGLE_TRAIN"] = local_mpi_engine engines["PSLIB"]["TRAIN"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine - engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine @@ -142,7 +135,7 @@ def get_engine(args, running_config, mode): selected_gpus_num = len(selected_gpus.split(",")) if selected_gpus_num > 1: - engine = "LOCAL_CLUSTER" + engine = "LOCAL_CLUSTER_TRAIN" if engine not in engine_choices: raise ValueError("{} can not be chosen in {}".format(engine_class, @@ -219,7 +212,6 @@ def single_train_engine(args): def single_infer_engine(args): - _envs = envs.load_yaml(args.model) run_extras = get_all_inters_from_yaml(args.model, ["runner."]) mode = envs.get_runtime_environ("mode") @@ -260,48 +252,60 @@ def single_infer_engine(args): def cluster_engine(args): def master(): - role = "MASTER" from paddlerec.core.engine.cluster.cluster import ClusterEngine _envs = envs.load_yaml(args.backend) flattens = envs.flatten_environs(_envs, "_") - flattens["engine_role"] = role + flattens["engine_role"] = "MASTER" + flattens["engine_mode"] = envs.get_runtime_environ("mode") flattens["engine_run_config"] = args.model flattens["engine_temp_path"] = tempfile.mkdtemp() envs.set_runtime_environs(flattens) + ClusterEngine.workspace_replace() print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value"))) launch = ClusterEngine(None, args.model) return launch - def worker(): - role = "WORKER" + def worker(mode): + if not mode: + raise ValueError("mode: {} can not be recognized") + + run_extras = get_all_inters_from_yaml(args.model, ["runner."]) + + trainer_class = ".".join(["runner", mode, "trainer_class"]) + fleet_class = ".".join(["runner", mode, "fleet_mode"]) + device_class = ".".join(["runner", mode, "device"]) + selected_gpus_class = ".".join(["runner", mode, "selected_gpus"]) + strategy_class = ".".join(["runner", mode, "distribute_strategy"]) + worker_class = ".".join(["runner", mode, "worker_num"]) + server_class = ".".join(["runner", mode, "server_num"]) + + trainer = run_extras.get(trainer_class, "GeneralTrainer") + fleet_mode = run_extras.get(fleet_class, "ps") + device = run_extras.get(device_class, "cpu") + selected_gpus = run_extras.get(selected_gpus_class, "0") + distributed_strategy = run_extras.get(strategy_class, "async") + worker_num = run_extras.get(worker_class, 1) + server_num = run_extras.get(server_class, 1) + executor_mode = "train" - _envs = envs.load_yaml(args.model) - run_extras = get_all_inters_from_yaml(args.model, - ["train.", "runner."]) - trainer_class = run_extras.get( - "runner." + _envs["mode"] + ".trainer_class", None) + device = device.upper() + fleet_mode = fleet_mode.upper() - if trainer_class: - trainer = trainer_class - else: - trainer = "GeneralTrainer" + if fleet_mode == "COLLECTIVE" and device != "GPU": + raise ValueError("COLLECTIVE can not be used with GPU") - executor_mode = "train" + cluster_envs = {} - distributed_strategy = run_extras.get( - "runner." + _envs["mode"] + ".distribute_strategy", "async") - selected_gpus = run_extras.get( - "runner." + _envs["mode"] + ".selected_gpus", "0") - fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", - "ps") + if device == "GPU": + cluster_envs["selected_gpus"] = selected_gpus - cluster_envs = {} - cluster_envs["selected_gpus"] = selected_gpus + cluster_envs["server_num"] = server_num + cluster_envs["worker_num"] = worker_num cluster_envs["fleet_mode"] = fleet_mode cluster_envs["train.trainer.trainer"] = trainer - cluster_envs["train.trainer.executor_mode"] = executor_mode cluster_envs["train.trainer.engine"] = "cluster" + cluster_envs["train.trainer.executor_mode"] = executor_mode cluster_envs["train.trainer.strategy"] = distributed_strategy cluster_envs["train.trainer.threads"] = envs.get_runtime_environ( "CPU_NUM") @@ -316,7 +320,8 @@ def cluster_engine(args): role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER") if role == "WORKER": - return worker() + mode = os.getenv("PADDLE_PADDLEREC_MODE", None) + return worker(mode) else: return master() @@ -336,39 +341,83 @@ def cluster_mpi_engine(args): def local_cluster_engine(args): - from paddlerec.core.engine.local_cluster import LocalClusterEngine + def get_worker_num(run_extras, workers): + _envs = envs.load_yaml(args.model) + mode = envs.get_runtime_environ("mode") + workspace = envs.get_runtime_environ("workspace") + phases_class = ".".join(["runner", mode, "phases"]) + phase_names = run_extras.get(phases_class) + phases = [] + all_phases = _envs.get("phase") + if phase_names is None: + phases = all_phases + else: + for phase in all_phases: + if phase["name"] in phase_names: + phases.append(phase) + + dataset_names = [] + for phase in phases: + dataset_names.append(phase["dataset_name"]) + + datapaths = [] + for dataset in _envs.get("dataset"): + if dataset["name"] in dataset_names: + datapaths.append(dataset["data_path"]) + + if not datapaths: + raise ValueError("data path must exist for training/inference") + + datapaths = [ + envs.workspace_adapter_by_specific(path, workspace) + for path in datapaths + ] + all_workers = [len(os.listdir(path)) for path in datapaths] + all_workers.append(workers) + return min(all_workers) - _envs = envs.load_yaml(args.model) - run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) - trainer_class = run_extras.get("runner." + _envs["mode"] + ".runner_class", - None) + from paddlerec.core.engine.local_cluster import LocalClusterEngine - if trainer_class: - trainer = trainer_class - else: - trainer = "GeneralTrainer" + run_extras = get_all_inters_from_yaml(args.model, ["runner."]) + mode = envs.get_runtime_environ("mode") + trainer_class = ".".join(["runner", mode, "trainer_class"]) + fleet_class = ".".join(["runner", mode, "fleet_mode"]) + device_class = ".".join(["runner", mode, "device"]) + selected_gpus_class = ".".join(["runner", mode, "selected_gpus"]) + strategy_class = ".".join(["runner", mode, "distribute_strategy"]) + worker_class = ".".join(["runner", mode, "worker_num"]) + server_class = ".".join(["runner", mode, "server_num"]) + trainer = run_extras.get(trainer_class, "GeneralTrainer") + fleet_mode = run_extras.get(fleet_class, "ps") + device = run_extras.get(device_class, "cpu") + selected_gpus = run_extras.get(selected_gpus_class, "0") + distributed_strategy = run_extras.get(strategy_class, "async") executor_mode = "train" - distributed_strategy = run_extras.get( - "runner." + _envs["mode"] + ".distribute_strategy", "async") - - worker_num = run_extras.get("runner." + _envs["mode"] + ".worker_num", 1) - server_num = run_extras.get("runner." + _envs["mode"] + ".server_num", 1) - selected_gpus = run_extras.get( - "runner." + _envs["mode"] + ".selected_gpus", "0") - - fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", "") - if fleet_mode == "": - device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu") - if len(selected_gpus.split(",")) > 1 and device.upper() == "GPU": - fleet_mode = "COLLECTIVE" - else: - fleet_mode = "PS" + + worker_num = run_extras.get(worker_class, 1) + server_num = run_extras.get(server_class, 1) + max_worker_num = get_worker_num(run_extras, worker_num) + + if max_worker_num < worker_num: + print( + "has phase do not have enough datas for training, set worker num from {} to {}". + format(worker_num, max_worker_num)) + worker_num = max_worker_num + + device = device.upper() + fleet_mode = fleet_mode.upper() + + if fleet_mode == "COLLECTIVE" and device != "GPU": + raise ValueError("COLLECTIVE can not be used with GPU") cluster_envs = {} + + if device == "GPU": + cluster_envs["selected_gpus"] = selected_gpus + cluster_envs["server_num"] = server_num cluster_envs["worker_num"] = worker_num - cluster_envs["selected_gpus"] = selected_gpus cluster_envs["start_port"] = envs.find_free_port() cluster_envs["fleet_mode"] = fleet_mode cluster_envs["log_dir"] = "logs" @@ -376,10 +425,10 @@ def local_cluster_engine(args): cluster_envs["train.trainer.executor_mode"] = executor_mode cluster_envs["train.trainer.strategy"] = distributed_strategy cluster_envs["train.trainer.threads"] = "2" + cluster_envs["CPU_NUM"] = cluster_envs["train.trainer.threads"] cluster_envs["train.trainer.engine"] = "local_cluster" cluster_envs["train.trainer.platform"] = envs.get_platform() - cluster_envs["CPU_NUM"] = "2" print("launch {} engine with cluster to run model: {}".format(trainer, args.model)) @@ -400,20 +449,16 @@ def local_mpi_engine(args): if not mpi: raise RuntimeError("can not find mpirun, please check environment") - _envs = envs.load_yaml(args.model) - run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) - trainer_class = run_extras.get("runner." + _envs["mode"] + ".runner_class", - None) + run_extras = get_all_inters_from_yaml(args.model, ["runner."]) + + mode = envs.get_runtime_environ("mode") + trainer_class = ".".join(["runner", mode, "trainer_class"]) + fleet_class = ".".join(["runner", mode, "fleet_mode"]) + distributed_strategy = "async" executor_mode = "train" - distributed_strategy = run_extras.get( - "runner." + _envs["mode"] + ".distribute_strategy", "async") - fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", - "ps") - if trainer_class: - trainer = trainer_class - else: - trainer = "GeneralTrainer" + trainer = run_extras.get(trainer_class, "GeneralTrainer") + fleet_mode = run_extras.get(fleet_class, "ps") cluster_envs = {} cluster_envs["mpirun"] = mpi @@ -424,7 +469,6 @@ def local_mpi_engine(args): cluster_envs["fleet_mode"] = fleet_mode cluster_envs["train.trainer.strategy"] = distributed_strategy cluster_envs["train.trainer.threads"] = "2" - cluster_envs["train.trainer.engine"] = "local_cluster" cluster_envs["train.trainer.platform"] = envs.get_platform() set_runtime_envs(cluster_envs, args.model) @@ -458,11 +502,15 @@ if __name__ == "__main__": sys.exit(-1) engine_registry() - running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."]) + running_config = get_all_inters_from_yaml( + args.model, ["workspace", "mode", "runner."]) modes = get_modes(running_config) for mode in modes: - envs.set_runtime_environs({"mode": mode}) + envs.set_runtime_environs({ + "mode": mode, + "workspace": running_config["workspace"] + }) which_engine = get_engine(args, running_config, mode) engine = which_engine(args) engine.run() diff --git a/tools/k8s_tools.py b/tools/k8s_tools.py new file mode 100644 index 0000000000000000000000000000000000000000..c3b66fbd89358a70731230b875f27f68c34f13e4 --- /dev/null +++ b/tools/k8s_tools.py @@ -0,0 +1,185 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# !/bin/env python +import os +import sys +import time +import socket +from kubernetes import client, config + +NAMESPACE = os.getenv("NAMESPACE") +if os.getenv("KUBERNETES_SERVICE_HOST", None): + config.load_incluster_config() +else: + config.load_kube_config() +v1 = client.CoreV1Api() + + +def get_pod_status(item): + phase = item.status.phase + + # check terminate time although phase is Running. + if item.metadata.deletion_timestamp != None: + return "Terminating" + + return phase + + +def containers_all_ready(label_selector): + def container_statuses_ready(item): + container_statuses = item.status.container_statuses + + for status in container_statuses: + if not status.ready: + return False + return True + + api_response = v1.list_namespaced_pod( + namespace=NAMESPACE, pretty=True, label_selector=label_selector) + + for item in api_response.items: + if not container_statuses_ready(item): + return False + + return True + + +def fetch_pods_info(label_selector, phase=None): + api_response = v1.list_namespaced_pod( + namespace=NAMESPACE, pretty=True, label_selector=label_selector) + pod_list = [] + for item in api_response.items: + if phase is not None and get_pod_status(item) != phase: + continue + + pod_list.append( + (item.status.phase, item.status.pod_ip, item.metadata.name)) + return pod_list + + +def wait_pods_running(label_selector, desired): + print("label selector: %s, desired: %s" % (label_selector, desired)) + while True: + count = count_pods_by_phase(label_selector, 'Running') + # NOTE: pods may be scaled. + if count >= int(desired): + break + print('current cnt: %d sleep for 5 seconds...' % count) + time.sleep(5) + + +def wait_containers_ready(label_selector): + print("label selector: %s, wait all containers ready" % (label_selector)) + while True: + if containers_all_ready(label_selector): + break + print('not all containers ready, sleep for 5 seconds...') + time.sleep(5) + + +def count_pods_by_phase(label_selector, phase): + pod_list = fetch_pods_info(label_selector, phase) + return len(pod_list) + + +def fetch_ips_list(label_selector, phase=None): + pod_list = fetch_pods_info(label_selector, phase) + ips = [item[1] for item in pod_list] + ips.sort() + return ips + + +def fetch_name_list(label_selector, phase=None): + pod_list = fetch_pods_info(label_selector, phase) + names = [item[2] for item in pod_list] + names.sort() + return names + + +def fetch_ips_string(label_selector, phase=None): + ips = fetch_ips_list(label_selector, phase) + return ",".join(ips) + + +def fetch_endpoints_string(label_selector, port, phase=None, sameport=True): + ips = fetch_ips_list(label_selector, phase) + if sameport: + ips = ["{0}:{1}".format(ip, port) for ip in ips] + else: + srcips = ips + ips = [] + port = int(port) + for ip in srcips: + ips.append("{0}:{1}".format(ip, port)) + port = port + 1 + return ",".join(ips) + + +def fetch_pod_id(label_selector, phase=None, byname=True): + if byname: + names = fetch_name_list(label_selector, phase=phase) + + local_name = os.getenv('POD_NAME') + for i in xrange(len(names)): + if names[i] == local_name: + return i + + return None + else: + ips = fetch_ips_list(label_selector, phase=phase) + + local_ip = socket.gethostbyname(socket.gethostname()) + for i in xrange(len(ips)): + if ips[i] == local_ip: + return i + + # in minikube there can be one node only + local_ip = os.getenv("POD_IP") + for i in xrange(len(ips)): + if ips[i] == local_ip: + return i + + return None + + +def fetch_ips(label_selector): + return fetch_ips_string(label_selector, phase="Running") + + +def fetch_endpoints(label_selector, port): + return fetch_endpoints_string( + label_selector, port=port, phase="Running", sameport=True) + + +def fetch_id(label_selector): + return fetch_pod_id(label_selector, phase="Running") + + +if __name__ == "__main__": + command = sys.argv[1] + if command == "fetch_ips": + print(fetch_ips(sys.argv[2])) + if command == "fetch_ips_string": + print(fetch_ips_string(sys.argv[2], sys.argv[3])) + elif command == "fetch_endpoints": + print(fetch_endpoints(sys.argv[2], sys.argv[3])) + elif command == "fetch_id": + print(fetch_id(sys.argv[2])) + elif command == "count_pods_by_phase": + print(count_pods_by_phase(sys.argv[2], sys.argv[3])) + elif command == "wait_pods_running": + wait_pods_running(sys.argv[2], sys.argv[3]) + elif command == "wait_containers_ready": + wait_containers_ready(sys.argv[2])