Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenDILab开源决策智能平台
DI-orchestrator
提交
cb748239
D
DI-orchestrator
项目概览
OpenDILab开源决策智能平台
/
DI-orchestrator
上一次同步 大约 2 年
通知
1
Star
78
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
DI-orchestrator
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
cb748239
编写于
1月 17, 2022
作者:
L
liqingping
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: update api version to v2alpha1
上级
7f4b2065
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
283 addition
and
292 deletion
+283
-292
PROJECT
PROJECT
+2
-10
cmd/operator/operator.go
cmd/operator/operator.go
+2
-2
cmd/server/server.go
cmd/server/server.go
+3
-3
config/crd/bases/diengine.opendilab.org_dijobs.yaml
config/crd/bases/diengine.opendilab.org_dijobs.yaml
+1
-1
config/crd/minimal/crds.yaml
config/crd/minimal/crds.yaml
+1
-1
config/di-manager.yaml
config/di-manager.yaml
+2
-1
config/manager/di_operator.yaml
config/manager/di_operator.yaml
+1
-0
config/samples/dijob-gobigger.yaml
config/samples/dijob-gobigger.yaml
+1
-1
config/webhook/manifests_stable.yaml
config/webhook/manifests_stable.yaml
+4
-4
docs/developer-guide.md
docs/developer-guide.md
+1
-1
e2e/config/dijob-sidecar.yaml
e2e/config/dijob-sidecar.yaml
+2
-2
e2e/e2e_suite_test.go
e2e/e2e_suite_test.go
+3
-3
e2e/e2e_test.go
e2e/e2e_test.go
+12
-12
pkg/allocator/allocator.go
pkg/allocator/allocator.go
+6
-6
pkg/allocator/job.go
pkg/allocator/job.go
+3
-3
pkg/api/v2alpha1/dijob_types.go
pkg/api/v2alpha1/dijob_types.go
+1
-1
pkg/api/v2alpha1/groupversion_info.go
pkg/api/v2alpha1/groupversion_info.go
+3
-6
pkg/api/v2alpha1/zz_generated.deepcopy.go
pkg/api/v2alpha1/zz_generated.deepcopy.go
+1
-1
pkg/context/job.go
pkg/context/job.go
+7
-7
pkg/context/pod.go
pkg/context/pod.go
+14
-14
pkg/context/status.go
pkg/context/status.go
+23
-23
pkg/controllers/dijob.go
pkg/controllers/dijob.go
+17
-17
pkg/controllers/dijob_controller.go
pkg/controllers/dijob_controller.go
+10
-10
pkg/controllers/dijob_controller_test.go
pkg/controllers/dijob_controller_test.go
+35
-35
pkg/controllers/dijob_test.go
pkg/controllers/dijob_test.go
+12
-12
pkg/controllers/handler.go
pkg/controllers/handler.go
+55
-55
pkg/controllers/suite_test.go
pkg/controllers/suite_test.go
+2
-2
pkg/server/dynamic/handler.go
pkg/server/dynamic/handler.go
+3
-3
pkg/server/dynamic/informer.go
pkg/server/dynamic/informer.go
+3
-3
pkg/server/http/dijob.go
pkg/server/http/dijob.go
+7
-7
pkg/server/http/server.go
pkg/server/http/server.go
+1
-1
pkg/server/http/server_test.go
pkg/server/http/server_test.go
+29
-29
pkg/server/http/suite_test.go
pkg/server/http/suite_test.go
+4
-4
pkg/utils/testutils/dijob.go
pkg/utils/testutils/dijob.go
+6
-6
pkg/utils/util.go
pkg/utils/util.go
+6
-6
未找到文件。
PROJECT
浏览文件 @
cb748239
...
...
@@ -11,18 +11,10 @@ resources:
domain: opendilab.org
group: diengine
kind: DIJob
path: opendilab.org/di-orchestrator/api/v
1alpha2
version: v
1alpha2
path: opendilab.org/di-orchestrator/api/v
2alpha1
version: v
2alpha1
webhooks:
defaulting: true
validation: true
webhookVersion: v1
- api:
crdVersion: v1
namespaced: true
domain: opendilab.org
group: diengine
kind: AggregatorConfig
path: opendilab.org/di-operator/api/v1alpha2
version: v1alpha2
version: "3"
cmd/operator/operator.go
浏览文件 @
cb748239
...
...
@@ -30,7 +30,7 @@ import (
cmdcommon
"opendilab.org/di-orchestrator/cmd/common"
alloc
"opendilab.org/di-orchestrator/pkg/allocator"
alloctypes
"opendilab.org/di-orchestrator/pkg/allocator/types"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
"opendilab.org/di-orchestrator/pkg/controllers"
)
...
...
@@ -95,7 +95,7 @@ var (
func
init
()
{
utilruntime
.
Must
(
clientgoscheme
.
AddToScheme
(
scheme
))
utilruntime
.
Must
(
div
1alpha2
.
AddToScheme
(
scheme
))
utilruntime
.
Must
(
div
2alpha1
.
AddToScheme
(
scheme
))
//+kubebuilder:scaffold:scheme
}
...
...
cmd/server/server.go
浏览文件 @
cb748239
...
...
@@ -28,7 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
cmdcommon
"opendilab.org/di-orchestrator/cmd/common"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
gpualloc
"opendilab.org/di-orchestrator/pkg/common/gpuallocator"
serverdynamic
"opendilab.org/di-orchestrator/pkg/server/dynamic"
serverhttp
"opendilab.org/di-orchestrator/pkg/server/http"
...
...
@@ -93,8 +93,8 @@ func runCommand(cmd *cobra.Command, options *CreateOptions) error {
go
dif
.
Start
(
stopCh
)
diGVR
:=
schema
.
GroupVersionResource
{
Group
:
div
1alpha2
.
GroupVersion
.
Group
,
Version
:
div
1alpha2
.
GroupVersion
.
Version
,
Group
:
div
2alpha1
.
GroupVersion
.
Group
,
Version
:
div
2alpha1
.
GroupVersion
.
Version
,
Resource
:
"dijobs"
,
}
diclient
:=
dynamicClient
.
Resource
(
diGVR
)
...
...
config/crd/bases/diengine.opendilab.org_dijobs.yaml
浏览文件 @
cb748239
...
...
@@ -34,7 +34,7 @@ spec:
-
jsonPath
:
.metadata.creationTimestamp
name
:
Age
type
:
date
name
:
v
1alpha2
name
:
v
2alpha1
schema
:
openAPIV3Schema
:
description
:
DIJob is the Schema for the dijobs API
...
...
config/crd/minimal/crds.yaml
浏览文件 @
cb748239
...
...
@@ -23,7 +23,7 @@ spec:
-
jsonPath
:
.metadata.creationTimestamp
name
:
Age
type
:
date
name
:
v
1alpha2
name
:
v
2alpha1
schema
:
openAPIV3Schema
:
description
:
DIJob is the Schema for the dijobs API
...
...
config/di-manager.yaml
浏览文件 @
cb748239
...
...
@@ -39,7 +39,7 @@ spec:
-
jsonPath
:
.metadata.creationTimestamp
name
:
Age
type
:
date
name
:
v
1alpha2
name
:
v
2alpha1
schema
:
openAPIV3Schema
:
description
:
DIJob is the Schema for the dijobs API
...
...
@@ -4290,6 +4290,7 @@ spec:
-
--leader-elect
command
:
-
/di-orchestrator
-
--zap-devel=true
-
operator
envFrom
:
-
configMapRef
:
...
...
config/manager/di_operator.yaml
浏览文件 @
cb748239
...
...
@@ -24,6 +24,7 @@ spec:
containers
:
-
command
:
-
/di-orchestrator
-
--zap-devel=true
-
operator
args
:
-
"
--probe-addr=:8080"
...
...
config/samples/dijob-gobigger.yaml
浏览文件 @
cb748239
apiVersion
:
diengine.opendilab.org/v
1alpha2
apiVersion
:
diengine.opendilab.org/v
2alpha1
kind
:
DIJob
metadata
:
name
:
gobigger-test
...
...
config/webhook/manifests_stable.yaml
浏览文件 @
cb748239
...
...
@@ -12,14 +12,14 @@ webhooks:
service
:
name
:
di-webhook-service
namespace
:
di-system
path
:
/mutate-diengine-opendilab-org-v
1alpha2
-dijob
path
:
/mutate-diengine-opendilab-org-v
2alpha1
-dijob
failurePolicy
:
Fail
name
:
mdijob.kb.io
rules
:
-
apiGroups
:
-
diengine.opendilab.org
apiVersions
:
-
v
1alpha2
-
v
2alpha1
operations
:
-
CREATE
-
UPDATE
...
...
@@ -40,14 +40,14 @@ webhooks:
service
:
name
:
di-webhook-service
namespace
:
di-system
path
:
/validate-diengine-opendilab-org-v
1alpha2
-dijob
path
:
/validate-diengine-opendilab-org-v
2alpha1
-dijob
failurePolicy
:
Fail
name
:
vdijob.kb.io
rules
:
-
apiGroups
:
-
diengine.opendilab.org
apiVersions
:
-
v
1alpha2
-
v
2alpha1
operations
:
-
CREATE
-
UPDATE
...
...
docs/developer-guide.md
浏览文件 @
cb748239
...
...
@@ -12,7 +12,7 @@ kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash
## CRD Design
Update codes in
[
dijob_types.go
](
./api/v
1alpha2
/dijob_types.go
)
with your requirements, and generate deepcopy functions.
Update codes in
[
dijob_types.go
](
./api/v
2alpha1
/dijob_types.go
)
with your requirements, and generate deepcopy functions.
```
bash
make generate
...
...
e2e/config/dijob-sidecar.yaml
浏览文件 @
cb748239
apiVersion
:
diengine.opendilab.org/v
1alpha2
apiVersion
:
diengine.opendilab.org/v
2alpha1
kind
:
DIJob
metadata
:
name
:
dijob-cartpole-dqn
...
...
@@ -129,7 +129,7 @@ spec:
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v
1alpha2
',
api_version='/v
2alpha1
',
init_replicas_request=dict(
collectors={
"replicas": 2,
...
...
e2e/e2e_suite_test.go
浏览文件 @
cb748239
...
...
@@ -15,7 +15,7 @@ package e2e
// "sigs.k8s.io/controller-runtime/pkg/client"
// "sigs.k8s.io/controller-runtime/pkg/envtest/printer"
// div
1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
// div
2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
// )
// func TestE2E(t *testing.T) {
...
...
@@ -66,7 +66,7 @@ package e2e
// // uses the current context in kubeconfig
// cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
// Expect(err).NotTo(HaveOccurred())
// err = div
1alpha2
.AddToScheme(scheme.Scheme)
// err = div
2alpha1
.AddToScheme(scheme.Scheme)
// Expect(err).NotTo(HaveOccurred())
// //+kubebuilder:scaffold:scheme
...
...
@@ -78,7 +78,7 @@ package e2e
// clientset, err = kubernetes.NewForConfig(cfg)
// Expect(err).NotTo(HaveOccurred())
// k8sClient.DeleteAllOf(context.Background(), &div
1alpha2
.DIJob{},
// k8sClient.DeleteAllOf(context.Background(), &div
2alpha1
.DIJob{},
// client.InNamespace(namespace), client.MatchingLabels{"stability-test": "dijobs"})
// })
...
...
e2e/e2e_test.go
浏览文件 @
cb748239
...
...
@@ -16,7 +16,7 @@ package e2e
// "k8s.io/apimachinery/pkg/util/yaml"
// "sigs.k8s.io/controller-runtime/pkg/client"
// div
1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
// div
2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
// dicommon "opendilab.org/di-orchestrator/pkg/common"
// diutil "opendilab.org/di-orchestrator/pkg/utils"
// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
...
...
@@ -81,13 +81,13 @@ package e2e
// By("Waiting for job to be succeeded")
// jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}
// Eventually(func() div
1alpha2
.Phase {
// Eventually(func() div
2alpha1
.Phase {
// err := k8sClient.Get(ctx, jobKey, job)
// if err != nil {
// return div
1alpha2
.JobUnknown
// return div
2alpha1
.JobUnknown
// }
// return job.Status.Phase
// }, timeout, interval).Should(Equal(div
1alpha2
.JobSucceeded))
// }, timeout, interval).Should(Equal(div
2alpha1
.JobSucceeded))
// testutil.CleanUpJob(ctx, k8sClient, job)
// }
...
...
@@ -132,13 +132,13 @@ package e2e
// By("Waiting for job to be succeeded")
// jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}
// Eventually(func() div
1alpha2
.Phase {
// Eventually(func() div
2alpha1
.Phase {
// err := k8sClient.Get(ctx, jobKey, job)
// if err != nil {
// return div
1alpha2
.JobUnknown
// return div
2alpha1
.JobUnknown
// }
// return job.Status.Phase
// }, timeout, interval).Should(Equal(div
1alpha2
.JobSucceeded))
// }, timeout, interval).Should(Equal(div
2alpha1
.JobSucceeded))
// testutil.CleanUpJob(ctx, k8sClient, job)
// })
...
...
@@ -190,13 +190,13 @@ package e2e
// By("Waiting for job to be succeeded")
// jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}
// Eventually(func() div
1alpha2
.Phase {
// Eventually(func() div
2alpha1
.Phase {
// err := k8sClient.Get(ctx, jobKey, job)
// if err != nil {
// return div
1alpha2
.JobUnknown
// return div
2alpha1
.JobUnknown
// }
// return job.Status.Phase
// }, timeout, interval).Should(Equal(div
1alpha2
.JobSucceeded))
// }, timeout, interval).Should(Equal(div
2alpha1
.JobSucceeded))
// testutil.CleanUpJob(ctx, k8sClient, job)
// }
...
...
@@ -281,11 +281,11 @@ package e2e
// })
// })
// func buildDIJob(jobPath, sharedVolumePath string) *div
1alpha2
.DIJob {
// func buildDIJob(jobPath, sharedVolumePath string) *div
2alpha1
.DIJob {
// yamlFile, err := ioutil.ReadFile(jobPath)
// Expect(err).NotTo(HaveOccurred())
// var job div
1alpha2
.DIJob
// var job div
2alpha1
.DIJob
// err = yaml.Unmarshal(yamlFile, &job)
// Expect(err).NotTo(HaveOccurred())
...
...
pkg/allocator/allocator.go
浏览文件 @
cb748239
...
...
@@ -13,7 +13,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
ditypes
"opendilab.org/di-orchestrator/pkg/allocator/types"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dihandler
"opendilab.org/di-orchestrator/pkg/common/handler"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
...
...
@@ -46,7 +46,7 @@ func (a *Allocator) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resul
a
.
updateLastTime
()
jobkey
:=
req
.
NamespacedName
job
:=
&
div
1alpha2
.
DIJob
{}
job
:=
&
div
2alpha1
.
DIJob
{}
if
err
:=
a
.
ctx
.
Get
(
ctx
,
jobkey
,
job
);
err
!=
nil
{
return
ctrl
.
Result
{},
err
}
...
...
@@ -81,9 +81,9 @@ func (a *Allocator) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resul
// SetupWithManager sets up the controller with the Manager.
func
(
a
*
Allocator
)
SetupWithManager
(
mgr
ctrl
.
Manager
)
error
{
return
ctrl
.
NewControllerManagedBy
(
mgr
)
.
For
(
&
div
1alpha2
.
DIJob
{})
.
For
(
&
div
2alpha1
.
DIJob
{})
.
Watches
(
&
source
.
Kind
{
Type
:
&
div
1alpha2
.
DIJob
{}},
&
source
.
Kind
{
Type
:
&
div
2alpha1
.
DIJob
{}},
&
dihandler
.
EventHandler
{
OnCreateHandlers
:
[]
func
(
obj
client
.
Object
){
a
.
onJobAddHandler
,
...
...
@@ -105,7 +105,7 @@ func (a *Allocator) SetupWithManager(mgr ctrl.Manager) error {
// onJobAddHandler handle the event when a job is created.
func
(
a
*
Allocator
)
onJobAddHandler
(
obj
client
.
Object
)
{
log
:=
a
.
ctx
.
Log
.
WithName
(
"onJobAddHandler"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
obj
.
GetNamespace
(),
obj
.
GetName
()))
job
:=
obj
.
(
*
div
1alpha2
.
DIJob
)
job
:=
obj
.
(
*
div
2alpha1
.
DIJob
)
if
err
:=
a
.
allocate
(
job
);
err
!=
nil
{
log
.
Error
(
err
,
"failed to allocate"
)
...
...
@@ -121,7 +121,7 @@ func (a *Allocator) updateLastTime() {
a
.
last
=
time
.
Now
()
}
func
(
a
*
Allocator
)
allocate
(
job
*
div
1alpha2
.
DIJob
)
error
{
func
(
a
*
Allocator
)
allocate
(
job
*
div
2alpha1
.
DIJob
)
error
{
log
:=
a
.
ctx
.
Log
.
WithName
(
"allocate"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
status
:=
job
.
Status
.
DeepCopy
()
// allocate job if preemptible, otherwise just update status.replicas
...
...
pkg/allocator/job.go
浏览文件 @
cb748239
...
...
@@ -4,12 +4,12 @@ import (
corev1
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
ditypes
"opendilab.org/di-orchestrator/pkg/allocator/types"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
"opendilab.org/di-orchestrator/pkg/common"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
)
func
getJobInfo
(
job
*
div
1alpha2
.
DIJob
)
(
ditypes
.
JobInfo
,
error
)
{
func
getJobInfo
(
job
*
div
2alpha1
.
DIJob
)
(
ditypes
.
JobInfo
,
error
)
{
res
,
err
:=
getJobResources
(
job
)
if
err
!=
nil
{
return
ditypes
.
JobInfo
{},
err
...
...
@@ -24,7 +24,7 @@ func getJobInfo(job *div1alpha2.DIJob) (ditypes.JobInfo, error) {
return
*
jobinfo
,
nil
}
func
getJobResources
(
job
*
div
1alpha2
.
DIJob
)
(
corev1
.
ResourceRequirements
,
error
)
{
func
getJobResources
(
job
*
div
2alpha1
.
DIJob
)
(
corev1
.
ResourceRequirements
,
error
)
{
res
,
err
:=
common
.
GetDIJobDefaultResources
()
if
err
!=
nil
{
return
corev1
.
ResourceRequirements
{},
err
...
...
pkg/api/v
1alpha2
/dijob_types.go
→
pkg/api/v
2alpha1
/dijob_types.go
浏览文件 @
cb748239
...
...
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package
v
1alpha2
package
v
2alpha1
import
(
corev1
"k8s.io/api/core/v1"
...
...
pkg/api/v
1alpha2
/groupversion_info.go
→
pkg/api/v
2alpha1
/groupversion_info.go
浏览文件 @
cb748239
...
...
@@ -14,10 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package v
1alpha2 contains API Schema definitions for the v1alpha2
API group
// Package v
2alpha1 contains API Schema definitions for the v2alpha1
API group
//+kubebuilder:object:generate=true
//+groupName=diengine.opendilab.org
package
v
1alpha2
package
v
2alpha1
import
(
"k8s.io/apimachinery/pkg/runtime/schema"
...
...
@@ -28,11 +28,8 @@ var (
// KindDIJob is kind of DIJob
KindDIJob
=
"DIJob"
// KindAGConfig is kind of AGConfig
KindAGConfig
=
"AggregatorConfig"
// GroupVersion is group version used to register these objects
GroupVersion
=
schema
.
GroupVersion
{
Group
:
"diengine.opendilab.org"
,
Version
:
"v
1alpha2
"
}
GroupVersion
=
schema
.
GroupVersion
{
Group
:
"diengine.opendilab.org"
,
Version
:
"v
2alpha1
"
}
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder
=
&
scheme
.
Builder
{
GroupVersion
:
GroupVersion
}
...
...
pkg/api/v
1alpha2
/zz_generated.deepcopy.go
→
pkg/api/v
2alpha1
/zz_generated.deepcopy.go
浏览文件 @
cb748239
...
...
@@ -18,7 +18,7 @@ limitations under the License.
// Code generated by controller-gen. DO NOT EDIT.
package
v
1alpha2
package
v
2alpha1
import
(
runtime
"k8s.io/apimachinery/pkg/runtime"
...
...
pkg/context/job.go
浏览文件 @
cb748239
...
...
@@ -9,12 +9,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
)
func
(
c
*
Context
)
CleanUpJob
(
job
*
div
1alpha2
.
DIJob
)
error
{
func
(
c
*
Context
)
CleanUpJob
(
job
*
div
2alpha1
.
DIJob
)
error
{
err
:=
c
.
Delete
(
c
.
ctx
,
job
,
&
client
.
DeleteOptions
{})
if
err
!=
nil
{
return
err
...
...
@@ -45,7 +45,7 @@ func (c *Context) CleanUpJob(job *div1alpha2.DIJob) error {
return
nil
}
func
(
c
*
Context
)
WaitForAllReplicas
(
job
*
div
1alpha2
.
DIJob
,
phase
corev1
.
PodPhase
)
error
{
func
(
c
*
Context
)
WaitForAllReplicas
(
job
*
div
2alpha1
.
DIJob
,
phase
corev1
.
PodPhase
)
error
{
if
err
:=
wait
.
Poll
(
100
*
time
.
Millisecond
,
5
*
time
.
Minute
,
func
()
(
bool
,
error
)
{
pods
,
err
:=
c
.
ListJobPods
(
job
)
if
err
!=
nil
{
...
...
@@ -78,7 +78,7 @@ var (
}
)
func
OnTopologyHandler
(
job
*
div
1alpha2
.
DIJob
,
rank
int
,
pod
*
corev1
.
Pod
)
{
func
OnTopologyHandler
(
job
*
div
2alpha1
.
DIJob
,
rank
int
,
pod
*
corev1
.
Pod
)
{
envs
:=
make
(
map
[
string
]
string
)
subdomain
:=
job
.
Name
pworkers
:=
int
(
job
.
Spec
.
EngineFields
.
ParallelWorkers
)
...
...
@@ -98,12 +98,12 @@ func OnTopologyHandler(job *div1alpha2.DIJob, rank int, pod *corev1.Pod) {
attachedNodesArgValue
:=
""
switch
job
.
Spec
.
EngineFields
.
Topology
{
case
div
1alpha2
.
TopologyAlone
:
case
div
2alpha1
.
TopologyAlone
:
// do nothing
case
div
1alpha2
.
TopologyStar
:
case
div
2alpha1
.
TopologyStar
:
worker0
:=
diutil
.
ReplicaName
(
job
.
Name
,
int
(
job
.
Status
.
Generation
),
0
)
attachedNodesArgValue
=
buildArg
(
dicommon
.
DIArgAttachedNodes
,
buildURL
(
dicommon
.
DINodeURLPrefix
,
worker0
,
subdomain
,
dicommon
.
DefaultPort
))
case
div
1alpha2
.
TopologyMesh
:
case
div
2alpha1
.
TopologyMesh
:
var
nodes
[]
string
for
i
:=
0
;
i
<
rank
;
i
++
{
workerName
:=
diutil
.
ReplicaName
(
job
.
Name
,
int
(
job
.
Status
.
Generation
),
i
)
...
...
pkg/context/pod.go
浏览文件 @
cb748239
...
...
@@ -9,12 +9,12 @@ import (
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
)
func
(
c
*
Context
)
BuildPod
(
job
*
div
1alpha2
.
DIJob
,
rank
int
,
allocation
[]
string
)
*
corev1
.
Pod
{
func
(
c
*
Context
)
BuildPod
(
job
*
div
2alpha1
.
DIJob
,
rank
int
,
allocation
[]
string
)
*
corev1
.
Pod
{
generation
:=
int
(
job
.
Status
.
Generation
)
replicas
:=
int
(
job
.
Status
.
Replicas
)
nodeName
:=
""
...
...
@@ -62,7 +62,7 @@ func (c *Context) BuildPod(job *div1alpha2.DIJob, rank int, allocation []string)
return
pod
}
func
(
c
*
Context
)
BuildService
(
job
*
div
1alpha2
.
DIJob
)
*
corev1
.
Service
{
func
(
c
*
Context
)
BuildService
(
job
*
div
2alpha1
.
DIJob
)
*
corev1
.
Service
{
labels
:=
diutil
.
GenLabels
(
*
job
)
return
&
corev1
.
Service
{
ObjectMeta
:
metav1
.
ObjectMeta
{
...
...
@@ -86,7 +86,7 @@ func (c *Context) BuildService(job *div1alpha2.DIJob) *corev1.Service {
}
}
func
(
c
*
Context
)
CreatePod
(
job
*
div
1alpha2
.
DIJob
,
pod
*
corev1
.
Pod
)
error
{
func
(
c
*
Context
)
CreatePod
(
job
*
div
2alpha1
.
DIJob
,
pod
*
corev1
.
Pod
)
error
{
log
:=
c
.
Log
.
WithName
(
"CreatePod"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
if
err
:=
c
.
Create
(
c
.
ctx
,
pod
,
&
client
.
CreateOptions
{});
err
!=
nil
&&
!
errors
.
IsAlreadyExists
(
err
)
{
msg
:=
fmt
.
Sprintf
(
"failed to create pod: %s error: %v"
,
pod
.
Name
,
err
)
...
...
@@ -99,7 +99,7 @@ func (c *Context) CreatePod(job *div1alpha2.DIJob, pod *corev1.Pod) error {
return
nil
}
func
(
c
*
Context
)
CreateService
(
job
*
div
1alpha2
.
DIJob
,
service
*
corev1
.
Service
)
error
{
func
(
c
*
Context
)
CreateService
(
job
*
div
2alpha1
.
DIJob
,
service
*
corev1
.
Service
)
error
{
log
:=
c
.
Log
.
WithName
(
"CreateService"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
if
err
:=
c
.
Create
(
c
.
ctx
,
service
,
&
client
.
CreateOptions
{});
err
!=
nil
&&
!
errors
.
IsAlreadyExists
(
err
)
{
msg
:=
fmt
.
Sprintf
(
"failed to create service: %s error: %v"
,
service
.
Name
,
err
)
...
...
@@ -112,7 +112,7 @@ func (c *Context) CreateService(job *div1alpha2.DIJob, service *corev1.Service)
return
nil
}
func
(
c
*
Context
)
DeletePodsAndServices
(
job
*
div
1alpha2
.
DIJob
,
pods
[]
*
corev1
.
Pod
,
services
[]
*
corev1
.
Service
)
error
{
func
(
c
*
Context
)
DeletePodsAndServices
(
job
*
div
2alpha1
.
DIJob
,
pods
[]
*
corev1
.
Pod
,
services
[]
*
corev1
.
Service
)
error
{
log
:=
c
.
Log
.
WithName
(
"DeletePodsAndServices"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
if
len
(
pods
)
==
0
{
return
nil
...
...
@@ -122,15 +122,15 @@ func (c *Context) DeletePodsAndServices(job *div1alpha2.DIJob, pods []*corev1.Po
return
err
}
}
if
job
.
Spec
.
CleanPodPolicy
!=
div
1alpha2
.
CleanPodPolicyAll
&&
job
.
Spec
.
CleanPodPolicy
!=
div
1alpha2
.
CleanPodPolicyRunning
{
if
job
.
Spec
.
CleanPodPolicy
!=
div
2alpha1
.
CleanPodPolicyAll
&&
job
.
Spec
.
CleanPodPolicy
!=
div
2alpha1
.
CleanPodPolicyRunning
{
return
nil
}
for
_
,
pod
:=
range
pods
{
// Just delete running pod when the cleanPodPolicy is Running
needsDelete
:=
true
if
job
.
Spec
.
CleanPodPolicy
==
div
1alpha2
.
CleanPodPolicyRunning
{
if
job
.
Spec
.
CleanPodPolicy
==
div
2alpha1
.
CleanPodPolicyRunning
{
if
pod
.
Status
.
Phase
!=
corev1
.
PodRunning
&&
pod
.
Status
.
Phase
!=
corev1
.
PodPending
{
continue
}
...
...
@@ -161,7 +161,7 @@ func (c *Context) DeletePodsAndServices(job *div1alpha2.DIJob, pods []*corev1.Po
return
nil
}
func
(
c
*
Context
)
DeletePods
(
job
*
div
1alpha2
.
DIJob
,
pods
[]
*
corev1
.
Pod
)
error
{
func
(
c
*
Context
)
DeletePods
(
job
*
div
2alpha1
.
DIJob
,
pods
[]
*
corev1
.
Pod
)
error
{
var
err
error
for
_
,
pod
:=
range
pods
{
if
err1
:=
c
.
DeletePod
(
job
,
pod
);
err
!=
nil
{
...
...
@@ -171,7 +171,7 @@ func (c *Context) DeletePods(job *div1alpha2.DIJob, pods []*corev1.Pod) error {
return
err
}
func
(
c
*
Context
)
DeletePod
(
job
*
div
1alpha2
.
DIJob
,
pod
*
corev1
.
Pod
)
error
{
func
(
c
*
Context
)
DeletePod
(
job
*
div
2alpha1
.
DIJob
,
pod
*
corev1
.
Pod
)
error
{
log
:=
c
.
Log
.
WithName
(
"DeletePod"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
if
err
:=
c
.
Delete
(
c
.
ctx
,
pod
,
&
client
.
DeleteOptions
{
GracePeriodSeconds
:
func
(
a
int64
)
*
int64
{
return
&
a
}(
0
)});
err
!=
nil
&&
!
errors
.
IsNotFound
(
err
)
{
...
...
@@ -185,7 +185,7 @@ func (c *Context) DeletePod(job *div1alpha2.DIJob, pod *corev1.Pod) error {
return
nil
}
func
(
c
*
Context
)
DeleteService
(
job
*
div
1alpha2
.
DIJob
,
service
*
corev1
.
Service
)
error
{
func
(
c
*
Context
)
DeleteService
(
job
*
div
2alpha1
.
DIJob
,
service
*
corev1
.
Service
)
error
{
log
:=
c
.
Log
.
WithName
(
"DeleteService"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
if
err
:=
c
.
Delete
(
c
.
ctx
,
service
,
&
client
.
DeleteOptions
{
GracePeriodSeconds
:
func
(
a
int64
)
*
int64
{
return
&
a
}(
0
)});
err
!=
nil
&&
!
errors
.
IsNotFound
(
err
)
{
...
...
@@ -199,7 +199,7 @@ func (c *Context) DeleteService(job *div1alpha2.DIJob, service *corev1.Service)
return
nil
}
func
(
c
*
Context
)
ListJobPods
(
job
*
div
1alpha2
.
DIJob
)
([]
*
corev1
.
Pod
,
error
)
{
func
(
c
*
Context
)
ListJobPods
(
job
*
div
2alpha1
.
DIJob
)
([]
*
corev1
.
Pod
,
error
)
{
labelSelector
,
err
:=
metav1
.
LabelSelectorAsSelector
(
&
metav1
.
LabelSelector
{
MatchLabels
:
diutil
.
GenLabels
(
*
job
),
})
...
...
@@ -215,7 +215,7 @@ func (c *Context) ListJobPods(job *div1alpha2.DIJob) ([]*corev1.Pod, error) {
return
pods
,
nil
}
func
(
c
*
Context
)
ListJobServices
(
job
*
div
1alpha2
.
DIJob
)
([]
*
corev1
.
Service
,
error
)
{
func
(
c
*
Context
)
ListJobServices
(
job
*
div
2alpha1
.
DIJob
)
([]
*
corev1
.
Service
,
error
)
{
labelSelector
,
err
:=
metav1
.
LabelSelectorAsSelector
(
&
metav1
.
LabelSelector
{
MatchLabels
:
diutil
.
GenLabels
(
*
job
),
})
...
...
pkg/context/status.go
浏览文件 @
cb748239
...
...
@@ -16,7 +16,7 @@ import (
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
)
...
...
@@ -38,16 +38,16 @@ const (
statusUpdatedPauseDuration
=
50
*
time
.
Millisecond
)
func
(
c
*
Context
)
CheckJobCompletion
(
job
*
div
1alpha2
.
DIJob
,
pods
[]
*
corev1
.
Pod
)
(
completed
bool
)
{
func
(
c
*
Context
)
CheckJobCompletion
(
job
*
div
2alpha1
.
DIJob
,
pods
[]
*
corev1
.
Pod
)
(
completed
bool
)
{
succeeded
,
failed
:=
c
.
checkPodsCompletion
(
pods
,
job
.
Spec
.
Preemptible
)
completed
=
false
if
succeeded
!=
0
&&
succeeded
==
len
(
pods
)
{
msg
:=
"job succeeded since all the replicas are succeeded."
c
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobSucceeded
,
DIJobSucceededReason
,
msg
)
c
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobSucceeded
,
DIJobSucceededReason
,
msg
)
completed
=
true
}
else
if
failed
!=
0
{
msg
:=
fmt
.
Sprintf
(
"job failed since %d replicas failed."
,
failed
)
c
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobFailed
,
DIJobFailedReason
,
msg
)
c
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobFailed
,
DIJobFailedReason
,
msg
)
completed
=
true
}
return
...
...
@@ -95,7 +95,7 @@ func (c *Context) checkPodsCompletion(pods []*corev1.Pod, preemptable bool) (suc
return
succeeded
,
failed
}
func
(
c
*
Context
)
DetectRestart
(
job
*
div
1alpha2
.
DIJob
,
pods
[]
*
corev1
.
Pod
,
allocation
[]
string
,
replicas
int
)
bool
{
func
(
c
*
Context
)
DetectRestart
(
job
*
div
2alpha1
.
DIJob
,
pods
[]
*
corev1
.
Pod
,
allocation
[]
string
,
replicas
int
)
bool
{
log
:=
c
.
Log
.
WithName
(
"DetectRestart"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
for
_
,
pod
:=
range
pods
{
areplicas
,
err
:=
strconv
.
Atoi
(
pod
.
Annotations
[
dicommon
.
AnnotationReplicas
])
...
...
@@ -126,12 +126,12 @@ func (c *Context) MarkIncorrectJobFailed(obj client.Object) {
}
// build status
failedConvertDIJob
:=
fmt
.
Sprintf
(
"failed to convert type %T to v
1alpha2
.DIJob"
,
obj
)
status
:=
div
1alpha2
.
DIJobStatus
{
Phase
:
div
1alpha2
.
JobFailed
,
Conditions
:
[]
div
1alpha2
.
DIJobCondition
{
failedConvertDIJob
:=
fmt
.
Sprintf
(
"failed to convert type %T to v
2alpha1
.DIJob"
,
obj
)
status
:=
div
2alpha1
.
DIJobStatus
{
Phase
:
div
2alpha1
.
JobFailed
,
Conditions
:
[]
div
2alpha1
.
DIJobCondition
{
{
Type
:
div
1alpha2
.
JobFailed
,
Type
:
div
2alpha1
.
JobFailed
,
Status
:
corev1
.
ConditionTrue
,
Message
:
failedConvertDIJob
,
},
...
...
@@ -145,8 +145,8 @@ func (c *Context) MarkIncorrectJobFailed(obj client.Object) {
// get dijob
dijobRes
:=
schema
.
GroupVersionResource
{
Group
:
div
1alpha2
.
GroupVersion
.
Group
,
Version
:
div
1alpha2
.
GroupVersion
.
Version
,
Group
:
div
2alpha1
.
GroupVersion
.
Group
,
Version
:
div
2alpha1
.
GroupVersion
.
Version
,
Resource
:
"dijobs"
,
}
un
,
err
:=
dclient
.
Resource
(
dijobRes
)
.
Namespace
(
obj
.
GetNamespace
())
.
Get
(
context
.
Background
(),
obj
.
GetName
(),
metav1
.
GetOptions
{})
...
...
@@ -167,10 +167,10 @@ func (c *Context) MarkIncorrectJobFailed(obj client.Object) {
}
}
func
(
c
*
Context
)
UpdateDIJobStatusInCluster
(
job
*
div
1alpha2
.
DIJob
)
error
{
func
(
c
*
Context
)
UpdateDIJobStatusInCluster
(
job
*
div
2alpha1
.
DIJob
)
error
{
var
err
error
for
i
:=
0
;
i
<
statusUpdateRetries
;
i
++
{
newJob
:=
&
div
1alpha2
.
DIJob
{}
newJob
:=
&
div
2alpha1
.
DIJob
{}
err
:=
c
.
Get
(
c
.
ctx
,
types
.
NamespacedName
{
Namespace
:
job
.
Namespace
,
Name
:
job
.
Name
},
newJob
)
if
err
!=
nil
{
break
...
...
@@ -185,19 +185,19 @@ func (c *Context) UpdateDIJobStatusInCluster(job *div1alpha2.DIJob) error {
}
func
(
c
*
Context
)
UpdateJobStatus
(
job
*
div
1alpha2
.
DIJob
,
phase
div1alpha2
.
Phase
,
reason
string
,
msg
string
)
{
job
*
div
2alpha1
.
DIJob
,
phase
div2alpha1
.
Phase
,
reason
string
,
msg
string
)
{
log
:=
c
.
Log
.
WithName
(
"UpdateJobStatus"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
log
.
Info
(
msg
)
updateDIJobConditions
(
job
,
phase
,
reason
,
msg
)
job
.
Status
.
Phase
=
phase
}
func
updateDIJobConditions
(
job
*
div
1alpha2
.
DIJob
,
conditionType
div1alpha2
.
Phase
,
reason
,
msg
string
)
{
func
updateDIJobConditions
(
job
*
div
2alpha1
.
DIJob
,
conditionType
div2alpha1
.
Phase
,
reason
,
msg
string
)
{
newCondition
:=
newCondition
(
conditionType
,
reason
,
msg
)
if
diutil
.
IsSucceeded
(
job
)
||
diutil
.
IsFailed
(
job
)
{
for
i
:=
range
job
.
Status
.
Conditions
{
if
job
.
Status
.
Conditions
[
i
]
.
Type
==
div
1alpha2
.
JobRunning
{
if
job
.
Status
.
Conditions
[
i
]
.
Type
==
div
2alpha1
.
JobRunning
{
job
.
Status
.
Conditions
[
i
]
.
Status
=
corev1
.
ConditionFalse
job
.
Status
.
Conditions
[
i
]
.
LastTransitionTime
=
metav1
.
Now
()
job
.
Status
.
Conditions
[
i
]
.
LastUpdateTime
=
metav1
.
Now
()
...
...
@@ -207,8 +207,8 @@ func updateDIJobConditions(job *div1alpha2.DIJob, conditionType div1alpha2.Phase
setCondition
(
&
job
.
Status
,
newCondition
)
}
func
newCondition
(
conditionType
div
1alpha2
.
Phase
,
reason
,
msg
string
)
*
div1alpha2
.
DIJobCondition
{
return
&
div
1alpha2
.
DIJobCondition
{
func
newCondition
(
conditionType
div
2alpha1
.
Phase
,
reason
,
msg
string
)
*
div2alpha1
.
DIJobCondition
{
return
&
div
2alpha1
.
DIJobCondition
{
Type
:
conditionType
,
Status
:
corev1
.
ConditionTrue
,
Reason
:
reason
,
...
...
@@ -219,7 +219,7 @@ func newCondition(conditionType div1alpha2.Phase, reason, msg string) *div1alpha
}
// setCondition sets the condition for the job, skip if the condition is already exists with the same status and reason
func
setCondition
(
status
*
div
1alpha2
.
DIJobStatus
,
condition
*
div1alpha2
.
DIJobCondition
)
{
func
setCondition
(
status
*
div
2alpha1
.
DIJobStatus
,
condition
*
div2alpha1
.
DIJobCondition
)
{
currentCondition
:=
getCondition
(
status
,
condition
.
Type
)
if
currentCondition
!=
nil
&&
currentCondition
.
Reason
==
condition
.
Reason
&&
currentCondition
.
Status
==
condition
.
Status
{
...
...
@@ -235,7 +235,7 @@ func setCondition(status *div1alpha2.DIJobStatus, condition *div1alpha2.DIJobCon
status
.
Conditions
=
append
(
conditions
,
*
condition
)
}
func
getCondition
(
status
*
div
1alpha2
.
DIJobStatus
,
conditionType
div1alpha2
.
Phase
)
*
div1alpha2
.
DIJobCondition
{
func
getCondition
(
status
*
div
2alpha1
.
DIJobStatus
,
conditionType
div2alpha1
.
Phase
)
*
div2alpha1
.
DIJobCondition
{
for
_
,
condition
:=
range
status
.
Conditions
{
if
condition
.
Type
==
conditionType
{
return
&
condition
...
...
@@ -244,8 +244,8 @@ func getCondition(status *div1alpha2.DIJobStatus, conditionType div1alpha2.Phase
return
nil
}
func
filterOutConditions
(
conditions
[]
div
1alpha2
.
DIJobCondition
,
conditionType
div1alpha2
.
Phase
)
[]
div1alpha2
.
DIJobCondition
{
newConditions
:=
[]
div
1alpha2
.
DIJobCondition
{}
func
filterOutConditions
(
conditions
[]
div
2alpha1
.
DIJobCondition
,
conditionType
div2alpha1
.
Phase
)
[]
div2alpha1
.
DIJobCondition
{
newConditions
:=
[]
div
2alpha1
.
DIJobCondition
{}
for
_
,
condition
:=
range
conditions
{
if
condition
.
Type
==
conditionType
{
...
...
pkg/controllers/dijob.go
浏览文件 @
cb748239
...
...
@@ -7,12 +7,12 @@ import (
corev1
"k8s.io/api/core/v1"
apiequality
"k8s.io/apimachinery/pkg/api/equality"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
)
func
(
r
*
DIJobReconciler
)
reconcileReplicas
(
ctx
context
.
Context
,
job
*
div
1alpha2
.
DIJob
,
func
(
r
*
DIJobReconciler
)
reconcileReplicas
(
ctx
context
.
Context
,
job
*
div
2alpha1
.
DIJob
,
pods
[]
*
corev1
.
Pod
,
services
[]
*
corev1
.
Service
)
error
{
log
:=
r
.
ctx
.
Log
.
WithName
(
"reconcileReplicas"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
oldStatus
:=
job
.
Status
.
DeepCopy
()
...
...
@@ -60,53 +60,53 @@ func (r *DIJobReconciler) reconcileReplicas(ctx context.Context, job *div1alpha2
return
nil
}
func
(
r
*
DIJobReconciler
)
reconcileWithJobStatus
(
job
*
div
1alpha2
.
DIJob
,
func
(
r
*
DIJobReconciler
)
reconcileWithJobStatus
(
job
*
div
2alpha1
.
DIJob
,
pods
[]
*
corev1
.
Pod
)
error
{
allocation
:=
job
.
Status
.
Allocation
replicas
:=
int
(
job
.
Status
.
Replicas
)
switch
job
.
Status
.
Phase
{
case
div
1alpha2
.
JobPending
:
case
div
2alpha1
.
JobPending
:
if
job
.
Spec
.
Preemptible
&&
allocation
!=
nil
&&
len
(
pods
)
==
0
{
if
err
:=
r
.
buildAndCreatePods
(
job
,
len
(
allocation
),
allocation
);
err
!=
nil
{
return
err
}
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobStarting
,
dicontext
.
DIJobStartingReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobStarting
,
dicontext
.
DIJobStartingReason
,
"job is starting since all pods are created."
)
}
else
if
!
job
.
Spec
.
Preemptible
&&
replicas
!=
0
&&
len
(
pods
)
==
0
{
if
err
:=
r
.
buildAndCreatePods
(
job
,
replicas
,
allocation
);
err
!=
nil
{
return
err
}
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobStarting
,
dicontext
.
DIJobStartingReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobStarting
,
dicontext
.
DIJobStartingReason
,
"job is starting since all pods are created."
)
}
case
div
1alpha2
.
JobStarting
:
case
div
2alpha1
.
JobStarting
:
if
diutil
.
CountPodsScheduled
(
pods
)
!=
replicas
&&
r
.
ctx
.
DetectRestart
(
job
,
pods
,
allocation
,
replicas
)
{
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobRestarting
,
dicontext
.
DIJobRestartingReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobRestarting
,
dicontext
.
DIJobRestartingReason
,
"job is restarting since conditions changed."
)
}
else
if
len
(
pods
)
!=
replicas
{
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobRestarting
,
dicontext
.
DIJobRestartingReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobRestarting
,
dicontext
.
DIJobRestartingReason
,
fmt
.
Sprintf
(
"job is restarting since the created pods %d are not matched replicas %d."
,
len
(
pods
),
replicas
))
}
else
if
diutil
.
CountReadyPods
(
pods
)
==
replicas
{
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobRunning
,
dicontext
.
DIJobRunningReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobRunning
,
dicontext
.
DIJobRunningReason
,
"job is running since all pods are ready."
)
}
case
div
1alpha2
.
JobRunning
:
case
div
2alpha1
.
JobRunning
:
if
r
.
ctx
.
DetectRestart
(
job
,
pods
,
allocation
,
replicas
)
||
len
(
pods
)
!=
replicas
{
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobRestarting
,
dicontext
.
DIJobRestartingReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobRestarting
,
dicontext
.
DIJobRestartingReason
,
fmt
.
Sprintf
(
"job is restarting since the created pods %d are not matched replicas %d."
,
len
(
pods
),
replicas
))
}
case
div
1alpha2
.
JobRestarting
:
case
div
2alpha1
.
JobRestarting
:
if
len
(
pods
)
!=
0
{
r
.
ctx
.
DeletePods
(
job
,
pods
)
}
else
{
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobPending
,
dicontext
.
DIJobPendingReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobPending
,
dicontext
.
DIJobPendingReason
,
"job is pending since job restarted."
)
}
}
return
nil
}
func
(
r
*
DIJobReconciler
)
buildAndCreatePods
(
job
*
div
1alpha2
.
DIJob
,
replicas
int
,
allocation
[]
string
)
error
{
func
(
r
*
DIJobReconciler
)
buildAndCreatePods
(
job
*
div
2alpha1
.
DIJob
,
replicas
int
,
allocation
[]
string
)
error
{
log
:=
r
.
ctx
.
Log
.
WithName
(
"buildAndCreatePods"
)
.
WithValues
(
"job"
,
diutil
.
NamespacedName
(
job
.
Namespace
,
job
.
Name
))
builtPods
:=
[]
*
corev1
.
Pod
{}
for
i
:=
0
;
i
<
replicas
;
i
++
{
...
...
@@ -117,7 +117,7 @@ func (r *DIJobReconciler) buildAndCreatePods(job *div1alpha2.DIJob, replicas int
err
:=
r
.
ctx
.
CreatePod
(
job
,
pod
)
if
err
!=
nil
{
log
.
Error
(
err
,
"failed to create pod."
)
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobFailed
,
dicontext
.
DIJobFailedReason
,
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobFailed
,
dicontext
.
DIJobFailedReason
,
fmt
.
Sprintf
(
"job failed since pod %s failed to create."
,
pod
.
Name
))
return
r
.
ctx
.
DeletePods
(
job
,
builtPods
)
}
...
...
@@ -126,7 +126,7 @@ func (r *DIJobReconciler) buildAndCreatePods(job *div1alpha2.DIJob, replicas int
}
// we will utilize pod subdomain to reduce the number of services created.
func
(
r
*
DIJobReconciler
)
reconcileServices
(
job
*
div
1alpha2
.
DIJob
,
svcs
[]
*
corev1
.
Service
)
error
{
func
(
r
*
DIJobReconciler
)
reconcileServices
(
job
*
div
2alpha1
.
DIJob
,
svcs
[]
*
corev1
.
Service
)
error
{
if
len
(
svcs
)
==
0
{
svc
:=
r
.
ctx
.
BuildService
(
job
)
if
err
:=
r
.
ctx
.
CreateService
(
job
,
svc
);
err
!=
nil
{
...
...
pkg/controllers/dijob_controller.go
浏览文件 @
cb748239
...
...
@@ -30,7 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dihandler
"opendilab.org/di-orchestrator/pkg/common/handler"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
...
...
@@ -68,7 +68,7 @@ func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
log
:=
r
.
ctx
.
Log
.
WithName
(
"Reconcile"
)
.
WithValues
(
"job"
,
req
.
NamespacedName
)
// get DIJob object
job
:=
&
div
1alpha2
.
DIJob
{}
job
:=
&
div
2alpha1
.
DIJob
{}
err
:=
r
.
ctx
.
Get
(
ctx
,
req
.
NamespacedName
,
job
)
if
err
!=
nil
{
if
!
errors
.
IsNotFound
(
err
)
{
...
...
@@ -109,9 +109,9 @@ func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// SetupWithManager sets up the controller with the Manager.
func
(
r
*
DIJobReconciler
)
SetupWithManager
(
mgr
ctrl
.
Manager
)
error
{
return
ctrl
.
NewControllerManagedBy
(
mgr
)
.
For
(
&
div
1alpha2
.
DIJob
{})
.
For
(
&
div
2alpha1
.
DIJob
{})
.
Watches
(
&
source
.
Kind
{
Type
:
&
div
1alpha2
.
DIJob
{}},
&
source
.
Kind
{
Type
:
&
div
2alpha1
.
DIJob
{}},
&
dihandler
.
EventHandler
{
OnCreateHandlers
:
[]
func
(
obj
client
.
Object
){
r
.
onJobAddHandler
,
...
...
@@ -129,7 +129,7 @@ func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
&
source
.
Kind
{
Type
:
&
corev1
.
Pod
{}},
&
handler
.
EnqueueRequestForOwner
{
IsController
:
true
,
OwnerType
:
&
div
1alpha2
.
DIJob
{},
OwnerType
:
&
div
2alpha1
.
DIJob
{},
},
builder
.
Predicates
{},
)
.
...
...
@@ -137,7 +137,7 @@ func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
&
source
.
Kind
{
Type
:
&
corev1
.
Service
{}},
&
handler
.
EnqueueRequestForOwner
{
IsController
:
true
,
OwnerType
:
&
div
1alpha2
.
DIJob
{},
OwnerType
:
&
div
2alpha1
.
DIJob
{},
},
)
.
Complete
(
r
)
...
...
@@ -147,7 +147,7 @@ func (r *DIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func
(
r
*
DIJobReconciler
)
onJobAddHandler
(
obj
client
.
Object
)
{
jobkey
:=
diutil
.
NamespacedName
(
obj
.
GetNamespace
(),
obj
.
GetName
())
log
:=
r
.
ctx
.
Log
.
WithName
(
"onJobAddHandler"
)
.
WithValues
(
"job"
,
jobkey
)
job
,
ok
:=
obj
.
(
*
div
1alpha2
.
DIJob
)
job
,
ok
:=
obj
.
(
*
div
2alpha1
.
DIJob
)
if
!
ok
{
log
.
Error
(
fmt
.
Errorf
(
"failed to convert object to DIJob"
),
""
)
r
.
ctx
.
MarkIncorrectJobFailed
(
obj
)
...
...
@@ -158,7 +158,7 @@ func (r *DIJobReconciler) onJobAddHandler(obj client.Object) {
// update job status
msg
:=
"job created."
if
job
.
Status
.
Phase
==
""
{
r
.
ctx
.
UpdateJobStatus
(
job
,
div
1alpha2
.
JobPending
,
dicontext
.
DIJobPendingReason
,
msg
)
r
.
ctx
.
UpdateJobStatus
(
job
,
div
2alpha1
.
JobPending
,
dicontext
.
DIJobPendingReason
,
msg
)
r
.
ctx
.
Recorder
.
Eventf
(
job
,
corev1
.
EventTypeNormal
,
dicontext
.
DIJobPendingReason
,
msg
)
}
...
...
@@ -172,12 +172,12 @@ func (r *DIJobReconciler) onJobAddHandler(obj client.Object) {
func
(
r
*
DIJobReconciler
)
onJobUpdateHandler
(
old
,
new
client
.
Object
)
{
jobkey
:=
diutil
.
NamespacedName
(
old
.
GetNamespace
(),
old
.
GetName
())
log
:=
r
.
ctx
.
Log
.
WithName
(
"onJobUpdateHandler"
)
.
WithValues
(
"job"
,
jobkey
)
oldjob
,
ok
:=
old
.
(
*
div
1alpha2
.
DIJob
)
oldjob
,
ok
:=
old
.
(
*
div
2alpha1
.
DIJob
)
if
!
ok
{
log
.
Error
(
fmt
.
Errorf
(
"failed to convert object to DIJob"
),
""
)
return
}
newjob
,
ok
:=
new
.
(
*
div
1alpha2
.
DIJob
)
newjob
,
ok
:=
new
.
(
*
div
2alpha1
.
DIJob
)
if
!
ok
{
log
.
Error
(
fmt
.
Errorf
(
"failed to convert object to DIJob"
),
""
)
return
...
...
pkg/controllers/dijob_controller_test.go
浏览文件 @
cb748239
...
...
@@ -12,7 +12,7 @@ package controllers
// "k8s.io/apimachinery/pkg/types"
// "sigs.k8s.io/controller-runtime/pkg/client"
// div
1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
// div
2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
// dicommon "opendilab.org/di-orchestrator/pkg/common"
// commontypes "opendilab.org/di-orchestrator/pkg/common/types"
// diutil "opendilab.org/di-orchestrator/pkg/utils"
...
...
@@ -31,7 +31,7 @@ package controllers
// checkCoordinatorCreated(ctx, dijob)
// By("Checking the created DIJob is in Created state")
// checkDIJobPhase(ctx, k8sClient, jobKey, div
1alpha2
.JobPending)
// checkDIJobPhase(ctx, k8sClient, jobKey, div
2alpha1
.JobPending)
// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
...
...
@@ -42,26 +42,26 @@ package controllers
// By("Checking the created DIJob has enough coordinator")
// coorStatus := make([]int, 3)
// coorStatus[0] = 1
// replicasStatuses := map[div
1alpha2
.ReplicaType][]int{
// div
1alpha2
.ReplicaTypeCoordinator: coorStatus,
// replicasStatuses := map[div
2alpha1
.ReplicaType][]int{
// div
2alpha1
.ReplicaTypeCoordinator: coorStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
// By("Checking the created DIJob is in Running state")
// checkDIJobPhase(ctx, k8sClient, jobKey, div
1alpha2
.JobRunning)
// checkDIJobPhase(ctx, k8sClient, jobKey, div
2alpha1
.JobRunning)
// By("Update coordinator to Succeeded")
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded)
// Expect(err).NotTo(HaveOccurred())
// By("Checking the job is succeeded")
// checkDIJobPhase(ctx, k8sClient, jobKey, div
1alpha2
.JobSucceeded)
// checkDIJobPhase(ctx, k8sClient, jobKey, div
2alpha1
.JobSucceeded)
// By("Checking the coordinator is succeeded")
// coorStatus = make([]int, 3)
// coorStatus[2] = 1
// replicasStatuses = map[div
1alpha2
.ReplicaType][]int{
// div
1alpha2
.ReplicaTypeCoordinator: coorStatus,
// replicasStatuses = map[div
2alpha1
.ReplicaType][]int{
// div
2alpha1
.ReplicaTypeCoordinator: coorStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
...
...
@@ -73,12 +73,12 @@ package controllers
// It("DIJob status changed with components status", func() {
// type testCase struct {
// coorStatus corev1.PodPhase
// expectStatus div
1alpha2
.Phase
// expectStatus div
2alpha1
.Phase
// }
// testCases := []testCase{
// {coorStatus: corev1.PodRunning, expectStatus: div
1alpha2
.JobRunning},
// {coorStatus: corev1.PodFailed, expectStatus: div
1alpha2
.JobFailed},
// {coorStatus: corev1.PodSucceeded, expectStatus: div
1alpha2
.JobSucceeded},
// {coorStatus: corev1.PodRunning, expectStatus: div
2alpha1
.JobRunning},
// {coorStatus: corev1.PodFailed, expectStatus: div
2alpha1
.JobFailed},
// {coorStatus: corev1.PodSucceeded, expectStatus: div
2alpha1
.JobSucceeded},
// }
// for i := range testCases {
// c := testCases[i]
...
...
@@ -107,8 +107,8 @@ package controllers
// case corev1.PodSucceeded:
// coorStatus[2] = 1
// }
// replicasStatuses := map[div
1alpha2
.ReplicaType][]int{
// div
1alpha2
.ReplicaTypeCoordinator: coorStatus,
// replicasStatuses := map[div
2alpha1
.ReplicaType][]int{
// div
2alpha1
.ReplicaTypeCoordinator: coorStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
...
...
@@ -133,7 +133,7 @@ package controllers
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// By("Checking the created DIJob is in Created state")
// checkDIJobPhase(ctx, k8sClient, jobKey, div
1alpha2
.JobPending)
// checkDIJobPhase(ctx, k8sClient, jobKey, div
2alpha1
.JobPending)
// By("Cleaning up")
// err = testutil.CleanUpJob(ctx, k8sClient, &dijob)
...
...
@@ -192,7 +192,7 @@ package controllers
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div
1alpha2.GroupVersion.String(), div1alpha2
.KindDIJob, dijob.Name, dijob.UID, true)
// ownRefer := diutil.NewOwnerReference(div
2alpha1.GroupVersion.String(), div2alpha1
.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name))
// colStatus := make([]int, 3)
...
...
@@ -206,9 +206,9 @@ package controllers
// }
// By("Checking the ReplicaStatus is as expected")
// replicasStatuses := map[div
1alpha2
.ReplicaType][]int{
// div
1alpha2
.ReplicaTypeCollector: colStatus,
// div
1alpha2
.ReplicaTypeLearner: lrStatus,
// replicasStatuses := map[div
2alpha1
.ReplicaType][]int{
// div
2alpha1
.ReplicaTypeCollector: colStatus,
// div
2alpha1
.ReplicaTypeLearner: lrStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
...
...
@@ -224,7 +224,7 @@ package controllers
// Expect(err).NotTo(HaveOccurred())
// By("Checking the job is successfully succeeded")
// checkDIJobPhase(ctx, k8sClient, jobKey, div
1alpha2
.JobSucceeded)
// checkDIJobPhase(ctx, k8sClient, jobKey, div
2alpha1
.JobSucceeded)
// By("Checking the ReplicaStatus is as expected")
// coorStatus := make([]int, 3)
...
...
@@ -239,10 +239,10 @@ package controllers
// lrFinishedStatus[1] = lrStatus[1]
// lrFinishedStatus[2] = lrStatus[0] + lrStatus[2]
// replicasStatuses = map[div
1alpha2
.ReplicaType][]int{
// div
1alpha2
.ReplicaTypeCoordinator: coorStatus,
// div
1alpha2
.ReplicaTypeCollector: colFinishedStatus,
// div
1alpha2
.ReplicaTypeLearner: lrFinishedStatus,
// replicasStatuses = map[div
2alpha1
.ReplicaType][]int{
// div
2alpha1
.ReplicaTypeCoordinator: coorStatus,
// div
2alpha1
.ReplicaTypeCollector: colFinishedStatus,
// div
2alpha1
.ReplicaTypeLearner: lrFinishedStatus,
// }
// checkReplicasStatuses(ctx, k8sClient, jobKey, replicasStatuses)
...
...
@@ -275,7 +275,7 @@ package controllers
// checkCoordinatorCreated(ctx, dijob)
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div
1alpha2.GroupVersion.String(), div1alpha2
.KindDIJob, dijob.Name, dijob.UID, true)
// ownRefer := diutil.NewOwnerReference(div
2alpha1.GroupVersion.String(), div2alpha1
.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("Create replicas for DIJob %s", dijob.Name))
// pod := buildPod(c.name, dijob.Name, dicommon.DDPLearnerName, ownRefer)
...
...
@@ -306,8 +306,8 @@ package controllers
// })
// })
// func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div
1alpha2
.DIJob) (
// div
1alpha2
.DIJob, types.NamespacedName) {
// func createDIJob(ctx context.Context, k8sClient client.Client, dijob *div
2alpha1
.DIJob) (
// div
2alpha1
.DIJob, types.NamespacedName) {
// name := diutil.GenerateName(dijob.Name)
// dijob.SetName(name)
...
...
@@ -316,7 +316,7 @@ package controllers
// By(fmt.Sprintf("Checking the DIJob %s is successfully created", name))
// key := types.NamespacedName{Namespace: dijob.Namespace, Name: dijob.Name}
// createdDIjob := div
1alpha2
.DIJob{}
// createdDIjob := div
2alpha1
.DIJob{}
// Eventually(func() bool {
// err := k8sClient.Get(ctx, key, &createdDIjob)
// return err == nil
...
...
@@ -325,7 +325,7 @@ package controllers
// return createdDIjob, key
// }
// func checkCoordinatorCreated(ctx context.Context, dijob div
1alpha2
.DIJob) {
// func checkCoordinatorCreated(ctx context.Context, dijob div
2alpha1
.DIJob) {
// By("Checking coordinator are created")
// replicaName := diutil.ReplicaPodName(dijob.Name, "coordinator")
// var pod corev1.Pod
...
...
@@ -373,20 +373,20 @@ package controllers
// }
// }
// func checkDIJobPhase(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, phase div
1alpha2
.Phase) {
// var dijob div
1alpha2
.DIJob
// Eventually(func() div
1alpha2
.Phase {
// func checkDIJobPhase(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, phase div
2alpha1
.Phase) {
// var dijob div
2alpha1
.DIJob
// Eventually(func() div
2alpha1
.Phase {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
// return div
1alpha2
.JobUnknown
// return div
2alpha1
.JobUnknown
// }
// return dijob.Status.Phase
// }, timeout, interval).Should(Equal(phase))
// }
// func checkReplicasStatuses(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, replicasStatuses map[div
1alpha2
.ReplicaType][]int) {
// func checkReplicasStatuses(ctx context.Context, k8sClient client.Client, jobKey types.NamespacedName, replicasStatuses map[div
2alpha1
.ReplicaType][]int) {
// for rtype, status := range replicasStatuses {
// var dijob div
1alpha2
.DIJob
// var dijob div
2alpha1
.DIJob
// Eventually(func() []int {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
...
...
pkg/controllers/dijob_test.go
浏览文件 @
cb748239
...
...
@@ -9,7 +9,7 @@ package controllers
// corev1 "k8s.io/api/core/v1"
// "k8s.io/apimachinery/pkg/types"
// div
1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
// div
2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
// dicommon "opendilab.org/di-orchestrator/pkg/common"
// diutil "opendilab.org/di-orchestrator/pkg/utils"
// testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
...
...
@@ -19,10 +19,10 @@ package controllers
// Context("When creating a DIJob with different CleanPodPolicy", func() {
// It("Should execute different pods deletion policy with different CleanPodPolicy", func() {
// cleanPodPolicies := []div
1alpha2
.CleanPodPolicy{
// div
1alpha2
.CleanPodPolicyAll,
// div
1alpha2
.CleanPodPolicyRunning,
// div
1alpha2
.CleanPodPolicyNone,
// cleanPodPolicies := []div
2alpha1
.CleanPodPolicy{
// div
2alpha1
.CleanPodPolicyAll,
// div
2alpha1
.CleanPodPolicyRunning,
// div
2alpha1
.CleanPodPolicyNone,
// }
// for _, policy := range cleanPodPolicies {
// type replica struct {
...
...
@@ -77,7 +77,7 @@ package controllers
// checkCoordinatorCreated(ctx, dijob)
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div
1alpha2.GroupVersion.String(), div1alpha2
.KindDIJob, dijob.Name, dijob.UID, true)
// ownRefer := diutil.NewOwnerReference(div
2alpha1.GroupVersion.String(), div2alpha1
.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("ownRefer: %s %s", ownRefer.APIVersion, ownRefer.Kind))
// colStatus := make([]int, 3)
// for _, col := range c.collectors {
...
...
@@ -106,18 +106,18 @@ package controllers
// }
// By("Checking the job is succeeded")
// Eventually(func() div
1alpha2
.Phase {
// Eventually(func() div
2alpha1
.Phase {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
// return div
1alpha2
.JobUnknown
// return div
2alpha1
.JobUnknown
// }
// return dijob.Status.Phase
// }, timeout, interval).Should(Equal(div
1alpha2
.JobSucceeded))
// }, timeout, interval).Should(Equal(div
2alpha1
.JobSucceeded))
// By("Checking all the pods and services are deleted")
// switch policy {
// case div
1alpha2
.CleanPodPolicyAll:
// case div
2alpha1
.CleanPodPolicyAll:
// Eventually(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
...
...
@@ -132,7 +132,7 @@ package controllers
// }
// return len(svcs)
// }, timeout, interval).Should(Equal(0))
// case div
1alpha2
.CleanPodPolicyNone:
// case div
2alpha1
.CleanPodPolicyNone:
// Consistently(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
...
...
@@ -147,7 +147,7 @@ package controllers
// }
// return len(svcs)
// }, duration, interval).Should(Equal(0))
// case div
1alpha2
.CleanPodPolicyRunning:
// case div
2alpha1
.CleanPodPolicyRunning:
// Eventually(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
...
...
pkg/controllers/handler.go
浏览文件 @
cb748239
...
...
@@ -2,107 +2,107 @@ package controllers
import
(
corev1
"k8s.io/api/core/v1"
"opendilab.org/di-orchestrator/pkg/api/v
1alpha2
"
"opendilab.org/di-orchestrator/pkg/api/v
2alpha1
"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
)
var
(
jobStatusHandlers
map
[
v
1alpha2
.
Phase
]
map
[
v1alpha2
.
Phase
][]
func
(
ctx
dicontext
.
Context
,
job
*
v1alpha2
.
DIJob
)
jobStatusHandlers
map
[
v
2alpha1
.
Phase
]
map
[
v2alpha1
.
Phase
][]
func
(
ctx
dicontext
.
Context
,
job
*
v2alpha1
.
DIJob
)
)
func
init
()
{
jobStatusHandlers
=
make
(
map
[
v
1alpha2
.
Phase
]
map
[
v1alpha2
.
Phase
][]
func
(
ctx
dicontext
.
Context
,
job
*
v1alpha2
.
DIJob
))
jobStatusHandlers
=
make
(
map
[
v
2alpha1
.
Phase
]
map
[
v2alpha1
.
Phase
][]
func
(
ctx
dicontext
.
Context
,
job
*
v2alpha1
.
DIJob
))
registerJobStatusHandlers
()
}
func
registerJobStatusHandlers
()
{
registerEach
:=
func
(
old
,
new
v
1alpha2
.
Phase
,
handlers
...
func
(
ctx
dicontext
.
Context
,
job
*
v1alpha2
.
DIJob
))
{
registerEach
:=
func
(
old
,
new
v
2alpha1
.
Phase
,
handlers
...
func
(
ctx
dicontext
.
Context
,
job
*
v2alpha1
.
DIJob
))
{
if
jobStatusHandlers
[
old
]
==
nil
{
jobStatusHandlers
[
old
]
=
make
(
map
[
v
1alpha2
.
Phase
][]
func
(
ctx
dicontext
.
Context
,
job
*
v1alpha2
.
DIJob
))
jobStatusHandlers
[
old
]
=
make
(
map
[
v
2alpha1
.
Phase
][]
func
(
ctx
dicontext
.
Context
,
job
*
v2alpha1
.
DIJob
))
}
jobStatusHandlers
[
old
][
new
]
=
handlers
}
registerEach
(
v
1alpha2
.
JobPending
,
v1alpha2
.
JobPending
,
nothing
)
registerEach
(
v
1alpha2
.
JobPending
,
v1alpha2
.
JobStarting
,
onJobStarting
)
registerEach
(
v
1alpha2
.
JobPending
,
v1alpha2
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
1alpha2
.
JobPending
,
v1alpha2
.
JobRunning
,
onJobRunning
)
registerEach
(
v
1alpha2
.
JobPending
,
v1alpha2
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobPending
,
v1alpha2
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobStarting
,
v1alpha2
.
JobPending
,
nothing
)
registerEach
(
v
1alpha2
.
JobStarting
,
v1alpha2
.
JobStarting
,
nothing
)
registerEach
(
v
1alpha2
.
JobStarting
,
v1alpha2
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
1alpha2
.
JobStarting
,
v1alpha2
.
JobRunning
,
onJobRunning
)
registerEach
(
v
1alpha2
.
JobStarting
,
v1alpha2
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobStarting
,
v1alpha2
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobRestarting
,
v1alpha2
.
JobPending
,
nothing
)
registerEach
(
v
1alpha2
.
JobRestarting
,
v1alpha2
.
JobStarting
,
onJobStarting
)
registerEach
(
v
1alpha2
.
JobRestarting
,
v1alpha2
.
JobRestarting
,
nothing
)
registerEach
(
v
1alpha2
.
JobRestarting
,
v1alpha2
.
JobRunning
,
onJobRunning
)
registerEach
(
v
1alpha2
.
JobRestarting
,
v1alpha2
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobRestarting
,
v1alpha2
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobRunning
,
v1alpha2
.
JobPending
,
nothing
)
registerEach
(
v
1alpha2
.
JobRunning
,
v1alpha2
.
JobStarting
,
onJobStarting
)
registerEach
(
v
1alpha2
.
JobRunning
,
v1alpha2
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
1alpha2
.
JobRunning
,
v1alpha2
.
JobRunning
,
nothing
)
registerEach
(
v
1alpha2
.
JobRunning
,
v1alpha2
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobRunning
,
v1alpha2
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobFailed
,
v1alpha2
.
JobPending
,
nothing
)
registerEach
(
v
1alpha2
.
JobFailed
,
v1alpha2
.
JobStarting
,
onJobStarting
)
registerEach
(
v
1alpha2
.
JobFailed
,
v1alpha2
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
1alpha2
.
JobFailed
,
v1alpha2
.
JobRunning
,
onJobRunning
)
registerEach
(
v
1alpha2
.
JobFailed
,
v1alpha2
.
JobFailed
,
nothing
)
registerEach
(
v
1alpha2
.
JobFailed
,
v1alpha2
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobSucceeded
,
v1alpha2
.
JobPending
,
nothing
)
registerEach
(
v
1alpha2
.
JobSucceeded
,
v1alpha2
.
JobStarting
,
onJobStarting
)
registerEach
(
v
1alpha2
.
JobSucceeded
,
v1alpha2
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
1alpha2
.
JobSucceeded
,
v1alpha2
.
JobRunning
,
onJobRunning
)
registerEach
(
v
1alpha2
.
JobSucceeded
,
v1alpha2
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
1alpha2
.
JobSucceeded
,
v1alpha2
.
JobSucceeded
,
nothing
)
registerEach
(
v
2alpha1
.
JobPending
,
v2alpha1
.
JobPending
,
nothing
)
registerEach
(
v
2alpha1
.
JobPending
,
v2alpha1
.
JobStarting
,
onJobStarting
)
registerEach
(
v
2alpha1
.
JobPending
,
v2alpha1
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
2alpha1
.
JobPending
,
v2alpha1
.
JobRunning
,
onJobRunning
)
registerEach
(
v
2alpha1
.
JobPending
,
v2alpha1
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobPending
,
v2alpha1
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobStarting
,
v2alpha1
.
JobPending
,
nothing
)
registerEach
(
v
2alpha1
.
JobStarting
,
v2alpha1
.
JobStarting
,
nothing
)
registerEach
(
v
2alpha1
.
JobStarting
,
v2alpha1
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
2alpha1
.
JobStarting
,
v2alpha1
.
JobRunning
,
onJobRunning
)
registerEach
(
v
2alpha1
.
JobStarting
,
v2alpha1
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobStarting
,
v2alpha1
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobRestarting
,
v2alpha1
.
JobPending
,
nothing
)
registerEach
(
v
2alpha1
.
JobRestarting
,
v2alpha1
.
JobStarting
,
onJobStarting
)
registerEach
(
v
2alpha1
.
JobRestarting
,
v2alpha1
.
JobRestarting
,
nothing
)
registerEach
(
v
2alpha1
.
JobRestarting
,
v2alpha1
.
JobRunning
,
onJobRunning
)
registerEach
(
v
2alpha1
.
JobRestarting
,
v2alpha1
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobRestarting
,
v2alpha1
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobRunning
,
v2alpha1
.
JobPending
,
nothing
)
registerEach
(
v
2alpha1
.
JobRunning
,
v2alpha1
.
JobStarting
,
onJobStarting
)
registerEach
(
v
2alpha1
.
JobRunning
,
v2alpha1
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
2alpha1
.
JobRunning
,
v2alpha1
.
JobRunning
,
nothing
)
registerEach
(
v
2alpha1
.
JobRunning
,
v2alpha1
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobRunning
,
v2alpha1
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobFailed
,
v2alpha1
.
JobPending
,
nothing
)
registerEach
(
v
2alpha1
.
JobFailed
,
v2alpha1
.
JobStarting
,
onJobStarting
)
registerEach
(
v
2alpha1
.
JobFailed
,
v2alpha1
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
2alpha1
.
JobFailed
,
v2alpha1
.
JobRunning
,
onJobRunning
)
registerEach
(
v
2alpha1
.
JobFailed
,
v2alpha1
.
JobFailed
,
nothing
)
registerEach
(
v
2alpha1
.
JobFailed
,
v2alpha1
.
JobSucceeded
,
onJobSucceeded
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobSucceeded
,
v2alpha1
.
JobPending
,
nothing
)
registerEach
(
v
2alpha1
.
JobSucceeded
,
v2alpha1
.
JobStarting
,
onJobStarting
)
registerEach
(
v
2alpha1
.
JobSucceeded
,
v2alpha1
.
JobRestarting
,
onJobRestarting
,
updateGeneration
)
registerEach
(
v
2alpha1
.
JobSucceeded
,
v2alpha1
.
JobRunning
,
onJobRunning
)
registerEach
(
v
2alpha1
.
JobSucceeded
,
v2alpha1
.
JobFailed
,
onJobFailed
,
updateReadyReplicas
)
registerEach
(
v
2alpha1
.
JobSucceeded
,
v2alpha1
.
JobSucceeded
,
nothing
)
}
func
HandleJobStatus
(
ctx
dicontext
.
Context
,
old
,
new
*
v
1alpha2
.
DIJob
)
{
func
HandleJobStatus
(
ctx
dicontext
.
Context
,
old
,
new
*
v
2alpha1
.
DIJob
)
{
for
_
,
handler
:=
range
jobStatusHandlers
[
old
.
Status
.
Phase
][
new
.
Status
.
Phase
]
{
handler
(
ctx
,
new
)
}
}
func
nothing
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{}
func
nothing
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{}
func
updateReadyReplicas
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
updateReadyReplicas
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
job
.
Status
.
ReadyReplicas
=
0
}
func
updateGeneration
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
updateGeneration
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
job
.
Status
.
Generation
++
}
func
onJobStarting
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
onJobStarting
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
msg
:=
"job is starting since all replicas are created."
ctx
.
Recorder
.
Eventf
(
job
,
corev1
.
EventTypeNormal
,
dicontext
.
DIJobStartingReason
,
msg
)
}
func
onJobRunning
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
onJobRunning
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
msg
:=
"job is running since all replicas are ready."
ctx
.
Recorder
.
Eventf
(
job
,
corev1
.
EventTypeNormal
,
dicontext
.
DIJobRunningReason
,
msg
)
}
func
onJobRestarting
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
onJobRestarting
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
msg
:=
"job is restarting since conditions changed."
ctx
.
Recorder
.
Eventf
(
job
,
corev1
.
EventTypeWarning
,
dicontext
.
DIJobRestartingReason
,
msg
)
}
func
onJobFailed
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
onJobFailed
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
msg
:=
"job is failed since some replicas are failed."
ctx
.
Recorder
.
Eventf
(
job
,
corev1
.
EventTypeWarning
,
dicontext
.
DIJobFailedReason
,
msg
)
}
func
onJobSucceeded
(
ctx
dicontext
.
Context
,
job
*
v
1alpha2
.
DIJob
)
{
func
onJobSucceeded
(
ctx
dicontext
.
Context
,
job
*
v
2alpha1
.
DIJob
)
{
msg
:=
"job is succeeded since all the replicas are succeeded."
ctx
.
Recorder
.
Eventf
(
job
,
corev1
.
EventTypeNormal
,
dicontext
.
DIJobSucceededReason
,
msg
)
}
pkg/controllers/suite_test.go
浏览文件 @
cb748239
...
...
@@ -33,7 +33,7 @@ import (
logf
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
div1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
v2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
//+kubebuilder:scaffold:imports
)
...
...
@@ -72,7 +72,7 @@ var _ = BeforeSuite(func() {
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
Expect
(
cfg
)
.
NotTo
(
BeNil
())
err
=
div1alpha2
.
AddToScheme
(
scheme
.
Scheme
)
err
=
v2alpha1
.
AddToScheme
(
scheme
.
Scheme
)
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
//+kubebuilder:scaffold:scheme
...
...
pkg/server/dynamic/handler.go
浏览文件 @
cb748239
...
...
@@ -8,7 +8,7 @@ import (
corev1
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
)
func
GetPodFromObject
(
obj
interface
{})
(
*
corev1
.
Pod
,
error
)
{
...
...
@@ -20,7 +20,7 @@ func GetPodFromObject(obj interface{}) (*corev1.Pod, error) {
}
owners
:=
pod
.
GetOwnerReferences
()
for
_
,
owner
:=
range
owners
{
if
owner
.
Kind
==
div
1alpha2
.
KindDIJob
{
if
owner
.
Kind
==
div
2alpha1
.
KindDIJob
{
return
&
pod
,
nil
}
}
...
...
@@ -36,7 +36,7 @@ func GetServiceFromObject(obj interface{}) (*corev1.Service, error) {
}
owners
:=
service
.
GetOwnerReferences
()
for
_
,
owner
:=
range
owners
{
if
owner
.
Kind
==
div
1alpha2
.
KindDIJob
{
if
owner
.
Kind
==
div
2alpha1
.
KindDIJob
{
return
&
service
,
nil
}
}
...
...
pkg/server/dynamic/informer.go
浏览文件 @
cb748239
...
...
@@ -11,7 +11,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
)
var
(
...
...
@@ -27,8 +27,8 @@ type Informers struct {
func
NewDynamicInformer
(
dif
dynamicinformer
.
DynamicSharedInformerFactory
)
Informers
{
// add DIJob informer
diGVR
:=
schema
.
GroupVersionResource
{
Group
:
div
1alpha2
.
GroupVersion
.
Group
,
Version
:
div
1alpha2
.
GroupVersion
.
Version
,
Group
:
div
2alpha1
.
GroupVersion
.
Group
,
Version
:
div
2alpha1
.
GroupVersion
.
Version
,
Resource
:
"dijobs"
,
}
...
...
pkg/server/http/dijob.go
浏览文件 @
cb748239
...
...
@@ -9,7 +9,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
commontypes
"opendilab.org/di-orchestrator/pkg/common/types"
)
...
...
@@ -18,12 +18,12 @@ var (
statusUpdatedPauseDuration
=
50
*
time
.
Millisecond
)
func
(
s
*
DIServer
)
getDIJob
(
namespace
,
name
string
)
(
*
div
1alpha2
.
DIJob
,
error
)
{
func
(
s
*
DIServer
)
getDIJob
(
namespace
,
name
string
)
(
*
div
2alpha1
.
DIJob
,
error
)
{
diUn
,
err
:=
s
.
DIClient
.
Namespace
(
namespace
)
.
Get
(
context
.
Background
(),
name
,
metav1
.
GetOptions
{})
if
err
!=
nil
{
return
nil
,
err
}
var
job
div
1alpha2
.
DIJob
var
job
div
2alpha1
.
DIJob
err
=
runtime
.
DefaultUnstructuredConverter
.
FromUnstructured
(
diUn
.
UnstructuredContent
(),
&
job
)
if
err
!=
nil
{
errMsg
:=
fmt
.
Sprintf
(
"failed to convert unstructured: %s"
,
diUn
.
UnstructuredContent
())
...
...
@@ -32,7 +32,7 @@ func (s *DIServer) getDIJob(namespace, name string) (*div1alpha2.DIJob, error) {
return
&
job
,
nil
}
func
(
s
*
DIServer
)
getCachedDIJobByKey
(
key
string
)
(
*
div
1alpha2
.
DIJob
,
error
)
{
func
(
s
*
DIServer
)
getCachedDIJobByKey
(
key
string
)
(
*
div
2alpha1
.
DIJob
,
error
)
{
obj
,
exists
,
err
:=
s
.
dyi
.
DIInformer
.
Informer
()
.
GetIndexer
()
.
GetByKey
(
key
)
if
err
!=
nil
{
errMsg
:=
fmt
.
Sprintf
(
"failed to get DIJob: %s"
,
err
)
...
...
@@ -44,7 +44,7 @@ func (s *DIServer) getCachedDIJobByKey(key string) (*div1alpha2.DIJob, error) {
}
diUn
:=
obj
.
(
*
unstructured
.
Unstructured
)
var
diJob
div
1alpha2
.
DIJob
var
diJob
div
2alpha1
.
DIJob
err
=
runtime
.
DefaultUnstructuredConverter
.
FromUnstructured
(
diUn
.
UnstructuredContent
(),
&
diJob
)
if
err
!=
nil
{
errMsg
:=
fmt
.
Sprintf
(
"failed to convert unstructured: %s"
,
diUn
.
UnstructuredContent
())
...
...
@@ -68,10 +68,10 @@ func (s *DIServer) needMultiDDPLearnerPod(resource commontypes.ResourceQuantity)
return
false
,
nil
}
func
(
s
*
DIServer
)
updateDIJobStatusInCluster
(
job
*
div
1alpha2
.
DIJob
)
error
{
func
(
s
*
DIServer
)
updateDIJobStatusInCluster
(
job
*
div
2alpha1
.
DIJob
)
error
{
var
err
error
for
i
:=
0
;
i
<
statusUpdateRetries
;
i
++
{
newJob
:=
&
div
1alpha2
.
DIJob
{}
newJob
:=
&
div
2alpha1
.
DIJob
{}
job
,
err
:=
s
.
getDIJob
(
job
.
Namespace
,
job
.
Name
)
if
err
!=
nil
{
break
...
...
pkg/server/http/server.go
浏览文件 @
cb748239
...
...
@@ -26,7 +26,7 @@ import (
)
var
(
apiVersion
=
"v
1alpha2
"
apiVersion
=
"v
2alpha1
"
replicasAPI
=
"/replicas"
)
...
...
pkg/server/http/server_test.go
浏览文件 @
cb748239
...
...
@@ -16,7 +16,7 @@ package http
// "k8s.io/apimachinery/pkg/api/resource"
// "sigs.k8s.io/controller-runtime/pkg/client"
// div
1alpha2 "opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
// div
2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
// dicommon "opendilab.org/di-orchestrator/pkg/common"
// commontypes "opendilab.org/di-orchestrator/pkg/common/types"
// diutil "opendilab.org/di-orchestrator/pkg/utils"
...
...
@@ -25,7 +25,7 @@ package http
// var _ = Describe("Server Test", func() {
// Context("When send request to server", func() {
// It("Should have correct response when request /v
1alpha2/replicas and /v1alpha2
/replicas/failed", func() {
// It("Should have correct response when request /v
2alpha1/replicas and /v2alpha1
/replicas/failed", func() {
// job := testutil.NewDIJob()
// name := diutil.GenerateName(job.Name)
// job.SetName(name)
...
...
@@ -36,10 +36,10 @@ package http
// err = creatDIJob(ctx, job)
// Expect(err).NotTo(HaveOccurred())
// By("Send request on POST /v
1alpha2
/replicas")
// By("Send request on POST /v
2alpha1
/replicas")
// coorname := diutil.ReplicaPodName(job.Name, "coordinator")
// addr := fmt.Sprintf("%s:%d", localServingHost, localServingPort)
// rurl := fmt.Sprintf("http://%s/v
1alpha2
/replicas", addr)
// rurl := fmt.Sprintf("http://%s/v
2alpha1
/replicas", addr)
// var cn, ln int = 2, 3
// req := commontypes.DIJobRequest{
// Namespace: job.Namespace,
...
...
@@ -60,7 +60,7 @@ package http
// Expect(len(diresp.Collectors)).Should(Equal(cn))
// Expect(len(diresp.Learners)).Should(Equal(ln))
// By("Send request on GET /v
1alpha2
/replicas")
// By("Send request on GET /v
2alpha1
/replicas")
// gurl := fmt.Sprintf("%s?namespace=%s&coordinator=%s", rurl, job.Namespace, coorname)
// resp, err := http.Get(gurl)
// Expect(err).NotTo(HaveOccurred())
...
...
@@ -69,8 +69,8 @@ package http
// Expect(len(gdiresp.Collectors)).Should(Equal(cn))
// Expect(len(gdiresp.Learners)).Should(Equal(ln))
// By("Send request on POST /v
1alpha2
/replicas/failed")
// furl := fmt.Sprintf("http://%s/v
1alpha2
/replicas/failed", addr)
// By("Send request on POST /v
2alpha1
/replicas/failed")
// furl := fmt.Sprintf("http://%s/v
2alpha1
/replicas/failed", addr)
// freq := commontypes.DIJobResponse{
// Namespace: job.Namespace,
// Coordinator: coorname,
...
...
@@ -101,7 +101,7 @@ package http
// return len(pods)
// }, timeout, interval).Should(Equal(totalPods))
// By("Send request on DELETE /v
1alpha2
/replicas")
// By("Send request on DELETE /v
2alpha1
/replicas")
// var dcn, dln int = 1, 1
// dreq := commontypes.DIJobRequest{
// Namespace: job.Namespace,
...
...
@@ -144,9 +144,9 @@ package http
// Expect(err).NotTo(HaveOccurred())
// Expect(hresp.StatusCode).Should(Equal(http.StatusOK))
// By("Send request on POST /v
1alpha2
/replicas")
// By("Send request on POST /v
2alpha1
/replicas")
// coorname := diutil.ReplicaPodName(job.Name, "coordinator")
// rurl := fmt.Sprintf("http://%s/v
1alpha2
/replicas", addr)
// rurl := fmt.Sprintf("http://%s/v
2alpha1
/replicas", addr)
// var cn, ln int = 2, 3
// req := commontypes.DIJobRequest{
// Namespace: job.Namespace,
...
...
@@ -167,16 +167,16 @@ package http
// Expect(len(diresp.Collectors)).Should(Equal(cn))
// Expect(len(diresp.Learners)).Should(Equal(ln))
// By("Send not found resource on POST /v
1alpha2
/replicas")
// By("Send not found resource on POST /v
2alpha1
/replicas")
// req.Coordinator = "not-exists"
// rbody, err = json.Marshal(req)
// Expect(err).NotTo(HaveOccurred())
// By("Send bad request on POST /v
1alpha2
/replicas")
// By("Send bad request on POST /v
2alpha1
/replicas")
// _, err = sendRequest(http.MethodPost, rbody, rurl, http.StatusNotFound, false)
// Expect(err).NotTo(HaveOccurred())
// By("Send not implemented method on /v
1alpha2
/replicas")
// By("Send not implemented method on /v
2alpha1
/replicas")
// _, err = sendRequest(http.MethodPatch, rbody, rurl, http.StatusNotImplemented, false)
// Expect(err).NotTo(HaveOccurred())
...
...
@@ -184,7 +184,7 @@ package http
// _, err = sendRequest(http.MethodPost, rbody, rurl, http.StatusBadRequest, false)
// Expect(err).NotTo(HaveOccurred())
// By("Send request on GET /v
1alpha2
/replicas with namespace and coordinator")
// By("Send request on GET /v
2alpha1
/replicas with namespace and coordinator")
// gurl := fmt.Sprintf("%s?namespace=%s&coordinator=%s", rurl, job.Namespace, coorname)
// resp, err := http.Get(gurl)
// Expect(err).NotTo(HaveOccurred())
...
...
@@ -193,7 +193,7 @@ package http
// Expect(len(gdiresp.Collectors)).Should(Equal(cn))
// Expect(len(gdiresp.Learners)).Should(Equal(ln))
// By("Send request on GET /v
1alpha2
/replicas with namespace")
// By("Send request on GET /v
2alpha1
/replicas with namespace")
// gurl = fmt.Sprintf("%s?namespace=%s", rurl, job.Namespace)
// resp, err = http.Get(gurl)
// Expect(err).NotTo(HaveOccurred())
...
...
@@ -204,7 +204,7 @@ package http
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// Expect(nresp.Success).Should(BeTrue())
// By("Send request on GET /v
1alpha2
/replicas")
// By("Send request on GET /v
2alpha1
/replicas")
// gurl = rurl
// resp, err = http.Get(gurl)
// Expect(err).NotTo(HaveOccurred())
...
...
@@ -214,8 +214,8 @@ package http
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// Expect(nresp.Success).Should(BeTrue())
// By("Send request on POST /v
1alpha2
/replicas/failed")
// furl := fmt.Sprintf("http://%s/v
1alpha2
/replicas/failed", addr)
// By("Send request on POST /v
2alpha1
/replicas/failed")
// furl := fmt.Sprintf("http://%s/v
2alpha1
/replicas/failed", addr)
// freq := commontypes.DIJobResponse{
// Namespace: job.Namespace,
// Coordinator: coorname,
...
...
@@ -246,8 +246,8 @@ package http
// return len(pods)
// }, timeout, interval).Should(Equal(totalPods))
// By("Send request on POST /v
1alpha2
/replicas/failed with duplicate replicas")
// fdurl := fmt.Sprintf("http://%s/v
1alpha2
/replicas/failed", addr)
// By("Send request on POST /v
2alpha1
/replicas/failed with duplicate replicas")
// fdurl := fmt.Sprintf("http://%s/v
2alpha1
/replicas/failed", addr)
// fdreq := commontypes.DIJobResponse{
// Namespace: job.Namespace,
// Coordinator: coorname,
...
...
@@ -280,7 +280,7 @@ package http
// return len(pods)
// }, timeout, interval).Should(Equal(dtotalPods))
// By("Send request on DELETE /v
1alpha2
/replicas")
// By("Send request on DELETE /v
2alpha1
/replicas")
// var dcn, dln int = 1, 1
// dreq := commontypes.DIJobRequest{
// Namespace: job.Namespace,
...
...
@@ -337,9 +337,9 @@ package http
// err = creatDIJob(ctx, job)
// Expect(err).NotTo(HaveOccurred())
// By("Send request on POST /v
1alpha2
/replicas")
// By("Send request on POST /v
2alpha1
/replicas")
// coorname := diutil.ReplicaPodName(job.Name, "coordinator")
// rurl := fmt.Sprintf("http://%s/v
1alpha2
/replicas", addr)
// rurl := fmt.Sprintf("http://%s/v
2alpha1
/replicas", addr)
// var ln int = c.ln
// req := commontypes.DIJobRequest{
// Namespace: job.Namespace,
...
...
@@ -369,7 +369,7 @@ package http
// Expect(err).NotTo(HaveOccurred())
// Expect(len(aggs.Items)).Should(Equal(c.expectedAgg))
// By("Send request on GET /v
1alpha2
/replicas with namespace and coordinator")
// By("Send request on GET /v
2alpha1
/replicas with namespace and coordinator")
// gurl := fmt.Sprintf("%s?namespace=%s&coordinator=%s", rurl, job.Namespace, coorname)
// resp, err := http.Get(gurl)
// Expect(err).NotTo(HaveOccurred())
...
...
@@ -379,7 +379,7 @@ package http
// Expect(len(gdiresp.Learners)).Should(Equal(c.expectedLearner + c.expectedAgg))
// if len(aggs.Items) > 0 {
// By("Send request on GET /v
1alpha2
/replicas with namespace and aggregator")
// By("Send request on GET /v
2alpha1
/replicas with namespace and aggregator")
// agg := aggs.Items[0]
// aurl := fmt.Sprintf("%s?namespace=%s&aggregator=%s", rurl, job.Namespace, agg.Name)
// aresp, err := http.Get(aurl)
...
...
@@ -390,8 +390,8 @@ package http
// expectedDDPLs := c.expectedDDPLPorts / c.ln
// Expect(len(adiresp.Learners)).Should(Equal(expectedDDPLs))
// By("Send request on POST /v
1alpha2
/replicas/failed to report failure of aggregator")
// aurl = fmt.Sprintf("http://%s/v
1alpha2
/replicas/failed", addr)
// By("Send request on POST /v
2alpha1
/replicas/failed to report failure of aggregator")
// aurl = fmt.Sprintf("http://%s/v
2alpha1
/replicas/failed", addr)
// areq := commontypes.DIJobResponse{
// Namespace: job.Namespace,
// Coordinator: coorname,
...
...
@@ -415,7 +415,7 @@ package http
// })
// })
// func creatDIJob(ctx context.Context, job *div
1alpha2
.DIJob) error {
// func creatDIJob(ctx context.Context, job *div
2alpha1
.DIJob) error {
// var err error
// err = k8sClient.Create(ctx, job, &client.CreateOptions{})
// if err != nil {
...
...
@@ -423,7 +423,7 @@ package http
// }
// By("Create coordinator")
// ownRefer := diutil.NewOwnerReference(div
1alpha2.GroupVersion.String(), div1alpha2
.KindDIJob, job.Name, job.UID, true)
// ownRefer := diutil.NewOwnerReference(div
2alpha1.GroupVersion.String(), div2alpha1
.KindDIJob, job.Name, job.UID, true)
// coorname := diutil.ReplicaPodName(job.Name, "coordinator")
// coorpod := testutil.NewPod(coorname, job.Name, ownRefer)
...
...
pkg/server/http/suite_test.go
浏览文件 @
cb748239
...
...
@@ -43,7 +43,7 @@ import (
logf
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
serverdynamic
"opendilab.org/di-orchestrator/pkg/server/dynamic"
//+kubebuilder:scaffold:imports
)
...
...
@@ -90,7 +90,7 @@ var _ = BeforeSuite(func() {
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
Expect
(
cfg
)
.
NotTo
(
BeNil
())
err
=
div
1alpha2
.
AddToScheme
(
scheme
.
Scheme
)
err
=
div
2alpha1
.
AddToScheme
(
scheme
.
Scheme
)
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
//+kubebuilder:scaffold:scheme
...
...
@@ -118,8 +118,8 @@ var _ = BeforeSuite(func() {
kubeClient
=
kubernetes
.
NewForConfigOrDie
(
cfg
)
dynamicClient
:=
dynamic
.
NewForConfigOrDie
(
cfg
)
diGVR
:=
schema
.
GroupVersionResource
{
Group
:
div
1alpha2
.
GroupVersion
.
Group
,
Version
:
div
1alpha2
.
GroupVersion
.
Version
,
Group
:
div
2alpha1
.
GroupVersion
.
Group
,
Version
:
div
2alpha1
.
GroupVersion
.
Version
,
Resource
:
"dijobs"
,
}
diclient
:=
dynamicClient
.
Resource
(
diGVR
)
...
...
pkg/utils/testutils/dijob.go
浏览文件 @
cb748239
...
...
@@ -4,21 +4,21 @@ import (
corev1
"k8s.io/api/core/v1"
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
)
func
NewDIJob
()
*
div
1alpha2
.
DIJob
{
return
&
div
1alpha2
.
DIJob
{
func
NewDIJob
()
*
div
2alpha1
.
DIJob
{
return
&
div
2alpha1
.
DIJob
{
TypeMeta
:
metav1
.
TypeMeta
{
Kind
:
div
1alpha2
.
KindDIJob
,
APIVersion
:
div
1alpha2
.
GroupVersion
.
String
(),
Kind
:
div
2alpha1
.
KindDIJob
,
APIVersion
:
div
2alpha1
.
GroupVersion
.
String
(),
},
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
DIJobName
,
Namespace
:
DIJobNamespace
,
},
Spec
:
div
1alpha2
.
DIJobSpec
{
Spec
:
div
2alpha1
.
DIJobSpec
{
Template
:
corev1
.
PodTemplateSpec
{
Spec
:
corev1
.
PodSpec
{
Containers
:
[]
corev1
.
Container
{
...
...
pkg/utils/util.go
浏览文件 @
cb748239
...
...
@@ -13,7 +13,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilrand
"k8s.io/apimachinery/pkg/util/rand"
div
1alpha2
"opendilab.org/di-orchestrator/pkg/api/v1alpha2
"
div
2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1
"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
)
...
...
@@ -45,12 +45,12 @@ func ReplicaName(jobName string, generation, rank int) string {
return
fmt
.
Sprintf
(
"%s-%d-%d"
,
jobName
,
generation
,
rank
)
}
func
IsSucceeded
(
job
*
div
1alpha2
.
DIJob
)
bool
{
return
job
.
Status
.
Phase
==
div
1alpha2
.
JobSucceeded
func
IsSucceeded
(
job
*
div
2alpha1
.
DIJob
)
bool
{
return
job
.
Status
.
Phase
==
div
2alpha1
.
JobSucceeded
}
func
IsFailed
(
job
*
div
1alpha2
.
DIJob
)
bool
{
return
job
.
Status
.
Phase
==
div
1alpha2
.
JobFailed
func
IsFailed
(
job
*
div
2alpha1
.
DIJob
)
bool
{
return
job
.
Status
.
Phase
==
div
2alpha1
.
JobFailed
}
func
IsTerminating
(
pod
*
corev1
.
Pod
)
bool
{
...
...
@@ -96,7 +96,7 @@ func AddPortToPod(pod *corev1.Pod, port corev1.ContainerPort) {
}
}
func
GenLabels
(
job
div
1alpha2
.
DIJob
)
map
[
string
]
string
{
func
GenLabels
(
job
div
2alpha1
.
DIJob
)
map
[
string
]
string
{
return
map
[
string
]
string
{
dicommon
.
LabelGroup
:
job
.
Spec
.
Group
,
dicommon
.
LabelJob
:
strings
.
Replace
(
job
.
Name
,
"/"
,
"-"
,
-
1
),
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录