提交 35a0c826 编写于 作者: L liqingping

test: add pod failed test and pod deleted test

上级 778b53ac
......@@ -5,7 +5,6 @@ import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
dicommon "opendilab.org/di-orchestrator/common"
diutil "opendilab.org/di-orchestrator/utils"
......@@ -79,13 +78,7 @@ func buildPodAndServiceForReplica(template *corev1.PodTemplateSpec, job *div1alp
}
// build owner reference
ownRefer := metav1.OwnerReference{
APIVersion: job.APIVersion,
Kind: job.Kind,
Name: job.Name,
UID: job.GetUID(),
Controller: func(c bool) *bool { return &c }(true),
}
ownRefer := diutil.NewOwnerReference(job.APIVersion, job.Kind, job.Name, job.UID, true)
// build pod
pod, svc, port, err := diutil.BuildPodAndService(template, ownRefer, job.Name, job.Namespace, replicaType, volumes)
......
apiVersion: diengine.opendilab.org/v1alpha1
kind: AggregatorConfig
metadata:
name: aggregator-config
namespace: di-system
spec:
aggregator:
template:
spec:
containers:
- name: shell
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
resources:
requests:
cpu: 3
memory: "10Gi"
limits:
cpu: 3
memory: "10Gi"
command: ["/bin/bash", "-c",]
args:
- |
# if code has been changed in the mount path, we have to reinstall cli
# pip install --no-cache-dir -e .;
# pip install --no-cache-dir -e .[common_env]
ding -m dist --module learner_aggregator
ports:
- name: di-port
containerPort: 22270
\ No newline at end of file
......@@ -19,7 +19,7 @@ spec:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
......@@ -155,7 +155,7 @@ spec:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
......@@ -175,7 +175,7 @@ spec:
spec:
containers:
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
......
apiVersion: diengine.opendilab.org/v1alpha1
kind: DIJob
metadata:
name: dijob-cartpole-dqn
spec:
group: xxx
priorityClassName: ""
cleanPodPolicy: "Running"
volumes:
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
- name: work-dir
hostPath:
path: /data/nfs/ding/cartpole
coordinator:
template:
spec:
shareProcessNamespace: true
containers:
- name: shell
command: ["sh", "-c"]
args:
- |
sleep 30s;
ps -ef |grep "/opt/conda/bin/ding"| grep -v "ps -ef" | awk 'NR<2';
kill -9 $(ps -ef |grep "/opt/conda/bin/ding"| grep -v "ps -ef"|awk 'NR<2 {print $1}');
exit 0;
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > cartpole_dqn_config_k8s.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
encoder_hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
# increase collector task space when get rs from server
collector_task_space=0,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['dizoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['ding.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['ding.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['ding.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['ding.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['ding.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"replicas": 2,
},
learners={
"gpus": "1",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='./data',
path_policy='./policy',
communication_mode='auto',
learner_gpu_num=1,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
ding -m dist --module config -P k8s -c ./cartpole_dqn_config_k8s.py -s 0;
ding -m dist --module coordinator -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -cdp $COORDINATOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
collector:
template:
spec:
shareProcessNamespace: true
containers:
- name: shell
command: ["sh", "-c"]
args:
- |
sleep 30s;
ps -ef |grep "/opt/conda/bin/ding"| grep -v "ps -ef" | awk 'NR<2';
kill -9 $(ps -ef |grep "/opt/conda/bin/ding"| grep -v "ps -ef"|awk 'NR<2 {print $1}');
exit 0;
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- |
ding -m dist --module collector -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -clp $COLLECTOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
learner:
template:
spec:
shareProcessNamespace: true
containers:
- name: shell
command: ["sh", "-c"]
args:
- |
sleep 30s;
ps -ef |grep "/opt/conda/bin/ding"| grep -v "ps -ef" | awk 'NR<2';
kill -9 $(ps -ef |grep "/opt/conda/bin/ding"| grep -v "ps -ef"|awk 'NR<2 {print $1}');
exit 0;
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- |
ding -m dist --module spawn_learner -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -lp $LEARNER_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: cache-volume
mountPath: /dev/shm
- name: work-dir
mountPath: /ding
\ No newline at end of file
apiVersion: diengine.opendilab.org/v1alpha1
kind: DIJob
metadata:
name: dijob-cartpole-dqn
spec:
group: xxx
priorityClassName: ""
cleanPodPolicy: "Running"
volumes:
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
- name: work-dir
hostPath:
path: /data/nfs/ding/cartpole
coordinator:
template:
spec:
shareProcessNamespace: true
containers:
- name: shell
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- |
cat <<EOF > cartpole_dqn_config_k8s.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
encoder_hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
# increase collector task space when get rs from server
collector_task_space=0,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['dizoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['ding.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['ding.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['ding.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['ding.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['ding.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='di-server.di-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"replicas": 2,
},
learners={
"gpus": "2",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='./data',
path_policy='./policy',
communication_mode='auto',
learner_gpu_num=2,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
ding -m dist --module config -P k8s -c ./cartpole_dqn_config_k8s.py -s 0;
ding -m dist --module coordinator -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -cdp $COORDINATOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
collector:
template:
spec:
shareProcessNamespace: true
containers:
- name: shell
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- |
ding -m dist --module collector -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -clp $COLLECTOR_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: work-dir
mountPath: /ding
learner:
template:
spec:
shareProcessNamespace: true
containers:
- name: shell
image: busybox
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
add:
- SYS_PTRACE
stdin: true
tty: true
- name: di-container
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- |
ding -m dist --module spawn_learner -c ./cartpole_dqn_config_k8s.py.pkl -s 0 -lp $LEARNER_PORT
ports:
- name: di-port
containerPort: 22270
volumeMounts:
- name: cache-volume
mountPath: /dev/shm
- name: work-dir
mountPath: /ding
\ No newline at end of file
......@@ -29,7 +29,7 @@ spec:
stdin: true
tty: true
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
......@@ -175,7 +175,7 @@ spec:
stdin: true
tty: true
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
......@@ -205,7 +205,7 @@ spec:
stdin: true
tty: true
- name: di-container
image: diorchestrator/ding:v0.1.0-df39b81c
image: registry.sensetime.com/cloudnative4ai/ding:v0.1.0-75c41277
imagePullPolicy: Always
env:
- name: PYTHONUNBUFFERED
......
......@@ -17,7 +17,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
div1alpha1 "opendilab.org/di-orchestrator/api/v1alpha1"
testutil "opendilab.org/di-orchestrator/utils/testutils"
)
func TestE2E(t *testing.T) {
......@@ -80,8 +79,8 @@ var _ = BeforeSuite(func() {
clientset, err = kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())
agconfig := testutil.NewAggregatorConfig()
err = k8sClient.Create(context.Background(), agconfig, &client.CreateOptions{})
agconfig := buildAGConfig("./config/agconfig.yaml")
err = k8sClient.Update(context.Background(), agconfig, &client.UpdateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
Expect(err).NotTo(HaveOccurred())
}
......@@ -89,3 +88,9 @@ var _ = BeforeSuite(func() {
k8sClient.DeleteAllOf(context.Background(), &div1alpha1.DIJob{},
client.InNamespace(namespace), client.MatchingLabels{"stability-test": "dijobs"})
})
var _ = AfterSuite(func() {
agconfig := buildAGConfig("../config/samples/agconfig.yaml")
err := k8sClient.Update(context.Background(), agconfig, &client.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
})
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"os/exec"
"path/filepath"
"strings"
"time"
......@@ -34,7 +35,7 @@ const (
var _ = Describe("E2E test for DI-engine", func() {
Context("When DIJob meets network exception", func() {
It("Should reconnect after each module is available again", func() {
It("Should reconnect after each module's service is recreated", func() {
testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"}
for i := range testCases {
tc := testCases[i]
......@@ -63,7 +64,7 @@ var _ = Describe("E2E test for DI-engine", func() {
err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout)
Expect(err).NotTo(HaveOccurred())
By("Delete coordinator service for a while")
By(fmt.Sprintf("Delete %s service for a while", tc))
svcs, err := diutil.ListServices(ctx, k8sClient, job)
Expect(err).NotTo(HaveOccurred())
......@@ -73,20 +74,11 @@ var _ = Describe("E2E test for DI-engine", func() {
svc = isvc
}
}
svc.ResourceVersion = ""
svc.Status = corev1.ServiceStatus{}
By(fmt.Sprintf("Delete %s service", svc.Name))
err = k8sClient.Delete(ctx, svc, &client.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Sleep for %s before rebuilding connection", networkFailedDuration.String()))
time.Sleep(networkFailedDuration)
By("Recreate service to rebuild connection")
err = k8sClient.Create(ctx, svc, &client.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
By("Waiting for job to be succeeded")
jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}
Eventually(func() div1alpha1.Phase {
......@@ -101,7 +93,7 @@ var _ = Describe("E2E test for DI-engine", func() {
}
})
It("Should reconnect after all modules are available again", func() {
It("Should reconnect after all replicas are available again", func() {
By("Create DIJob")
jobPath := filepath.Join(exampleJobsDir, "dijob-multi-gpu.yaml")
sharedVolumePath := filepath.Join(sharedVolumesDir, "cartpole-multi-gpu")
......@@ -134,13 +126,7 @@ var _ = Describe("E2E test for DI-engine", func() {
err = k8sClient.Delete(ctx, svc, &client.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Sleep for %s before rebuilding connection", networkFailedDuration.String()))
time.Sleep(networkFailedDuration)
By("Recreate service to rebuild connection")
err = k8sClient.Create(ctx, svc, &client.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Sleep for %s before delete next module", networkFailedDuration.String()))
time.Sleep(networkFailedDuration)
}
......@@ -157,6 +143,142 @@ var _ = Describe("E2E test for DI-engine", func() {
testutil.CleanUpJob(ctx, k8sClient, job)
})
})
Context("When some of DIJob's replicas are deleted", func() {
It("DIJob should catch the change and request di-server to create new replicas", func() {
testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"}
for i := range testCases {
tc := testCases[i]
By(fmt.Sprintf("Create %dth DIJob", i+1))
jobPath := filepath.Join(exampleJobsDir, "dijob-sidecar.yaml")
if tc == "aggregator" || tc == "ddp-learner" {
jobPath = filepath.Join(exampleJobsDir, "dijob-sidecar-multi-gpu.yaml")
}
sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i))
job := buildDIJob(jobPath, sharedVolumePath)
ctx := context.Background()
var err error
err = k8sClient.Create(ctx, job, &client.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Waiting for replicas to be running"))
err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning)
Expect(err).NotTo(HaveOccurred())
By("Checking coordinator's log to decide to delete replicas")
replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator")
err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Delete %s pod for a while", tc))
pods, err := diutil.ListPods(ctx, k8sClient, job)
Expect(err).NotTo(HaveOccurred())
var pod *corev1.Pod
for _, ipod := range pods {
if strings.Contains(ipod.Name, tc) {
pod = ipod
}
}
By(fmt.Sprintf("Delete %s replica", pod.Name))
err = k8sClient.Delete(ctx, pod, &client.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
By("Waiting for job to be succeeded")
jobKey := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}
Eventually(func() div1alpha1.Phase {
err := k8sClient.Get(ctx, jobKey, job)
if err != nil {
return div1alpha1.JobUnknown
}
return job.Status.Phase
}, timeout, interval).Should(Equal(div1alpha1.JobSucceeded))
testutil.CleanUpJob(ctx, k8sClient, job)
}
})
})
Context("When some of DIJob's replicas are failed", func() {
It("DIJob should catch the change and request di-server to create new replicas", func() {
testCases := []string{"coordinator", "collector", "learner", "aggregator", "ddp-learner"}
for i := range testCases {
tc := testCases[i]
By(fmt.Sprintf("Create %dth DIJob", i+1))
jobPath := filepath.Join(exampleJobsDir, "dijob-sidecar.yaml")
if tc == "aggregator" || tc == "ddp-learner" {
jobPath = filepath.Join(exampleJobsDir, "dijob-sidecar-multi-gpu.yaml")
}
sharedVolumePath := filepath.Join(sharedVolumesDir, fmt.Sprintf("cartpole-%d", i))
job := buildDIJob(jobPath, sharedVolumePath)
ctx := context.Background()
var err error
err = k8sClient.Create(ctx, job, &client.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Waiting for replicas to be running"))
err = testutil.WaitForAllReplicas(ctx, k8sClient, job, corev1.PodRunning)
Expect(err).NotTo(HaveOccurred())
By("Checking coordinator's log to decide to failed replicas")
replicaName := fmt.Sprintf("%s-%s", job.Name, "coordinator")
err = testutil.GetPodLogs(clientset, namespace, replicaName, dicommon.DefaultContainerName, true, messageRepresentJobStartsTraining, logTimeout)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Make %s pod failed", tc))
pods, err := diutil.ListPods(ctx, k8sClient, job)
Expect(err).NotTo(HaveOccurred())
var pod *corev1.Pod
for _, ipod := range pods {
if strings.Contains(ipod.Name, tc) {
pod = ipod
}
}
By(fmt.Sprintf("Make %s replica failed", pod.Name))
cmd := fmt.Sprintf("kubectl exec -n %s %s -c shell -- sh -c ", pod.Namespace, pod.Name)
podCmd := `"kill -9 \$(ps -ef |grep \"/opt/conda/bin/ding\"| grep -v \"ps -ef\"|awk 'NR<2 {print \$1}')"`
cmd = fmt.Sprintf("%s%s", cmd, podCmd)
command := exec.Command("bash", "-c", cmd)
_, err = command.CombinedOutput()
Expect(err).NotTo(HaveOccurred())
By("Waiting for job to be completed")
expectedContainerStatus := "Completed"
if tc == "coordinator" {
expectedContainerStatus = "Error"
}
podKey := types.NamespacedName{Namespace: job.Namespace, Name: fmt.Sprintf("%s-%s", job.Name, tc)}
Eventually(func() string {
var pod *corev1.Pod
err := k8sClient.Get(ctx, podKey, pod)
if err != nil {
return "Unknown"
}
for _, status := range pod.Status.ContainerStatuses {
if status.Name != dicommon.DefaultContainerName {
continue
}
if status.State.Terminated != nil {
return status.State.Terminated.Reason
}
}
return "Unknown"
}, timeout, interval).Should(Equal(expectedContainerStatus))
testutil.CleanUpJob(ctx, k8sClient, job)
}
})
})
})
func buildDIJob(jobPath, sharedVolumePath string) *div1alpha1.DIJob {
......@@ -183,3 +305,14 @@ func buildDIJob(jobPath, sharedVolumePath string) *div1alpha1.DIJob {
job.Labels["stability-test"] = "dijobs"
return &job
}
func buildAGConfig(configPath string) *div1alpha1.AggregatorConfig {
yamlFile, err := ioutil.ReadFile(configPath)
Expect(err).NotTo(HaveOccurred())
var agconfig div1alpha1.AggregatorConfig
err = yaml.Unmarshal(yamlFile, &agconfig)
Expect(err).NotTo(HaveOccurred())
return &agconfig
}
......@@ -249,7 +249,7 @@ func (s *DIServer) createDDPLearnerPods(template *corev1.PodTemplateSpec,
replicaResource.GPU = resource.MustParse(fmt.Sprintf("%d", gpus))
// build ddp learner pod
pod, _, err := buildDDPLearnerPodAndService(template, ownRefer, aggOwnRefer,
pod, _, err := buildDDPLearnerPod(template, ownRefer, aggOwnRefer,
jobName, namespace, *replicaResource, volumes)
if err != nil {
return err
......@@ -284,7 +284,7 @@ func (s *DIServer) createDDPLearnerPods(template *corev1.PodTemplateSpec,
}
} else {
// build ddp learner pod
pod, _, err := buildDDPLearnerPodAndService(template, ownRefer, aggOwnRefer,
pod, _, err := buildDDPLearnerPod(template, ownRefer, aggOwnRefer,
jobName, namespace, resources, volumes)
if err != nil {
return err
......@@ -302,7 +302,7 @@ func (s *DIServer) createDDPLearnerPods(template *corev1.PodTemplateSpec,
return nil
}
func buildDDPLearnerPodAndService(template *corev1.PodTemplateSpec,
func buildDDPLearnerPod(template *corev1.PodTemplateSpec,
ownRefer, aggOwnRefer metav1.OwnerReference,
jobName, namespace string,
resources commontypes.ResourceQuantity, volumes []corev1.Volume) (*corev1.Pod, int32, error) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册