提交 294519a3 编写于 作者: L liqingping

Merge branch 'docs/usage' into 'master'

feat: many updates, see details in long description

See merge request platform/CloudNative4AI/cluster-lifecycle/di-orchestrator!60
......@@ -6,7 +6,7 @@ name: Release
on: [push]
env:
version: v0.2.1
version: v1.0.1
jobs:
docker:
......@@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
platform: [ linux/amd64 ]
target: [ di-operator, di-server, di-webhook ]
target: [ di-orchestrator ]
steps:
- name: Checkout
uses: actions/checkout@v2
......@@ -26,12 +26,12 @@ jobs:
TARGET: ${{ matrix.target }}
run: |
DOCKER_IMAGE=$DOCKERIO_ORG/$TARGET
VERSION=${version}-edge
VERSION=${version}-nightly
if [[ $GITHUB_REF == refs/tags/* ]]; then
VERSION=${GITHUB_REF#refs/tags/}
fi
if [ "${{ github.event_name }}" = "schedule" ]; then
VERSION=nightly
VERSION=edge
fi
TAGS="${DOCKER_IMAGE}:${VERSION}"
if [[ $VERSION =~ ^v[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
......
variables:
REGISTRY: registry.sensetime.com/cloudnative4ai
VERSION: v1.0.0
VERSION: v1.0.1
PROJECT: di-orchestrator
# dind config
DOCKER_HOST: tcp://localhost:2376
......
# di-operator version
VERSION ?= v1.0.0
VERSION ?= v1.0.1
MASTER_VERSION := $(VERSION)
COMMIT_SHORT_SHA=$(shell git log -n 1 | head -n 1 | sed -e 's/^commit //' | head -c 8)
......
......@@ -29,14 +29,34 @@ di-server-7b86ff8df4-jfgmp 1/1 Running 0 59s
```bash
# submit DIJob
$ kubectl create -f config/samples/dijob-cartpole.yaml
$ kubectl create -f config/samples/dijob-gobigger.yaml
# get pod and you will see coordinator is created by di-operator
# a few seconds later, you will see collectors and learners created by di-server
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
gobigger-test-0-0 1/1 Running 0 4m17s
gobigger-test-0-1 1/1 Running 0 4m17s
# get logs of coordinator
$ kubectl logs cartpole-dqn-coordinator
$ kubectl logs -n xlab gobigger-test-0-0
Bind subprocesses on these addresses: ['tcp://10.148.3.4:22270',
'tcp://10.148.3.4:22271']
[Warning] no enough data: 128/0
...
[Warning] no enough data: 128/120
Current Training: Train Iter(0) Loss(102.256)
Current Training: Train Iter(0) Loss(103.133)
Current Training: Train Iter(20) Loss(28.795)
Current Training: Train Iter(20) Loss(32.837)
...
Current Training: Train Iter(360) Loss(12.850)
Current Training: Train Iter(340) Loss(11.812)
Current Training: Train Iter(380) Loss(12.892)
Current Training: Train Iter(360) Loss(13.621)
Current Training: Train Iter(400) Loss(15.183)
Current Training: Train Iter(380) Loss(14.187)
Current Evaluation: Train Iter(404) Eval Reward(-1788.326)
```
## User Guide
......
......@@ -46,7 +46,7 @@ func NewCreateOptions(genFlags cmdcommon.GenericFlags) *CreateOptions {
GenericFlags: genFlags,
ServerBindAddress: ":8081",
ProbeAddress: ":8080",
MetricAddress: ":8089",
MetricAddress: ":8443",
}
}
......
......@@ -4231,8 +4231,8 @@ subjects:
apiVersion: v1
data:
DI_JOB_DEFAULT_RESOURCES: '{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}'
DI_ORCHESTRATOR_VERSION: v1.0.0
DI_SERVER_URL: http://di-server.di-system:8080
DI_ORCHESTRATOR_VERSION: v1.0.1
DI_SERVER_URL: http://di-server.di-system:8081
kind: ConfigMap
metadata:
name: di-config
......@@ -4260,9 +4260,9 @@ metadata:
namespace: di-system
spec:
ports:
- port: 8080
- port: 8081
protocol: TCP
targetPort: 8080
targetPort: 8081
selector:
control-plane: di-server
---
......@@ -4295,7 +4295,7 @@ spec:
envFrom:
- configMapRef:
name: di-config
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
......@@ -4348,7 +4348,7 @@ spec:
envFrom:
- configMapRef:
name: di-config
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
......
......@@ -3,6 +3,6 @@ kind: ConfigMap
metadata:
name: di-config
data:
DI_SERVER_URL: "http://di-server.di-system:8080"
DI_ORCHESTRATOR_VERSION: v1.0.0
DI_SERVER_URL: "http://di-server.di-system:8081"
DI_ORCHESTRATOR_VERSION: v1.0.1
DI_JOB_DEFAULT_RESOURCES: '{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}'
......@@ -30,7 +30,7 @@ spec:
- "--probe-addr=:8080"
- "--metric-addr=:8443"
- "--leader-elect"
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
name: manager
envFrom:
......
......@@ -21,7 +21,7 @@ spec:
args:
- --zap-devel=true
- --server-bind-address=:8081
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
name: server
envFrom:
......
......@@ -22,7 +22,7 @@ spec:
- "--probe-addr=:8080"
- "--metric-addr=:8443"
- --port=9443
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
name: webhook
securityContext:
......
......@@ -8,4 +8,4 @@ kind: Kustomization
images:
- name: registry.sensetime.com/cloudnative4ai/di-orchestrator
newName: registry.sensetime.com/cloudnative4ai/di-orchestrator
newTag: v1.0.0
newTag: v1.0.1
......@@ -8,5 +8,5 @@ spec:
control-plane: di-server
ports:
- protocol: TCP
port: 8080
targetPort: 8080
port: 8081
targetPort: 8081
......@@ -326,7 +326,7 @@ spec:
command:
- /di-orchestrator
- operator
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
......@@ -376,7 +376,7 @@ spec:
command:
- /di-orchestrator
- server
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
......@@ -421,7 +421,7 @@ spec:
command:
- /di-orchestrator
- webhook
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
......
# DI Orchestrator架构
DI-engine框架分为3个重要的模块,分别是coordinator、collector和learner。一般情况下,一个DI-engine训练任务只有一个coordinator,learner和collector的数目可以变化。三个模块的作用分别为:
- coordinator:保持与collector和learner的连接,接受collector和learner的获取原信息请求、推送原信息请求等,向learner和collector发送任务。
- collector:从coordinator获取RL模型在存储中间件中的位置并加载RL模型,然后在自身构造的环境中根据RL模型决策产生数据帧,将数据帧存储回存储中间件,并将数据帧原信息(存储路径、大小等)汇报给coordinator。
- learner:从coordinator获取数据帧存储位置并从存储中间件中加载数据帧开始训练RL模型,训练完成之后将模型存储到中间件中,并将模型原信息(存储路径、大小等)汇报给coordinator。由于learner部分常常存在数据并行训练这一额外的分布式机制,避免混淆,我们将与coordinator进行交互的模块称作logic learner,是coordinator下发任务的基本单位;而将数据并行训练中的单个learner进程称作ddp learner,多个ddp learner进程提供数据并行服务。一个logic learner可以对应1个ddp learner(单卡)或多个ddp learner(多卡)。另外,提供数据并行训练服务还需要额外引入aggregator模块,aggregator负责将多个ddp learner的训练结果进行汇总并发送给coordinator,即aggregator与多个ddp learner一起构成logic learner,而coordinator只会与logic learner进行交互。
有关DI-engine的详细介绍可参考[DI-engine developer tutorial](https://opendilab.github.io/DI-engine/tutorial_dev/index.html)
DI-engine框架v1版本分为3个重要的模块,分别是coordinator、collector和learner。对应DI Orchestrator v1版本。
DI-engine框架v2版本将各个模块进行了整合,使得在同一个worker内可以完成完整的训练过程,当有新的worker加入时也能直接加入而无需重启。本文将针对DI-engine v2版本对DI Orchestrator v2版本进行详细描述。
为了提供DI-engine在Kubernetes(K8s)中运行的支持,我们设计了DI Orchestrator,本文将说明利用DI Orchestrator,DI-engine各个模块在K8s系统上如何被创建、如何相互发现、如何开始训练等。DI Orchestrator的架构如下图所示:
有关DI-engine的详细介绍可参考[DI-engine developer tutorial](https://opendilab.github.io/DI-engine/tutorial_dev/index.html)
![](images/di-arch.svg)
为了提供DI-engine在Kubernetes(K8s)中运行的支持,我们设计了DI Orchestrator,本文将说明利用DI Orchestrator,DI-engine的组件在K8s系统上如何被创建、如何相互发现、如何开始训练等。DI Orchestrator的架构如下图所示:
整体分为两大模块:`di-server``di-operator``DDPL`指ddp learner,`Lm`指Learner,`Cn`指Collector,`Aggregator+DDPL`构成一个logic learner。接下来将首先介绍一个DI-engine任务提交到K8s之后DI Orchestrator如何将DI-engine的各个模块(在K8s中就是一个[pod](https://kubernetes.io/docs/concepts/workloads/pods/))创建并启动,然后将对di-server和di-operator进行介绍。
![](images/di-engine-arch.png)
## 任务创建流程
这里介绍任务创建流程,说明一个DI-engine任务在K8s中从创建到执行完成的一整个生命周期
- 编写AggregatorConfig yaml文件,定义aggregator的模板,将在后面创建DIJob的时候用来创建aggregator,aggregator可以为训练端提供数据并行训练服务。
- 编写DIJob yaml文件,定义coordinator、collector、learner的模板,提交到K8s集群中。
- di-operator监听到DIJob的提交,创建coordinator,并为coordinator创建可访问的域名。
- coordinator启动之后按照默认配置向di-server请求创建一定数目的collector和learner。
- di-server收到coordinator的创建请求后,读取DIJob中collector和learner的模板,创建相应数目的collector(上图中Cn)和learner(上图中Lm)并把collector和learner可访问的URL返回给请求方。同时,根据每个learner中申请的GPU数目来决定是否创建aggregator。即当learner申请的GPU数目大于1时创建为该learner创建一个aggregator,否则不创建aggregator。
- coordinator等待collector, learner(aggregator和其多个ddp learner可以看作一个logic learner)连接上后开始下发任务开始训练。
- 用户可手动向di-server发送请求增删collector和learner,coordinator会定期查询其可用的collector和learner数目并决定新建或断开连接。
- 训练结束后,di-operator默认会将collectors、learner、aggregator都删除掉,而coordinator则会保留给用户查看日志等操作。
整体分为两大模块:`di-server``di-operator`。本文将对这两大模块逐一进行介绍。
## DI Operator
di-operator是在一个负责在K8s系统中编排DIJob的组件,采用K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/),通过[controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/)中的控制循环监听K8s集群中DIJob的状态,并在有需要的时候对DIJob的状态进行修改,使得DIJob的实际状态与我们预定义的状态尽可能保持一致。
di-operator是负责在K8s系统中编排DIJob,采用K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/),通过[controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/)中的控制循环监听K8s集群中DIJob的状态,并在有DIJob状态变更事件的时候对DIJob进行调谐,使得DIJob的实际状态与预期的状态尽可能保持一致。
### API定义
根据DI框架中每个模块的特性,我们定义了两种自定义资源(Custom Resource),分别是DIJob和AggregatorConfig。前者用来定义一个RL任务的coordinator、collector和learner运行所需的必备条件,包括镜像、启动命令、所需计算和存储资源、环境变量等;后者用来定义一个RL任务的aggregator运行所需的必备条件。
DIJob定义:
根据DI-engine框架的特性,我们利用K8s Custom Resource定义了DIJob资源,用来定义一个DI-engine强化学习(Reinforcement Learning,RL)任务运行所期望达成的状态,包括镜像、启动命令、挂载存储、workers数目等。
DIJobSpec中各字段定义及含义:
```go
type DIJobSpec struct {
// Group is a collection of DIJobs
// Group is a collection of DIJobs.
Group string `json:"group,omitempty"`
//Priority labels the priority of DIJob
PriorityClassName PriorityClassName `json:"priorityClassName,omitempty"`
// Priority labels the priority of DIJob.
Priority Priority `json:"priority,omitempty"`
// EngineFields defines features of the DI-engine framework.
EngineFields EngineFields `json:"engineFields,omitempty"`
// CleanPodPolicy defines the policy to clean pods after DIJob completed
// CleanPodPolicy defines the policy to clean pods after DIJob completed.
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`
// Preemptible defines whether the dijob can be preempted.
Preemptible bool `json:"preemptible,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
// MinReplicas defines the minimum number of replicas of DIJob.
MinReplicas int32 `json:"minReplicas,omitempty"`
Collector CollectorSpec `json:"collector,"`
// MaxReplicas defines the maximum number of replicas of DIJob.
MaxReplicas int32 `json:"maxReplicas,omitempty"`
Learner LearnerSpec `json:"learner,"`
// Template defines the pod template for DIJob.
Template corev1.PodTemplateSpec `json:"template"`
}
```
AggregatorConfig定义:
```go
type AggregatorConfigSpec struct {
Aggregator AggregatorSpec `json:"aggregator,"`
type EngineFields struct {
// Topology defines the topology among the workers of the job.
Topology Topology `json:"topology,omitempty"`
// ParallelWorkers defines the number of parallel workers in each worker.
ParallelWorkers int32 `json:"parallelWorkers,omitempty"`
}
```
> **为什么aggregator单独定义?**
aggregator对所有使用DI-engine框架进行RL训练的任务都是通用的,因此我们将aggregator定义为一个全局的、共享的资源AggregatorConfig,所有RL任务提交后,di-server将通过读取集群中唯一的AggregatorConfig来创建aggregator。另外,aggregator只是针对最常见的数据并行训练,如果是其他并行训练方法,需要定义新的Custom Resource。
### 状态定义
用户提交DIJob后,di-operator便接管了DIJob的生命周期的管理,为了便于用户了解DIJob的状态,我们定义了以下阶段(phase):
用户提交DIJob后,di-operator便接管了DIJob的生命周期的管理,我们定义了以下阶段(phase)便于用户了解DIJob的状态:
```go
const (
// JobCreated means the job has been submitted to the cluster,
// but not all the pods and services have been created,
// or no pods are running
JobCreated Phase = "Created"
// JobPending means the job has been submitted to the cluster,
// but not all the pods and services have been created
JobPending Phase = "Pending"
// JobStarted means the job has been created and waits for running.
JobStarting Phase = "Starting"
// JobRestarting means the job has been rescheduled and waits for restarting.
JobRestarting Phase = "Restarting"
// JobRunning means all the pods are in running state
JobRunning Phase = "Running"
......@@ -84,82 +87,87 @@ const (
JobUnknown Phase = "Unknown"
)
```
一个正常运行并结束的DIJob会经历Created、Running和Succeeded三个阶段:
- 当DIJob提交后,di-operator将coordinator创建后进入Created阶段
- 当coordinator pod处于Running阶段后DIJob进入Running阶段
- 当coordinator pod处于Completed阶段后DIJob进入Succeeded阶段。
另外,当coordinator pod处于Failed阶段时,DIJob也会进入Failed阶段。而aggregator、collector、learner在失败后会立即重启,不会影响DIJob所处的阶段。
Unknown阶段暂时未作定义。
一个正常运行并结束的DIJob会经历Pending、Starting、Running和Succeeded四个阶段,状态转移图如下图所示:
![](images/di-engine-status-machine.png)
### 控制循环
使用[kubebuilder v3](https://github.com/kubernetes-sigs/kubebuilder/releases/download/v3.0.0/kubebuilder_linux_amd64)构建项目,operator所需的[reflector、informer、indexer](https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md)、controller等组件都由[controller-runtime](https://github.com/kubernetes-sigs/controller-runtime)封装到[manager](https://github.com/kubernetes-sigs/controller-runtime/blob/master/pkg/manager/manager.go)中,将调谐(Reconcile)函数暴露给我们实现调谐逻辑,如下代码所示:
```go
func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// your reconcile logic here
return ctrl.Result{}, nil
}
```
- 当DIJob提交后,进入Pending阶段。
- 当di-operator将workers创建后,进入Starting状态。
- 当所有workers都ready后,进入Running状态。
- 当所有workers都Succeeded后,进入Succeeded状态。
- 当有worker出现Failed,进入Failed状态。
- 当DIJob被重调度或者workers数目与预期不符,进入Restarting状态。
当用户提交DIJob后,informer获取到该提交事件后触发handler,之后Reconcile函数被调用;Reconcile函数中调用list pod方法发现coordinator未创建,则读取DIJob中关于coordinator的定义模板,创建相应的coordinator pod(coordinator程序在其中运行)和service(用于pod间通信),并将一些环境变量写入pod中,包括pod的名称、pod的命名空间、访问coordinator的URL等环境变量
Unknown阶段暂时未作定义
其中,DI-engine框架的每个模块占用的端口都有一个默认值,如下所示:
### 控制循环
借鉴自[Adaptdl](https://github.com/petuum/adaptdl),v2版本架构对Operator调谐逻辑进行了重构,将调度和调谐逻辑分别在Allocator和Controller中完成,使得组件分工更明确。
#### Allocator控制循环
Allocator为v2架构中新增的模块,用于调度DIJob,包括分配workers和放置workers。定义两个方法(allocate和allocateAll)用于对单任务和多任务进行调度。为了提供不同的调度策略,我们将调度策略定义为一个interface Policy,该interface中定义了两个方法分别是`Allocate``Optimize`,前者用于在任务提交时为该任务进行初始调度;后者用于对全局任务进行统一调度。
Policy interface定义如下:
```go
DefaultCollectorPort = 22270
DefaultLearnerPort = 22271
DefaultAggregatorPort = 22272
DefaultCoordinatorPort = 22273
type Policy interface {
Allocate(job JobInfo, nodes map[string]*NodeInfo) (NodeList, error)
Optimize(jobs map[string]JobInfo, nodes map[string]*NodeInfo, prevAllocations map[string]NodeList) (map[string]NodeList, error)
}
```
用户可根据自身需求实现自己的调度算法。
coordinator创建之后,di-operator将监听pod的状态并修改DIJob的状态。等到DIJob完成后(Succeeded或者Failed),di-operator默认会将DIJob的所有处于Running阶段的pod和所有的service都删除,coordinator pod会保留。
### Webhook
用户提交DIJob时,可能存在yaml文件里的某些字段输入错误的问题,导致DIJob的运行状态达不到预期,影响用户排查问题;或者需要为DIJob的某些字段设置默认值。如果在DIJob提交到K8s集群前能为DIJob设置默认值,以及做一次正确性校验,有助于用户提前发现问题。
在K8s中,可以配置webhook在DIJob提交到K8s集群之前对其进行正确性校验。K8s webhook分为MutatingWebhook和ValidatingWebhook,前者用于修改K8s资源对象的值,后者用于验证K8s资源对象的正确性。
di-operator中实现了webhook校验方法,创建MutatingWebhook用于设置DIJob的默认值;创建ValidatingWebhook用于校验DIJob的正确性。比如对`CleanPodPolicy`字段,我们在MutatingWebhook中设置其默认值为`Running`,表示DIJob完成后将Running的pod都删除;我们在ValidatingWebhook中校验`CleanPodPolicy`字段的值,如果用户设置的值不等于`None``ALL``Running`中的任何一个,则拒绝提交该DIJob。
`job.spec.preemptible==false`时,Allocator将不会对该任务进行调度,只会根据`job.spec.minReplicas`为该任务分配固定数目的workers,分配结果写到`job.status.replicas`。不过,用户可以通过修改`job.status.replicas`来变更该任务的workers数目。
> Note:不能直接通过`kubectl apply`或者`kubectl edit`命令直接修改`job.status.replicas`,因为`job.status`被定义为SubResource,对于DIJob的所有的PUT和POST请求都会忽略`job.status`字段,见[Kubernetes API Conversion](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status)。可以执行`go run ./hack/update_replicas.go --ns [your-job-namespace] --n [your-job-name] --r [expected-replicas]`实现修改replicas的操作。
#### Controller控制循环
Controller控制循环用于调谐DIJob的状态,包括生命周期管理、workers的创建和删除等,如前文所述状态转移图。
## DI Server
di-server是一个为DI-engine框架定制的http服务器,提供新增、删除和查询collector、learner、aggregator的功能。通过调用di-server的相关接口,di-server为DIJob提供了动态增删collector和learner的能力。下面将对di-server的设计进行简要介绍,包括存储AggregatorConfig、DIJob以及DIJob所有pod的本地cache;用于动态新增、删除和查询collector、learner和aggregator的http接口设计。
### 本地cache
为了减少di-server与K8s api server之间查询的频率,从而减轻K8s api server的负担,我们利用[client-go](https://github.com/kubernetes/client-go)提供的informer机制将AggregatorConfig、DIJob和DIJob的所有pod存储在本地cache,如下图所示
[示意图](https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md)
![](images/client-go-controller-interaction.jpeg)
上图中我们只关注上半部分:reflector通过list & watch接受到新的资源实例存在的通知,就将新资源实例放到Delta Fifo queue中,informer从Delta Fifo queue中获取新资源实例并通过indexer存放到本地cache中。查询操作都可以通过查询本地cache来完成,减少向K8s api server的请求次数。如下命令:
```go
genericInformer.Informer().GetIndexer().GetByKey(key)
```
当资源对象有变更时,reflector同样会接受到通知并更新本地cache;另外,informer也会定期向api server同步本地cache,与K8s集群中的资源对象保持一致。
Server是一个为DI-engine框架定制的http服务器,提供新增、删除和查询workers的功能。Server利用[gin](https://github.com/gin-gonic/gin) web框架提供http服务能力
下面将对Server的设计进行简要介绍,包括用于动态新增、删除和查询workers的http接口以及用户汇报训练任务profilings数据的接口。
### http接口
为了支持DIJob动态增删collector/learner的需求,di-server提供http接口用于对collector/learner进行新增、删除和查询的功能,如下图所示:
![](images/di-api.png)
提供如下接口:
为了支持DIJob动态增删workers,Server提供http接口用于对workers进行新增、删除和查询,提供如下接口:
| method | path | description |
| ------ | ------------------------------------------------ | ------------------------------------------------------------------------- |
| GET | /v2alpha1/[job_id]/replicas | get job replicas |
| DELETE | /v2alpha1/[job_id]/replicas | delete some replicas. put data in request body |
| POST | /v2alpha1/[job_id]/replicas | create replicas. put data in request body |
| POST | /v2alpha1/[job_id]/profilings | post job profiling data. put data in request body |
job_id由`namespace.name.generation`三元组构成。
- create和delete请求:Request Body="{"replicas": n}",Server读取Request Body中的replicas,直接修改`job.status.replicas`,真正的创建和删除操作由Operator完成。(注:Server只会对preemptible的DIJob进行操作)
- get请求:Server查询DIJob的replicas,并将访问每个replicas的[ip:port]返回。
- Post profilings请求:Request Body="{"data": {}}",Server读取Request Body中的data,将data patch到`job.status.profilings`中。
## 任务运行流程
用户提交的任务按照以下流程在集群中运行,由Allocator进行调度、Controller进行容器编排、Server进行任务profilings的汇报。
![](images/di-engine-schedule.png)
1. 用户提交DIJob到K8s集群中。
2. Allocator进行初始分配:
1. 对不允许抢占的job,根据`job.spec.minReplicas`修改`job.status.replicas`的值。
2. 对允许抢占的job,根据`job.spec.minReplicas`修改`job.status.allocation`的值,`job.status.allocation`是一个节点list,表示每个replicas放置的节点。
3. Controller获取K8s集群中job的变更。
4. Controller创建相应数目的replicas。
1. 对不允许抢占的job,根据`job.status.replicas`创建对应数目的replicas。
2. 对允许抢占的job,根据`job.status.allocation`创建对应数目的replicas,并为每个replicas指定在哪个节点运行。
5. replicas启动并开始训练,一段时间后将采集到的profilings数据汇报到Server端。
6. Server将profilings数目更新到`job.status.profilings`中。
7. 每个固定调度周期,Allocator重新调度所有jobs:
1. 对不允许抢占的jobs,这里不做重调度。
2. 对允许抢占的jobs,利用每个job的`job.status.profilings`并根据Allocator Policy中定义的调度策略进行全局调度,并修改每个jobs的`job.status.allocation`
8. Controller获取K8s集群中jobs的变更。
9. Controller创建相应数目的replicas。
| method | path | description |
|---|---|---|
| GET | /v1alpha2/replicas | list all collectors and learners |
| GET | /v1alpha2/replicas?namespace=xxx | list all collectors and learners in namespace |
| GET | /v1alpha2/replicas?namespace=xxx&coordinator=xxx | list all replicas belongs to coordinator |
| GET | /v1alpha2/replicas?namespace=xxx&aggregator=xxx | get learners belongs to aggregator |
| DELETE | /v1alpha2/replicas | delete some replicas. put data in request body |
| POST | /v1alpha2/replicas | create replicas. put data in request body |
| POST | /v1alpha2/replicas/failed | post failed replicas and request for recreation. put data in request body |
## DI Orchestrator的优势
DI Orchestrator为DI-engine框架提供了分布式场景下基于K8s的容器运行方案。对于用户提交的DIJob,Operator负责对DI-engine的workers进行编排,使得各个worker可以正常运行并执行训练任务;通过子模块Allocator为DI-engine框架提供资源动态分配与调度的能力。通过调用Server的接口,赋予用户新增、删除和查询任务的workers的功能。总结DI Orchestrator提供了以下优势:
## DI Orchestrator的优势
DI Orchestrator为DI-engine框架提供了分布式场景下基于K8s的容器运行方案。对于用户提交的DIJob,di-operator负责对DI-engine的各个模块进行编排,使得各个模块可以正常运行并执行训练任务。通过调用di-server的接口,赋予coordinator新增、删除和查询其所有的collector、learner和aggregator的功能,提升DI-engine框架资源动态分配的能力。总结DI Orchestrator提供了以下优势:
1. 封装性。依赖di-operator的编排能力,部署DI-engine分布式RL训练的细节(包括pod创建、服务发现)对用户来说是透明的。根据DI-engine框架对分布式RL训练的部署需求,di-operator会将coordinator创建出来,然后coordinator再请求di-server创建其他模块,di-operator会把每个模块的pod的状态记录到DIJob的状态中。DIJob的生命周期也由di-operator维护,向用户展示DIJob在不同阶段的状态。
2. 易用性。用户只需要在DIJob的yaml文件中定义好coordinator、collector、learner的配置之后,一键提交到K8s集群即可,di-operator将负责完成部署工作,将用户从K8s集群中复杂的分布式RL训练部署中解放出来。
3. 鲁棒性。依赖K8s的pod重启机制,保证pod在意外退出的情况下能自动重启,coordinator能够迅速响应并重新连接。
4. 动态扩展。DIJob所需的collector/learner/aggregator是动态变化的,因此di-server提供了http接口可以动态调整collector/learner的数目,使得DIJob可以根据自身需求调整collector和learner的比例,优化吞吐量。
1. 封装性。依赖Operator的编排能力,部署DI-engine分布式RL训练的细节(包括pod创建、服务发现)对用户来说是透明的。根据DI-engine框架对分布式RL训练的部署需求,Operator为任务创建workers,Operator会把每个worker的状态记录到DIJob的状态中。DIJob的生命周期也由Operator维护,向用户展示DIJob在不同阶段的状态。
2. 易用性。用户只需要在DIJob的yaml文件中定义好任务的配置之后,一键提交到K8s集群即可,Operator将负责完成部署工作,将用户从K8s集群中复杂的分布式RL训练部署中解放出来。同时可以借助命令行工具一键提交DIJob。
3. 鲁棒性。依赖Operator的重启机制,保证workers在意外退出的情况下能自动重启。
4. 动态扩展。DIJob所需的workers是动态变化的,因此用户可以通过K8s client直接修改DIJob来更改workers数目;同时,Server提供了HTTP接口可以动态调整workers的数目。动态扩展使得用户可以根据自身需求调整workers数目,优化吞吐量。
5. 动态调度。依赖Operator子组件Allocator,针对DI-engine任务进行动态调度变得简单。Allocator提供了针对单任务和多任务的调度策略,可以在不影响正常训练的情况下优化全局任务完成时间。
# DI Operator architecture
DI-engine framework consists of 3 important modules, namely coordinator, collector and learner. In general, a DI-engine training job has only one coordinator, and the number of learners and collectors can vary. The roles of the three modules are:
- Coordinator. Maintain connections with collectors and learners, accept meta-infos requests and posts from collectors and learners, and send tasks to collectors and learners.
- Collector. Request path to RL model stored in storage middleware from coordinator, load the RL model, and then generate data frames according to the RL model's steps from environment. Store the data frames back to the storage middleware, and report meta-infos (the storage path, size, etc.) of the data frames to coordinator.
- Learner: Request data frames storage path from coordinator and load the data frames from storage middleware to start training the RL model. After the training is completed, store the model into the storage middleware, and report model meta-infos (storage path, size, etc.) to coordinator. Because we often need to use distributed mechanism of data parallel training, to avoid confusion, we call the module interacting with coordinator the logic learner, which is the basic unit for coordinator to issue tasks. And the single learner process in the data parallel training is called ddp learner, and multiple ddp learner processes provide data parallel services. One logic learner can correspond to one ddp learner (single-gpu) or multiple ddp learners (multi-gpu). In addition, to provide data parallel training services, an additional aggregator module needs to be introduced. The aggregator is responsible for summarizing the training results of multiple ddp learners and sending them to coordinator. That is, the aggregator and multiple ddp learners form a logic learner, and coordinator will only interact with logic learners.
The v1 version of the DI-engine framework consists of three important modules, namely coordinator, collector and learner which is corresponding to DI Orchestrator v1 version.
For the introduction of DI-engine, please refer to [DI-engine developer tutorial](https://opendilab.github.io/DI-engine/tutorial_dev/index.html).
The v2 version of the DI-engine framework integrates the three modules, so that the complete training process can be completed within the same worker, and a new worker can be added directly without restarting. This article will describe the DI Orchestrator v2 version for the DI-engine v2 version in detail.
In order to provide running support for DI-engine in Kubernetes (K8s), we designed `DI Orchestrator`. This article will explain how to use DI Orchestrator, how each module of DI-engine is created on K8s and discovers each other, how to start training, etc. The architecture of DI Orchestrator is shown in the figure below:
For more details about the DI-engine framework, please refer to [DI-engine Documentation](https://opendilab.github.io/DI-engine/index.html)
![](images/di-arch.svg)
In order to support for DI-engine running in Kubernetes (K8s), we designed DI Orchestrator. This article will explain how DI-engine components are created on K8s system using DI Orchestrator, how components to discover each other, how components to start training, etc. The architecture of DI Orchestrator is shown in the following figure:
There are two main modules that is `di-server` and `di-operator`.
`DDPL` represents ddp learner, `Lm` represents logic learner, `Cn` represents collector, and `Aggregator+DDPL` constructs a logic learner. In the following pages, we will first introduce how `DI Orchestrator` creates and starts each module of DI-engine after a DI-engine job is submitted to K8s, and then introduces the architecture of `di-server` and `di-operator`.
![](images/di-engine-arch.png)
## Job creation process
Here is a description of the job creation process, illustrating the entire life cycle of a DI-engine job from creation to execution in K8s.
- Edit the AggregatorConfig yaml file to define the aggregator template, which will be used to create aggregators when DIJob is created later. Aggregator can provide data parallel training services.
- Edit the DIJob yaml file to define the template of coordinator, collector and learner, and submit it to K8s.
- After di-operator received the event of DIJob submission, it creates a coordinator, and creates an accessible domain name for the coordinator.
- After the coordinator started, it sends an HTTP request to di-server to create a certain number of collectors and learners according to the coordinator's default configuration.
- After di-server receives the coordinator's creation request, it reads the collector and learner templates from DIJob object, and creates the corresponding number of collectors (Cn in the above figure) and learners (Lm in the above figure), and returns the URLs accessible to the collectors and learners. At the same time, di-server will determine whether to create an aggregator for each learner according to the number of GPUs applied for in each learner. That is, when the number of GPUs requested by the learner is greater than 1, an aggregator is created for the learner, otherwise no aggregator is created.
- Coordinator waits for collectors and learners (an aggregator and its multiple ddp learners are regarded as a logic learner) to connect, and then starts to issue tasks to start training.
- We can manually send a request to di-server to add/delete collectors or learners, and the coordinator will periodically query the number of available collectors and learners and decide to create or disconnect connections to them.
- When the training is completed, di-operator will delete all collectors, learners by default, while coordinator will be reserved for users to view logs and other operations.
DI Orchestrator consists of two modules, namely `di-operator` and `di-server`. This article will explain the two modules one by one.
## DI Operator
Di-operator is a component responsible for orchestrating DIJob in K8s. It uses K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) to monitor the status of DIJob objects in K8s cluster through the control loop in [controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/), and to update the status of DIJob when necessary. The status is modified so that the actual status of DIJob is as consistent as possible with our predefined status.
### API definition
According to the characteristics of each module, we have defined two Custom Resources, namely DIJob and AggregatorConfig. The former is used to define the prerequisites for coordinator, collector and learner to start running, including docker images, startup commands, computing and storage resources, environment variables, etc. The latter is used to define the prerequisites for aggregator.
DI Operator is responsible for orchestrating DIJob in K8s system, using K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/), monitoring the status of DIJob in K8s cluster through the control loop with [controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/), and reconciling DIJob when a DIJob event occurred. Make sure the actual DIJob state is as consistent as possible with the expected state.
### API Definitions
According to the characteristics of DI-engine framework, we use K8s Custom Resource to define the DIJob resource, which is used to define the desired state of a DI-engine Reinforcement Learning(RL) job, including images, startup commands, mount volumes, and the number of workers, etc..
Definition and meaning of each field in DIJobSpec is as follows:
DIJob definition is described as below:
```go
type DIJobSpec struct {
// Group is a collection of DIJobs
// Group is a collection of DIJobs.
Group string `json:"group,omitempty"`
//Priority labels the priority of DIJob
PriorityClassName PriorityClassName `json:"priorityClassName,omitempty"`
// Priority labels the priority of DIJob.
Priority Priority `json:"priority,omitempty"`
// CleanPodPolicy defines the policy to clean pods after DIJob completed
// EngineFields defines features of the DI-engine framework.
EngineFields EngineFields `json:"engineFields,omitempty"`
// CleanPodPolicy defines the policy to clean pods after DIJob completed.
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`
// Preemptible defines whether the dijob can be preempted.
Preemptible bool `json:"preemptible,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
// MinReplicas defines the minimum number of replicas of DIJob.
MinReplicas int32 `json:"minReplicas,omitempty"`
Collector CollectorSpec `json:"collector,"`
// MaxReplicas defines the maximum number of replicas of DIJob.
MaxReplicas int32 `json:"maxReplicas,omitempty"`
Learner LearnerSpec `json:"learner,"`
// Template defines the pod template for DIJob.
Template corev1.PodTemplateSpec `json:"template"`
}
```
AggregatorConfig definition is described as below:
```go
type AggregatorConfigSpec struct {
Aggregator AggregatorSpec `json:"aggregator,"`
type EngineFields struct {
// Topology defines the topology among the workers of the job.
Topology Topology `json:"topology,omitempty"`
// ParallelWorkers defines the number of parallel workers in each worker.
ParallelWorkers int32 `json:"parallelWorkers,omitempty"`
}
```
> **Why should aggregator be defined alone?**
Aggregator is common module for all RL training jobs using DI-engine framework, so we define the aggregator as a global and shared resource named AggregatorConfig. After RL jobs are submitted, di-server will read the global AggregatorConfig in K8s cluster to create aggregators for these RL jobs. In addition, aggregator is only for most common data parallel training. You need to define a new Custom Resource if other parallel training methods are used.
### Status definition
After DIJob is submitted, di-operator takes over the management of the life cycle of the DIJob. In order to facilitate the user to have a better view of the DIJob's status, we define the following phases:
### Phase Definitions
After a DIJob submitted, di-operator takes over the management of the life cycle of the DIJob. We define the following phases so that users can have a good opinion on the status of the DIJob.
```go
const (
// JobCreated means the job has been submitted to the cluster,
// but not all the pods and services have been created,
// or no pods are running
JobCreated Phase = "Created"
// JobPending means the job has been submitted to the cluster,
// but not all the pods and services have been created
JobPending Phase = "Pending"
// JobStarted means the job has been created and waits for running.
JobStarting Phase = "Starting"
// JobRestarting means the job has been rescheduled and waits for restarting.
JobRestarting Phase = "Restarting"
// JobRunning means all the pods are in running state
JobRunning Phase = "Running"
......@@ -85,84 +86,93 @@ const (
JobUnknown Phase = "Unknown"
)
```
A normal DIJob that runs and ends successfully will go through three stages, that is Created, Running and Succeeded:
- When DIJob is submitted, di-operator will enter the Created phase after creating the coordinator.
- When the coordinator pod is in the Running phase, DIJob enters the Running phase.
- When the coordinator pod is in the Completed phase, DIJob enters the Succeeded phase.
In addition, when the coordinator pod is in the Failed phase, DIJob will also enter the Failed phase. The aggregators, collectors, and learners will restart immediately after failure, but they will not affect DIJob's phase.
A DIJob that runs and ends normally will go through four phases: Pending, Starting, Running and Succeeded. The state transition diagram is shown in the following figure
Unknown phase has not been defined.
![](images/di-engine-status-machine.png)
### Control loop
Built upon [kubebuilder v3](https://github.com/kubernetes-sigs/kubebuilder/releases/download/v3.0.0/kubebuilder_linux_amd64), components such as [reflectors, informers, indexers](https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md) and controllers required by operator are all encapsulated in [manager](https://github.com/kubernetes-sigs/controller-runtime/blob/master/pkg/manager/manager.go) of [controller-runtime](https://github.com/kubernetes-sigs/controller-runtime). Kubebuilder only exposes a common function named `Reconcile` to us to implement reconcile logic for DIJob.
```go
func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// your reconcile logic here
return ctrl.Result{}, nil
}
```
- When a DIJob is submitted, it enters the Pending phase.
- After di-operator creates the workers, DIJob enters the Starting phase.
- When all workers are ready, DIJob enters the Running phase.
- When all workers are Succeeded, DIJob enters Succeeded phase.
- When a worker fails, DIJob enters the Failed phase.
- When the DIJob is rescheduled or the number of workers is not as expected, DIJob enters the Restarting phase.
Unknown phase is not used yet.
When DIJob is submitted, we firstly list pods that belong to DIJob in the Reconcile function and find that the coordinator has not been created. Then we read the coordinator template defined in DIJob and create the corresponding coordinator pod (used to run coordinator main process) and service (used for inter-pod communication), and write some environment variables into the pod, including the name of the pod, the namespace of the pod, the port which coordinator listens to, and the URL to access the coordinator.
### Control Loop
Inspired from [Adaptdl](https://github.com/petuum/adaptdl), the v2 version architecture refactors the operator reconciling logic, and divides the scheduling and reconciling logic into Allocator and Controller respectively, which makes the division of modules' responsibilities more clear.
The port occupied by each module of the DI-engine framework has a default value, as shown below:
#### Allocator Control Loop
Allocator is a new module in the v2 architecture for scheduling DIJob, responsible for assigning workers and placing workers. We define two methods (allocate and allocateAll) for single-job and multi-job scheduling. In order to provide different scheduling policies, we define the scheduling policy as an interface named `Policy`, in which two methods are defined, `Allocate` and `Optimize`, the former is used to perform initial scheduling for the job when the job is submitted; the latter is used for global scheduling of all jobs.
The Policy interface is defined as follows, you can implement your own scheduling algorithm using the interface:
```go
DefaultCollectorPort = 22270
DefaultLearnerPort = 22271
DefaultAggregatorPort = 22272
DefaultCoordinatorPort = 22273
type Policy interface {
Allocate(job JobInfo, nodes map[string]*NodeInfo) (NodeList, error)
Optimize(jobs map[string]JobInfo, nodes map[string]*NodeInfo, prevAllocations map[string]NodeList) (map[string]NodeList, error)
}
```
After the coordinator is created, di-operator will monitor the status of the pod and modify the status of the DIJob. After DIJob is completed (Succeeded or Failed), di-operator will delete all services of the DIJob, and all pods that are in the Running phase of the DIJob by default.
When `job.spec.preemptible==false`, Allocator will not schedule the job, but will only allocate a fixed number of workers to the job according to `job.spec.minReplicas`, and the allocation result will be written to `job.status .replicas`. However, you can change the number of workers for the job by modifying `job.status.replicas`.
### Webhook
There may be some mistakes when submitting a DIJob, such as spelling mistakes in DIJob's fields, field value miss matched with predefined, etc., resulting in potential errors when managing DIJob's life cycle. For the other hand, it is necessary to set default values for some fields of DIJob. If the default value of DIJob can be set before DIJob is submitted, and a correctness check can be performed, it will help us find problems in advance.
> Note: You cannot directly modify `job.status.replicas` through `kubectl apply` or `kubectl edit` commands, because `job.status` is defined as a SubResource. `job.status` is ignored for all PUT and POST requests of DIJob. See [Kubernetes API Conversion](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status). You can execute `go run ./hack/update_replicas.go --ns [your-job-namespace] --n [your-job-name] --r [expected-replicas]` to modify replicas.
To achieve the above goals, we can configure webhooks in K8s. K8s webhook consists of MutatingWebhook and ValidatingWebhook. The former is used to modify the value of the K8s resource object, and the latter is used to verify the correctness of the K8s resource object.
#### Controller Loop
The webhook verification is implemented in di-operator. MutatingWebhook is created to set the default value for DIJob; ValidatingWebhook is created to verify the correctness of DIJob. For example, for the `CleanPodPolicy` field in DIJob, we set its default value in MutatingWebhook to `Running`, which means that all running pods will be deleted after DIJob is completed. We verify the value of the `CleanPodPolicy` field in ValidatingWebhook, if the value set by the user is not equal to any of `None`, `ALL`, or `Running`, the DIJob will be rejected.
The Controller control loop is used to reconcile the state of DIJob, including life cycle management, creation and deletion of workers, etc., as described in the state transition diagram above.
## DI Server
Di-server is an http server customized for DI-engine framework, providing the apis of adding, deleting, and querying collectors, learners, and aggregators. By calling the related apis of di-server, di-server can provide DIJob with the ability to dynamically scale collectors and learners. The following will briefly introduce the design of di-server, including the local cache for storing AggregatorConfig, DIJob and all pods of DIJob; the http interface design for dynamically adding, deleting and querying collectors, learners and aggregators.
### Local cache
In order to reduce the frequency of queries between di-server and K8s api server, thereby reducing the burden of K8s api server, we use [client-go](https://github.com/kubernetes/client-go)'s informer mechanism to store AggregatorConfig, DIJob and all pods of DIJob in local cache, as shown in the following figure
Server is an http server customized for DI-engine framework, providing functions for adding, deleting and querying workers. Server uses the [gin](https://github.com/gin-gonic/gin) web framework to provide http service capabilities.
[Schematic diagram](https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md)
The following will briefly introduce the design of Server, including the http interface for dynamically adding, deleting, and querying workers, and the interface for users to report training task profilings data.
![](images/client-go-controller-interaction.jpeg)
### HTTP Interface
In the above figure, we only pay attention to the upper part. Reflector receives notifications of the existence of new resource instance through list & watch api, and puts the new resource instance into Delta Fifo queue, and informer gets the new resource instance from the Delta Fifo queue and passes it through indexer to store in local cache. The query operation can be completed by querying the local cache, reducing the number of requests to K8s api server. The query command is as following:
In order to support DIJob to dynamically add and delete workers, Server provides http interfaces for adding, deleting and querying workers. The following interfaces are provided:
```go
genericInformer.Informer().GetIndexer().GetByKey(key)
```
When the resource object changes, the reflector will also receive notifications and update the local cache. In addition, the informer will also periodically synchronize the local cache with K8s api server to be consistent with the resource objects in K8s cluster.
| method | path | description |
| ------ | ------------------------------------------------ | ------------------------------------------------------------------------- |
| GET | /v2alpha1/[job_id]/replicas | get job replicas |
| DELETE | /v2alpha1/[job_id]/replicas | delete some replicas. put data in request body |
| POST | /v2alpha1/[job_id]/replicas | create replicas. put data in request body |
| POST | /v2alpha1/[job_id]/profilings | post job profiling data. put data in request body |
job_id consists of `namespace.name.generation` triples.
- Create and delete requests: Request Body="{"replicas": n}". Server reads the replicas in the Request Body and directly modifies `job.status.replicas`. The real create and delete operations are done by Operator. (Note: Server will only operate on preemptible DIJobs)
- Get request: Server queries the replicas of DIJob and returns the [ip:port] of each replica.
- Post profilings request: Request Body="{"data": {}}". Server reads the data in the Request Body and patches the data to `job.status.profilings`.
### HTTP interface
In order to support dynamic scaling of collectors/learners for DIJobs, di-server implements some http interfaces for adding, deleting and querying collectors/learners, as shown in the following figure:
## Job Running Process
![](images/di-api.png)
Jobs submitted run in the cluster according to the process in the following figure. Allocator performs scheduling, Controller performs container orchestration, and Server performs task profilings reporting.
![](images/di-engine-schedule.png)
The following http interfaces are provided:
1. User submits DIJob to K8s cluster.
2. Allocator makes initial allocation:
1. For jobs that are not preemptible, modify the value of `job.status.replicas` according to `job.spec.minReplicas`.
2. For jobs that are preemptible, modify the value of `job.status.allocation` according to `job.spec.minReplicas`. `job.status.allocation` is a list of nodes, indicating the nodes where each replica is placed.
3. Controller obtains the changes of the job in the K8s cluster.
4. Controller creates the corresponding number of replicas.
1. For jobs that are not preemptible, create the corresponding number of replicas according to `job.status.replicas`.
2. For jobs that are preemptible, create a corresponding number of replicas according to `job.status.allocation`, and specify which node to run each replicas on.
5. The replicas start training, and report the collected profilings data to Server after a period of time.
6. Server updates profilings to `job.status.profilings`.
7. Every fixed scheduling cycle, Allocator reschedules all jobs:
1. For jobs that are not preemptible, rescheduling will not be performed.
2. For jobs that are preemptible, use the `job.status.profilings` of each job and perform global scheduling according to the scheduling policy defined in the Allocator `Policy`, and modify `job.status.allocation` of each job.
8. Controller obtains the changes of the jobs in the K8s cluster.
9. Controller creates the corresponding number of replicas.
| method | path | description |
|---|---|---|
| GET | /v1alpha2/replicas | list all collectors and learners |
| GET | /v1alpha2/replicas?namespace=xxx | list all collectors and learners in namespace |
| GET | /v1alpha2/replicas?namespace=xxx&coordinator=xxx | list all replicas belongs to coordinator |
| GET | /v1alpha2/replicas?namespace=xxx&aggregator=xxx | get learners belongs to aggregator |
| DELETE | /v1alpha2/replicas | delete some replicas. put data in request body |
| POST | /v1alpha2/replicas | create replicas. put data in request body |
| POST | /v1alpha2/replicas/failed | post failed replicas and request for recreation. put data in request body |
## Advantages of DI Orchestrator
DI Orchestrator provides a K8s-based container-orchestration solution for DI-engine framework in a distributed scenario. For a DIJob, Operator is responsible for orchestrating DI-engine workers so that each worker can run normally and perform training tasks. The sub-module Allocator in Operator provides DI-engine framework with the ability to dynamically allocate and schedule resources. By calling Server's HTTP interface, users are given the functions of adding, deleting, and querying workers for each job. In summary, DI Orchestrator provides the following advantages:
## Advantages of DI Orchestrator
DI Orchestrator provides a K8s-based container-orchestration solution for the DI-engine framework in a distributed scenario. For a DIJob, di-operator is responsible for arranging the various modules of DI-engine so that each module can run normally and perform training tasks. By calling di-server’s HTTP interface, coordinator is given the ability to add, delete, and query all its collectors, learners, aggregators and improve the dynamic allocation of DI-engine framework resources. In summary, DI Orchestrator provides the following advantages:
1. Encapsulation. Relying on the orchestration capabilities of di-operator, deploying DI-engine distributed RL training (including pod creation and service discovery) is transparent to us. According to the deployment requirements of the DI-engine framework for distributed RL training, di-operator will create coordinator, and then the coordinator will request di-server to create other modules. Di-operator will record the status of the pod of each module into the status of the DIJob. The life cycle of DIJob is also maintained by di-operator, providing us with status of DIJob in different stages.
2. Ease of use. We only need to define the configuration of coordinator, collector, and learner in the yaml file of DIJob, and submit them to K8s cluster with one click. Di-operator will be responsible for deploying DI-engine RL training and liberating us from the complex distributed RL deployments in K8s cluster.
3. Robustness. Relying on the pod restart mechanism of K8s, ensures that pods can automatically restart in the event of an unexpected exit, and the coordinator can respond quickly and reconnect.
4. Dynamic expansion. Collectors/learners required by DIJob are dynamically changing, so di-server provides HTTP interfaces to allow us to dynamically adjust the number of collectors/learners, so that DIJob can adjust the ratio of collectors and learners according to its own needs to optimize throughput.
1. Encapsulation. Depending on the orchestration capabilities of Operator, details of deploying DI-engine distributed RL training jobs(including pod creation, service discovery) are transparent to users. According to the deployment requirements of DI-engine jobs for distributed RL training, Operator creates workers for jobs, and writes the status of each worker to DIJob status. The life cycle of DIJob is also maintained by Operator, providing us with status of DIJob in different stages.
2. Ease of use. Users only need to define the configuration of DI-engine job in the yaml file of DIJob and submit it to K8s cluster with one click, and Operator will be responsible for completing the deployment work, freeing users from the complex distributed RL training deployment in K8s cluster. At the same time, DIJob can be submitted with one click with the help of command line tools.
3. Robustness. Rely on the Operator's restart mechanism to ensure that workers can automatically restart in the case of unexpected exit.
4. Dynamic expansion. The number of workers required by DIJob changes dynamically, so users can directly modify DIJob through the K8s client to change the number of workers; at the same time, Server provides HTTP interfaces to dynamically adjust the number of workers. Dynamic expansion allows users to adjust the number of workers according to their own needs and optimize throughput.
5. Dynamic scheduling. By relying on Operator's sub-module Allocator, dynamic scheduling for DI-engine jobs becomes simple. Allocator provides scheduling strategies for single-job and multi-jobs, which can optimize the global job completion time without affecting normal training.
\ No newline at end of file
......@@ -28,11 +28,11 @@ New CRD files will be generated in [./config/crd/bases](./config/crd/bases)
## Controller Logic
Referenced to [controllers](./controllers)
Referenced to [controllers](./pkg/controllers)
## DI Server Logic
Referenced to [server](./server)
Referenced to [server](./pkg/server)
## Installation
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -3,8 +3,7 @@ module opendilab.org/di-orchestrator
go 1.16
require (
github.com/deckarep/golang-set v1.7.1
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/gin-gonic/gin v1.7.7
github.com/go-logr/logr v0.4.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.15.0
......
......@@ -111,8 +111,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/daviddengcn/go-colortext v0.0.0-20160507010035-511bcaf42ccd/go.mod h1:dv4zxwHi5C/8AeI+4gX4dCWOIvNi7I6JCSX0HvlKPgE=
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
......@@ -208,6 +206,7 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
......
......@@ -20,6 +20,17 @@ fi
for f in ".gitlab-ci.yml"; do
echo "update ci version to ${version}"
sed -r "s|^(\s*)VERSION:(\s*)(.*)|\1VERSION: ${version}|" "$f" >.tmp
# sed -r "s|^(\s*)VERSION:(\s*)v[0-9+][\.0-9+]*|\1VERSION: ${version}|" "$f" > .tmp
mv .tmp "$f"
done
for f in "config/manager/di_config.yaml"; do
echo "update config map orchestrator version to ${version}"
sed -r "s|^(\s*)DI_ORCHESTRATOR_VERSION:(\s*)(.*)|\1DI_ORCHESTRATOR_VERSION: ${version}|" "$f" >.tmp
mv .tmp "$f"
done
for f in ".github/workflows/release.yaml"; do
echo "update github action version to ${version}"
sed -r "s|^(\s*)version:(\s*)(.*)|\1version: ${version}|" "$f" >.tmp
mv .tmp "$f"
done
package main
import (
"context"
"flag"
"log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
ctrl "sigs.k8s.io/controller-runtime"
"opendilab.org/di-orchestrator/pkg/api/v2alpha1"
)
var (
namespace string
jobname string
replicas int
)
func main() {
flag.StringVar(&namespace, "ns", "default", "The namespace of the scaling job.")
flag.StringVar(&jobname, "n", "gobigger-test", "The name of the scaling job.")
flag.IntVar(&replicas, "r", 1, "The number of replicas for the job.")
flag.Parse()
cfg, err := ctrl.GetConfig()
if err != nil {
log.Fatalf("Failed to get kubeconfig: %v", err)
}
// create dynamic client for dijob
dclient := dynamic.NewForConfigOrDie(cfg)
gvr := schema.GroupVersionResource{
Group: v2alpha1.GroupVersion.Group,
Version: v2alpha1.GroupVersion.Version,
Resource: "dijobs",
}
diclient := dclient.Resource(gvr)
unjob, err := diclient.Namespace(namespace).Get(context.Background(), jobname, metav1.GetOptions{})
if err != nil {
log.Fatalf("Failed to get job with dynamic client: %v", err)
}
// set job.status.replicas to what we want
err = unstructured.SetNestedField(unjob.Object, int64(replicas), "status", "replicas")
if err != nil {
log.Fatalf("Failed to set nested fields")
}
// update job status
_, err = diclient.Namespace(namespace).UpdateStatus(context.Background(), unjob, metav1.UpdateOptions{})
if err != nil {
log.Fatalf("Failed to update status: %v", err)
}
log.Printf("Successfully update dijob %s/%s replicas to %d", namespace, jobname, replicas)
}
......@@ -149,11 +149,10 @@ type Phase string
const (
// JobPending means the job has been submitted to the cluster,
// but not all the pods and services have been created,
// or not pods are running
// but not all the pods and services have been created
JobPending Phase = "Pending"
// JobStarted means the job has been scheduled and waits for running.
// JobStarted means the job has been created and waits for running.
JobStarting Phase = "Starting"
// JobRestarting means the job has been rescheduled and waits for restarting.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册