Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenDILab开源决策智能平台
DI-orchestrator
提交
3e0aa88b
D
DI-orchestrator
项目概览
OpenDILab开源决策智能平台
/
DI-orchestrator
上一次同步 2 年多
通知
1
Star
78
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
DI-orchestrator
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
3e0aa88b
编写于
3月 02, 2022
作者:
L
liqingping
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Develop
上级
294519a3
变更
12
展开全部
显示空白变更内容
内联
并排
Showing
12 changed file
with
688 addition
and
839 deletion
+688
-839
.gitlab-ci.yml
.gitlab-ci.yml
+1
-0
Makefile
Makefile
+4
-3
cmd/server/server.go
cmd/server/server.go
+1
-1
pkg/common/config_test.go
pkg/common/config_test.go
+27
-9
pkg/common/gpuallocator/gpu_allocator.go
pkg/common/gpuallocator/gpu_allocator.go
+0
-132
pkg/common/gpuallocator/gpu_allocator_test.go
pkg/common/gpuallocator/gpu_allocator_test.go
+0
-110
pkg/controllers/dijob_controller_test.go
pkg/controllers/dijob_controller_test.go
+426
-404
pkg/controllers/dijob_test.go
pkg/controllers/dijob_test.go
+209
-160
pkg/controllers/suite_test.go
pkg/controllers/suite_test.go
+9
-13
pkg/server/suite_test.go
pkg/server/suite_test.go
+1
-1
pkg/utils/testutils/dijob.go
pkg/utils/testutils/dijob.go
+3
-0
pkg/utils/testutils/pod.go
pkg/utils/testutils/pod.go
+7
-6
未找到文件。
.gitlab-ci.yml
浏览文件 @
3e0aa88b
...
...
@@ -97,6 +97,7 @@ tag:
-
cloudnative4ai-group-runner-phoenix
services
:
-
registry.sensetime.com/cloudnative4ai/docker:19.03.8-dind
when
:
manual
allow_failure
:
false
dependencies
:
-
build-release
...
...
Makefile
浏览文件 @
3e0aa88b
...
...
@@ -85,10 +85,11 @@ lint:
.PHONY
:
test
test
:
ginkgo
##
Run tests.
$(GINKGO)
-nodes
4
-v
-cover
-coverprofile
=
coverage.out ./pkg/...
go tool cover
-func
=
./pkg/controllers/coverage.out
#
$(GINKGO)
-nodes
4
-v
-cover
-coverprofile
=
coverage.out ./pkg/...
$(GINKGO)
-cover
-coverprofile
=
coverage.out ./pkg/...
go tool cover
-func
=
./pkg/server/coverage.out
go tool cover
-func
=
./pkg/common/gpuallocator/coverage.out
go tool cover
-func
=
./pkg/common/coverage.out
go tool cover
-func
=
./pkg/controllers/coverage.out
##@ Build
...
...
cmd/server/server.go
浏览文件 @
3e0aa88b
...
...
@@ -68,7 +68,7 @@ func NewCmdServer(genFlags cmdcommon.GenericFlags) *cobra.Command {
Examples:
# Start di-server with gpu allocation policy and bind address specified.
di-orchestrator server -p
simple -b :8080
di-orchestrator server -p
:8080 -s :8081
`
,
Run
:
func
(
cmd
*
cobra
.
Command
,
args
[]
string
)
{
cobra
.
CheckErr
(
runCommand
(
cmd
,
o
))
...
...
pkg/common/config_test.go
浏览文件 @
3e0aa88b
package
common
import
(
"fmt"
"os"
"testing"
...
...
@@ -14,16 +15,33 @@ func TestConfig(t *testing.T) {
RegisterFailHandler
(
Fail
)
RunSpecsWithDefaultAndCustomReporters
(
t
,
"Config Suite"
,
"Co
mmonCo
nfig Suite"
,
[]
Reporter
{
printer
.
NewlineReporter
{}})
}
var
_
=
Describe
(
"GetDIJobDefaultResources"
,
func
()
{
defaultResource
:=
`{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}`
err
:=
os
.
Setenv
(
"DI_JOB_DEFAULT_RESOURCES"
,
defaultResource
)
var
_
=
Describe
(
"Test common config"
,
func
()
{
Context
(
"Get DIJob default resources"
,
func
()
{
It
(
"returns the default resources"
,
func
()
{
type
testCase
struct
{
resource
string
expectCPU
string
expectMem
string
}
testCases
:=
[]
testCase
{
{
resource
:
`{"resources": {"requests": {"cpu": 1, "memory": "2Gi"}}}`
,
expectCPU
:
"1"
,
expectMem
:
"2Gi"
},
{
resource
:
`{"resources": {"requests": {"cpu": 2, "memory": "3Gi"}}}`
,
expectCPU
:
"2"
,
expectMem
:
"3Gi"
},
{
resource
:
""
,
expectCPU
:
"1"
,
expectMem
:
"2Gi"
},
}
for
i
:=
range
testCases
{
c
:=
testCases
[
i
]
By
(
fmt
.
Sprintf
(
"Create the %dth DIJob"
,
i
+
1
))
err
:=
os
.
Setenv
(
"DI_JOB_DEFAULT_RESOURCES"
,
c
.
resource
)
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
r
,
err
:=
GetDIJobDefaultResources
()
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
Expect
(
r
.
Requests
.
Cpu
()
.
Equal
(
resource
.
MustParse
(
"1"
)))
.
Should
(
BeTrue
())
Expect
(
r
.
Requests
.
Memory
()
.
Equal
(
resource
.
MustParse
(
"2Gi"
)))
.
Should
(
BeTrue
())
Expect
(
r
.
Requests
.
Cpu
()
.
Equal
(
resource
.
MustParse
(
c
.
expectCPU
)))
.
Should
(
BeTrue
())
Expect
(
r
.
Requests
.
Memory
()
.
Equal
(
resource
.
MustParse
(
c
.
expectMem
)))
.
Should
(
BeTrue
())
}
})
})
})
pkg/common/gpuallocator/gpu_allocator.go
已删除
100644 → 0
浏览文件 @
294519a3
package
gpuallocator
import
(
"fmt"
corev1
"k8s.io/api/core/v1"
)
type
GPUAllocator
struct
{
Nodes
[]
*
corev1
.
Node
policy
Policy
}
func
NewSimpleGPUAllocator
(
nodes
[]
*
corev1
.
Node
)
*
GPUAllocator
{
return
&
GPUAllocator
{
Nodes
:
nodes
,
policy
:
NewSimplePolicy
()}
}
func
(
g
*
GPUAllocator
)
Allocate
(
gpus
int
)
[]
int
{
return
g
.
policy
.
Allocate
(
g
.
Nodes
,
gpus
)
}
func
(
g
*
GPUAllocator
)
NumGPUsOfMajorityNodeType
()
int
{
return
GetGPUsMajority
(
g
.
Nodes
)
}
const
(
SimpleGPUAllocPolicy
=
"simple"
)
type
Policy
interface
{
Allocate
(
nodes
[]
*
corev1
.
Node
,
gpus
int
)
[]
int
}
type
SimplePolicy
struct
{}
func
NewSimplePolicy
()
*
SimplePolicy
{
return
&
SimplePolicy
{}
}
func
(
s
*
SimplePolicy
)
Allocate
(
nodes
[]
*
corev1
.
Node
,
gpus
int
)
[]
int
{
// gpusMajority is the node gpus with most frequent occurrence.
// maxGPUCount is the number of nodes with gpus equal to gpusMajority
gpusMajority
:=
GetGPUsMajority
(
nodes
)
if
gpusMajority
<=
0
{
return
nil
}
perNodeGPUs
:=
Max
(
gpusMajority
,
1
)
if
gpus
<
perNodeGPUs
{
return
[]
int
{
gpus
}
}
var
result
[]
int
nResults
:=
gpus
/
perNodeGPUs
for
i
:=
0
;
i
<
nResults
;
i
++
{
result
=
append
(
result
,
perNodeGPUs
)
}
remainGPUs
:=
gpus
-
nResults
*
perNodeGPUs
if
remainGPUs
>
0
{
result
=
append
(
result
,
remainGPUs
)
}
return
result
}
func
Max
(
x
,
y
int
)
int
{
if
x
<
y
{
return
y
}
return
x
}
func
MaxInArray
(
v
[]
int
)
(
int
,
error
)
{
if
len
(
v
)
==
0
{
return
0
,
fmt
.
Errorf
(
"empty list"
)
}
max
:=
v
[
0
]
for
_
,
i
:=
range
v
{
if
i
>
max
{
max
=
i
}
}
return
max
,
nil
}
func
GetGPUsMajority
(
nodes
[]
*
corev1
.
Node
)
int
{
var
nodeGPUCounts
[]
int
for
_
,
node
:=
range
nodes
{
allocGPUs
:=
node
.
Status
.
Allocatable
[
corev1
.
ResourceName
(
"nvidia.com/gpu"
)]
nodeGPUCounts
=
append
(
nodeGPUCounts
,
int
(
allocGPUs
.
Value
()))
}
// gpusMajority is the number of gpus of majority nodes.
// majorityNodes is the number of nodes with gpus equal to gpusMajority
gpusMajority
,
_
:=
ValueOccursMostFrequentInList
(
nodeGPUCounts
)
if
gpusMajority
==
0
{
max
,
_
:=
MaxInArray
(
nodeGPUCounts
)
return
max
}
return
gpusMajority
}
// ValueOccursMostFrequentInList returns value that occurs most frequently in list,
// and the count of occurrences.
func
ValueOccursMostFrequentInList
(
list
[]
int
)
(
int
,
int
)
{
if
len
(
list
)
==
0
{
return
0
,
0
}
// map the occurrence frequency of each value
maxCount
:=
0
maxCountValue
:=
0
valuesMap
:=
make
(
map
[
int
]
int
)
for
_
,
v
:=
range
list
{
if
valuesMap
[
v
]
!=
0
{
valuesMap
[
v
]
++
}
else
{
valuesMap
[
v
]
=
1
}
if
maxCount
<
valuesMap
[
v
]
{
maxCount
=
valuesMap
[
v
]
maxCountValue
=
v
}
else
if
maxCount
==
valuesMap
[
v
]
&&
maxCountValue
<
v
{
maxCountValue
=
v
}
}
return
maxCountValue
,
maxCount
}
pkg/common/gpuallocator/gpu_allocator_test.go
已删除
100644 → 0
浏览文件 @
294519a3
package
gpuallocator
import
(
"testing"
.
"github.com/onsi/ginkgo"
.
"github.com/onsi/gomega"
corev1
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
)
func
TestAllocators
(
t
*
testing
.
T
)
{
RegisterFailHandler
(
Fail
)
RunSpecsWithDefaultAndCustomReporters
(
t
,
"Allocator Suite"
,
[]
Reporter
{
printer
.
NewlineReporter
{}})
}
var
_
=
Describe
(
"Test SimpleGPUAllocator"
,
func
()
{
It
(
"ValueOccursMostFrequentInList function"
,
func
()
{
testCases
:=
map
[
string
]
struct
{
list
[]
int
expectedValue
int
expectedCount
int
}{
"Only one max value"
:
{
[]
int
{
1
,
2
,
3
,
4
,
5
,
2
,
2
,
4
,
6
,
2
,
3
,
3
,
1
},
2
,
4
,
},
"Multi max value"
:
{
[]
int
{
1
,
2
,
3
,
4
,
5
,
2
,
2
,
4
,
6
,
2
,
3
,
3
,
1
,
3
},
3
,
4
,
},
"Multi max value second"
:
{
[]
int
{
1
,
3
,
3
,
4
,
5
,
2
,
2
,
4
,
6
,
2
,
3
,
2
,
1
,
3
},
3
,
4
,
},
}
for
_
,
test
:=
range
testCases
{
maxValue
,
maxCount
:=
ValueOccursMostFrequentInList
(
test
.
list
)
Expect
(
maxValue
)
.
To
(
Equal
(
test
.
expectedValue
))
Expect
(
maxCount
)
.
To
(
Equal
(
test
.
expectedCount
))
}
})
It
(
"Allocate function"
,
func
()
{
testCases
:=
map
[
string
]
struct
{
nodeGPUs
map
[
int
]
int
gpus
int
result
[]
int
}{
"Only one max value with 12 gpus request"
:
{
map
[
int
]
int
{
8
:
4
,
10
:
3
,
6
:
3
,
},
12
,
[]
int
{
8
,
4
},
},
"Only one max value with 16 gpus request"
:
{
map
[
int
]
int
{
8
:
4
,
10
:
3
,
6
:
3
,
},
16
,
[]
int
{
8
,
8
},
},
"Multi max value with 16 gpus request"
:
{
map
[
int
]
int
{
8
:
4
,
10
:
4
,
6
:
3
,
},
16
,
[]
int
{
10
,
6
},
},
"Multi max value with 8 gpus request"
:
{
map
[
int
]
int
{
8
:
4
,
10
:
4
,
6
:
3
,
},
8
,
[]
int
{
8
},
},
}
for
_
,
test
:=
range
testCases
{
var
nodes
[]
*
corev1
.
Node
for
nodeSpec
,
nodeGPUs
:=
range
test
.
nodeGPUs
{
for
i
:=
0
;
i
<
nodeGPUs
;
i
++
{
nodes
=
append
(
nodes
,
newNode
(
nodeSpec
))
}
}
alloc
:=
NewSimpleGPUAllocator
(
nodes
)
result
:=
alloc
.
Allocate
(
test
.
gpus
)
Expect
(
result
)
.
To
(
Equal
(
test
.
result
))
}
})
})
func
newNode
(
gpus
int
)
*
corev1
.
Node
{
return
&
corev1
.
Node
{
Status
:
corev1
.
NodeStatus
{
Allocatable
:
corev1
.
ResourceList
{
"nvidia.com/gpu"
:
*
resource
.
NewQuantity
(
int64
(
gpus
),
resource
.
DecimalExponent
),
},
},
}
}
pkg/controllers/dijob_controller_test.go
浏览文件 @
3e0aa88b
此差异已折叠。
点击以展开。
pkg/controllers/dijob_test.go
浏览文件 @
3e0aa88b
package
controllers
//
import (
// "contex
t"
// "fmt
"
import
(
"fm
t"
"strings
"
//
. "github.com/onsi/ginkgo"
//
. "github.com/onsi/gomega"
//
corev1 "k8s.io/api/core/v1"
//
"k8s.io/apimachinery/pkg/types"
.
"github.com/onsi/ginkgo"
.
"github.com/onsi/gomega"
corev1
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
//
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
//
dicommon "opendilab.org/di-orchestrator/pkg/common"
//
diutil "opendilab.org/di-orchestrator/pkg/utils"
//
testutil "opendilab.org/di-orchestrator/pkg/utils/testutils"
//
)
div2alpha1
"opendilab.org/di-orchestrator/pkg/api/v2alpha1"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
diutil
"opendilab.org/di-orchestrator/pkg/utils"
testutil
"opendilab.org/di-orchestrator/pkg/utils/testutils"
)
//
var _ = Describe("DIJob Specification", func() {
var
_
=
Describe
(
"DIJob Specification"
,
func
()
{
// Context("When creating a DIJob with different CleanPodPolicy", func() {
// It("Should execute different pods deletion policy with different CleanPodPolicy", func() {
// cleanPodPolicies := []div2alpha1.CleanPodPolicy{
// div2alpha1.CleanPodPolicyAll,
// div2alpha1.CleanPodPolicyRunning,
// div2alpha1.CleanPodPolicyNone,
// }
// for _, policy := range cleanPodPolicies {
// type replica struct {
// name string
// status corev1.PodPhase
// }
// type testCase struct {
// runnings int
// collectors []replica
// learners []replica
// }
// testCases := []testCase{
// {
// runnings: 2,
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodRunning},
// },
// },
// {
// runnings: 2,
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// {name: "job-collector-4tf", status: corev1.PodFailed},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodRunning},
// },
// },
// {
// runnings: 2,
// collectors: []replica{
// {name: "job-collector-sdf", status: corev1.PodRunning},
// {name: "job-collector-4tf", status: corev1.PodFailed},
// },
// learners: []replica{
// {name: "job-learner-sdf", status: corev1.PodSucceeded},
// {name: "job-learner-s4t", status: corev1.PodRunning},
// },
// },
// }
// for i := range testCases {
// c := testCases[i]
// By(fmt.Sprintf("Create %dth DIJob", i+1))
// var err error
// ctx := context.Background()
// jobTmpl := testutil.NewDIJob()
// jobTmpl.Spec.CleanPodPolicy = policy
// dijob, jobKey := createDIJob(ctx, k8sClient, jobTmpl)
// checkCoordinatorCreated(ctx, dijob)
Context
(
"Test DIJob Specification"
,
func
()
{
It
(
"Should execute different pods deletion policy with different CleanPodPolicy"
,
func
()
{
cleanPodPolicies
:=
[]
div2alpha1
.
CleanPodPolicy
{
div2alpha1
.
CleanPodPolicyAll
,
div2alpha1
.
CleanPodPolicyRunning
,
div2alpha1
.
CleanPodPolicyNone
,
}
for
_
,
policy
:=
range
cleanPodPolicies
{
type
testCase
struct
{
runnings
int
// pending pods are also considered as running pods
replicaStatues
[]
corev1
.
PodPhase
}
testCases
:=
[]
testCase
{
{
runnings
:
2
,
replicaStatues
:
[]
corev1
.
PodPhase
{
corev1
.
PodRunning
,
corev1
.
PodRunning
,
corev1
.
PodFailed
,
corev1
.
PodSucceeded
,
},
},
{
runnings
:
0
,
replicaStatues
:
[]
corev1
.
PodPhase
{
corev1
.
PodFailed
,
corev1
.
PodSucceeded
,
corev1
.
PodFailed
,
},
},
{
runnings
:
3
,
replicaStatues
:
[]
corev1
.
PodPhase
{
corev1
.
PodPending
,
corev1
.
PodRunning
,
corev1
.
PodFailed
,
corev1
.
PodRunning
,
},
},
{
runnings
:
0
,
replicaStatues
:
[]
corev1
.
PodPhase
{
corev1
.
PodFailed
,
corev1
.
PodFailed
,
corev1
.
PodFailed
,
},
},
{
runnings
:
0
,
replicaStatues
:
[]
corev1
.
PodPhase
{
corev1
.
PodSucceeded
,
corev1
.
PodSucceeded
,
corev1
.
PodSucceeded
,
},
},
}
for
i
:=
range
testCases
{
c
:=
testCases
[
i
]
By
(
fmt
.
Sprintf
(
"Create %dth DIJob"
,
i
+
1
))
var
err
error
jobTmpl
:=
testutil
.
NewDIJob
()
jobTmpl
.
Spec
.
MinReplicas
=
int32
(
len
(
c
.
replicaStatues
))
jobTmpl
.
Spec
.
CleanPodPolicy
=
policy
job
,
jobKey
:=
createAndUpdateReplicas
(
ctx
,
jobTmpl
)
// // build owner reference
// ownRefer := diutil.NewOwnerReference(div2alpha1.GroupVersion.String(), div2alpha1.KindDIJob, dijob.Name, dijob.UID, true)
// By(fmt.Sprintf("ownRefer: %s %s", ownRefer.APIVersion, ownRefer.Kind))
// colStatus := make([]int, 3)
// for _, col := range c.collectors {
// By(fmt.Sprintf("Create pod %s", col.name))
// createAndUpdatePodPhase(ctx, k8sClient, col.name, dijob.Name, col.status, dicommon.CollectorName, ownRefer, colStatus)
// }
By
(
"Check the created DIJob is in Starting state"
)
checkDIJobPhase
(
ctx
,
jobKey
,
div2alpha1
.
JobStarting
)
// lrStatus := make([]int, 3)
// for _, lr := range c.learners {
// By(fmt.Sprintf("Create pod %s", lr.name))
// createAndUpdatePodPhase(ctx, k8sClient, lr.name, dijob.Name, lr.status, dicommon.LearnerName, ownRefer, lrStatus)
// }
By
(
"Update workers status"
)
for
rank
:=
0
;
rank
<
len
(
c
.
replicaStatues
);
rank
++
{
replicaName
:=
diutil
.
ReplicaName
(
job
.
Name
,
int
(
job
.
Status
.
Generation
),
rank
)
podKey
:=
types
.
NamespacedName
{
Namespace
:
job
.
Namespace
,
Name
:
replicaName
}
err
=
testutil
.
UpdatePodPhase
(
ctx
,
podKey
,
c
.
replicaStatues
[
rank
])
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
}
//
By("Get the number of pods")
// pods, err := diutil.ListPods(ctx, k8sClient, &di
job)
//
Expect(err).NotTo(HaveOccurred())
//
npods := len(pods)
By
(
"Get the number of pods"
)
pods
,
err
:=
ctx
.
ListJobPods
(
&
job
)
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
npods
:=
len
(
pods
)
// By("Update coordinator to Succeeded")
// for _, replicaName := range []string{
// diutil.ReplicaPodName(dijob.Name, "coordinator"),
// } {
// podKey := types.NamespacedName{Namespace: dijob.Namespace, Name: replicaName}
// err = testutil.UpdatePodPhase(ctx, k8sClient, podKey, corev1.PodSucceeded)
// Expect(err).NotTo(HaveOccurred())
// }
By
(
"Checking all the pods and services are deleted"
)
// By("Checking the job is succeeded")
// Eventually(func() div2alpha1.Phase {
// err := k8sClient.Get(ctx, jobKey, &dijob)
// if err != nil {
// return div2alpha1.JobUnknown
// }
// return dijob.Status.Phase
// }, timeout, interval).Should(Equal(div2alpha1.JobSucceeded))
switch
policy
{
case
div2alpha1
.
CleanPodPolicyAll
:
Eventually
(
func
()
int
{
pods
,
err
:=
ctx
.
ListJobPods
(
&
job
)
if
err
!=
nil
{
return
-
1
}
return
len
(
pods
)
},
timeout
,
interval
)
.
Should
(
Equal
(
0
))
Eventually
(
func
()
int
{
svcs
,
err
:=
ctx
.
ListJobServices
(
&
job
)
if
err
!=
nil
{
return
-
1
}
return
len
(
svcs
)
},
timeout
,
interval
)
.
Should
(
Equal
(
0
))
case
div2alpha1
.
CleanPodPolicyNone
:
Consistently
(
func
()
int
{
pods
,
err
:=
ctx
.
ListJobPods
(
&
job
)
if
err
!=
nil
{
return
-
1
}
return
len
(
pods
)
},
duration
,
interval
)
.
Should
(
Equal
(
npods
))
Eventually
(
func
()
int
{
svcs
,
err
:=
ctx
.
ListJobServices
(
&
job
)
if
err
!=
nil
{
return
-
1
}
return
len
(
svcs
)
},
timeout
,
interval
)
.
Should
(
Equal
(
0
))
case
div2alpha1
.
CleanPodPolicyRunning
:
Eventually
(
func
()
int
{
pods
,
err
:=
ctx
.
ListJobPods
(
&
job
)
if
err
!=
nil
{
return
-
1
}
return
len
(
pods
)
},
timeout
,
interval
)
.
Should
(
Equal
(
npods
-
c
.
runnings
))
Eventually
(
func
()
int
{
svcs
,
err
:=
ctx
.
ListJobServices
(
&
job
)
if
err
!=
nil
{
return
-
1
}
return
len
(
svcs
)
},
timeout
,
interval
)
.
Should
(
Equal
(
0
))
}
// By("Checking all the pods and services are deleted")
By
(
"Clean up pods"
)
err
=
ctx
.
CleanUpJob
(
&
job
)
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
}
}
})
It
(
"Should create replicas with different connections relying on topology and parallel workers"
,
func
()
{
type
testCase
struct
{
topology
div2alpha1
.
Topology
replicas
int
paralleWorkers
int
expectAttachedNodes
int
}
testCases
:=
[]
testCase
{
{
topology
:
div2alpha1
.
TopologyAlone
,
replicas
:
1
,
paralleWorkers
:
1
,
expectAttachedNodes
:
0
,
},
{
topology
:
div2alpha1
.
TopologyAlone
,
replicas
:
2
,
paralleWorkers
:
3
,
expectAttachedNodes
:
0
,
},
{
topology
:
div2alpha1
.
TopologyStar
,
replicas
:
1
,
paralleWorkers
:
1
,
expectAttachedNodes
:
0
,
},
{
topology
:
div2alpha1
.
TopologyStar
,
replicas
:
2
,
paralleWorkers
:
3
,
expectAttachedNodes
:
1
,
},
{
topology
:
div2alpha1
.
TopologyStar
,
replicas
:
3
,
paralleWorkers
:
3
,
expectAttachedNodes
:
2
,
},
{
topology
:
div2alpha1
.
TopologyStar
,
replicas
:
3
,
paralleWorkers
:
4
,
expectAttachedNodes
:
2
,
},
{
topology
:
div2alpha1
.
TopologyMesh
,
replicas
:
1
,
paralleWorkers
:
1
,
expectAttachedNodes
:
0
,
},
{
topology
:
div2alpha1
.
TopologyMesh
,
replicas
:
2
,
paralleWorkers
:
3
,
expectAttachedNodes
:
3
,
},
{
topology
:
div2alpha1
.
TopologyMesh
,
replicas
:
3
,
paralleWorkers
:
3
,
expectAttachedNodes
:
9
,
},
{
topology
:
div2alpha1
.
TopologyMesh
,
replicas
:
3
,
paralleWorkers
:
4
,
expectAttachedNodes
:
12
,
},
}
for
i
:=
range
testCases
{
c
:=
testCases
[
i
]
By
(
fmt
.
Sprintf
(
"Create %dth DIJob"
,
i
+
1
))
var
err
error
jobTmpl
:=
testutil
.
NewDIJob
()
jobTmpl
.
Spec
.
MinReplicas
=
int32
(
c
.
replicas
)
jobTmpl
.
Spec
.
EngineFields
.
ParallelWorkers
=
int32
(
c
.
paralleWorkers
)
jobTmpl
.
Spec
.
EngineFields
.
Topology
=
c
.
topology
job
,
jobKey
:=
createAndUpdateReplicas
(
ctx
,
jobTmpl
)
// switch policy {
// case div2alpha1.CleanPodPolicyAll:
// Eventually(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(pods)
// }, timeout, interval).Should(Equal(0))
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(svcs)
// }, timeout, interval).Should(Equal(0))
// case div2alpha1.CleanPodPolicyNone:
// Consistently(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(pods)
// }, duration, interval).Should(Equal(npods))
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(svcs)
// }, duration, interval).Should(Equal(0))
// case div2alpha1.CleanPodPolicyRunning:
// Eventually(func() int {
// pods, err := diutil.ListPods(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(pods)
// }, duration, interval).Should(Equal(npods - c.runnings))
// Eventually(func() int {
// svcs, err := diutil.ListServices(ctx, k8sClient, &dijob)
// if err != nil {
// return -1
// }
// return len(svcs)
// }, duration, interval).Should(Equal(0))
// }
By
(
"Check the created DIJob is in Starting state"
)
checkDIJobPhase
(
ctx
,
jobKey
,
div2alpha1
.
JobStarting
)
// By("Clean up pods")
// err = testutil.CleanUpJob(ctx, k8sClient, dijob.DeepCopy())
// Expect(err).NotTo(HaveOccurred())
// }
// }
// })
// })
// })
By
(
"Check workers' attached nodes are as expected"
)
Eventually
(
func
()
int
{
pods
,
err
:=
ctx
.
ListJobPods
(
&
job
)
if
err
!=
nil
{
return
-
1
}
attachedNodes
:=
0
for
_
,
pod
:=
range
pods
{
for
_
,
env
:=
range
pod
.
Spec
.
Containers
[
0
]
.
Env
{
if
env
.
Name
==
dicommon
.
ENVAttachedNodesArg
{
if
env
.
Value
==
""
{
continue
}
attachedNodes
+=
len
(
strings
.
Split
(
env
.
Value
,
","
))
}
}
}
return
attachedNodes
},
timeout
,
interval
)
.
Should
(
Equal
(
c
.
expectAttachedNodes
))
By
(
"Clean up pods"
)
err
=
ctx
.
CleanUpJob
(
&
job
)
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
}
})
})
})
pkg/controllers/suite_test.go
浏览文件 @
3e0aa88b
...
...
@@ -21,13 +21,13 @@ import (
"fmt"
"path/filepath"
"testing"
"time"
.
"github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/config"
.
"github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
ctrl
"sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
logf
"sigs.k8s.io/controller-runtime/pkg/log"
...
...
@@ -42,20 +42,20 @@ import (
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
const
(
//
timeout = 5 * time.Second
//
interval = 250 * time.Millisecond
//
duration = 200 * time.Millisecond
timeout
=
5
*
time
.
Second
interval
=
250
*
time
.
Millisecond
duration
=
200
*
time
.
Millisecond
)
// var cfg *rest.Config
var
k8sClient
client
.
Clien
t
var
ctx
dicontext
.
Contex
t
var
testEnv
*
envtest
.
Environment
func
TestControllers
(
t
*
testing
.
T
)
{
RegisterFailHandler
(
Fail
)
RunSpecsWithDefaultAndCustomReporters
(
t
,
"Controller Suite"
,
"
DI-
Controller Suite"
,
[]
Reporter
{
printer
.
NewlineReporter
{}})
}
...
...
@@ -77,10 +77,6 @@ var _ = BeforeSuite(func() {
//+kubebuilder:scaffold:scheme
k8sClient
,
err
=
client
.
New
(
cfg
,
client
.
Options
{
Scheme
:
scheme
.
Scheme
})
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
Expect
(
k8sClient
)
.
NotTo
(
BeNil
())
// create controller manager
metricPort
:=
config
.
GinkgoConfig
.
ParallelNode
+
8200
metricAddress
:=
fmt
.
Sprintf
(
":%d"
,
metricPort
)
...
...
@@ -90,11 +86,11 @@ var _ = BeforeSuite(func() {
})
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
ctx
:
=
dicontext
.
NewContext
(
context
.
Background
(),
ctx
=
dicontext
.
NewContext
(
context
.
Background
(),
cfg
,
k8sManager
.
GetClient
(),
k8sManager
.
GetEventRecorderFor
(
"
di-operato
r"
),
ctrl
.
Log
.
WithName
(
"
di-operato
r"
))
k8sManager
.
GetEventRecorderFor
(
"
controlle
r"
),
ctrl
.
Log
.
WithName
(
"
controlle
r"
))
reconciler
:=
NewDIJobReconciler
(
k8sManager
.
GetScheme
(),
ctx
)
if
err
=
reconciler
.
SetupWithManager
(
k8sManager
);
err
!=
nil
{
Expect
(
err
)
.
NotTo
(
HaveOccurred
())
...
...
pkg/server/suite_test.go
浏览文件 @
3e0aa88b
...
...
@@ -68,7 +68,7 @@ func TestServer(t *testing.T) {
RegisterFailHandler
(
Fail
)
RunSpecsWithDefaultAndCustomReporters
(
t
,
"Server Suite"
,
"
DI-
Server Suite"
,
[]
Reporter
{
printer
.
NewlineReporter
{}})
}
...
...
pkg/utils/testutils/dijob.go
浏览文件 @
3e0aa88b
...
...
@@ -19,6 +19,9 @@ func NewDIJob() *div2alpha1.DIJob {
Namespace
:
DIJobNamespace
,
},
Spec
:
div2alpha1
.
DIJobSpec
{
MinReplicas
:
1
,
MaxReplicas
:
4
,
Preemptible
:
false
,
Template
:
corev1
.
PodTemplateSpec
{
Spec
:
corev1
.
PodSpec
{
Containers
:
[]
corev1
.
Container
{
...
...
pkg/utils/testutils/pod.go
浏览文件 @
3e0aa88b
...
...
@@ -15,9 +15,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
dicommon
"opendilab.org/di-orchestrator/pkg/common"
dicontext
"opendilab.org/di-orchestrator/pkg/context"
)
func
NewPod
(
name
,
jobNam
e
string
,
ownRefer
metav1
.
OwnerReference
)
*
corev1
.
Pod
{
func
NewPod
(
name
,
namespac
e
string
,
ownRefer
metav1
.
OwnerReference
)
*
corev1
.
Pod
{
pod
:=
&
corev1
.
Pod
{
TypeMeta
:
metav1
.
TypeMeta
{
APIVersion
:
"v1"
,
...
...
@@ -25,7 +26,7 @@ func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod {
},
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
name
,
Namespace
:
DIJobN
amespace
,
Namespace
:
n
amespace
,
},
Spec
:
corev1
.
PodSpec
{
Containers
:
[]
corev1
.
Container
{
...
...
@@ -41,9 +42,9 @@ func NewPod(name, jobName string, ownRefer metav1.OwnerReference) *corev1.Pod {
return
pod
}
func
UpdatePodPhase
(
ctx
context
.
Context
,
k8sClient
client
.
Clien
t
,
podKey
types
.
NamespacedName
,
phase
corev1
.
PodPhase
)
error
{
func
UpdatePodPhase
(
ctx
dicontext
.
Contex
t
,
podKey
types
.
NamespacedName
,
phase
corev1
.
PodPhase
)
error
{
var
pod
corev1
.
Pod
err
:=
k8sClient
.
Get
(
ctx
,
podKey
,
&
pod
)
err
:=
ctx
.
Get
(
context
.
TODO
()
,
podKey
,
&
pod
)
if
err
!=
nil
{
return
err
}
...
...
@@ -54,11 +55,11 @@ func UpdatePodPhase(ctx context.Context, k8sClient client.Client, podKey types.N
state
:=
corev1
.
ContainerStateRunning
{}
cstatus
:=
corev1
.
ContainerStatus
{
Name
:
containerName
,
State
:
corev1
.
ContainerState
{
Running
:
&
state
,
}}
}
,
Ready
:
true
}
pod
.
Status
.
ContainerStatuses
=
append
(
pod
.
Status
.
ContainerStatuses
,
cstatus
)
}
err
=
k8sClient
.
Status
()
.
Update
(
ctx
,
&
pod
,
&
client
.
UpdateOptions
{})
err
=
ctx
.
Status
()
.
Update
(
context
.
TODO
()
,
&
pod
,
&
client
.
UpdateOptions
{})
if
err
!=
nil
{
return
err
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录