job.go 6.6 KB
Newer Older
aaronchen2k2k's avatar
aaronchen2k2k 已提交
1 2 3
package service

import (
Z
zhaoke 已提交
4 5 6 7 8 9 10 11
	"errors"
	"fmt"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"time"

aaronchen2k2k's avatar
aaronchen2k2k 已提交
12
	commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
13
	commDomain "github.com/easysoft/zentaoatf/internal/pkg/domain"
14 15 16 17
	analysisHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/analysis"
	execHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/exec"
	scriptHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/script"
	zentaoHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/zentao"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
18
	serverConfig "github.com/easysoft/zentaoatf/internal/server/config"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
19 20 21
	serverDomain "github.com/easysoft/zentaoatf/internal/server/modules/v1/domain"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/repo"
22 23
	channelUtils "github.com/easysoft/zentaoatf/pkg/lib/channel"
	fileUtils "github.com/easysoft/zentaoatf/pkg/lib/file"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
24 25
)

26 27 28 29
var (
	channelMap sync.Map
)

aaronchen2k2k's avatar
aaronchen2k2k 已提交
30
type JobService struct {
31
	JobRepo *repo.JobRepo `inject:""`
aaronchen2k2k's avatar
aaronchen2k2k 已提交
32 33 34 35 36 37
}

func NewJobService() *JobService {
	return &JobService{}
}

38 39 40 41 42
func (s *JobService) Add(req serverDomain.ZentaoExecReq) (err error) {
	po := model.Job{
		Workspace: req.Workspace,
		Path:      req.Path,
		Ids:       req.Ids,
43
		Cmd:       req.Cmd,
44 45 46 47 48 49 50

		Task:   req.Task,
		Retry:  1,
		Status: commConsts.JobCreated,
	}

	s.JobRepo.Save(&po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
51 52 53 54

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
55
func (s *JobService) Start(po *model.Job) {
56 57
	ch := make(chan int, 1)
	channelMap.Store(po.ID, ch)
雨爱无痕 已提交
58
	commConsts.ExecFrom = commConsts.FromZentao
aaronchen2k2k's avatar
aaronchen2k2k 已提交
59

60
	req := s.genExecReqFromJob(*po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
61

62
	go func() {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
63
		s.JobRepo.UpdateStatus(po, commConsts.JobInprogress, true, false)
64

Z
zhaoke 已提交
65
		err := execHelper.Exec(nil, req, nil)
66

aaronchen2k2k's avatar
aaronchen2k2k 已提交
67
		s.JobRepo.UpdateStatus(po, commConsts.JobCompleted, false, true)
68

Z
zhaoke 已提交
69
		// s.SubmitJobStatus(*po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
70

Z
zhaoke 已提交
71
		s.SubmitExecResult(*po, err)
72 73 74 75 76 77

		if ch != nil {
			channelMap.Delete(po.ID)
			close(ch)
		}
	}()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
78 79
}

80 81
func (s *JobService) Cancel(id uint) {
	taskInfo, _ := s.JobRepo.Get(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
82

83 84
	if taskInfo.ID > 0 {
		s.JobRepo.SetCanceled(taskInfo)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
85 86
	}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
87 88 89 90 91 92 93 94 95 96
	s.stop(id)
}

func (s *JobService) Restart(po *model.Job) (ret bool) {
	s.stop(po.ID)
	s.Start(po)

	s.JobRepo.AddRetry(po)

	return
aaronchen2k2k's avatar
aaronchen2k2k 已提交
97 98
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
99
func (s *JobService) stop(id uint) {
100
	chVal, ok := channelMap.Load(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
101

102 103 104 105 106 107 108 109 110 111 112 113 114 115
	if !ok || chVal == nil {
		return
	}

	channelMap.Delete(id)

	ch := chVal.(chan int)
	if ch != nil {
		if !channelUtils.IsChanClose(ch) {
			ch <- 1
		}

		ch = nil
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
116 117
}

118
func (s *JobService) Check() (err error) {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
119
	taskMap, _ := s.Query()
120 121 122 123 124

	toStartNewJob := false
	if len(taskMap.Inprogress) > 0 {
		runningJob := taskMap.Inprogress[0]

aaronchen2k2k's avatar
aaronchen2k2k 已提交
125 126
		if s.IsError(*runningJob) || s.IsTimeout(*runningJob) || s.isEmpty() {
			if s.NeedRetry(*runningJob) {
127 128
				s.Restart(runningJob)
			} else {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
129 130 131
				s.JobRepo.UpdateStatus(runningJob, commConsts.JobFailed, false, true)
				s.SubmitJobStatus(*runningJob)

132 133 134 135 136 137
				toStartNewJob = true
			}
		}

	} else {
		toStartNewJob = true
aaronchen2k2k's avatar
aaronchen2k2k 已提交
138 139
	}

140 141 142 143 144
	if toStartNewJob && len(taskMap.Created) > 0 {
		newJob := taskMap.Created[0]

		s.Start(newJob)
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
145 146 147 148

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
func (s *JobService) List(status string) (jobs []model.Job, err error) {
	status = strings.TrimSpace(status)
	jobs, err = s.JobRepo.ListByStatus(status)

	return
}

func (s *JobService) Query() (ret serverDomain.JobQueryResp, err error) {
	//ret = serverDomain.JobQueryResp{
	//	Created:    make([]model.Job, 0),
	//	Inprogress: make([]model.Job, 0),
	//	Canceled:   make([]model.Job, 0),
	//	Completed:  make([]model.Job, 0),
	//	Failed:     make([]model.Job, 0),
	//}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
164

165
	pos, _ := s.JobRepo.Query()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
166

167 168 169 170 171
	for _, po := range pos {
		status := po.Status
		if status == commConsts.JobTimeout || status == commConsts.JobError {
			status = commConsts.JobInprogress
		}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
172

173
		if status == commConsts.JobCreated {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
174
			ret.Created = append(ret.Created, &po)
175
		} else if status == commConsts.JobInprogress {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
176
			ret.Inprogress = append(ret.Inprogress, &po)
177
		} else if status == commConsts.JobCanceled {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
178
			ret.Canceled = append(ret.Canceled, &po)
179
		} else if status == commConsts.JobCompleted {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
180
			ret.Completed = append(ret.Completed, &po)
181
		} else if status == commConsts.JobFailed {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
182
			ret.Failed = append(ret.Failed, &po)
183 184
		}
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
185 186 187 188

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
189 190 191 192
func (s *JobService) SubmitJobStatus(job model.Job) (err error) {
	status := serverDomain.ZentaoJobSubmitReq{
		Task:      job.Task,
		Status:    job.Status,
Z
zhaoke 已提交
193 194
		StartTime: (*job.StartDate).Format("2006-01-02 15:04:05"),
		EndTime:   (*job.EndDate).Format("2006-01-02 15:04:05"),
aaronchen2k2k's avatar
aaronchen2k2k 已提交
195
		RetryTime: job.Retry,
Z
zhaoke 已提交
196 197
		Error:     "",
		Data:      "",
aaronchen2k2k's avatar
aaronchen2k2k 已提交
198 199 200 201 202
	}

	config := commDomain.WorkspaceConf{
		Url: serverConfig.CONFIG.Server,
	}
Z
zhaoke 已提交
203
	err = zentaoHelper.JobCommitResult(status, config)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
204 205 206 207

	return
}

Z
zhaoke 已提交
208
func (s *JobService) SubmitExecResult(job model.Job, execErr error) (err error) {
209 210 211 212
	result := serverDomain.ZentaoResultSubmitReq{
		Task: job.Task,
		Seq:  commConsts.ExecLogDir,
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
213

Z
zhaoke 已提交
214 215 216 217 218 219 220 221 222
	reportPth := filepath.Join(result.Seq, commConsts.ResultJson)
	var report commDomain.ZtfReport
	if fileUtils.FileExist(reportPth) {
		report, err = analysisHelper.ReadReportByPath(reportPth)
	} else {
		err = errors.New("case not found")
	}
	if err != nil && execErr == nil {
		execErr = err
aaronchen2k2k's avatar
aaronchen2k2k 已提交
223 224
	}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
225 226 227
	config := commDomain.WorkspaceConf{
		Url: serverConfig.CONFIG.Server,
	}
Z
zhaoke 已提交
228 229 230 231 232 233 234 235

	ret := serverDomain.ZentaoJobSubmitReq{
		Task:      job.Task,
		Status:    job.Status,
		StartTime: (*job.StartDate).Format("2006-01-02 15:04:05"),
		EndTime:   (*job.EndDate).Format("2006-01-02 15:04:05"),
		RetryTime: job.Retry,
		Error:     fmt.Sprintf("%v", execErr),
Z
zhaoke 已提交
236
		Data:      report,
Z
zhaoke 已提交
237 238
	}
	err = zentaoHelper.JobCommitResult(ret, config)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
239

240 241 242
	return
}

243
func (s *JobService) genExecReqFromJob(po model.Job) (req serverDomain.ExecReq) {
244 245 246 247 248
	caseIds := make([]int, 0)
	for _, idStr := range strings.Split(po.Ids, ",") {
		id, err := strconv.Atoi(idStr)
		if err == nil {
			caseIds = append(caseIds, id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
249 250 251
		}
	}

252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
	dir := po.Path
	if !fileUtils.IsAbsolutePath(dir) {
		dir = filepath.Join(po.Workspace, dir)
	}

	caseIdMap := map[int]string{}
	scriptHelper.GetScriptByIdsInDir(dir, &caseIdMap)

	cases := scriptHelper.GetCaseByListInMap(caseIds, caseIdMap)

	req.Act = commConsts.ExecCase
	req.ScriptDirParamFromCmdLine = "."
	req.TestSets = append(req.TestSets, serverDomain.TestSet{
		WorkspacePath: po.Workspace,
		Cases:         cases,
267
		Cmd:           po.Cmd,
268 269
	})

aaronchen2k2k's avatar
aaronchen2k2k 已提交
270 271 272
	return
}

273 274 275
func (s *JobService) IsError(po model.Job) bool {
	return po.Status == commConsts.JobError
}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
276

277 278
func (s *JobService) IsTimeout(po model.Job) bool {
	dur := time.Now().Unix() - po.StartDate.Unix()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
279
	// return dur > 3
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
	return po.Status == commConsts.JobInprogress && dur > commConsts.JobTimeoutTime
}

func (s *JobService) NeedRetry(po model.Job) bool {
	return po.Retry < commConsts.JobRetryTime
}

func (s *JobService) isEmpty() bool {
	length := 0

	channelMap.Range(func(key, value interface{}) bool {
		length++
		return true
	})

	return length == 0
aaronchen2k2k's avatar
aaronchen2k2k 已提交
296
}