paddle_k8s 6.5 KB
Newer Older
T
typhoonzero 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
#!/bin/bash
start_pserver() {
    stdbuf -oL paddle pserver \
      --use_gpu=0 \
      --port=$PADDLE_INIT_PORT \
      --ports_num=$PADDLE_INIT_PORTS_NUM \
      --ports_num_for_sparse=$PADDLE_INIT_PORTS_NUM_FOR_SPARSE \
      --nics=$PADDLE_INIT_NICS \
      --comment=paddle_process_k8s \
      --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS
}

start_new_pserver() {
  stdbuf -oL python /root/k8s_tools.py wait_pods_running  paddle-job-master=${PADDLE_JOB_NAME} 1
  export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip)
  stdbuf -oL /usr/bin/pserver \
    -port=$PADDLE_INIT_PORT \
    -num-pservers=$PSERVERS \
    -log-level=debug \
    -etcd-endpoint=http://$MASTER_IP:2379
}

start_master() {
  stdbuf -oL /usr/bin/master \
  -port=8080 \
  -chunk-per-task=1\
  -task-timout-dur=16s\
  -endpoints=http://127.0.0.1:2379
}

check_failed_cnt() {
  max_failed=$1
  failed_count=$(python /root/k8s_tools.py count_pods_by_phase paddle-job=${PADDLE_JOB_NAME} Failed) 
  if [ $failed_count -gt $max_failed ]; then
    stdbuf -oL echo "Failed trainer count beyond the threadhold: "$max_failed
    echo "Failed trainer count beyond the threshold: " $max_failed > /dev/termination-log 
    exit 0
  fi
}

check_trainer_ret() {
  ret=$1
  stdbuf -oL echo "job returned $ret...setting pod return message..."
  stdbuf -oL echo "==============================="

  if [ $ret -eq 136 ] ; then
    echo "Error Arithmetic Operation(Floating Point Exception)" > /dev/termination-log
  elif [ $ret -eq 139 ] ; then
    echo "Segmentation Fault" > /dev/termination-log
  elif [ $ret -eq 1 ] ; then
    echo "General Error" > /dev/termination-log
  elif [ $ret -eq 134 ] ; then
    echo "Program Abort" > /dev/termination-log
  fi
  stdbuf -oL echo "termination log wroted..."
  exit $ret
}

start_fluid_process() {
  stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS}
  if [ "${TRAINING_ROLE}" == "TRAINER" ]; then
    check_failed_cnt ${TRAINERS}
    sleep 5
    export PADDLE_INIT_TRAINER_ID=$(python /root/k8s_tools.py fetch_trainer_id)
  fi
  export PADDLE_INIT_PSERVERS=$(python /root/k8s_tools.py fetch_pserver_ips)
  stdbuf -oL sh -c "${ENTRY}"
  check_trainer_ret $?
}

start_new_trainer() {
  # FIXME(Yancey1989): use command-line interface to configure the max failed count
  check_failed_cnt ${TRAINERS}
  stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS}
  sleep 5
  stdbuf -oL python /root/k8s_tools.py wait_pods_running  paddle-job-master=${PADDLE_JOB_NAME} 1
  export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip)
  export ETCD_IP="$MASTER_IP"

  # NOTE: $TRAINER_PACKAGE may be large, do not copy
  export PYTHONPATH=$TRAINER_PACKAGE:$PYTHONPATH
  cd $TRAINER_PACKAGE

  stdbuf -oL echo "Starting training job: " $TRAINER_PACKAGE, "num_gradient_servers:" \
  $PADDLE_INIT_NUM_GRADIENT_SERVERS, "version: " $1 

  stdbuf -oL sh -c "${ENTRY}"
  check_trainer_ret $?
}

start_trainer() {
    # paddle v1 and V2 distributed training does not allow any trainer failed. 
    check_failed_cnt 0
    stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS}
    stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job=${PADDLE_JOB_NAME} ${TRAINERS}

    export PADDLE_INIT_PSERVERS=$(python /root/k8s_tools.py fetch_pserver_ips)
    export PADDLE_INIT_TRAINER_ID=$(python /root/k8s_tools.py fetch_trainer_id)
    stdbuf -oL echo $PADDLE_INIT_TRAINER_ID > /trainer_id
    # FIXME: /trainer_count = PADDLE_INIT_NUM_GRADIENT_SERVERS
    stdbuf -oL echo $PADDLE_INIT_NUM_GRADIENT_SERVERS > /trainer_count

    # NOTE: $TRAINER_PACKAGE may be large, do not copy
    export PYTHONPATH=$TRAINER_PACKAGE:$PYTHONPATH
    cd $TRAINER_PACKAGE

    stdbuf -oL echo "Starting training job: " $TRAINER_PACKAGE, "num_gradient_servers:" \
    $PADDLE_INIT_NUM_GRADIENT_SERVERS, "trainer_id: " $PADDLE_INIT_TRAINER_ID, \
    "version: " $1

    # FIXME: If we use the new PServer by Golang, add Kubernetes healthz
    # to wait PServer process get ready.Now only sleep 20 seconds.
    sleep 20

    case "$1" in
      "v1")
        FILE_COUNT=$(wc -l $TRAIN_LIST | awk '{print $1}')
        if [ $FILE_COUNT -le $PADDLE_INIT_NUM_GRADIENT_SERVERS ]; then
          echo "file count less than trainers"
          check_trainer_ret 0
        fi
        let lines_per_node="$FILE_COUNT / ($PADDLE_INIT_NUM_GRADIENT_SERVERS + 1)"
        echo "spliting file to" $lines_per_node
        cp $TRAIN_LIST /
        cd /
        split -l $lines_per_node -d -a 3 $TRAIN_LIST train.list
        CURRENT_LIST=$(printf "train.list%03d" $PADDLE_INIT_TRAINER_ID)
        # always use /train.list for paddle v1 for each node.
        echo "File for current node ${CURRENT_LIST}"
        sleep 10
        cp $CURRENT_LIST train.list

        cd $TRAINER_PACKAGE

        stdbuf -oL  paddle train \
          --port=$PADDLE_INIT_PORT \
          --nics=$PADDLE_INIT_NICS \
          --ports_num=$PADDLE_INIT_PORTS_NUM \
          --ports_num_for_sparse=$PADDLE_INIT_PORTS_NUM_FOR_SPARSE \
          --num_passes=$PADDLE_INIT_NUM_PASSES \
          --trainer_count=$PADDLE_INIT_TRAINER_COUNT \
          --saving_period=1 \
          --log_period=20 \
          --local=0 \
          --rdma_tcp=tcp \
          --config=$TOPOLOGY \
          --use_gpu=$PADDLE_INIT_USE_GPU \
          --trainer_id=$PADDLE_INIT_TRAINER_ID \
          --save_dir=$OUTPUT \
          --pservers=$PADDLE_INIT_PSERVERS \
          --num_gradient_servers=$PADDLE_INIT_NUM_GRADIENT_SERVERS
        # paddle v1 API does not allow any trainer failed.
        check_trainer_ret $? 
        ;;
      "v2")
        stdbuf -oL sh -c "${ENTRY}"
        # paddle v2 API does not allow any trainer failed.
        check_trainer_ret $? 
        ;;
      *)
        ;;
    esac
}

usage() {
    echo "usage: paddle_k8s [<args>]:"
    echo "  start_trainer  [v1|v2]    Start a trainer process with v1 or v2 API"
    echo "  start_pserver             Start a pserver process"
    echo "  start_new_pserver         Start a new pserver process"
    echo "  start_new_trainer         Start a new triner process"
}

case "$1" in
    start_pserver)
        start_pserver
        ;;
    start_trainer)
        start_trainer $2
        ;;
    start_new_trainer)
        start_new_trainer
        ;;
    start_new_pserver)
        start_new_pserver
        ;;
    start_master)
        start_master
        ;;
    start_fluid)
        start_fluid_process
        ;;
    --help)
        usage
        ;;
    *)
        usage
        ;;
esac