提交 b982f133 编写于 作者: R richardxz

update job's "rerun" function

上级 0b648032
......@@ -29,11 +29,16 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"reflect"
"strings"
"kubesphere.io/kubesphere/pkg/client"
)
var k8sClient *kubernetes.Clientset
const retryTimes = 3
func (ctl *JobCtl) generateObject(item v1.Job) *Job {
var status, displayName string
......@@ -134,11 +139,13 @@ func (ctl *JobCtl) initListerAndInformer() {
object := obj.(*v1.Job)
mysqlObject := ctl.generateObject(*object)
ctl.makeRevision(object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Job)
mysqlObject := ctl.generateObject(*object)
ctl.makeRevision(object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
......@@ -186,41 +193,41 @@ func getRevisions(job v1.Job) (JobRevisions, error) {
err := json.Unmarshal([]byte(revisionsStr), &revisions)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
return nil, fmt.Errorf("failed to rerun job %s", job.Name)
return nil, fmt.Errorf("failed to get job %s's revisions, reason: %s", job.Name, err)
}
}
return revisions, nil
}
func getStatus(item *v1.Job) JobStatus {
var status JobStatus
func getCurrentRevision(item *v1.Job) JobRevision {
var revision JobRevision
for _, condition := range item.Status.Conditions {
if condition.Type == "Failed" && condition.Status == "True" {
status.Status = Failed
status.Reasons = append(status.Reasons, condition.Reason)
status.Messages = append(status.Messages, condition.Message)
revision.Status = Failed
revision.Reasons = append(revision.Reasons, condition.Reason)
revision.Messages = append(revision.Messages, condition.Message)
}
if condition.Type == "Complete" && condition.Status == "True" {
status.Status = Completed
revision.Status = Completed
}
}
if len(status.Status) == 0 {
status.Status = Unfinished
if len(revision.Status) == 0 {
revision.Status = Running
}
status.DesirePodNum = *item.Spec.Completions
status.Succeed = item.Status.Succeeded
status.Failed = item.Status.Failed
status.StartTime = item.Status.StartTime.Time
revision.DesirePodNum = *item.Spec.Completions
revision.Succeed = item.Status.Succeeded
revision.Failed = item.Status.Failed
revision.StartTime = item.CreationTimestamp.Time
revision.Uid = string(item.UID)
if item.Status.CompletionTime != nil {
status.CompletionTime = item.Status.CompletionTime.Time
revision.CompletionTime = item.Status.CompletionTime.Time
}
return status
return revision
}
func deleteJob(namespace, job string) error {
......@@ -229,46 +236,81 @@ func deleteJob(namespace, job string) error {
return err
}
func (ctl *JobCtl) makeRevision(job *v1.Job) {
revisionIndex := -1
revisions, err := getRevisions(*job)
if err != nil {
glog.Error(err)
return
}
uid := job.UID
for index, revision := range revisions {
if revision.Uid == string(uid) {
currentRevision := getCurrentRevision(job)
if reflect.DeepEqual(currentRevision, revision) {
return
} else {
revisionIndex = index
break
}
}
}
if revisionIndex == -1 {
revisionIndex = len(revisions) + 1
}
revisions[revisionIndex] = getCurrentRevision(job)
revisionsByte, err := json.Marshal(revisions)
if err != nil {
glog.Error(err)
}
if job.Annotations == nil {
job.Annotations = make(map[string]string)
}
job.Annotations["revisions"] = string(revisionsByte)
ctl.K8sClient.BatchV1().Jobs(job.Namespace).Update(job)
}
func JobReRun(namespace, jobName string) (string, error) {
k8sClient = client.NewK8sClient()
job, err := k8sClient.BatchV1().Jobs(namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
return "", err
}
newJob := *job
newJob.ResourceVersion = ""
newJob.Status = v1.JobStatus{}
newJob.ObjectMeta.UID = ""
newJob.Annotations["revisions"] = strings.Replace(job.Annotations["revisions"], Running, Unfinished, -1)
delete(newJob.Spec.Selector.MatchLabels, "controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "controller-uid")
revisions, err := getRevisions(*job)
if err != nil {
return "", err
}
index := len(revisions) + 1
value := getStatus(job)
revisions[index] = value
revisionsByte, err := json.Marshal(revisions)
err = deleteJob(namespace, jobName)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
glog.Errorf("failed to rerun job %s, reason: %s", jobName, err)
return "", fmt.Errorf("failed to rerun job %s", jobName)
}
newJob.Annotations["revisions"] = string(revisionsByte)
err = deleteJob(job.Namespace, job.Name)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
return "", fmt.Errorf("failed to rerun job %s", jobName)
for i := 0; i < retryTimes; i++ {
_, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob)
if err != nil {
time.Sleep(time.Second)
continue
}
break
}
_, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
glog.Errorf("failed to rerun job %s, reason: %s", jobName, err)
return "", fmt.Errorf("failed to rerun job %s", jobName)
}
......
......@@ -284,15 +284,16 @@ type StorageClass struct {
Provisioner string `json:"provisioner"`
}
type JobRevisions map[int]JobStatus
type JobRevisions map[int]JobRevision
type JobStatus struct {
type JobRevision struct {
Status string `json:"status"`
Reasons []string `json:"reasons"`
Messages []string `json:"messages"`
Succeed int32 `json:"succeed"`
DesirePodNum int32 `json:"desire"`
Failed int32 `json:"failed"`
Uid string `json:"uid"`
StartTime time.Time `json:"start-time"`
CompletionTime time.Time `json:"completion-time"`
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册