#!/bin/bash start_pserver() { stdbuf -oL paddle pserver \ --use_gpu=0 \ --port=$PADDLE_INIT_PORT \ --ports_num=$PADDLE_INIT_PORTS_NUM \ --ports_num_for_sparse=$PADDLE_INIT_PORTS_NUM_FOR_SPARSE \ --nics=$PADDLE_INIT_NICS \ --comment=paddle_process_k8s \ --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS } start_new_pserver() { stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-master=${PADDLE_JOB_NAME} 1 export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip) stdbuf -oL /usr/bin/pserver \ -port=$PADDLE_INIT_PORT \ -num-pservers=$PSERVERS \ -log-level=debug \ -etcd-endpoint=http://$MASTER_IP:2379 } start_master() { stdbuf -oL /usr/bin/master \ -port=8080 \ -chunk-per-task=1\ -task-timout-dur=16s\ -endpoints=http://127.0.0.1:2379 } check_failed_cnt() { max_failed=$1 failed_count=$(python /root/k8s_tools.py count_pods_by_phase paddle-job=${PADDLE_JOB_NAME} Failed) if [ $failed_count -gt $max_failed ]; then stdbuf -oL echo "Failed trainer count beyond the threadhold: "$max_failed echo "Failed trainer count beyond the threshold: " $max_failed > /dev/termination-log exit 0 fi } check_trainer_ret() { ret=$1 stdbuf -oL echo "job returned $ret...setting pod return message..." stdbuf -oL echo "===============================" if [ $ret -eq 136 ] ; then echo "Error Arithmetic Operation(Floating Point Exception)" > /dev/termination-log elif [ $ret -eq 139 ] ; then echo "Segmentation Fault" > /dev/termination-log elif [ $ret -eq 1 ] ; then echo "General Error" > /dev/termination-log elif [ $ret -eq 134 ] ; then echo "Program Abort" > /dev/termination-log fi stdbuf -oL echo "termination log wroted..." exit $ret } start_fluid_process() { stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS} if [ "${TRAINING_ROLE}" == "TRAINER" ]; then check_failed_cnt ${TRAINERS} sleep 5 export PADDLE_INIT_TRAINER_ID=$(python /root/k8s_tools.py fetch_trainer_id) fi export PADDLE_INIT_PSERVERS=$(python /root/k8s_tools.py fetch_pserver_ips) stdbuf -oL sh -c "${ENTRY}" check_trainer_ret $? } start_new_trainer() { # FIXME(Yancey1989): use command-line interface to configure the max failed count check_failed_cnt ${TRAINERS} stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS} sleep 5 stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-master=${PADDLE_JOB_NAME} 1 export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip) export ETCD_IP="$MASTER_IP" # NOTE: $TRAINER_PACKAGE may be large, do not copy export PYTHONPATH=$TRAINER_PACKAGE:$PYTHONPATH cd $TRAINER_PACKAGE stdbuf -oL echo "Starting training job: " $TRAINER_PACKAGE, "num_gradient_servers:" \ $PADDLE_INIT_NUM_GRADIENT_SERVERS, "version: " $1 stdbuf -oL sh -c "${ENTRY}" check_trainer_ret $? } start_trainer() { # paddle v1 and V2 distributed training does not allow any trainer failed. check_failed_cnt 0 stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS} stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job=${PADDLE_JOB_NAME} ${TRAINERS} export PADDLE_INIT_PSERVERS=$(python /root/k8s_tools.py fetch_pserver_ips) export PADDLE_INIT_TRAINER_ID=$(python /root/k8s_tools.py fetch_trainer_id) stdbuf -oL echo $PADDLE_INIT_TRAINER_ID > /trainer_id # FIXME: /trainer_count = PADDLE_INIT_NUM_GRADIENT_SERVERS stdbuf -oL echo $PADDLE_INIT_NUM_GRADIENT_SERVERS > /trainer_count # NOTE: $TRAINER_PACKAGE may be large, do not copy export PYTHONPATH=$TRAINER_PACKAGE:$PYTHONPATH cd $TRAINER_PACKAGE stdbuf -oL echo "Starting training job: " $TRAINER_PACKAGE, "num_gradient_servers:" \ $PADDLE_INIT_NUM_GRADIENT_SERVERS, "trainer_id: " $PADDLE_INIT_TRAINER_ID, \ "version: " $1 # FIXME: If we use the new PServer by Golang, add Kubernetes healthz # to wait PServer process get ready.Now only sleep 20 seconds. sleep 20 case "$1" in "v1") FILE_COUNT=$(wc -l $TRAIN_LIST | awk '{print $1}') if [ $FILE_COUNT -le $PADDLE_INIT_NUM_GRADIENT_SERVERS ]; then echo "file count less than trainers" check_trainer_ret 0 fi let lines_per_node="$FILE_COUNT / ($PADDLE_INIT_NUM_GRADIENT_SERVERS + 1)" echo "spliting file to" $lines_per_node cp $TRAIN_LIST / cd / split -l $lines_per_node -d -a 3 $TRAIN_LIST train.list CURRENT_LIST=$(printf "train.list%03d" $PADDLE_INIT_TRAINER_ID) # always use /train.list for paddle v1 for each node. echo "File for current node ${CURRENT_LIST}" sleep 10 cp $CURRENT_LIST train.list cd $TRAINER_PACKAGE stdbuf -oL paddle train \ --port=$PADDLE_INIT_PORT \ --nics=$PADDLE_INIT_NICS \ --ports_num=$PADDLE_INIT_PORTS_NUM \ --ports_num_for_sparse=$PADDLE_INIT_PORTS_NUM_FOR_SPARSE \ --num_passes=$PADDLE_INIT_NUM_PASSES \ --trainer_count=$PADDLE_INIT_TRAINER_COUNT \ --saving_period=1 \ --log_period=20 \ --local=0 \ --rdma_tcp=tcp \ --config=$TOPOLOGY \ --use_gpu=$PADDLE_INIT_USE_GPU \ --trainer_id=$PADDLE_INIT_TRAINER_ID \ --save_dir=$OUTPUT \ --pservers=$PADDLE_INIT_PSERVERS \ --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS # paddle v1 API does not allow any trainer failed. check_trainer_ret $? ;; "v2") stdbuf -oL sh -c "${ENTRY}" # paddle v2 API does not allow any trainer failed. check_trainer_ret $? ;; *) ;; esac } usage() { echo "usage: paddle_k8s []:" echo " start_trainer [v1|v2] Start a trainer process with v1 or v2 API" echo " start_pserver Start a pserver process" echo " start_new_pserver Start a new pserver process" echo " start_new_trainer Start a new triner process" } case "$1" in start_pserver) start_pserver ;; start_trainer) start_trainer $2 ;; start_new_trainer) start_new_trainer ;; start_new_pserver) start_new_pserver ;; start_master) start_master ;; start_fluid) start_fluid_process ;; --help) usage ;; *) usage ;; esac