job.go 6.6 KB
Newer Older
aaronchen2k2k's avatar
aaronchen2k2k 已提交
1 2 3
package service

import (
Z
zhaoke 已提交
4 5 6 7 8 9 10 11
	"errors"
	"fmt"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"time"

aaronchen2k2k's avatar
aaronchen2k2k 已提交
12
	commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
13
	commDomain "github.com/easysoft/zentaoatf/internal/pkg/domain"
14 15 16 17
	analysisHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/analysis"
	execHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/exec"
	scriptHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/script"
	zentaoHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/zentao"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
18
	serverConfig "github.com/easysoft/zentaoatf/internal/server/config"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
19 20 21
	serverDomain "github.com/easysoft/zentaoatf/internal/server/modules/v1/domain"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
	"github.com/easysoft/zentaoatf/internal/server/modules/v1/repo"
22 23
	channelUtils "github.com/easysoft/zentaoatf/pkg/lib/channel"
	fileUtils "github.com/easysoft/zentaoatf/pkg/lib/file"
aaronchen2k2k's avatar
aaronchen2k2k 已提交
24 25
)

26 27 28 29
var (
	channelMap sync.Map
)

aaronchen2k2k's avatar
aaronchen2k2k 已提交
30
type JobService struct {
31
	JobRepo *repo.JobRepo `inject:""`
aaronchen2k2k's avatar
aaronchen2k2k 已提交
32 33 34 35 36 37
}

func NewJobService() *JobService {
	return &JobService{}
}

38 39 40 41 42 43 44 45 46 47 48 49
func (s *JobService) Add(req serverDomain.ZentaoExecReq) (err error) {
	po := model.Job{
		Workspace: req.Workspace,
		Path:      req.Path,
		Ids:       req.Ids,

		Task:   req.Task,
		Retry:  1,
		Status: commConsts.JobCreated,
	}

	s.JobRepo.Save(&po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
50 51 52 53

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
54
func (s *JobService) Start(po *model.Job) {
55 56
	ch := make(chan int, 1)
	channelMap.Store(po.ID, ch)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
57

aaronchen2k2k's avatar
aaronchen2k2k 已提交
58
	req := s.genExecReq(*po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
59

60
	go func() {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
61
		s.JobRepo.UpdateStatus(po, commConsts.JobInprogress, true, false)
62

Z
zhaoke 已提交
63
		err := execHelper.Exec(nil, req, nil)
64

aaronchen2k2k's avatar
aaronchen2k2k 已提交
65
		s.JobRepo.UpdateStatus(po, commConsts.JobCompleted, false, true)
66

Z
zhaoke 已提交
67
		// s.SubmitJobStatus(*po)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
68

Z
zhaoke 已提交
69
		s.SubmitExecResult(*po, err)
70 71 72 73 74 75

		if ch != nil {
			channelMap.Delete(po.ID)
			close(ch)
		}
	}()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
76 77
}

78 79
func (s *JobService) Cancel(id uint) {
	taskInfo, _ := s.JobRepo.Get(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
80

81 82
	if taskInfo.ID > 0 {
		s.JobRepo.SetCanceled(taskInfo)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
83 84
	}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
85 86 87 88 89 90 91 92 93 94
	s.stop(id)
}

func (s *JobService) Restart(po *model.Job) (ret bool) {
	s.stop(po.ID)
	s.Start(po)

	s.JobRepo.AddRetry(po)

	return
aaronchen2k2k's avatar
aaronchen2k2k 已提交
95 96
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
97
func (s *JobService) stop(id uint) {
98
	chVal, ok := channelMap.Load(id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
99

100 101 102 103 104 105 106 107 108 109 110 111 112 113
	if !ok || chVal == nil {
		return
	}

	channelMap.Delete(id)

	ch := chVal.(chan int)
	if ch != nil {
		if !channelUtils.IsChanClose(ch) {
			ch <- 1
		}

		ch = nil
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
114 115
}

116
func (s *JobService) Check() (err error) {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
117
	taskMap, _ := s.Query()
118 119 120 121 122

	toStartNewJob := false
	if len(taskMap.Inprogress) > 0 {
		runningJob := taskMap.Inprogress[0]

aaronchen2k2k's avatar
aaronchen2k2k 已提交
123 124
		if s.IsError(*runningJob) || s.IsTimeout(*runningJob) || s.isEmpty() {
			if s.NeedRetry(*runningJob) {
125 126
				s.Restart(runningJob)
			} else {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
127 128 129
				s.JobRepo.UpdateStatus(runningJob, commConsts.JobFailed, false, true)
				s.SubmitJobStatus(*runningJob)

130 131 132 133 134 135
				toStartNewJob = true
			}
		}

	} else {
		toStartNewJob = true
aaronchen2k2k's avatar
aaronchen2k2k 已提交
136 137
	}

138 139 140 141 142
	if toStartNewJob && len(taskMap.Created) > 0 {
		newJob := taskMap.Created[0]

		s.Start(newJob)
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
143 144 145 146

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
func (s *JobService) List(status string) (jobs []model.Job, err error) {
	status = strings.TrimSpace(status)
	jobs, err = s.JobRepo.ListByStatus(status)

	return
}

func (s *JobService) Query() (ret serverDomain.JobQueryResp, err error) {
	//ret = serverDomain.JobQueryResp{
	//	Created:    make([]model.Job, 0),
	//	Inprogress: make([]model.Job, 0),
	//	Canceled:   make([]model.Job, 0),
	//	Completed:  make([]model.Job, 0),
	//	Failed:     make([]model.Job, 0),
	//}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
162

163
	pos, _ := s.JobRepo.Query()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
164

165 166 167 168 169
	for _, po := range pos {
		status := po.Status
		if status == commConsts.JobTimeout || status == commConsts.JobError {
			status = commConsts.JobInprogress
		}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
170

171
		if status == commConsts.JobCreated {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
172
			ret.Created = append(ret.Created, &po)
173
		} else if status == commConsts.JobInprogress {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
174
			ret.Inprogress = append(ret.Inprogress, &po)
175
		} else if status == commConsts.JobCanceled {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
176
			ret.Canceled = append(ret.Canceled, &po)
177
		} else if status == commConsts.JobCompleted {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
178
			ret.Completed = append(ret.Completed, &po)
179
		} else if status == commConsts.JobFailed {
aaronchen2k2k's avatar
aaronchen2k2k 已提交
180
			ret.Failed = append(ret.Failed, &po)
181 182
		}
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
183 184 185 186

	return
}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
187 188 189 190
func (s *JobService) SubmitJobStatus(job model.Job) (err error) {
	status := serverDomain.ZentaoJobSubmitReq{
		Task:      job.Task,
		Status:    job.Status,
Z
zhaoke 已提交
191 192
		StartTime: (*job.StartDate).Format("2006-01-02 15:04:05"),
		EndTime:   (*job.EndDate).Format("2006-01-02 15:04:05"),
aaronchen2k2k's avatar
aaronchen2k2k 已提交
193
		RetryTime: job.Retry,
Z
zhaoke 已提交
194 195
		Error:     "",
		Data:      "",
aaronchen2k2k's avatar
aaronchen2k2k 已提交
196 197 198 199 200
	}

	config := commDomain.WorkspaceConf{
		Url: serverConfig.CONFIG.Server,
	}
Z
zhaoke 已提交
201
	err = zentaoHelper.JobCommitResult(status, config)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
202 203 204 205

	return
}

Z
zhaoke 已提交
206
func (s *JobService) SubmitExecResult(job model.Job, execErr error) (err error) {
207 208 209 210
	result := serverDomain.ZentaoResultSubmitReq{
		Task: job.Task,
		Seq:  commConsts.ExecLogDir,
	}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
211

Z
zhaoke 已提交
212 213 214 215 216 217 218 219 220
	reportPth := filepath.Join(result.Seq, commConsts.ResultJson)
	var report commDomain.ZtfReport
	if fileUtils.FileExist(reportPth) {
		report, err = analysisHelper.ReadReportByPath(reportPth)
	} else {
		err = errors.New("case not found")
	}
	if err != nil && execErr == nil {
		execErr = err
aaronchen2k2k's avatar
aaronchen2k2k 已提交
221 222
	}

aaronchen2k2k's avatar
aaronchen2k2k 已提交
223 224 225
	config := commDomain.WorkspaceConf{
		Url: serverConfig.CONFIG.Server,
	}
Z
zhaoke 已提交
226 227 228 229 230 231 232 233

	ret := serverDomain.ZentaoJobSubmitReq{
		Task:      job.Task,
		Status:    job.Status,
		StartTime: (*job.StartDate).Format("2006-01-02 15:04:05"),
		EndTime:   (*job.EndDate).Format("2006-01-02 15:04:05"),
		RetryTime: job.Retry,
		Error:     fmt.Sprintf("%v", execErr),
Z
zhaoke 已提交
234
		Data:      report,
Z
zhaoke 已提交
235 236
	}
	err = zentaoHelper.JobCommitResult(ret, config)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
237

238 239 240 241 242 243 244 245 246
	return
}

func (s *JobService) genExecReq(po model.Job) (req serverDomain.ExecReq) {
	caseIds := make([]int, 0)
	for _, idStr := range strings.Split(po.Ids, ",") {
		id, err := strconv.Atoi(idStr)
		if err == nil {
			caseIds = append(caseIds, id)
aaronchen2k2k's avatar
aaronchen2k2k 已提交
247 248 249
		}
	}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
	dir := po.Path
	if !fileUtils.IsAbsolutePath(dir) {
		dir = filepath.Join(po.Workspace, dir)
	}

	caseIdMap := map[int]string{}
	scriptHelper.GetScriptByIdsInDir(dir, &caseIdMap)

	cases := scriptHelper.GetCaseByListInMap(caseIds, caseIdMap)

	commConsts.ExecFrom = commConsts.FromZentao
	req.Act = commConsts.ExecCase
	req.ScriptDirParamFromCmdLine = "."
	req.TestSets = append(req.TestSets, serverDomain.TestSet{
		WorkspacePath: po.Workspace,
		Cases:         cases,
	})

aaronchen2k2k's avatar
aaronchen2k2k 已提交
268 269 270
	return
}

271 272 273
func (s *JobService) IsError(po model.Job) bool {
	return po.Status == commConsts.JobError
}
aaronchen2k2k's avatar
aaronchen2k2k 已提交
274

275 276
func (s *JobService) IsTimeout(po model.Job) bool {
	dur := time.Now().Unix() - po.StartDate.Unix()
aaronchen2k2k's avatar
aaronchen2k2k 已提交
277
	// return dur > 3
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
	return po.Status == commConsts.JobInprogress && dur > commConsts.JobTimeoutTime
}

func (s *JobService) NeedRetry(po model.Job) bool {
	return po.Retry < commConsts.JobRetryTime
}

func (s *JobService) isEmpty() bool {
	length := 0

	channelMap.Range(func(key, value interface{}) bool {
		length++
		return true
	})

	return length == 0
aaronchen2k2k's avatar
aaronchen2k2k 已提交
294
}