提交 e6f102d7 编写于 作者: L liqingping

Merge branch 'feat/volumes' into 'develop'

Feat/volumes

See merge request platform/CloudNative4AI/cluster-lifecycle/nervex-operator!22
......@@ -38,6 +38,9 @@ type NerveXJobSpec struct {
// CleanPodPolicy defines the policy to clean pods after NerveXJob completed
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for nerveX components
Volumes []corev1.Volume `json:"volumes,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
Collector CollectorSpec `json:"collector,"`
......
......@@ -21,6 +21,7 @@ limitations under the License.
package v1alpha1
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
......@@ -281,6 +282,13 @@ func (in *NerveXJobList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NerveXJobSpec) DeepCopyInto(out *NerveXJobSpec) {
*out = *in
if in.Volumes != nil {
in, out := &in.Volumes, &out.Volumes
*out = make([]v1.Volume, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
in.Coordinator.DeepCopyInto(&out.Coordinator)
in.Collector.DeepCopyInto(&out.Collector)
in.Learner.DeepCopyInto(&out.Learner)
......
......@@ -42,7 +42,7 @@ func (r *NerveXJobReconciler) reconcilePods(ctx context.Context, job *nervexv1al
// build aggregator pod
template := agconfig.Spec.Aggregator.Template.DeepCopy()
agpod, agsvc, agurl, err := buildPodAndServiceForReplica(template, job, nervexutil.AggregatorName,
nervexutil.DefaultAggregatorContainerName, nervexutil.DefaultAggregatorPortName, nervexutil.DefaultAggregatorPort)
nervexutil.DefaultAggregatorContainerName, nervexutil.DefaultAggregatorPortName, nervexutil.DefaultAggregatorPort, nil)
if err != nil {
msg := fmt.Sprintf("build aggregator pod for job %s failed", job.Name)
log.Error(err, msg)
......@@ -50,9 +50,10 @@ func (r *NerveXJobReconciler) reconcilePods(ctx context.Context, job *nervexv1al
}
// build coordinator pod
volumes := job.Spec.Volumes
template = job.Spec.Coordinator.Template.DeepCopy()
coorpod, coorsvc, coorurl, err := buildPodAndServiceForReplica(template, job, nervexutil.CoordinatorName,
nervexutil.DefaultCoordinatorContainerName, nervexutil.DefaultCoordinatorPortName, nervexutil.DefaultCoordinatorPort)
nervexutil.DefaultCoordinatorContainerName, nervexutil.DefaultCoordinatorPortName, nervexutil.DefaultCoordinatorPort, volumes)
if err != nil {
msg := fmt.Sprintf("build coordinator pod for job %s failed", job.Name)
log.Error(err, msg)
......@@ -172,7 +173,7 @@ func (r *NerveXJobReconciler) deleteService(ctx context.Context, job *nervexv1al
}
func buildPodAndServiceForReplica(template *corev1.PodTemplateSpec, job *nervexv1alpha1.NerveXJob,
replicaType, containerName, portName string, defaultPort int32) (*corev1.Pod, *corev1.Service, string, error) {
replicaType, containerName, portName string, defaultPort int32, volumes []corev1.Volume) (*corev1.Pod, *corev1.Service, string, error) {
if string(job.Spec.PriorityClassName) != "" {
template.Spec.PriorityClassName = string(job.Spec.PriorityClassName)
}
......@@ -204,6 +205,9 @@ func buildPodAndServiceForReplica(template *corev1.PodTemplateSpec, job *nervexv
envs[nervexutil.PodNameEnv] = pod.Name
nervexutil.SetPodEnv(pod, envs)
// add volumes
pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
// build service
svc := nervexutil.BuildService(pod.GetLabels(), port, portName)
svc.SetOwnerReferences([]metav1.OwnerReference{ownRefer})
......
......@@ -5,124 +5,442 @@ metadata:
spec:
group: xxx
priorityClassName: ""
cleanPodPolicy: "Running"
volumes:
- name: data-dir
hostPath:
path: /data/nfs/nervex/cartpole
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
coordinator:
template:
spec:
containers:
- name: coordinator
image: registry.sensetime.com/cloudnative4ai/nervex-parallel-linklink-migration:v0.10
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.1-torch1.4-cuda10.1-cudnn7-devel-9472640d
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: NCCL_SHM_DISABLE
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- >
cd /nfs/lijianwen/test_atari/test3;
python3 -u -c 'import nervex.entry.parallel_entry as pe; pe.launch_coordinator(filename="atari_impala_default_config.py.pkl")';
while true; do sleep 30; done;
- |
cat <<EOF > cartpole_dqn_config.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
collector_task_space=2,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['app_zoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['nervex.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['nervex.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['nervex.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['nervex.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['nervex.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='http://nervex-server.nervex-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"cpu": "3",
"memory": "4Gi",
"replicas": 2,
},
learners={
"cpu": "3",
"memory": "4Gi",
"gpu": "0",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='/data/nfs/nervex/cartpole/data',
path_policy='/data/nfs/nervex/cartpole/policy',
communication_mode='auto',
learner_multi_gpu=False,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
cat nervex/entry/dist_entry.py;
nervex -m dist --module config -p k8s -c cartpole_dqn_config.py -s 0;
nervex -m dist --module coordinator -p k8s -c cartpole_dqn_config.py.pkl -s 0 --disable_flask_log 0
ports:
- name: coordinator
containerPort: 22273
resources:
requests:
cpu: 2
memory: 5Gi
limits:
cpu: 2
memory: 5Gi
containerPort: 22270
volumeMounts:
- name: config
mountPath: /data/nervex
volumes:
- name: config
nfs:
path: /data/nfs/nervex
server: 10.152.197.14
- name: data-dir
mountPath: /data/nfs/nervex/cartpole
collector:
template:
spec:
containers:
- name: collector
image: registry.sensetime.com/cloudnative4ai/nervex-parallel-linklink-migration:v0.10
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.1-torch1.4-cuda10.1-cudnn7-devel-9472640d
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: NCCL_SHM_DISABLE
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- >
until ping -c 1 $HOSTNAME &>/dev/null ; do sleep 1 ; done ;
cd $EXECTIONPATH;
python3 -u -c 'import os; import nervex.entry.parallel_entry as pe; pe.launch_actor(filename="$REALFILENAME", name="actor{}".format(os.environ["HOSTNAME"].split("-")[-1]) )';
- |
cat <<EOF > cartpole_dqn_config.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
collector_task_space=2,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['app_zoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['nervex.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['nervex.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['nervex.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['nervex.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['nervex.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='http://nervex-server.nervex-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"cpu": "3",
"memory": "4Gi",
"replicas": 2,
},
learners={
"cpu": "3",
"memory": "4Gi",
"gpu": "0",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='/data/nfs/nervex/cartpole/data',
path_policy='/data/nfs/nervex/cartpole/policy',
communication_mode='auto',
learner_multi_gpu=False,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
nervex -m dist --module config -p k8s -c cartpole_dqn_config.py -s 0;
nervex -m dist --module collector -c cartpole_dqn_config.py.pkl -s 0
ports:
- name: collector
containerPort: 22270
resources:
requests:
cpu: 2
memory: 5Gi
limits:
cpu: 2
memory: 5Gi
volumeMounts:
- name: config
mountPath: /data/nervex
volumes:
- name: config
nfs:
path: /data/nfs/nervex
server: 10.152.197.14
- name: data-dir
mountPath: /data/nfs/nervex/cartpole
learner:
template:
spec:
containers:
- name: learner
image: registry.sensetime.com/cloudnative4ai/nervex-parallel-linklink-migration:v0.10
image: registry.sensetime.com/cloudnative4ai/nervex:v0.0.1-torch1.4-cuda10.1-cudnn7-devel-9472640d
imagePullPolicy: Always
env:
- name: LC_ALL
value: "en_US.utf-8"
- name: LANG
value: "en_US.utf-8"
- name: NCCL_SHM_DISABLE
- name: PYTHONUNBUFFERED
value: "1"
command: ["/bin/bash", "-c",]
args:
- >
until ping -c 1 $HOSTNAME &>/dev/null ; do sleep 1 ; done ;
cd $EXECTIONPATH;
mpirun -np $REPEATNUM python3 -u -c 'import os; import pickle; import nervex.entry.parallel_entry as pe; pe.launch_learner(filename="$REALFILENAME", name="learner{}".format(os.environ["HOSTNAME"].split("-")[-1]) )';
python3 -u -c 'import os; import pickle; import nervex.entry.parallel_entry as pe; pe.launch_learner(filename="$REALFILENAME", name="learner{}".format(os.environ["HOSTNAME"].split("-")[-1]) )';
- |
cat <<EOF > cartpole_dqn_config.py
from easydict import EasyDict
cartpole_dqn_config = dict(
env=dict(
collector_env_num=8,
collector_episode_num=2,
evaluator_env_num=5,
evaluator_episode_num=1,
stop_value=195,
),
policy=dict(
cuda=False,
model=dict(
obs_shape=4,
action_shape=2,
hidden_size_list=[128, 128, 64],
dueling=True,
),
nstep=3,
discount_factor=0.97,
learn=dict(
batch_size=32,
learning_rate=0.001,
learner=dict(
learner_num=1,
send_policy_freq=1,
),
),
collect=dict(
n_sample=16,
collector=dict(
collector_num=2,
update_policy_second=3,
),
),
eval=dict(evaluator=dict(eval_freq=50, )),
other=dict(
eps=dict(
type='exp',
start=0.95,
end=0.1,
decay=100000,
),
replay_buffer=dict(
replay_buffer_size=100000,
enable_track_used_data=False,
),
commander=dict(
collector_task_space=2,
learner_task_space=1,
eval_interval=5,
),
),
),
)
cartpole_dqn_config = EasyDict(cartpole_dqn_config)
main_config = cartpole_dqn_config
cartpole_dqn_create_config = dict(
env=dict(
type='cartpole',
import_names=['app_zoo.classic_control.cartpole.envs.cartpole_env'],
),
env_manager=dict(type='base'),
policy=dict(type='dqn_command'),
learner=dict(type='base', import_names=['nervex.worker.learner.base_learner']),
collector=dict(
type='zergling',
import_names=['nervex.worker.collector.zergling_collector'],
),
commander=dict(
type='solo',
import_names=['nervex.worker.coordinator.solo_parallel_commander'],
),
comm_learner=dict(
type='flask_fs',
import_names=['nervex.worker.learner.comm.flask_fs_learner'],
),
comm_collector=dict(
type='flask_fs',
import_names=['nervex.worker.collector.comm.flask_fs_collector'],
),
)
cartpole_dqn_create_config = EasyDict(cartpole_dqn_create_config)
create_config = cartpole_dqn_create_config
cartpole_dqn_system_config = dict(
coordinator=dict(
operator_server=dict(
system_addr='http://nervex-server.nervex-system:8080',
api_version='/v1alpha1',
init_replicas_request=dict(
collectors={
"cpu": "3",
"memory": "4Gi",
"replicas": 2,
},
learners={
"cpu": "3",
"memory": "4Gi",
"gpu": "0",
"replicas": 1,
},
),
collector_target_num=2,
learner_target_num=1,
),
),
path_data='/data/nfs/nervex/cartpole/data',
path_policy='/data/nfs/nervex/cartpole/policy',
communication_mode='auto',
learner_multi_gpu=False,
)
cartpole_dqn_system_config = EasyDict(cartpole_dqn_system_config)
system_config = cartpole_dqn_system_config
EOF
nervex -m dist --module config -p k8s -c cartpole_dqn_config.py -s 0;
nervex -m dist --module learner -c cartpole_dqn_config.py.pkl -s 0
ports:
- name: learner
containerPort: 22271
resources:
requests:
cpu: 2
memory: 5Gi
limits:
cpu: 2
memory: 5Gi
containerPort: 22270
volumeMounts:
- name: config
mountPath: /data/nervex
- name: cache-volume
mountPath: /dev/shm
volumes:
- name: config
nfs:
path: /data/nfs/nervex
server: 10.152.197.14
- name: cache-volume
emptyDir:
medium: Memory
sizeLimit: 128Mi
\ No newline at end of file
- name: data-dir
mountPath: /data/nfs/nervex/cartpole
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册