提交 97a57469 编写于 作者: L liqingping

refactor: merge from opensource repo

1. merge from opensource repo
2. update e2e test
name: Go
on:
- push
- pull_request
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
- name: lint
shell: bash
run: |
# binary will be $(go env GOPATH)/bin/golangci-lint
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.41.1
# or install it into ./bin/
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.41.1
# In alpine linux (as it does not come with curl by default)
wget -O- -nv https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.41.1
golangci-lint --version
make lint
unit-test:
needs: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
- name: Test
shell: bash
run: |
# download etcd to bootstrap test environment
curl -L https://github.com/kubernetes-sigs/kubebuilder/releases/download/v2.3.2/kubebuilder_2.3.2_linux_amd64.tar.gz | tar -xz -C /tmp/
mv /tmp/kubebuilder_2.3.2_linux_amd64 /tmp/kubebuilder
export KUBEBUILDER_ASSETS=/tmp/kubebuilder/bin
make test
build:
needs: unit-test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
- name: Build Bin
run: make build
\ No newline at end of file
name: Publish Releases to Hub
# When its time to do a release do a full cross platform build for all supported
# architectures and push all of them to Docker Hub.
# Only trigger on semver shaped tags.
on:
push:
tags:
- "v*.*.*"
jobs:
docker:
runs-on: ubuntu-latest
strategy:
matrix:
platform: [ linux/amd64 ]
target: [ di-operator, di-server ]
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Prepare
id: prep
env:
DOCKERIO_ORG: diorchestrator
TARGET: ${{ matrix.target }}
run: |
DOCKER_IMAGE=$DOCKERIO_ORG/$TARGET
VERSION=edge
if [[ $GITHUB_REF == refs/tags/* ]]; then
VERSION=${GITHUB_REF#refs/tags/}
fi
if [ "${{ github.event_name }}" = "schedule" ]; then
VERSION=nightly
fi
TAGS="${DOCKER_IMAGE}:${VERSION}"
if [[ $VERSION =~ ^v[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
TAGS="$TAGS,${DOCKER_IMAGE}:latest"
fi
echo ::set-output name=tags::${TAGS}
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
with:
platforms: all
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1
- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Login to DockerHub
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERIO_USERNAME }}
password: ${{ secrets.DOCKERIO_PASSWORD }}
- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
builder: ${{ steps.buildx.outputs.name }}
context: ./
file: ./Dockerfile
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.prep.outputs.tags }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
target: ${{ matrix.target }}
- name: Image digest
run: echo ${{ steps.docker_build.outputs.digest }}
\ No newline at end of file
......@@ -26,4 +26,6 @@ bin
*.vscode
*.coverprofile
coverage.out.*
\ No newline at end of file
coverage.out.*
config/webhook/manifests.yaml
variables:
REGISTRY: registry.sensetime.com/cloudnative4ai
VERSION: v0.0.1-alpha.0
VERSION: v0.2.0-alpha.0
PROJECT: di-orchestrator
# dind config
DOCKER_HOST: tcp://localhost:2376
......@@ -33,7 +33,7 @@ stages:
lint:
stage: lint
image: registry.sensetime.com/cloudnative4ai/golang-build-kit:v1.14
image: registry.sensetime.com/cloudnative4ai/ci-deploy-kit:latest
tags:
- cloudnative4ai-group-runner-phoenix
script:
......@@ -42,7 +42,7 @@ lint:
unit-test:
stage: test
image: registry.sensetime.com/cloudnative4ai/golang-build-kit:v1.14
image: registry.sensetime.com/cloudnative4ai/ci-deploy-kit:latest
tags:
- cloudnative4ai-group-runner-phoenix
script:
......@@ -53,7 +53,7 @@ unit-test:
build-manual:
stage: build
image: registry.sensetime.com/cloudnative4ai/golang-build-kit:v1.14
image: registry.sensetime.com/cloudnative4ai/ci-deploy-kit:latest
tags:
- cloudnative4ai-group-runner-phoenix
allow_failure: false
......@@ -67,14 +67,14 @@ build-manual:
- develop
- /^release.*$/
script:
- make dev-images
- make docker-build
- make docker-push
after_script:
- docker image prune -f
build-release:
stage: build
image: registry.sensetime.com/cloudnative4ai/golang-build-kit:v1.14
image: registry.sensetime.com/cloudnative4ai/ci-deploy-kit:latest
tags:
- cloudnative4ai-group-runner-phoenix
allow_failure: false
......@@ -87,14 +87,14 @@ build-release:
- develop
- /^release.*$/
script:
- make dev-images
- make docker-build
- make docker-push
after_script:
- docker image prune -f
tag:
stage: release
image: registry.sensetime.com/cloudnative4ai/golang-build-kit:v1.14
image: registry.sensetime.com/cloudnative4ai/ci-deploy-kit:latest
tags:
- cloudnative4ai-group-runner-phoenix
services:
......@@ -106,7 +106,7 @@ tag:
- master
- /^release.*$/
script:
- make dev-images
- make docker-build
- make docker-release
after_script:
- docker image prune -f
\ No newline at end of file
......@@ -10,11 +10,11 @@ run:
tests: true
skip-dirs:
- manifests # deploy phoenix-rubber yaml
- manifests
- third_party # from go-ethereum
- _out #phoenix-rubber executable binary file
- _out
- doc # user tutorial
- deployment # deploy phoenix-rubber yaml
- deployment
- config # the crd config yaml
- cluster # the logging bash
- vendor # the third library
......
# Build di-operator with local di-operator binary
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 as dev-di-operator
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
WORKDIR /
COPY /bin/di-operator .
ENTRYPOINT ["/di-operator"]
# Build the di-operator binary
FROM registry.sensetime.com/cloudnative4ai/golang:1.14 as builder
LABEL maintainer="liqingping@sensetime.com"
FROM registry.sensetime.com/cloudnative4ai/golang:1.15 as builder
WORKDIR /workspace
# Copy the Go Modules manifests
......@@ -23,17 +14,30 @@ RUN go mod download
# Copy the go source
COPY main.go main.go
COPY api/ api/
COPY common/ common/
COPY controllers/ controllers/
COPY server/ server/
COPY utils/ utils/
# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o di-operator main.go
# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o di-server server/main.go
# Use distroless as minimal base image to package the di-operator binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 as di-operator
LABEL maintainer="opendilab.contact@gmail.com"
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
WORKDIR /
COPY --from=builder /workspace/di-operator .
ENTRYPOINT ["/di-operator"]
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 as di-server
LABEL maintainer="opendilab.contact@gmail.com"
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
WORKDIR /
COPY --from=builder /workspace/di-server .
ENTRYPOINT ["/di-server"]
# Build di-server with local nerver-server binary
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 as dev-di-server
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
WORKDIR /
COPY /bin/di-server .
ENTRYPOINT ["/di-server"]
# Build the manager binary
FROM registry.sensetime.com/cloudnative4ai/golang:1.14 as builder
WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download
# Copy the go source
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY server/ server/
COPY utils/ utils/
# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o di-server server/main.go
# Use distroless as minimal base image to package the di-server binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM registry.sensetime.com/cloudnative4ai/ubi:v1.0.0 as di-server
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
WORKDIR /
COPY --from=builder /workspace/di-server .
ENTRYPOINT ["/di-server"]
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
# di-operator version
VERSION ?= v0.0.1-alpha.0
VERSION ?= v0.2.0-alpha.0
MASTER_VERSION := $(VERSION)
COMMIT_SHORT_SHA=$(shell git log -n 1 | head -n 1 | sed -e 's/^commit //' | head -c 8)
......@@ -58,6 +58,7 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust
$(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=di-operator-cluster-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
cd config/manager && $(KUSTOMIZE) edit set image ${IMG_BASE}=${MASTER_IMG} ${SERVER_IMG_BASE}=${MASTER_SERVER_IMG}
./hack/update-version.sh ${MASTER_VERSION}
./hack/update-image-tags.sh config/manager ${MASTER_VERSION}
# dev-manifests will add COMMIT_SHORT_SHA to ci version, and image tag, so it is only used for development
# used `make manifests` when commited git
......@@ -65,6 +66,7 @@ dev-manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and
$(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=di-operator-cluster-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
cd config/manager && $(KUSTOMIZE) edit set image ${IMG_BASE}=${IMG} ${SERVER_IMG_BASE}=${SERVER_IMG}
./hack/update-version.sh ${VERSION}
./hack/update-image-tags.sh config/manager ${VERSION}
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
......@@ -81,7 +83,16 @@ lint:
.PHONY: test
test: ginkgo ## Run tests.
$(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./...
$(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./api/v1alpha1 ./controllers ./server/http ./common/gpuallocator
go tool cover -func=./api/v1alpha1/coverage.out
go tool cover -func=./controllers/coverage.out
go tool cover -func=./server/http/coverage.out
go tool cover -func=./common/gpuallocator/coverage.out
.PHONY: e2e-test
e2e-test: ## Run e2e tests.
go test -timeout 20m -v ./e2e -shared-volumes-dir /data/nfs/ding/cartpole --kubeconfig ~/.kube/config
go tool cover -func=./e2e/coverage.out
##@ Build
......@@ -92,13 +103,9 @@ build: generate ## Build di-operator binary.
run: manifests generate fmt vet ## Run a controller from your host.
go run ./main.go
dev-images: build
docker build -t ${IMG} --target dev-di-operator .
docker build -t ${SERVER_IMG} -f Dockerfile.server --target dev-di-server .
docker-build: build ## Build docker image with the di-operator.
docker-build: ## Build docker image with the di-operator.
docker build -t ${IMG} --target di-operator .
docker build -t ${SERVER_IMG} -f Dockerfile.server --target di-server .
docker build -t ${SERVER_IMG} --target di-server .
docker-push: ## Push docker image with the di-operator.
docker push ${IMG}
......
......@@ -2,7 +2,7 @@ domain: opendilab.org
layout:
- go.kubebuilder.io/v3
projectName: di
repo: go-sensephoenix.sensetime.com/di-orchestrator
repo: opendilab.org/di-orchestrator
resources:
- api:
crdVersion: v1
......@@ -11,7 +11,7 @@ resources:
domain: opendilab.org
group: diengine
kind: DIJob
path: go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1
path: opendilab.org/di-orchestrator/api/v1alpha1
version: v1alpha1
webhooks:
defaulting: true
......@@ -23,6 +23,6 @@ resources:
domain: opendilab.org
group: diengine
kind: AggregatorConfig
path: go-sensephoenix.sensetime.com/di-operator/api/v1alpha1
path: opendilab.org/di-operator/api/v1alpha1
version: v1alpha1
version: "3"
# di-orchestrator
# DI Orchestrator
DI Orchestrator is designed to manage DI (Decision Intelligence) jobs using Kubernetes Custom Resource and Operator.
## developer guide
Refers to [developer-guide](./docs/developer-guide.md)
## user guide
Refers to [user-guide](./docs/architecture.md)
### prerequisites
- a well prepared kubernetes cluster. Follow the [instructions](https://kubernetes.io/docs/setup/production-environment/tools/kubeadm/create-cluster-kubeadm/) to create a kubernetes cluster, or create a local kubernetes node referring to [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) or [minikube](https://minikube.sigs.k8s.io/docs/start/)
- cert-manager. Installation on kubernetes referenced to [cert-manager docs](https://cert-manager.io/docs/installation/kubernetes/). Or you can install by the following command.
### Prerequisites
- A well-prepared kubernetes cluster. Follow the [instructions](https://kubernetes.io/docs/setup/production-environment/tools/kubeadm/create-cluster-kubeadm/) to create a kubernetes cluster, or create a local kubernetes node referring to [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) or [minikube](https://minikube.sigs.k8s.io/docs/start/)
- Cert-manager. Installation on kubernetes please refer to [cert-manager docs](https://cert-manager.io/docs/installation/kubernetes/). Or you can install it by the following command.
```bash
kubectl create -f ./config/certmanager/cert-manager.yaml
```
### install
Install `di-operator` and `di-server` with the following command.
### Install DI Orchestrator
DI Orchestrator consists of two components: `di-operator` and `di-server`. Install `di-operator` and `di-server` with the following command.
```bash
kubectl create -f ./config/-manager.yaml
kubectl create -f ./config/di-manager.yaml
```
`di-operator` and `di-server` will be installed in `di-system` namespace.
```bash
$ kubectl get pod -n -system
$ kubectl get pod -n di-system
NAME READY STATUS RESTARTS AGE
di-operator-57cc65d5c9-5vnvn 1/1 Running 0 59s
di-server-7b86ff8df4-jfgmp 1/1 Running 0 59s
......@@ -28,107 +24,25 @@ di-server-7b86ff8df4-jfgmp 1/1 Running 0 59s
Install global components of DIJob defined in AggregatorConfig:
```bash
kubectl create -f examples/-mock-agconfig.yaml -n -system
kubectl create -f config/samples/agconfig.yaml -n di-system
```
### submit DIJob
### Submit DIJob
```bash
# submit DIJob
$ kubectl create -f examples/-mock-dijob.yaml
dijob.sensetime.com/dijob-example-1 created
$ kubectl create -f config/samples/dijob-cartpole.yaml
# get pod and you will see coordinator and aggregator are created
# get pod and you will see coordinator is created by di-operator
# a few seconds later, you will see collectors and learners created by di-server
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
dijob-example-1-aggregator 1/1 Running 0 8s
dijob-example-1-coordinator 1/1 Running 0 8s
# few seconds later, you will see collectors and learners created by di-server
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
dijob-example-1-aggregator 1/1 Running 0 80s
dijob-example-1-collector-pm5gv 1/1 Running 0 66s
dijob-example-1-coordinator 1/1 Running 0 80s
dijob-example-1-learner-rcwmc 1/1 Running 0 66s
dijob-example-1-learner-txjks 1/1 Running 0 66s
# get logs of coordinator
$ kubectl logs cartpole-dqn-coordinator
```
## User Guide
Refers to [user-guide](./docs/architecture.md). For Chinese version, please refer to [中文手册](./docs/architecture-cn.md)
## Contributing
Refers to [developer-guide](./docs/developer-guide.md).
# get logs
$ kubectl logs dijob-example-1-coordinator
* Serving Flask app "interaction.master.master" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
try to connect to dijob-example-1-aggregator.default:80
can't acquire resource for learner(dijob-example-1-aggregator.default:80)
Successed to connect to dijob-example-1-aggregator.default:80
have connected to aggregator
Recevied replicas response from server {'namespace': 'default', 'coordinator': 'dijob-example-1-coordinator', 'collectors': ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'], 'learners': ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']}
try to connect to dijob-example-1-collector-pm5gv.default:80
try to connect to dijob-example-1-collector-dz9jl.default:80
failed list:Only can connect 0 collectors, 1 learners.
[] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
Only can connect 0 collectors, 1 learners.
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
Only can connect 0 collectors, 1 learners.
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
Only can connect 0 collectors, 1 learners.
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
Only can connect 0 collectors, 1 learners.
Only can connect 0 collectors, 1 learners.
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-txjks.default:80', 'dijob-example-1-learner-rcwmc.default:80']
Only can connect 0 collectors, 1 learners.
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-txjks.default:80', 'dijob-example-1-learner-rcwmc.default:80']
Only can connect 0 collectors, 1 learners.
Successed to connect to dijob-example-1-collector-dz9jl.default:80
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
Have connected 1 collectors, 1 learners, match limit requests.
Start...
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: learner task(learner_task_PID1UUIDb0059cb6-b13a-11eb-8766-8a7e232739e4_1620615118.4310277) put into queue
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector task(collector_task_PID1UUIDb006f46c-b13a-11eb-a8f4-8a7e232739e4_1620615118.43981) put into queue
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector_task(collector_task_PID1UUIDb006f46c-b13a-11eb-a8f4-8a7e232739e4_1620615118.43981) can't find proper buffer_id(buffer_PID1UUIDb005a51c-b13a-11eb-8766-8a7e232739e4_1620615118.431186)
failed list: [] []
currnet list: ['dijob-example-1-collector-dz9jl.default:80', 'dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
send delete and received {'namespace': 'default', 'coordinator': 'dijob-example-1-coordinator', 'collectors': ['dijob-example-1-collector-dz9jl.default:80'], 'learners': []}
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector_task(collector_task_PID1UUIDb006f46c-b13a-11eb-a8f4-8a7e232739e4_1620615118.43981) can't find proper buffer_id(buffer_PID1UUIDb005a51c-b13a-11eb-8766-8a7e232739e4_1620615118.431186)
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector task(collector_task_PID1UUIDb006f46c-b13a-11eb-a8f4-8a7e232739e4_1620615118.43981) reput into queue
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector_task(collector_task_PID1UUIDb006f46c-b13a-11eb-a8f4-8a7e232739e4_1620615118.43981) can't find proper buffer_id(buffer_PID1UUIDb005a51c-b13a-11eb-8766-8a7e232739e4_1620615118.431186)
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: learner task(learner_task_PID1UUIDb0059cb6-b13a-11eb-8766-8a7e232739e4_1620615118.4310277) reput into queue
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: replay_buffer(buffer_PID1UUIDb005a51c-b13a-11eb-8766-8a7e232739e4_1620615118.431186) is created
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: learner_task(learner_task_PID1UUIDb0059cb6-b13a-11eb-8766-8a7e232739e4_1620615118.4310277) is successful to be assigned
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-txjks.default:80', 'dijob-example-1-learner-rcwmc.default:80']
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector task(collector_task_PID1UUIDb006f46c-b13a-11eb-a8f4-8a7e232739e4_1620615118.43981) timeout: [1620615124.456363, 1620615118.439875, 6.016488075256348/5]
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-txjks.default:80', 'dijob-example-1-learner-rcwmc.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-txjks.default:80', 'dijob-example-1-learner-rcwmc.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-txjks.default:80', 'dijob-example-1-learner-rcwmc.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
failed list: [] []
currnet list: ['dijob-example-1-collector-pm5gv.default:80'] ['dijob-example-1-learner-rcwmc.default:80', 'dijob-example-1-learner-txjks.default:80']
Successed to connect to dijob-example-1-collector-pm5gv.default:80
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector task(collector_task_PID1UUIDbe480962-b13a-11eb-a8f4-8a7e232739e4_1620615142.3544374) put into queue
collector task(collector_task_PID1UUIDbe480962-b13a-11eb-a8f4-8a7e232739e4_1620615142.3544374) is assigned to collector(dijob-example-1-collector-pm5gv.default:80)
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector_task(collector_task_PID1UUIDbe480962-b13a-11eb-a8f4-8a7e232739e4_1620615142.3544374) is successful to be assigned
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector task(collector_task_PID1UUIDbe480962-b13a-11eb-a8f4-8a7e232739e4_1620615142.3544374) send data(be49a1aa-b13a-11eb-84d1-5afa89bc32d3)
[Coordinator(PID1UUID9ddfbc06-b13a-11eb-8692-8a7e232739e4_1620615087.9837542)]: collector task(collector_task_PID1UUIDbe480962-b13a-11eb-a8f4-8a7e232739e4_1620615142.3544374) send data(bee26a66-b13a-11eb-84d1-5afa89bc32d3)
```
\ No newline at end of file
Contact us throw <opendilab.contact@gmail.com>
......@@ -38,7 +38,7 @@ type DIJobSpec struct {
// CleanPodPolicy defines the policy to clean pods after DIJob completed
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for nerveX components
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
......
/*
Copyright 2021 The SensePhoenix authors.
Copyright 2021 The OpenDILab authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
/*
Copyright 2021 The SensePhoenix authors.
Copyright 2021 The OpenDILab authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
/*
Copyright 2021 The SensePhoenix authors.
Copyright 2021 The OpenDILab authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......@@ -73,7 +73,8 @@ var _ = BeforeSuite(func() {
},
}
cfg, err := testEnv.Start()
var err error
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
......
// +build !ignore_autogenerated
/*
Copyright 2021 The SensePhoenix authors.
Copyright 2021 The OpenDILab authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
package util
const (
GroupNameLabel = "group-name"
JobNameLabel = "dijob-name"
ControllerNameLabel = "controller-name"
ReplicaTypeLabel = "replica-type"
PodNameLabel = "pod-name"
ControllerName = "di-operator"
CollectorName = "collector"
LearnerName = "learner"
DDPLearnerName = "ddp-learner"
AggregatorName = "aggregator"
CoordinatorName = "coordinator"
DefaultContainerName = "di-container"
DefaultPortName = "di-port"
DefaultCollectorPort = 22270
DefaultLearnerPort = 22271
DefaultAggregatorPort = 22272
DefaultCoordinatorPort = 22273
DDPLearnerPortPrefix = "gpu-port"
PodNamespaceEnv = "KUBERNETES_POD_NAMESPACE"
PodNameEnv = "KUBERNETES_POD_NAME"
CoordinatorURLEnv = "KUBERNETES_COORDINATOR_URL"
AggregatorURLEnv = "KUBERNETES_AGGREGATOR_URL"
ServerURLEnv = "KUBERNETES_SERVER_URL"
WorldSize = "WORLD_SIZE"
LocalWorldSize = "LOCAL_WORLD_SIZE"
StartRank = "START_RANK"
MasterAddr = "MASTER_ADDR"
MasterPort = "MASTER_PORT"
DefaultMasterPort = 10314
)
var (
DefaultServerURL = "di-server.di-system:8080"
)
......@@ -26689,7 +26689,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: registry.sensetime.com/cloudnative4ai/jetstack/cert-manager-cainjector:v1.3.1
image: quay.io/jetstack/cert-manager-cainjector:v1.3.1
imagePullPolicy: IfNotPresent
name: cert-manager
resources: {}
......@@ -26734,7 +26734,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: registry.sensetime.com/cloudnative4ai/jetstack/cert-manager-controller:v1.3.1
image: quay.io/jetstack/cert-manager-controller:v1.3.1
imagePullPolicy: IfNotPresent
name: cert-manager
ports:
......@@ -26780,7 +26780,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: registry.sensetime.com/cloudnative4ai/jetstack/cert-manager-webhook:v1.3.1
image: quay.io/jetstack/cert-manager-webhook:v1.3.1
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
......@@ -19748,7 +19748,7 @@ spec:
description: Priority labels the priority of DIJob
type: string
volumes:
description: Volumes defines the shared volumes for nerveX components
description: Volumes defines the shared volumes for DI-engine components
items:
description: Volume represents a named volume in a pod that may
be accessed by any container in the pod.
......@@ -15114,7 +15114,11 @@ spec:
description: Priority labels the priority of DIJob
type: string
volumes:
<<<<<<< HEAD
description: Volumes defines the shared volumes for nerveX components
=======
description: Volumes defines the shared volumes for di components
>>>>>>> test-e2e
items:
description: Volume represents a named volume in a pod that may be accessed by any container in the pod.
properties:
......@@ -16320,7 +16324,11 @@ spec:
- --leader-elect
command:
- /di-operator
<<<<<<< HEAD
image: registry.sensetime.com/cloudnative4ai/di-operator:v0.0.1-alpha.0
=======
image: diorchestrator/di-operator:v0.1.0
>>>>>>> test-e2e
imagePullPolicy: Always
livenessProbe:
httpGet:
......@@ -16384,7 +16392,11 @@ spec:
- --lease-lock-name=di-server
command:
- /di-server
<<<<<<< HEAD
image: registry.sensetime.com/cloudnative4ai/di-server:v0.0.1-alpha.0
=======
image: diorchestrator/di-server:v0.1.0
>>>>>>> test-e2e
imagePullPolicy: Always
livenessProbe:
httpGet:
......@@ -27,7 +27,7 @@ spec:
- /di-operator
args:
- "--server-address=http://di-server.di-system:8080"
image: registry.sensetime.com/cloudnative4ai/di-operator:v0.0.1-alpha.0
image: registry.sensetime.com/cloudnative4ai/di-operator:v0.2.0-alpha.0
imagePullPolicy: Always
name: manager
securityContext:
......
......@@ -23,7 +23,7 @@ spec:
- "--leader-elect"
- "--lease-lock-namespace=di-system"
- "--lease-lock-name=di-server"
image: registry.sensetime.com/cloudnative4ai/di-server:v0.0.1-alpha.0
image: registry.sensetime.com/cloudnative4ai/di-server:v0.2.0-alpha.0
imagePullPolicy: Always
name: server
securityContext:
......
......@@ -6,7 +6,7 @@ kind: Kustomization
images:
- name: registry.sensetime.com/cloudnative4ai/di-operator
newName: registry.sensetime.com/cloudnative4ai/di-operator
newTag: v0.0.1-alpha.0
newTag: v0.2.0-alpha.0
- name: registry.sensetime.com/cloudnative4ai/di-server
newName: registry.sensetime.com/cloudnative4ai/di-server
newTag: v0.0.1-alpha.0
newTag: v0.2.0-alpha.0
......@@ -7,17 +7,11 @@ spec:
aggregator:
template:
spec:
# nodeSelector:
# kubernetes.io/hostname: "liqingping-gpu.novalocal"
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.5-torch1.4-cuda10.1-cudnn7-devel-d6c260bf
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: PYTHONUNBUFFERED
value: "1"
resources:
......@@ -34,7 +28,7 @@ spec:
# pip install --no-cache-dir -e .;
# pip install --no-cache-dir -e .[common_env]
nervex -m dist --module learner_aggregator
ding -m dist --module learner_aggregator
ports:
- name: di-port
containerPort: 22270
\ No newline at end of file
......@@ -11,4 +11,4 @@ COPY ./utils ./utils
COPY ./worker ./worker
COPY ./main.py ./main.py
# docker build -t registry.sensetime.com/cloudnative4ai/nervex-mock:v0.0.1 .
\ No newline at end of file
# docker build -t diorchestrator/di-mock:v0.0.1 .
\ No newline at end of file
......@@ -9,7 +9,7 @@ spec:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/di-mock:v0.0.5
image: diorchestrator/di-mock:v0.0.5
imagePullPolicy: IfNotPresent
command: ["/bin/bash", "-c",]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 5; python3 -u main.py aggregator -sl $HOSTNAME.default -sp $AGGREGATOR_PORT -sl $HOSTNAME.default -ml $HOSTNAME.default -mp 81"]
......
......@@ -11,7 +11,7 @@ spec:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/-mock:v0.0.5
image: diorchestrator/di-mock:v0.0.5
imagePullPolicy: Always
command: ["/bin/bash", "-c",]
args: ["python3 -u main.py coordinator -l $HOSTNAME -p $COORDINATOR_PORT"]
......@@ -21,7 +21,7 @@ spec:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/-mock:v0.0.5
image: diorchestrator/di-mock:v0.0.5
imagePullPolicy: Always
command: ["/bin/bash", "-c",]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 10; python3 -u main.py collector -l $HOSTNAME.default -p $COLLECTOR_PORT"]
......@@ -33,7 +33,7 @@ spec:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/-mock:v0.0.5
image: diorchestrator/di-mock:v0.0.5
imagePullPolicy: Always
command: ["/bin/bash", "-c",]
args: ["until ping -c 1 $HOSTNAME.default ; do sleep 1 ; done ; sleep 10; python3 -u main.py learner -l $HOSTNAME.default -p $LEARNER_PORT"]
......
......@@ -75,7 +75,7 @@ class Master(ControllableService):
self.__shutdown_event = Event()
self.__lock = Lock()
# k8s: nervex-server
# k8s: di-server
self.__server_http_engine = None
# slave connection
......
......@@ -14,9 +14,9 @@ def start_cooridnator(args):
log.disabled = True
host, port = args.listen, args.port
nervex_server = args.server
di_server = args.server
coordinator = MockCoordinator(host, port, nervex_server)
coordinator = MockCoordinator(host, port, di_server)
coordinator.start()
system_shutdown_event = Event()
......@@ -31,7 +31,7 @@ def start_cooridnator(args):
shutdown_monitor_thread = Thread(target=shutdown_monitor, args=(), daemon=True, name='shutdown_monitor')
shutdown_monitor_thread.start()
system_shutdown_event.wait()
print("[nerveX parallel pipeline]Your RL agent is converged, you can refer to 'log' and 'tensorboard' for details")
print("[DI parallel pipeline]Your RL agent is converged, you can refer to 'log' and 'tensorboard' for details")
def start_collector(args):
host, port = args.listen, args.port
......@@ -50,13 +50,13 @@ def start_aggregator(args):
aggregator.start()
def parse_args():
parser = argparse.ArgumentParser(prog="mock_nervex", description="")
parser = argparse.ArgumentParser(prog="mock_di", description="")
parser.set_defaults(func=info)
subparsers = parser.add_subparsers()
# Coordinator
parser_coordinator = subparsers.add_parser("coordinator", help="start a new coordiantor")
parser_coordinator.add_argument("--server", type=str, default='http://nervex-server.nervex-system:8080', help="ip address")
parser_coordinator.add_argument("--server", type=str, default='http://di-server.di-system:8080', help="ip address")
parser_coordinator.add_argument("-l", "--listen", default="localhost", help="Specify the IP address on which the server listens")
parser_coordinator.add_argument("-p", "--port", type=int, default=8000, help="Specify the port on which the server listens")
parser_coordinator.add_argument('--disable_flask_log', default=True, type=bool, help='whether disable flask log')
......
"""
Copyright 2020 Sensetime X-lab. All Rights Reserved
Copyright 2020 OpenDILab. All Rights Reserved
"""
from typing import Union, Mapping, List, NamedTuple, Tuple, Callable, Optional, Any
import copy
......@@ -23,7 +23,7 @@ def lists_to_dicts(
- newdata (:obj:`Union[Mapping[object, object], NamedTuple]`): A list of dicts as a result
Example:
>>> from nervex.utils import *
>>> from di.utils import *
>>> lists_to_dicts([{1: 1, 10: 3}, {1: 2, 10: 4}])
{1: [1, 2], 10: [3, 4]}
"""
......@@ -59,7 +59,7 @@ def dicts_to_lists(data: Mapping[object, List[object]]) -> List[Mapping[object,
- newdata (:obj:`List[Mapping[object, object]]`): A dict of lists as a result
Example:
>>> from nervex.utils import *
>>> from di.utils import *
>>> dicts_to_lists({1: [1, 2], 10: [3, 4]})
[{1: 1, 10: 3}, {1: 2, 10: 4}]
"""
......
......@@ -54,7 +54,7 @@ from typing import List
# global linklink_flag
# """
# Overview:
# Try import linklink module, if failed, import nervex.tests.fake_linklink instead
# Try import linklink module, if failed, import di.tests.fake_linklink instead
# Returns:
# module: imported module (may be fake_linklink)
......@@ -66,7 +66,7 @@ from typing import List
# warnings.warn(
# "You have not installed linklink package! If you are not run locally and testing, "
# "ask coworker for help. We will run a fake linklink."
# "Refer to nervex.utils.fake_linklink.py for details."
# "Refer to di.utils.fake_linklink.py for details."
# )
# from .fake_linklink import link
# linklink_flag = False
......
"""
Copyright 2020 Sensetime X-lab. All Rights Reserved
Copyright 2020 OpenDILab. All Rights Reserved
"""
import os
import socket
......
......@@ -85,7 +85,7 @@ class MockLearnerAggregator(object):
if self._task != None:
start_task = conn.new_task({'name': 'learner_start_task', 'task_info': self._task['task_info']})
start_task.start().join()
# In real nerveX, here also need to reconstruct the communcation
# In real DI, here also need to reconstruct the communcation
if start_task.status != TaskStatus.COMPLETED:
print("can't send start task to learner({})".format(_id))
time.sleep(1)
......@@ -120,7 +120,7 @@ class MockLearnerAggregator(object):
conn = self._learner_connection.pop(learner_id)
# conn.disconnect()
# assert not conn.is_connected
# In real nerveX, here also need to reconstruct the communcation
# In real DI, here also need to reconstruct the communcation
self._world_size -= 1
else:
print("cannot find learner {}".format(learner))
......
......@@ -70,14 +70,14 @@ class TaskState(object):
class MockCoordinator(object):
def __init__(self, host, port, nervex_server) -> None:
def __init__(self, host, port, di_server) -> None:
self.__host = host
self.__port = port
cfg.interaction.host = host
cfg.interaction.port = port
self.__nerver_server = nervex_server
self.__nerver_server = di_server
self._coordinator_uid = get_task_uid()
self._cfg = cfg
......
......@@ -16,7 +16,7 @@ from .resource_manager import NaiveResourceManager
from interaction.base import get_http_engine_class, split_http_address
DEFAULT_NAMESPACE = 'default'
DEFAULT_POD_NAME = 'nervexjob-example-coordinator'
DEFAULT_POD_NAME = 'dijob-example-coordinator'
init_replicas_request = {
"collectors": {
......@@ -198,7 +198,7 @@ class CoordinatorInteraction(object):
def _period_sync_with_server(self):
start_time = time.time()
while not self._end_flag:
# First: send failed list to notify nerveX-server which replicas are failed, and then terminate such replicas.
# First: send failed list to notify di-server which replicas are failed, and then terminate such replicas.
# print("failed list:", list(self._failed_collector_conn), list(self._failed_learner_conn))
if len(self._failed_learner_conn) > 0 or len(self._failed_collector_conn) > 0:
success, _, message, _ = self._server_conn.post_replicas_failed(learners=list(self._failed_learner_conn), collectors=list(self._failed_collector_conn))
......
......@@ -4,7 +4,7 @@ from typing import Union
from collections import defaultdict
import random
# from nervex.policy import create_policy
# from di.policy import create_policy
from utils import LimitedSpaceContainer, get_task_uid
from .base_parallel_commander import register_parallel_commander, BaseCommander
......
apiVersion: diengine.opendilab.org/v1alpha1
kind: DIJob
metadata:
name: dijob-example
name: cartpole-dqn
spec:
group: xxx
priorityClassName: ""
cleanPodPolicy: "Running"
volumes:
- name: data-dir
hostPath:
path: /data/nfs/di/cartpole
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
- name: work-dir
hostPath:
path: /data/nfs/ding/cartpole
coordinator:
template:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.5-torch1.4-cuda10.1-cudnn7-devel-d6c260bf
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 3
memory: "10Gi"
limits:
cpu: 3
memory: "10Gi"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > cartpole_dqn_config.py
cat <<EOF > cartpole_dqn_config_k8s.py
from easydict import EasyDict
cartpole_dqn_config = dict(
......@@ -47,13 +50,12 @@ spec:
model=dict(
obs_shape=4,
action_shape=2,
hidden_size_list=[128, 128, 64],
encoder_hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
multi_gpu=True,
batch_size=32,
learning_rate=0.001,
learner=dict(
......@@ -81,7 +83,8 @@ spec:
enable_track_used_data=False,
),
commander=dict(
collector_task_space=2,
# increase collector task space when get rs from server
collector_task_space=0,
learner_task_space=1,
eval_interval=5,
),
......@@ -94,26 +97,26 @@ spec:
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['app_zoo.classic_control.cartpole.envs.cartpole_env'],
import_names=['dizoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['nervex.worker.learner.base_learner']),
learner=dict(type='base', import_names=['ding.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['nervex.worker.collector.zergling_collector'],
import_names=['ding.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['nervex.worker.coordinator.solo_parallel_commander'],
import_names=['ding.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['nervex.worker.learner.comm.flask_fs_learner'],
import_names=['ding.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['nervex.worker.collector.comm.flask_fs_collector'],
import_names=['ding.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
......@@ -129,7 +132,7 @@ spec:
"replicas": 2,
},
learners={
"gpus": "2",
"gpus": "0",
"replicas": 1,
},
),
......@@ -137,303 +140,76 @@ spec:
learner_target_num=1,
),
),
path_data='/data/nfs/di/cartpole/data',
path_policy='/data/nfs/di/cartpole/policy',
path_data='./data',
path_policy='./policy',
communication_mode='auto',
learner_multi_gpu=False,
learner_gpu_num=2
learner_gpu_num=1,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
nervex -m dist --module config -p k8s -c cartpole_dqn_config.py -s 0;
nervex -m dist --module coordinator -p k8s -c cartpole_dqn_config.py.pkl -s 0 --disable_flask_log 0
ding -m dist --module config -P k8s -c ./cartpole_dqn_config_k8s.py -s 0;
ding -m dist --module coordinator -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -cdp $COORDINATOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: data-dir
mountPath: /data/nfs/di/cartpole
- name: work-dir
mountPath: /ding
collector:
template:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.5-torch1.4-cuda10.1-cudnn7-devel-d6c260bf
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 8
memory: "10Gi"
limits:
cpu: 8
memory: "10Gi"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > cartpole_dqn_config.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
multi_gpu=True,
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
collector_task_space=2,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['app_zoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['di.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['nervex.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['nervex.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['nervex.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['nervex.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"replicas": 2,
},
learners={
"gpus": "2",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='/data/nfs/di/cartpole/data',
path_policy='/data/nfs/di/cartpole/policy',
communication_mode='auto',
learner_multi_gpu=False,
learner_gpu_num=2
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
nervex -m dist --module config -p k8s -c cartpole_dqn_config.py -s 0;
nervex -m dist --module collector -c cartpole_dqn_config.py.pkl -s 0
ding -m dist --module collector -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -clp $COLLECTOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: data-dir
mountPath: /data/nfs/di/cartpole
- name: work-dir
mountPath: /ding
learner:
template:
spec:
containers:
- name: di-container
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.5-torch1.4-cuda10.1-cudnn7-devel-d6c260bf
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 8
memory: "20Gi"
limits:
cpu: 8
memory: "20Gi"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > cartpole_dqn_config.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
multi_gpu=True,
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
collector_task_space=2,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['app_zoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['di.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['nervex.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['nervex.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['nervex.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['nervex.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"replicas": 2,
},
learners={
"gpus": "2",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='/data/nfs/di/cartpole/data',
path_policy='/data/nfs/di/cartpole/policy',
communication_mode='auto',
learner_multi_gpu=False,
learner_gpu_num=2
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
nervex -m dist --module config -p k8s -c cartpole_dqn_config.py -s 0;
nervex -m dist --module learner -c cartpole_dqn_config.py.pkl -s 0
ding -m dist --module learner -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -lp $LEARNER_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: cache-volume
mountPath: /dev/shm
- name: data-dir
mountPath: /data/nfs/di/cartpole
\ No newline at end of file
- name: work-dir
mountPath: /ding
\ No newline at end of file
apiVersion: diengine.opendilab.org/v1alpha1
kind: DIJob
metadata:
name: qbert-dqn
spec:
group: xxx
priorityClassName: ""
cleanPodPolicy: "Running"
volumes:
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
- name: work-dir
hostPath:
path: /data/nfs/ding/qbert
coordinator:
template:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 3
memory: "10Gi"
limits:
cpu: 3
memory: "10Gi"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > qbert_dqn_config_k8s.py
from easydict import EasyDict
qbert_dqn_config = dict(
env=dict(
collector_env_num=16,
collector_episode_num=2,
evaluator_env_num=8,
evaluator_episode_num=1,
stop_value=30000,
env_id='QbertNoFrameskip-v4',
frame_stack=4,
manager=dict(
shared_memory=False,
),
),
policy=dict(
cuda=True,
priority=True,
model=dict(
obs_shape=[4, 84, 84],
action_shape=6,
encoder_hidden_size_list=[128, 128, 512],
),
nstep=3,
discount_factor=0.99,
learn=dict(
multi_gpu=True,
batch_size=32,
learning_rate=0.0001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=500, )),
other=dict(
eps=dict(
type='exp',
start=1.,
end=0.05,
decay=250000,
),
replay_buffer=dict(
replay_buffer_size=400000,
enable_track_used_data=True,
),
commander=dict(
collector_task_space=0,
learner_task_space=1,
eval_interval=30,
),
),
),
)
qbert_dqn_config = EasyDict(qbert_dqn_config)
main_config = qbert_dqn_config
qbert_dqn_create_config = dict(
env=dict(
type='atari',
import_names=['dizoo.atari.envs.atari_env'],
),
env_manager=dict(type='subprocess'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['ding.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['ding.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['ding.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['ding.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['ding.worker.collector.comm.flask_fs_collector'],
),
)
qbert_dqn_create_config = EasyDict(qbert_dqn_create_config)
create_config = qbert_dqn_create_config
qbert_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"replicas": 2,
},
learners={
"gpus": "2",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='./data',
path_policy='./policy',
communication_mode='auto',
learner_gpu_num=2,
)
qbert_dqn_system_config = EasyDict(qbert_dqn_system_config)
system_config = qbert_dqn_system_config
EOF
# if code has been changed in the mount path, we have to reinstall ding cli
# pip install --no-cache-dir -e .;
ding -m dist --module config -P k8s -c qbert_dqn_config_k8s.py -s 0;
ding -m dist --module coordinator -c qbert_dqn_config_k8s.py.pkl -s 0 --disable-flask-log 0 -cdp $COORDINATOR_PORT
ports:
- name: di-port
containerPort: 22273
volumeMounts:
- name: work-dir
mountPath: /ding
collector:
template:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 8
memory: "10Gi"
limits:
cpu: 8
memory: "10Gi"
command: ["/bin/bash", "-c",]
args:
- |
# if code has been changed in the mount path, we have to reinstall ding cli
# pip install --no-cache-dir -e .;
ding -m dist --module collector -c qbert_dqn_config_k8s.py.pkl -s 0 -clp $COLLECTOR_PORT --disable-flask-log 0
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
learner:
template:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 3
nvidia.com/gpu: 2
memory: "30Gi"
limits:
cpu: 3
nvidia.com/gpu: 2
memory: "30Gi"
command: ["/bin/bash", "-c",]
args:
- |
# if code has been changed in the mount path, we have to reinstall ding cli
# pip install --no-cache-dir -e .;
ding -m dist --module spawn_learner -c qbert_dqn_config_k8s.py.pkl -s 0 -lp $LEARNER_PORT --disable-flask-log 0
ports:
- name: di-port
containerPort: 22271
volumeMounts:
- name: cache-volume
mountPath: /dev/shm
- name: work-dir
mountPath: /ding
\ No newline at end of file
package controllers
import (
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
)
func isSucceeded(job *div1alpha1.DIJob) bool {
......
/*
Copyright 2021 The SensePhoenix authors.
Copyright 2021 The OpenDILab authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......@@ -32,8 +32,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
diutil "go-sensephoenix.sensetime.com/di-orchestrator/utils"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
diutil "opendilab.org/di-orchestrator/utils"
)
// DIJobReconciler reconciles a DIJob object
......
......@@ -12,9 +12,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
diutil "go-sensephoenix.sensetime.com/di-orchestrator/utils"
testutil "go-sensephoenix.sensetime.com/di-orchestrator/utils/testutils"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
dicommon "opendilab.org/di-orchestrator/common"
diutil "opendilab.org/di-orchestrator/utils"
testutil "opendilab.org/di-orchestrator/utils/testutils"
)
var _ = Describe("DIJob Controller", func() {
......@@ -36,59 +37,59 @@ var _ = Describe("DIJob Controller", func() {
Expect(err).NotTo(HaveOccurred())
}
var createdNvxjob div1alpha1.DIJob
var createdDIjob div1alpha1.DIJob
By("Checking the created DIJob has enough coordinator")
for _, rtype := range []div1alpha1.ReplicaType{div1alpha1.ReplicaTypeCoordinator} {
Eventually(func() int {
err := k8sClient.Get(ctx, jobKey, &createdNvxjob)
err := k8sClient.Get(ctx, jobKey, &createdDIjob)
if err != nil {
return -1
}
if createdNvxjob.Status.ReplicaStatus == nil {
if createdDIjob.Status.ReplicaStatus == nil {
return -1
}
return int(createdNvxjob.Status.ReplicaStatus[rtype].Active)
return int(createdDIjob.Status.ReplicaStatus[rtype].Active)
}, timeout, interval).Should(Equal(1))
}
By("Checking the created DIJob is in Running state")
Eventually(func() bool {
err := k8sClient.Get(ctx, jobKey, &createdNvxjob)
err := k8sClient.Get(ctx, jobKey, &createdDIjob)
if err != nil {
return false
}
return createdNvxjob.Status.Phase == div1alpha1.JobRunning
return createdDIjob.Status.Phase == div1alpha1.JobRunning
}, duration, interval).Should(BeTrue())
By("Update coordinator to Succeeded")
for _, replicaName := range []string{
diutil.ReplicaPodName(createdNvxjob.Name, "coordinator"),
diutil.ReplicaPodName(createdDIjob.Name, "coordinator"),
} {
podKey := types.NamespacedName{Namespace: createdNvxjob.Namespace, Name: replicaName}
podKey := types.NamespacedName{Namespace: createdDIjob.Namespace, Name: replicaName}
err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded)
Expect(err).NotTo(HaveOccurred())
}
By("Checking the job is succeeded")
Eventually(func() div1alpha1.Phase {
err := k8sClient.Get(ctx, jobKey, &createdNvxjob)
err := k8sClient.Get(ctx, jobKey, &createdDIjob)
if err != nil {
return div1alpha1.JobUnknown
}
return createdNvxjob.Status.Phase
return createdDIjob.Status.Phase
}, timeout, interval).Should(Equal(div1alpha1.JobSucceeded))
By("Checking the coordinator is succeeded")
Eventually(func() int {
err := k8sClient.Get(ctx, jobKey, &createdNvxjob)
err := k8sClient.Get(ctx, jobKey, &createdDIjob)
if err != nil {
return -1
}
return int(createdNvxjob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Succeeded)
return int(createdDIjob.Status.ReplicaStatus[div1alpha1.ReplicaTypeCoordinator].Succeeded)
}, timeout, interval).Should(Equal(1))
By("Cleaning up")
err = testutil.CleanUpJob(ctx, k8sClient, createdNvxjob.DeepCopy(), timeout, interval)
err = testutil.CleanUpJob(ctx, k8sClient, createdDIjob.DeepCopy())
Expect(err).NotTo(HaveOccurred())
})
......@@ -156,7 +157,7 @@ var _ = Describe("DIJob Controller", func() {
}, timeout, interval).Should(Equal(c.expectStatus))
By("Cleaning up")
err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy(), timeout, interval)
err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy())
Expect(err).NotTo(HaveOccurred())
}
})
......@@ -220,12 +221,12 @@ var _ = Describe("DIJob Controller", func() {
By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name))
colStatus := make([]int, 3)
for _, col := range c.collectors {
createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, diutil.CollectorName, ownRefer, colStatus)
createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus)
}
lrStatus := make([]int, 3)
for _, lr := range c.learners {
createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, diutil.LearnerName, ownRefer, lrStatus)
createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus)
}
By("Checking the ReplicaStatus is as expected")
......@@ -314,7 +315,7 @@ var _ = Describe("DIJob Controller", func() {
}, timeout, interval).Should(Equal(status))
}
err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy(), timeout, interval)
err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy())
Expect(err).NotTo(HaveOccurred())
}
})
......@@ -331,9 +332,9 @@ func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div1alpha1
By(fmt.Sprintf("Checking the DIJob %s is successfully created", name))
key := types.NamespacedName{Namespace: dijob.Namespace, Name: dijob.Name}
createdNvxjob := div1alpha1.DIJob{}
createdDIjob := div1alpha1.DIJob{}
Eventually(func() bool {
err := k8sClient.Get(ctx, key, &createdNvxjob)
err := k8sClient.Get(ctx, key, &createdDIjob)
if err != nil {
return false
}
......@@ -352,7 +353,7 @@ func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div1alpha1
return true
}, timeout, interval).Should(BeTrue())
return createdNvxjob, key
return createdDIjob, key
}
func createAndUpdatePodPhase(
......@@ -362,8 +363,8 @@ func createAndUpdatePodPhase(
pod := testutil.NewPod(name, jobName, ownRefer)
labs := diutil.GenLabels(jobName)
labs[diutil.ReplicaTypeLabel] = replicaType
labs[diutil.PodNameLabel] = pod.Name
labs[dicommon.ReplicaTypeLabel] = replicaType
labs[dicommon.PodNameLabel] = pod.Name
pod.SetLabels(labs)
err := k8sClient.Create(ctx, pod, &client.CreateOptions{})
......
......@@ -10,9 +10,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
diutil "go-sensephoenix.sensetime.com/di-orchestrator/utils"
testutil "go-sensephoenix.sensetime.com/di-orchestrator/utils/testutils"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
dicommon "opendilab.org/di-orchestrator/common"
diutil "opendilab.org/di-orchestrator/utils"
testutil "opendilab.org/di-orchestrator/utils/testutils"
)
var _ = Describe("DIJob Specification", func() {
......@@ -87,13 +88,13 @@ var _ = Describe("DIJob Specification", func() {
colStatus := make([]int, 3)
for _, col := range c.collectors {
By(fmt.Sprintf("Create pod %s", col.name))
createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, diutil.CollectorName, ownRefer, colStatus)
createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus)
}
lrStatus := make([]int, 3)
for _, lr := range c.learners {
By(fmt.Sprintf("Create pod %s", lr.name))
createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, diutil.LearnerName, ownRefer, lrStatus)
createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus)
}
By("Get the number of pods")
......@@ -170,7 +171,7 @@ var _ = Describe("DIJob Specification", func() {
}
By("Clean up pods")
err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy(), timeout, interval)
err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy())
Expect(err).NotTo(HaveOccurred())
}
}
......
......@@ -9,8 +9,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
diutil "go-sensephoenix.sensetime.com/di-orchestrator/utils"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
dicommon "opendilab.org/di-orchestrator/common"
diutil "opendilab.org/di-orchestrator/utils"
)
func (r *DIJobReconciler) reconcilePods(ctx context.Context, job *div1alpha1.DIJob, pods []*corev1.Pod) error {
......@@ -32,8 +33,8 @@ func (r *DIJobReconciler) reconcilePods(ctx context.Context, job *div1alpha1.DIJ
// build coordinator pod
volumes := job.Spec.Volumes
template := job.Spec.Coordinator.Template.DeepCopy()
coorpod, coorsvc, coorurl, err := buildPodAndServiceForReplica(template, job, diutil.CoordinatorName,
diutil.DefaultCoordinatorPort, volumes)
coorpod, coorsvc, coorurl, err := buildPodAndServiceForReplica(template, job, dicommon.CoordinatorName,
dicommon.DefaultCoordinatorPort, volumes)
if err != nil {
msg := fmt.Sprintf("build coordinator pod for job %s failed", job.Name)
log.Error(err, msg)
......@@ -41,8 +42,8 @@ func (r *DIJobReconciler) reconcilePods(ctx context.Context, job *div1alpha1.DIJ
}
// add env
envs := make(map[string]string)
envs[diutil.CoordinatorURLEnv] = coorurl
diutil.SetPodEnv(coorpod, envs)
envs[dicommon.CoordinatorURLEnv] = coorurl
diutil.AddEnvsToPod(coorpod, envs)
if coordinator == nil {
if err := r.createPodAndService(ctx, job, coorpod, coorsvc); err != nil {
......@@ -140,7 +141,7 @@ func buildPodAndServiceForReplica(template *corev1.PodTemplateSpec, job *div1alp
}
// set restart policy for coordinator
if replicaType == diutil.CoordinatorName && template.Spec.RestartPolicy == "" {
if replicaType == dicommon.CoordinatorName && template.Spec.RestartPolicy == "" {
template.Spec.RestartPolicy = corev1.RestartPolicyNever
}
......@@ -161,10 +162,10 @@ func buildPodAndServiceForReplica(template *corev1.PodTemplateSpec, job *div1alp
// add env
envs := make(map[string]string)
envs[diutil.PodNamespaceEnv] = pod.Namespace
envs[diutil.PodNameEnv] = pod.Name
envs[diutil.ServerURLEnv] = diutil.DefaultServerURL
diutil.SetPodEnv(pod, envs)
envs[dicommon.PodNamespaceEnv] = pod.Namespace
envs[dicommon.PodNameEnv] = pod.Name
envs[dicommon.ServerURLEnv] = dicommon.DefaultServerURL
diutil.AddEnvsToPod(pod, envs)
// add volumes
pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
......
......@@ -10,8 +10,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
diutil "go-sensephoenix.sensetime.com/di-orchestrator/utils"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
dicommon "opendilab.org/di-orchestrator/common"
diutil "opendilab.org/di-orchestrator/utils"
)
const (
......@@ -134,13 +135,13 @@ func updateReplicaStatus(pod *corev1.Pod, job *div1alpha1.DIJob, replicaType div
containerName := ""
switch replicaType {
case div1alpha1.ReplicaTypeCoordinator:
containerName = diutil.CoordinatorName
containerName = dicommon.CoordinatorName
case div1alpha1.ReplicaTypeAggregator:
containerName = diutil.AggregatorName
containerName = dicommon.AggregatorName
case div1alpha1.ReplicaTypeCollector:
containerName = diutil.CollectorName
containerName = dicommon.CollectorName
case div1alpha1.ReplicaTypeLearner:
containerName = diutil.LearnerName
containerName = dicommon.LearnerName
}
switch pod.Status.Phase {
case corev1.PodRunning:
......
/*
Copyright 2021 The SensePhoenix authors.
Copyright 2021 The OpenDILab authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......@@ -35,8 +35,8 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
div1alpha1 "go-sensephoenix.sensetime.com/di-orchestrator/api/v1alpha1"
testutil "go-sensephoenix.sensetime.com/di-orchestrator/utils/testutils"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
testutil "opendilab.org/di-orchestrator/utils/testutils"
//+kubebuilder:scaffold:imports
)
......@@ -53,7 +53,7 @@ const (
var k8sClient client.Client
var testEnv *envtest.Environment
func TestAPIs(t *testing.T) {
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
......
# DI Orchestrator架构
nerveX框架分为3个重要的模块,分别是coordinator、collector和learner。一般情况下,一个nerveX训练任务只有一个coordinator,learner和collector的数目可以变化。三个模块的作用分别为:
DI-engine框架分为3个重要的模块,分别是coordinator、collector和learner。一般情况下,一个DI-engine训练任务只有一个coordinator,learner和collector的数目可以变化。三个模块的作用分别为:
- coordinator:保持与collector和learner的连接,接受collector和learner的获取原信息请求、推送原信息请求等,向learner和collector发送任务。
- collector:从coordinator获取RL模型在存储中间件中的位置并加载RL模型,然后在自身构造的环境中根据RL模型决策产生数据帧,将数据帧存储回存储中间件,并将数据帧原信息(存储路径、大小等)汇报给coordinator。
- learner:从coordinator获取数据帧存储位置并从存储中间件中加载数据帧开始训练RL模型,训练完成之后将模型存储到中间件中,并将模型原信息(存储路径、大小等)汇报给coordinator。由于learner部分常常存在数据并行训练这一额外的分布式机制,避免混淆,我们将与coordinator进行交互的模块称作logic learner,是coordinator下发任务的基本单位;而将数据并行训练中的单个learner进程称作ddp learner,多个ddp learner进程提供数据并行服务。一个logic learner可以对应1个ddp learner(单卡)或多个ddp learner(多卡)。另外,提供数据并行训练服务还需要额外引入aggregator模块,aggregator负责将多个ddp learner的训练结果进行汇总并发送给coordinator,即aggregator与多个ddp learner一起构成logic learner,而coordinator只会与logic learner进行交互。
有关nerveX的详细介绍可参考[nerveX developer tutorial](http://open-xlab.pages.gitlab.bj.sensetime.com/cell/nerveX/tutorial_dev/index.html)
有关DI-engine的详细介绍可参考[DI-engine developer tutorial](https://opendilab.github.io/DI-engine/tutorial_dev/index.html)
为了提供nerveX在Kubernetes(K8s)中运行的支持,我们设计了DI Orchestrator,本文将说明利用DI Orchestrator,nerveX各个模块在K8s系统上如何被创建、如何相互发现、如何开始训练等。DI Orchestrator的架构如下图所示:
为了提供DI-engine在Kubernetes(K8s)中运行的支持,我们设计了DI Orchestrator,本文将说明利用DI Orchestrator,DI-engine各个模块在K8s系统上如何被创建、如何相互发现、如何开始训练等。DI Orchestrator的架构如下图所示:
![](images/di-arch.png)
![](images/di-arch.svg)
整体分为两大模块:`di-server``di-operator``DDPL`指ddp learner,`Lm`指Learner,`Cn`指Collector,`Aggregator+DDPL`构成一个logic learner。接下来将首先介绍一个nerveX任务提交到K8s之后DI Orchestrator如何将nerveX的各个模块(在K8s中就是一个[pod](https://kubernetes.io/docs/concepts/workloads/pods/))创建并启动,然后将对di-server和di-operator进行介绍。
整体分为两大模块:`di-server``di-operator``DDPL`指ddp learner,`Lm`指Learner,`Cn`指Collector,`Aggregator+DDPL`构成一个logic learner。接下来将首先介绍一个DI-engine任务提交到K8s之后DI Orchestrator如何将DI-engine的各个模块(在K8s中就是一个[pod](https://kubernetes.io/docs/concepts/workloads/pods/))创建并启动,然后将对di-server和di-operator进行介绍。
## 任务创建流程
这里介绍任务创建流程,说明一个nerveX任务在K8s中从创建到执行完成的一整个生命周期
这里介绍任务创建流程,说明一个DI-engine任务在K8s中从创建到执行完成的一整个生命周期
- 编写AggregatorConfig yaml文件,定义aggregator的模板,将在后面创建DIJob的时候用来创建aggregator,aggregator可以为训练端提供数据并行训练服务。
- 编写DIJob yaml文件,定义coordinator、collector、learner的模板,提交到K8s集群中。
- di-operator监听到DIJob的提交,创建coordinator,并为coordinator创建可访问的域名。
......@@ -27,7 +27,7 @@ nerveX框架分为3个重要的模块,分别是coordinator、collector和learn
di-operator是在一个负责在K8s系统中编排DIJob的组件,采用K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/),通过[controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/)中的控制循环监听K8s集群中DIJob的状态,并在有需要的时候对DIJob的状态进行修改,使得DIJob的实际状态与我们预定义的状态尽可能保持一致。
### API定义
根据nerveX框架中每个模块的特性,我们定义了两种自定义资源(Custom Resource),分别是DIJob和AggregatorConfig。前者用来定义一个RL任务的coordinator、collector和learner运行所需的必备条件,包括镜像、启动命令、所需计算和存储资源、环境变量等;后者用来定义一个RL任务的aggregator运行所需的必备条件。
根据DI框架中每个模块的特性,我们定义了两种自定义资源(Custom Resource),分别是DIJob和AggregatorConfig。前者用来定义一个RL任务的coordinator、collector和learner运行所需的必备条件,包括镜像、启动命令、所需计算和存储资源、环境变量等;后者用来定义一个RL任务的aggregator运行所需的必备条件。
DIJob定义:
```go
......@@ -41,7 +41,7 @@ type DIJobSpec struct {
// CleanPodPolicy defines the policy to clean pods after DIJob completed
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for nerveX components
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
......@@ -60,7 +60,7 @@ type AggregatorConfigSpec struct {
```
> **为什么aggregator单独定义?**
aggregator对所有使用nerveX框架进行RL训练的任务都是通用的,因此我们将aggregator定义为一个全局的、共享的资源AggregatorConfig,所有RL任务提交后,di-server将通过读取集群中唯一的AggregatorConfig来创建aggregator。另外,aggregator只是针对最常见的数据并行训练,如果是其他并行训练方法,需要定义新的Custom Resource。
aggregator对所有使用DI-engine框架进行RL训练的任务都是通用的,因此我们将aggregator定义为一个全局的、共享的资源AggregatorConfig,所有RL任务提交后,di-server将通过读取集群中唯一的AggregatorConfig来创建aggregator。另外,aggregator只是针对最常见的数据并行训练,如果是其他并行训练方法,需要定义新的Custom Resource。
### 状态定义
用户提交DIJob后,di-operator便接管了DIJob的生命周期的管理,为了便于用户了解DIJob的状态,我们定义了以下阶段(phase):
......@@ -103,7 +103,7 @@ func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
当用户提交DIJob后,informer获取到该提交事件后触发handler,之后Reconcile函数被调用;Reconcile函数中调用list pod方法发现coordinator未创建,则读取DIJob中关于coordinator的定义模板,创建相应的coordinator pod(coordinator程序在其中运行)和service(用于pod间通信),并将一些环境变量写入pod中,包括pod的名称、pod的命名空间、访问coordinator的URL等环境变量。
其中,nerveX框架的每个模块占用的端口都有一个默认值,如下所示:
其中,DI-engine框架的每个模块占用的端口都有一个默认值,如下所示:
```go
DefaultCollectorPort = 22270
......@@ -122,7 +122,7 @@ coordinator创建之后,di-operator将监听pod的状态并修改DIJob的状
di-operator中实现了webhook校验方法,创建MutatingWebhook用于设置DIJob的默认值;创建ValidatingWebhook用于校验DIJob的正确性。比如对`CleanPodPolicy`字段,我们在MutatingWebhook中设置其默认值为`Running`,表示DIJob完成后将Running的pod都删除;我们在ValidatingWebhook中校验`CleanPodPolicy`字段的值,如果用户设置的值不等于`None``ALL``Running`中的任何一个,则拒绝提交该DIJob。
## DI Server
di-server是一个为nerveX框架定制的http服务器,提供新增、删除和查询collector、learner、aggregator的功能。通过调用di-server的相关接口,di-server为DIJob提供了动态增删collector和learner的能力。下面将对di-server的设计进行简要介绍,包括存储AggregatorConfig、DIJob以及DIJob所有pod的本地cache;用于动态新增、删除和查询collector、learner和aggregator的http接口设计。
di-server是一个为DI-engine框架定制的http服务器,提供新增、删除和查询collector、learner、aggregator的功能。通过调用di-server的相关接口,di-server为DIJob提供了动态增删collector和learner的能力。下面将对di-server的设计进行简要介绍,包括存储AggregatorConfig、DIJob以及DIJob所有pod的本地cache;用于动态新增、删除和查询collector、learner和aggregator的http接口设计。
### 本地cache
为了减少di-server与K8s api server之间查询的频率,从而减轻K8s api server的负担,我们利用[client-go](https://github.com/kubernetes/client-go)提供的informer机制将AggregatorConfig、DIJob和DIJob的所有pod存储在本地cache,如下图所示
......@@ -156,11 +156,10 @@ genericInformer.Informer().GetIndexer().GetByKey(key)
| POST | /v1alpha1/replicas | create replicas. put data in request body |
| POST | /v1alpha1/replicas/failed | post failed replicas and request for recreation. put data in request body |
各个接口的请求格式、请求参数、请求body、返回值详见[http接口定义](https://gitlab.bj.sensetime.com/platform/CloudNative4AI/cluster-lifecycle/di-operator/issues/6)
## DI Orchestrator的优势
DI Orchestrator为nerveX框架提供了分布式场景下基于K8s的容器运行方案。对于用户提交的DIJob,di-operator负责对nerveX的各个模块进行编排,使得各个模块可以正常运行并执行训练任务。通过调用di-server的接口,赋予coordinator新增、删除和查询其所有的collector、learner和aggregator的功能,提升nerveX框架资源动态分配的能力。总结DI Orchestrator提供了以下优势:
1. 封装性。依赖di-operator的编排能力,部署nerveX分布式RL训练的细节(包括pod创建、服务发现)对用户来说是透明的。根据nerveX框架对分布式RL训练的部署需求,di-operator会将coordinator创建出来,然后coordinator再请求di-server创建其他模块,di-operator会把每个模块的pod的状态记录到DIJob的状态中。DIJob的生命周期也由di-operator维护,向用户展示DIJob在不同阶段的状态。
DI Orchestrator为DI-engine框架提供了分布式场景下基于K8s的容器运行方案。对于用户提交的DIJob,di-operator负责对DI-engine的各个模块进行编排,使得各个模块可以正常运行并执行训练任务。通过调用di-server的接口,赋予coordinator新增、删除和查询其所有的collector、learner和aggregator的功能,提升DI-engine框架资源动态分配的能力。总结DI Orchestrator提供了以下优势:
1. 封装性。依赖di-operator的编排能力,部署DI-engine分布式RL训练的细节(包括pod创建、服务发现)对用户来说是透明的。根据DI-engine框架对分布式RL训练的部署需求,di-operator会将coordinator创建出来,然后coordinator再请求di-server创建其他模块,di-operator会把每个模块的pod的状态记录到DIJob的状态中。DIJob的生命周期也由di-operator维护,向用户展示DIJob在不同阶段的状态。
2. 易用性。用户只需要在DIJob的yaml文件中定义好coordinator、collector、learner的配置之后,一键提交到K8s集群即可,di-operator将负责完成部署工作,将用户从K8s集群中复杂的分布式RL训练部署中解放出来。
3. 鲁棒性。依赖K8s的pod重启机制,保证pod在意外退出的情况下能自动重启,coordinator能够迅速响应并重新连接。
4. 动态扩展。DIJob所需的collector/learner/aggregator是动态变化的,因此di-server提供了http接口可以动态调整collector/learner的数目,使得DIJob可以根据自身需求调整collector和learner的比例,优化吞吐量。
# DI Operator architecture
DI framework consists of 3 important modules, namely coordinator, collector and learner. In general, a nerveX training job has only one coordinator, and the number of learners and collectors can vary. The roles of the three modules are:
DI-engine framework consists of 3 important modules, namely coordinator, collector and learner. In general, a DI-engine training job has only one coordinator, and the number of learners and collectors can vary. The roles of the three modules are:
- Coordinator. Maintain connections with collectors and learners, accept meta-infos requests and posts from collectors and learners, and send tasks to collectors and learners.
- Collector. Request path to RL model stored in storage middleware from coordinator, load the RL model, and then generate data frames according to the RL model's steps from environment. Store the data frames back to the storage middleware, and report meta-infos (the storage path, size, etc.) of the data frames to coordinator.
- Learner: Request data frames storage path from coordinator and load the data frames from storage middleware to start training the RL model. After the training is completed, store the model into the storage middleware, and report model meta-infos (storage path, size, etc.) to coordinator. Because we often need to use distributed mechanism of data parallel training, to avoid confusion, we call the module interacting with coordinator the logic learner, which is the basic unit for coordinator to issue tasks. And the single learner process in the data parallel training is called ddp learner, and multiple ddp learner processes provide data parallel services. One logic learner can correspond to one ddp learner (single-gpu) or multiple ddp learners (multi-gpu). In addition, to provide data parallel training services, an additional aggregator module needs to be introduced. The aggregator is responsible for summarizing the training results of multiple ddp learners and sending them to coordinator. That is, the aggregator and multiple ddp learners form a logic learner, and coordinator will only interact with logic learners.
For the introduction of nerveX, please refer to [nerveX developer tutorial](http://open-xlab.pages.gitlab.bj.sensetime.com/cell/nerveX/tutorial_dev/index.html).
For the introduction of DI-engine, please refer to [DI-engine developer tutorial](https://opendilab.github.io/DI-engine/tutorial_dev/index.html).
In order to provide running support for nerveX in Kubernetes (K8s), we designed `DI Orchestrator`. This article will explain how to use DI Orchestrator, how each module of nerveX is created on K8s and discovers each other, how to start training, etc. The architecture of DI Orchestrator is shown in the figure below:
In order to provide running support for DI-engine in Kubernetes (K8s), we designed `DI Orchestrator`. This article will explain how to use DI Orchestrator, how each module of DI-engine is created on K8s and discovers each other, how to start training, etc. The architecture of DI Orchestrator is shown in the figure below:
![](images/di-arch.png)
![](images/di-arch.svg)
There are two main modules that is `di-server` and `di-operator`.
`DDPL` represents ddp learner, `Lm` represents logic learner, `Cn` represents collector, and `Aggregator+DDPL` constructs a logic learner. In the following pages, we will first introduce how `DI Orchestrator` creates and starts each module of nerveX after a nerveX job is submitted to K8s, and then introduces the architecture of `di-server` and `di-operator`.
`DDPL` represents ddp learner, `Lm` represents logic learner, `Cn` represents collector, and `Aggregator+DDPL` constructs a logic learner. In the following pages, we will first introduce how `DI Orchestrator` creates and starts each module of DI-engine after a DI-engine job is submitted to K8s, and then introduces the architecture of `di-server` and `di-operator`.
## Job creation process
Here is a description of the job creation process, illustrating the entire life cycle of a nerveX job from creation to execution in K8s.
Here is a description of the job creation process, illustrating the entire life cycle of a DI-engine job from creation to execution in K8s.
- Edit the AggregatorConfig yaml file to define the aggregator template, which will be used to create aggregators when DIJob is created later. Aggregator can provide data parallel training services.
- Edit the DIJob yaml file to define the template of coordinator, collector and learner, and submit it to K8s.
- After di-operator received the event of DIJob submission, it creates a coordinator, and creates an accessible domain name for the coordinator.
......@@ -25,7 +25,7 @@ Here is a description of the job creation process, illustrating the entire life
- When the training is completed, di-operator will delete all collectors, learners by default, while coordinator will be reserved for users to view logs and other operations.
## DI Operator
Ding-operator is a component responsible for orchestrating DIJob in K8s. It uses K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) to monitor the status of DIJob objects in K8s cluster through the control loop in [controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/), and to update the status of DIJob when necessary. The status is modified so that the actual status of DIJob is as consistent as possible with our predefined status.
Di-operator is a component responsible for orchestrating DIJob in K8s. It uses K8s [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) to monitor the status of DIJob objects in K8s cluster through the control loop in [controller pattern](https://kubernetes.io/docs/concepts/architecture/controller/), and to update the status of DIJob when necessary. The status is modified so that the actual status of DIJob is as consistent as possible with our predefined status.
### API definition
According to the characteristics of each module, we have defined two Custom Resources, namely DIJob and AggregatorConfig. The former is used to define the prerequisites for coordinator, collector and learner to start running, including docker images, startup commands, computing and storage resources, environment variables, etc. The latter is used to define the prerequisites for aggregator.
......@@ -42,7 +42,7 @@ type DIJobSpec struct {
// CleanPodPolicy defines the policy to clean pods after DIJob completed
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for nerveX components
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
......@@ -61,7 +61,7 @@ type AggregatorConfigSpec struct {
```
> **Why should aggregator be defined alone?**
Aggregator is common module for all RL training jobs using nerveX framework, so we define the aggregator as a global and shared resource named AggregatorConfig. After RL jobs are submitted, di-server will read the global AggregatorConfig in K8s cluster to create aggregators for these RL jobs. In addition, aggregator is only for most common data parallel training. You need to define a new Custom Resource if other parallel training methods are used.
Aggregator is common module for all RL training jobs using DI-engine framework, so we define the aggregator as a global and shared resource named AggregatorConfig. After RL jobs are submitted, di-server will read the global AggregatorConfig in K8s cluster to create aggregators for these RL jobs. In addition, aggregator is only for most common data parallel training. You need to define a new Custom Resource if other parallel training methods are used.
### Status definition
After DIJob is submitted, di-operator takes over the management of the life cycle of the DIJob. In order to facilitate the user to have a better view of the DIJob's status, we define the following phases:
......@@ -105,7 +105,7 @@ func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
When DIJob is submitted, we firstly list pods that belong to DIJob in the Reconcile function and find that the coordinator has not been created. Then we read the coordinator template defined in DIJob and create the corresponding coordinator pod (used to run coordinator main process) and service (used for inter-pod communication), and write some environment variables into the pod, including the name of the pod, the namespace of the pod, the port which coordinator listens to, and the URL to access the coordinator.
The port occupied by each module of the nerveX framework has a default value, as shown below:
The port occupied by each module of the DI-engine framework has a default value, as shown below:
```go
DefaultCollectorPort = 22270
......@@ -124,7 +124,7 @@ To achieve the above goals, we can configure webhooks in K8s. K8s webhook consis
The webhook verification is implemented in di-operator. MutatingWebhook is created to set the default value for DIJob; ValidatingWebhook is created to verify the correctness of DIJob. For example, for the `CleanPodPolicy` field in DIJob, we set its default value in MutatingWebhook to `Running`, which means that all running pods will be deleted after DIJob is completed. We verify the value of the `CleanPodPolicy` field in ValidatingWebhook, if the value set by the user is not equal to any of `None`, `ALL`, or `Running`, the DIJob will be rejected.
## DI Server
Ding-server is an http server customized for nerveX framework, providing the apis of adding, deleting, and querying collectors, learners, and aggregators. By calling the related apis of di-server, di-server can provide DIJob with the ability to dynamically scale collectors and learners. The following will briefly introduce the design of di-server, including the local cache for storing AggregatorConfig, DIJob and all pods of DIJob; the http interface design for dynamically adding, deleting and querying collectors, learners and aggregators.
Di-server is an http server customized for DI-engine framework, providing the apis of adding, deleting, and querying collectors, learners, and aggregators. By calling the related apis of di-server, di-server can provide DIJob with the ability to dynamically scale collectors and learners. The following will briefly introduce the design of di-server, including the local cache for storing AggregatorConfig, DIJob and all pods of DIJob; the http interface design for dynamically adding, deleting and querying collectors, learners and aggregators.
### Local cache
In order to reduce the frequency of queries between di-server and K8s api server, thereby reducing the burden of K8s api server, we use [client-go](https://github.com/kubernetes/client-go)'s informer mechanism to store AggregatorConfig, DIJob and all pods of DIJob in local cache, as shown in the following figure
......@@ -147,7 +147,7 @@ In order to support dynamic scaling of collectors/learners for DIJobs, di-server
![](images/di-api.png)
提供如下接口:
The following http interfaces are provided:
| method | path | description |
|---|---|---|
......@@ -159,11 +159,10 @@ In order to support dynamic scaling of collectors/learners for DIJobs, di-server
| POST | /v1alpha1/replicas | create replicas. put data in request body |
| POST | /v1alpha1/replicas/failed | post failed replicas and request for recreation. put data in request body |
For the request format, request parameters, request body, and return value of each interface, please refer to [http interface definition](https://gitlab.bj.sensetime.com/platform/CloudNative4AI/cluster-lifecycle/di-operator/issues/6)
## Advantages of DI Orchestrator
DI Orchestrator provides a K8s-based container-orchestration solution for the nerveX framework in a distributed scenario. For a DIJob, di-operator is responsible for arranging the various modules of nerveX so that each module can run normally and perform training tasks. By calling di-server’s HTTP interface, coordinator is given the ability to add, delete, and query all its collectors, learners, aggregators and improve the dynamic allocation of nerveX framework resources. In summary, DI Orchestrator provides the following advantages:
1. Encapsulation. Relying on the orchestration capabilities of di-operator, deploying nerveX distributed RL training (including pod creation and service discovery) are transparent to us. According to the deployment requirements of the nerveX framework for distributed RL training, di-operator will create coordinator, and then the coordinator will request di-server to create other modules. Ding-operator will record the status of the pod of each module into the status of the DIJob. The life cycle of DIJob is also maintained by di-operator, providing us with status of DIJob in different stages.
2. Ease of use. We only need to define the configuration of coordinator, collector, and learner in the yaml file of DIJob, and submit them to K8s cluster with one click. Ding-operator will be responsible for deploying nerveX RL trainings and liberating us from the complex distributed RL deployments in K8s cluster.
3. Robustness. Relying on the pod restart mechanism of K8s, it ensures that pods can automatically restart in the event of an unexpected exit, and the coordinator can respond quickly and reconnect.
4. Dynamic expansion. Collectors/learners required by DIJob is dynamically changing, so di-server provides HTTP interfaces to allow us to dynamically adjust the number of collectors/learners, so that DIJob can adjust the ratio of collectors and learners according to its own needs to optimize throughput.
DI Orchestrator provides a K8s-based container-orchestration solution for the DI-engine framework in a distributed scenario. For a DIJob, di-operator is responsible for arranging the various modules of DI-engine so that each module can run normally and perform training tasks. By calling di-server’s HTTP interface, coordinator is given the ability to add, delete, and query all its collectors, learners, aggregators and improve the dynamic allocation of DI-engine framework resources. In summary, DI Orchestrator provides the following advantages:
1. Encapsulation. Relying on the orchestration capabilities of di-operator, deploying DI-engine distributed RL training (including pod creation and service discovery) is transparent to us. According to the deployment requirements of the DI-engine framework for distributed RL training, di-operator will create coordinator, and then the coordinator will request di-server to create other modules. Di-operator will record the status of the pod of each module into the status of the DIJob. The life cycle of DIJob is also maintained by di-operator, providing us with status of DIJob in different stages.
2. Ease of use. We only need to define the configuration of coordinator, collector, and learner in the yaml file of DIJob, and submit them to K8s cluster with one click. Di-operator will be responsible for deploying DI-engine RL training and liberating us from the complex distributed RL deployments in K8s cluster.
3. Robustness. Relying on the pod restart mechanism of K8s, ensures that pods can automatically restart in the event of an unexpected exit, and the coordinator can respond quickly and reconnect.
4. Dynamic expansion. Collectors/learners required by DIJob are dynamically changing, so di-server provides HTTP interfaces to allow us to dynamically adjust the number of collectors/learners, so that DIJob can adjust the ratio of collectors and learners according to its own needs to optimize throughput.
# developer guide
# Developer Guide
## prerequisites
## Prerequisites
- a well prepared kubernetes cluster. Follow the [instructions](https://kubernetes.io/docs/setup/production-environment/tools/kubeadm/create-cluster-kubeadm/) to create a kubernetes cluster, or create a local kubernetes node referring to [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) or [minikube](https://minikube.sigs.k8s.io/docs/start/)
- kustomize. Installed by the following command
```bash
......@@ -11,14 +11,14 @@ kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash
```bash
kubectl create -f ./config/certmanager/cert-manager.yaml
```
## project initialization
## Project Initialization
This project is based on [kubebuilder v3](https://github.com/kubernetes-sigs/kubebuilder/releases/download/v3.0.0/kubebuilder_linux_amd64), since CRDs generated by kubebuilder v2 is not compatible in kubernetes v1.20.
```bash
kubebuilder init --domain sensetime.com --license apache2 --owner "The SensePhoenix authors"
kubebuilder init --domain opendilab.org --license apache2 --owner "The OpenDILab authors"
kubebuilder create api --group di --version v1alpha1 --kind DIJob
kubebuilder create api --group diengine --version v1alpha1 --kind DIJob
kubebuilder create api --group di --version v1alpha1 --kind AggregatorConfig
kubebuilder create api --group diengine --version v1alpha1 --kind AggregatorConfig
```
## CRD Design
......@@ -32,26 +32,28 @@ make manifests
```
New CRD files will be generated in [./config/crd/bases](./config/crd/bases)
## controller logic
## Controller Logic
Referenced to [controllers](./controllers)
## di-server logic
## DI Server Logic
Referenced to [server](./server)
## Installation
Run the following command in the project root directory.
```bash
# build images. If you are not working in Linux, here you should use `make docker-build`
make dev-images
# build images.
make docker-build
make docker-push
# deploy di-operator and server to cluster
make dev-deploy
```
Since the CustomResourceDefinitions are too long, you will probably find the following error:
![](docs/images/deploy-failed.png)
```bash
The CustomResourceDefinition "dijobs.diengine.opendilab.org" is invalid: metadata.annotations: Too long: must have at most 262144 bytes
```
Then run the following command will solve the problem:
Then running the following command will solve the problem:
```bash
kustomize build config/crd | kubectl create -f -
```
......@@ -66,5 +68,5 @@ di-server-7b86ff8df4-jfgmp 1/1 Running 0 59s
Install global components of DIJob defined in AggregatorConfig:
```bash
kubectl create -f examples/di-mock-agconfig.yaml -n di-system
kubectl create -f config/samples/agconfig.yaml -n di-system
```
此差异已折叠。
apiVersion: diengine.opendilab.org/v1alpha1
kind: DIJob
metadata:
name: dijob-cartpole-dqn
spec:
group: xxx
priorityClassName: ""
cleanPodPolicy: "Running"
volumes:
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
- name: work-dir
hostPath:
path: /data/nfs/ding/cartpole
coordinator:
template:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 3
memory: "10Gi"
limits:
cpu: 3
memory: "10Gi"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > cartpole_dqn_config_k8s.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
encoder_hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
# increase collector task space when get rs from server
collector_task_space=0,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['dizoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['ding.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['ding.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['ding.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['ding.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['ding.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"replicas": 2,
},
learners={
"gpus": "1",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='./data',
path_policy='./policy',
communication_mode='auto',
learner_gpu_num=1,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
ding -m dist --module config -P k8s -c ./cartpole_dqn_config_k8s.py -s 0;
ding -m dist --module coordinator -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -cdp $COORDINATOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
collector:
template:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 16
memory: "5Gi"
limits:
cpu: 16
memory: "5Gi"
command: ["/bin/bash", "-c",]
args:
- |
ding -m dist --module collector -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -clp $COLLECTOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
learner:
template:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 16
memory: "5Gi"
limits:
cpu: 16
memory: "5Gi"
command: ["/bin/bash", "-c",]
args:
- |
ding -m dist --module learner -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -lp $LEARNER_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: cache-volume
mountPath: /dev/shm
- name: work-dir
mountPath: /ding
\ No newline at end of file
package e2e
import (
"context"
"flag"
"os"
"path/filepath"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
testutil "opendilab.org/di-orchestrator/utils/testutils"
)
func TestE2E(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t,
"E2E Suite",
[]Reporter{printer.NewlineReporter{}})
}
var (
k8sClient client.Client
clientset *kubernetes.Clientset
kubeconfig string
exampleJobsDir string
sharedVolumesDir string
)
func init() {
testing.Init()
if flag.Lookup("kubeconfig") == nil {
flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file path")
}
flag.StringVar(&sharedVolumesDir, "shared-volumes-dir", "/data/nfs/ding/", "dir to shared volumes")
flag.StringVar(&exampleJobsDir, "example-jobs-dir", "./config", "dir to the example jobs")
flag.Parse()
kubeconfig = flag.Lookup("kubeconfig").Value.String()
if kubeconfig == "" {
kubeconfig = os.Getenv("KUBECONFIG")
if kubeconfig == "" {
kubeconfig = filepath.Join(homeDir(), ".kube", "config")
}
}
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
var _ = BeforeSuite(func() {
// uses the current context in kubeconfig
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
Expect(err).NotTo(HaveOccurred())
err = div1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
//+kubebuilder:scaffold:scheme
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
clientset, err = kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())
agconfig := testutil.NewAggregatorConfig()
err = k8sClient.Create(context.Background(), agconfig, &client.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
Expect(err).NotTo(HaveOccurred())
}
})
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册