job.go 6.0 KB
Newer Older
aaronchen2k2k's avatar
aaronchen2k2k 已提交
1 2 3 4
package service

import (
	commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
5
	commDomain "github.com/easysoft/zentaoatf/internal/pkg/domain"
6 7 8 9
	analysisHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/analysis"
	execHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/exec"
	scriptHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/script"
	zentaoHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/zentao"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
10
	serverConfig "github.com/easysoft/zentaoatf/internal/server/config"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
11 12 13
	serverDomain "github.com/easysoft/zentaoatf/internal/server/modules/v1/domain"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/repo"
14 15 16
	channelUtils "github.com/easysoft/zentaoatf/pkg/lib/channel"
	fileUtils "github.com/easysoft/zentaoatf/pkg/lib/file"
	"path/filepath"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
17
	"strconv"
18 19
	"strings"
	"sync"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
20 21 22
	"time"
)

23 24 25 26
var (
	channelMap sync.Map
)

aaronchen2k2k's avatar
aaronchen2k2k 已提交
27
type JobService struct {
28
	JobRepo *repo.JobRepo `inject:""`
aaronchen2k2k's avatar
aaronchen2k2k 已提交
29 30 31 32 33 34
}

func NewJobService() *JobService {
	return &JobService{}
}

35 36 37 38 39 40 41 42 43 44 45 46
func (s *JobService) Add(req serverDomain.ZentaoExecReq) (err error) {
	po := model.Job{
		Workspace: req.Workspace,
		Path:      req.Path,
		Ids:       req.Ids,

		Task:   req.Task,
		Retry:  1,
		Status: commConsts.JobCreated,
	}

	s.JobRepo.Save(&po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
47 48 49 50

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
51
func (s *JobService) Start(po *model.Job) {
52 53
	ch := make(chan int, 1)
	channelMap.Store(po.ID, ch)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
54

aaronchen2k2k's avatar
aaronchen2k2k 已提交
55
	req := s.genExecReq(*po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
56

57
	go func() {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
58
		s.JobRepo.UpdateStatus(po, commConsts.JobInprogress, true, false)
59 60 61

		execHelper.Exec(nil, req, nil)

aaronchen2k2k's avatar
aaronchen2k2k 已提交
62
		s.JobRepo.UpdateStatus(po, commConsts.JobCompleted, false, true)
63

aaronchen2k2k's avatar
aaronchen2k2k 已提交
64 65 66
		s.SubmitJobStatus(*po)

		s.SubmitExecResult(*po)
67 68 69 70 71 72

		if ch != nil {
			channelMap.Delete(po.ID)
			close(ch)
		}
	}()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
73 74
}

75 76
func (s *JobService) Cancel(id uint) {
	taskInfo, _ := s.JobRepo.Get(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
77

78 79
	if taskInfo.ID > 0 {
		s.JobRepo.SetCanceled(taskInfo)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
80 81
	}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
82 83 84 85 86 87 88 89 90 91
	s.stop(id)
}

func (s *JobService) Restart(po *model.Job) (ret bool) {
	s.stop(po.ID)
	s.Start(po)

	s.JobRepo.AddRetry(po)

	return
aaronchen2k2k's avatar
aaronchen2k2k 已提交
92 93
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
94
func (s *JobService) stop(id uint) {
95
	chVal, ok := channelMap.Load(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110
	if !ok || chVal == nil {
		return
	}

	channelMap.Delete(id)

	ch := chVal.(chan int)
	if ch != nil {
		if !channelUtils.IsChanClose(ch) {
			ch <- 1
		}

		ch = nil
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
111 112
}

113
func (s *JobService) Check() (err error) {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
114
	taskMap, _ := s.Query()
115 116 117 118 119

	toStartNewJob := false
	if len(taskMap.Inprogress) > 0 {
		runningJob := taskMap.Inprogress[0]

aaronchen2k2k's avatar
aaronchen2k2k 已提交
120 121
		if s.IsError(*runningJob) || s.IsTimeout(*runningJob) || s.isEmpty() {
			if s.NeedRetry(*runningJob) {
122 123
				s.Restart(runningJob)
			} else {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
124 125 126
				s.JobRepo.UpdateStatus(runningJob, commConsts.JobFailed, false, true)
				s.SubmitJobStatus(*runningJob)

127 128 129 130 131 132
				toStartNewJob = true
			}
		}

	} else {
		toStartNewJob = true
aaronchen2k2k's avatar
aaronchen2k2k 已提交
133 134
	}

135 136 137 138 139
	if toStartNewJob && len(taskMap.Created) > 0 {
		newJob := taskMap.Created[0]

		s.Start(newJob)
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
140 141 142 143

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
func (s *JobService) List(status string) (jobs []model.Job, err error) {
	status = strings.TrimSpace(status)
	jobs, err = s.JobRepo.ListByStatus(status)

	return
}

func (s *JobService) Query() (ret serverDomain.JobQueryResp, err error) {
	//ret = serverDomain.JobQueryResp{
	//	Created:    make([]model.Job, 0),
	//	Inprogress: make([]model.Job, 0),
	//	Canceled:   make([]model.Job, 0),
	//	Completed:  make([]model.Job, 0),
	//	Failed:     make([]model.Job, 0),
	//}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
159

160
	pos, _ := s.JobRepo.Query()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
161

162 163 164 165 166
	for _, po := range pos {
		status := po.Status
		if status == commConsts.JobTimeout || status == commConsts.JobError {
			status = commConsts.JobInprogress
		}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
167

168
		if status == commConsts.JobCreated {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
169
			ret.Created = append(ret.Created, &po)
170
		} else if status == commConsts.JobInprogress {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
171
			ret.Inprogress = append(ret.Inprogress, &po)
172
		} else if status == commConsts.JobCanceled {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
173
			ret.Canceled = append(ret.Canceled, &po)
174
		} else if status == commConsts.JobCompleted {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
175
			ret.Completed = append(ret.Completed, &po)
176
		} else if status == commConsts.JobFailed {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
177
			ret.Failed = append(ret.Failed, &po)
178 179
		}
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
180 181 182 183

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
func (s *JobService) SubmitJobStatus(job model.Job) (err error) {
	status := serverDomain.ZentaoJobSubmitReq{
		Task:      job.Task,
		Status:    job.Status,
		StartTime: *job.StartDate,
		EndTime:   *job.EndDate,
		RetryTime: job.Retry,
	}

	config := commDomain.WorkspaceConf{
		Url: serverConfig.CONFIG.Server,
	}
	err = zentaoHelper.CommitStatus(status, config)

	return
}

func (s *JobService) SubmitExecResult(job model.Job) (err error) {
202 203 204 205
	result := serverDomain.ZentaoResultSubmitReq{
		Task: job.Task,
		Seq:  commConsts.ExecLogDir,
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
206

207
	report, err := analysisHelper.ReadReportByPath(filepath.Join(result.Seq, commConsts.ResultJson))
aaronchen2k2k's avatar
aaronchen2k2k 已提交
208 209 210 211
	if err != nil {
		return
	}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
212 213 214
	config := commDomain.WorkspaceConf{
		Url: serverConfig.CONFIG.Server,
	}
215
	err = zentaoHelper.CommitResult(report, result.ProductId, result.TaskId, result.Task, config, nil)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
216

217 218 219 220 221 222 223 224 225
	return
}

func (s *JobService) genExecReq(po model.Job) (req serverDomain.ExecReq) {
	caseIds := make([]int, 0)
	for _, idStr := range strings.Split(po.Ids, ",") {
		id, err := strconv.Atoi(idStr)
		if err == nil {
			caseIds = append(caseIds, id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
226 227 228
		}
	}

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
	dir := po.Path
	if !fileUtils.IsAbsolutePath(dir) {
		dir = filepath.Join(po.Workspace, dir)
	}

	caseIdMap := map[int]string{}
	scriptHelper.GetScriptByIdsInDir(dir, &caseIdMap)

	cases := scriptHelper.GetCaseByListInMap(caseIds, caseIdMap)

	commConsts.ExecFrom = commConsts.FromZentao
	req.Act = commConsts.ExecCase
	req.ScriptDirParamFromCmdLine = "."
	req.TestSets = append(req.TestSets, serverDomain.TestSet{
		WorkspacePath: po.Workspace,
		Cases:         cases,
	})

aaronchen2k2k's avatar
aaronchen2k2k 已提交
247 248 249
	return
}

250 251 252
func (s *JobService) IsError(po model.Job) bool {
	return po.Status == commConsts.JobError
}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
253

254 255
func (s *JobService) IsTimeout(po model.Job) bool {
	dur := time.Now().Unix() - po.StartDate.Unix()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
256
	// return dur > 3
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
	return po.Status == commConsts.JobInprogress && dur > commConsts.JobTimeoutTime
}

func (s *JobService) NeedRetry(po model.Job) bool {
	return po.Retry < commConsts.JobRetryTime
}

func (s *JobService) isEmpty() bool {
	length := 0

	channelMap.Range(func(key, value interface{}) bool {
		length++
		return true
	})

	return length == 0
aaronchen2k2k's avatar
aaronchen2k2k 已提交
273
}