job.go 5.5 KB
Newer Older
aaronchen2k2k's avatar
aaronchen2k2k 已提交
1 2 3 4
package service

import (
	commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
5 6 7 8 9
	analysisHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/analysis"
	configHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/config"
	execHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/exec"
	scriptHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/script"
	zentaoHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/zentao"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
10 11 12
	serverDomain "github.com/easysoft/zentaoatf/internal/server/modules/v1/domain"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/repo"
13 14 15
	channelUtils "github.com/easysoft/zentaoatf/pkg/lib/channel"
	fileUtils "github.com/easysoft/zentaoatf/pkg/lib/file"
	"path/filepath"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
16
	"strconv"
17 18
	"strings"
	"sync"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
19 20 21
	"time"
)

22 23 24 25
var (
	channelMap sync.Map
)

aaronchen2k2k's avatar
aaronchen2k2k 已提交
26
type JobService struct {
27
	JobRepo *repo.JobRepo `inject:""`
aaronchen2k2k's avatar
aaronchen2k2k 已提交
28 29 30 31 32 33
}

func NewJobService() *JobService {
	return &JobService{}
}

34 35 36 37 38 39 40 41 42 43 44 45
func (s *JobService) Add(req serverDomain.ZentaoExecReq) (err error) {
	po := model.Job{
		Workspace: req.Workspace,
		Path:      req.Path,
		Ids:       req.Ids,

		Task:   req.Task,
		Retry:  1,
		Status: commConsts.JobCreated,
	}

	s.JobRepo.Save(&po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
46 47 48 49

	return
}

50 51 52
func (s *JobService) Start(po model.Job) {
	ch := make(chan int, 1)
	channelMap.Store(po.ID, ch)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
53

54
	req := s.genExecReq(po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
55

56 57 58 59 60 61 62 63 64 65 66 67 68 69
	go func() {
		s.JobRepo.UpdateStatus(po.ID, commConsts.JobInprogress, true, false)

		execHelper.Exec(nil, req, nil)

		s.JobRepo.UpdateStatus(po.ID, commConsts.JobCompleted, false, true)

		s.SubmitResult(po)

		if ch != nil {
			channelMap.Delete(po.ID)
			close(ch)
		}
	}()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
70 71
}

72 73
func (s *JobService) Cancel(id uint) {
	taskInfo, _ := s.JobRepo.Get(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
74

75 76
	if taskInfo.ID > 0 {
		s.JobRepo.SetCanceled(taskInfo)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
77 78
	}

79
	s.Stop(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
80 81
}

82 83
func (s *JobService) Stop(id uint) {
	chVal, ok := channelMap.Load(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
84

85 86 87 88 89 90 91 92 93 94 95 96 97 98
	if !ok || chVal == nil {
		return
	}

	channelMap.Delete(id)

	ch := chVal.(chan int)
	if ch != nil {
		if !channelUtils.IsChanClose(ch) {
			ch <- 1
		}

		ch = nil
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
99 100
}

101
func (s *JobService) Restart(po model.Job) (ret bool) {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
102 103
	//s.Cancel(po.ID)
	s.Stop(po.ID)
104 105 106
	s.Start(po)

	s.JobRepo.AddRetry(po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
107 108 109 110

	return
}

111
func (s *JobService) Check() (err error) {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
112
	taskMap, _ := s.Query()
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128

	toStartNewJob := false
	if len(taskMap.Inprogress) > 0 {
		runningJob := taskMap.Inprogress[0]

		if s.IsError(runningJob) || s.IsTimeout(runningJob) || s.isEmpty() {
			if s.NeedRetry(runningJob) {
				s.Restart(runningJob)
			} else {
				s.JobRepo.SetFailed(runningJob)
				toStartNewJob = true
			}
		}

	} else {
		toStartNewJob = true
aaronchen2k2k's avatar
aaronchen2k2k 已提交
129 130
	}

131 132 133 134 135
	if toStartNewJob && len(taskMap.Created) > 0 {
		newJob := taskMap.Created[0]

		s.Start(newJob)
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
136 137 138 139

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
func (s *JobService) List(status string) (jobs []model.Job, err error) {
	status = strings.TrimSpace(status)
	jobs, err = s.JobRepo.ListByStatus(status)

	return
}

func (s *JobService) Query() (ret serverDomain.JobQueryResp, err error) {
	//ret = serverDomain.JobQueryResp{
	//	Created:    make([]model.Job, 0),
	//	Inprogress: make([]model.Job, 0),
	//	Canceled:   make([]model.Job, 0),
	//	Completed:  make([]model.Job, 0),
	//	Failed:     make([]model.Job, 0),
	//}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
155

156
	pos, _ := s.JobRepo.Query()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
157

158 159 160 161 162
	for _, po := range pos {
		status := po.Status
		if status == commConsts.JobTimeout || status == commConsts.JobError {
			status = commConsts.JobInprogress
		}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
163

164 165 166 167 168 169 170 171 172 173 174 175
		if status == commConsts.JobCreated {
			ret.Created = append(ret.Created, po)
		} else if status == commConsts.JobInprogress {
			ret.Inprogress = append(ret.Inprogress, po)
		} else if status == commConsts.JobCanceled {
			ret.Canceled = append(ret.Canceled, po)
		} else if status == commConsts.JobCompleted {
			ret.Completed = append(ret.Completed, po)
		} else if status == commConsts.JobFailed {
			ret.Failed = append(ret.Failed, po)
		}
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
176 177 178 179

	return
}

180 181 182 183 184
func (s *JobService) SubmitResult(job model.Job) (err error) {
	result := serverDomain.ZentaoResultSubmitReq{
		Task: job.Task,
		Seq:  commConsts.ExecLogDir,
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
185

186
	report, err := analysisHelper.ReadReportByPath(filepath.Join(result.Seq, commConsts.ResultJson))
aaronchen2k2k's avatar
aaronchen2k2k 已提交
187 188 189 190
	if err != nil {
		return
	}

191 192
	config := configHelper.LoadByWorkspacePath(commConsts.ZtfDir)
	err = zentaoHelper.CommitResult(report, result.ProductId, result.TaskId, result.Task, config, nil)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
193

194 195 196 197 198 199 200 201 202
	return
}

func (s *JobService) genExecReq(po model.Job) (req serverDomain.ExecReq) {
	caseIds := make([]int, 0)
	for _, idStr := range strings.Split(po.Ids, ",") {
		id, err := strconv.Atoi(idStr)
		if err == nil {
			caseIds = append(caseIds, id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
203 204 205
		}
	}

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	dir := po.Path
	if !fileUtils.IsAbsolutePath(dir) {
		dir = filepath.Join(po.Workspace, dir)
	}

	caseIdMap := map[int]string{}
	scriptHelper.GetScriptByIdsInDir(dir, &caseIdMap)

	cases := scriptHelper.GetCaseByListInMap(caseIds, caseIdMap)

	commConsts.ExecFrom = commConsts.FromZentao
	req.Act = commConsts.ExecCase
	req.ScriptDirParamFromCmdLine = "."
	req.TestSets = append(req.TestSets, serverDomain.TestSet{
		WorkspacePath: po.Workspace,
		Cases:         cases,
	})

aaronchen2k2k's avatar
aaronchen2k2k 已提交
224 225 226
	return
}

227 228 229
func (s *JobService) IsError(po model.Job) bool {
	return po.Status == commConsts.JobError
}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
230

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
func (s *JobService) IsTimeout(po model.Job) bool {
	dur := time.Now().Unix() - po.StartDate.Unix()
	//return dur > 3
	return po.Status == commConsts.JobInprogress && dur > commConsts.JobTimeoutTime
}

func (s *JobService) NeedRetry(po model.Job) bool {
	return po.Retry < commConsts.JobRetryTime
}

func (s *JobService) isEmpty() bool {
	length := 0

	channelMap.Range(func(key, value interface{}) bool {
		length++
		return true
	})

	return length == 0
aaronchen2k2k's avatar
aaronchen2k2k 已提交
250
}