提交 2d2d51b0 编写于 作者: aaronchen2k2k's avatar aaronchen2k2k

testing execution queue

上级 a334d230
......@@ -3,7 +3,6 @@ package main
import (
"flag"
websocketHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/websocket"
"github.com/easysoft/zentaoatf/internal/server/core/cron"
"github.com/easysoft/zentaoatf/internal/server/core/web"
logUtils "github.com/easysoft/zentaoatf/pkg/lib/log"
"os"
......@@ -43,7 +42,6 @@ func main() {
return
}
cron.NewServerCron().Init()
websocketHelper.InitMq()
webServer.Run()
......
......@@ -8,6 +8,8 @@ const (
AppAgent = "agent"
AppCommand = "cmd"
JobTimeout = 60 * 3
ConfigVersion = "3.0"
ConfigDir = "conf"
ConfigFile = "ztf.conf"
......
......@@ -251,3 +251,20 @@ const (
func (e DropPos) Int() int {
return int(e)
}
type ProgressStatus string
const (
ProgressCreated ProgressStatus = "created"
ProgressInProgress ProgressStatus = "in_progress"
ProgressTimeout ProgressStatus = "timeout"
ProgressCompleted ProgressStatus = "completed"
)
type BuildStatus string
const (
StatusCreated BuildStatus = "created"
StatusPass BuildStatus = "pass"
StatusFail BuildStatus = "fail"
)
......@@ -21,6 +21,10 @@ func SendOutputMsg(msg, isRunning string, info iris.Map, wsMsg *websocket.Messag
logUtils.Infof(i118Utils.Sprintf("ws_send_exec_msg", wsMsg.Room,
strings.ReplaceAll(strings.TrimSpace(msg), `%`, `%%`)))
if wsMsg == nil {
return
}
msg = strings.Trim(msg, "\n")
resp := commDomain.WsResp{Msg: msg, Category: commConsts.Output, Info: info}
......@@ -33,6 +37,10 @@ func SendExecMsg(msg, isRunning string, category commConsts.WsMsgCategory, info
logUtils.Infof(i118Utils.Sprintf("ws_send_exec_msg", wsMsg.Room,
strings.ReplaceAll(strings.TrimSpace(msg), `%`, `%%`)))
if wsMsg == nil {
return
}
msg = strings.TrimSpace(msg)
resp := commDomain.WsResp{Msg: msg, IsRunning: isRunning, Category: category, Info: info}
......
package serverConfig
const (
WebCheckInterval = 60 * 60
JobCheckInterval = 15
WsDefaultNameSpace = "default"
WsDefaultRoom = "default"
......
package cache
import (
"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
"sync"
)
const (
IsRunning = "IsRunning"
LastLoopEndTime = "lastLoopEndTime"
CurrJob = "currJob"
)
var (
SyncMap sync.Map
)
func GetCurrJob() (ret model.Job) {
currJobObj, _ := SyncMap.Load(CurrJob)
if currJobObj != nil {
ret = currJobObj.(model.Job)
}
return
}
......@@ -3,16 +3,18 @@ package cron
import (
"fmt"
serverConfig "github.com/easysoft/zentaoatf/internal/server/config"
"github.com/easysoft/zentaoatf/internal/server/core/cache"
"github.com/easysoft/zentaoatf/internal/server/modules/v1/service"
"github.com/easysoft/zentaoatf/pkg/lib/cron"
"github.com/easysoft/zentaoatf/pkg/lib/date"
"github.com/easysoft/zentaoatf/pkg/lib/log"
"github.com/kataras/iris/v12"
"sync"
"time"
)
type ServerCron struct {
syncMap sync.Map
ExecService *service.ExecService `inject:""`
JobService *service.JobService `inject:""`
}
func NewServerCron() *ServerCron {
......@@ -21,27 +23,29 @@ func NewServerCron() *ServerCron {
}
func (s *ServerCron) Init() {
s.syncMap.Store("isRunning", false)
s.syncMap.Store("lastCompletedTime", int64(0))
cache.SyncMap.Store(cache.IsRunning, false)
cache.SyncMap.Store(cache.LastLoopEndTime, int64(0))
cronUtils.AddTask(
"check",
fmt.Sprintf("@every %ds", serverConfig.WebCheckInterval),
"checkJob", fmt.Sprintf("@every %ds", serverConfig.JobCheckInterval),
func() {
isRunning, _ := s.syncMap.Load("isRunning")
lastCompletedTime, _ := s.syncMap.Load("lastCompletedTime")
isRunning, _ := cache.SyncMap.Load(cache.IsRunning)
lastCompletedTime, _ := cache.SyncMap.Load(cache.LastLoopEndTime)
if isRunning.(bool) || time.Now().Unix()-lastCompletedTime.(int64) < serverConfig.WebCheckInterval {
logUtils.Infof("skip this iteration " + dateUtils.DateTimeStr(time.Now()))
if isRunning.(bool) || time.Now().Unix()-lastCompletedTime.(int64) < serverConfig.JobCheckInterval {
logUtils.Infof("skip iteration" + dateUtils.DateTimeStr(time.Now()))
return
}
s.syncMap.Store("isRunning", true)
cache.SyncMap.Store(isRunning, true)
// do somethings
// start
currJob := cache.GetCurrJob()
s.JobService.Check(currJob)
// end
s.syncMap.Store("isRunning", false)
s.syncMap.Store("lastCompletedTime", time.Now().Unix())
cache.SyncMap.Store(isRunning, false)
cache.SyncMap.Store(lastCompletedTime, time.Now().Unix())
},
)
......
......@@ -7,6 +7,7 @@ import (
langHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/lang"
websocketHelper "github.com/easysoft/zentaoatf/internal/pkg/helper/websocket"
"github.com/easysoft/zentaoatf/internal/server/config"
"github.com/easysoft/zentaoatf/internal/server/core/cron"
"github.com/easysoft/zentaoatf/internal/server/core/dao"
"github.com/easysoft/zentaoatf/internal/server/core/module"
v1 "github.com/easysoft/zentaoatf/internal/server/modules/v1"
......@@ -120,11 +121,14 @@ func injectModule(ws *WebServer) {
var g inject.Graph
g.Logger = logrus.StandardLogger()
cron := cron.NewServerCron()
cron.Init()
indexModule := v1.NewIndexModule()
// inject objects
if err := g.Provide(
&inject.Object{Value: dao.GetDB()},
&inject.Object{Value: cron},
&inject.Object{Value: indexModule},
); err != nil {
logrus.Fatalf("provide usecase objects to the Graph: %v", err)
......
......@@ -7,23 +7,24 @@ import (
"github.com/kataras/iris/v12"
)
type ExecCtrl struct {
type JobCtrl struct {
BaseCtrl
TestExecService *service.TestExecService `inject:""`
JobService *service.JobService `inject:""`
ExecService *service.ExecService `inject:""`
}
func NewExecCtrl() *ExecCtrl {
return &ExecCtrl{}
func NewJobCtrl() *JobCtrl {
return &JobCtrl{}
}
func (c *ExecCtrl) Start(ctx iris.Context) {
req := serverDomain.ExecReq{}
func (c *JobCtrl) Add(ctx iris.Context) {
req := serverDomain.JobReq{}
if err := ctx.ReadQuery(&req); err != nil {
ctx.JSON(c.ErrResp(commConsts.CommErr, err.Error()))
return
}
err := c.TestExecService.Start(req, nil)
err := c.JobService.Add(req)
if err != nil {
ctx.JSON(c.ErrResp(commConsts.CommErr, err.Error()))
......@@ -33,14 +34,30 @@ func (c *ExecCtrl) Start(ctx iris.Context) {
ctx.JSON(c.SuccessResp(nil))
}
func (c *ExecCtrl) Stop(ctx iris.Context) {
req := serverDomain.ExecReq{}
func (c *JobCtrl) Remove(ctx iris.Context) {
req := serverDomain.JobReq{}
if err := ctx.ReadQuery(&req); err != nil {
ctx.JSON(c.ErrResp(commConsts.CommErr, err.Error()))
return
}
err := c.TestExecService.Stop(req, nil)
err := c.JobService.Remove(req)
if err != nil {
ctx.JSON(c.ErrResp(commConsts.CommErr, err.Error()))
return
}
ctx.JSON(c.SuccessResp(nil))
}
func (c *JobCtrl) Stop(ctx iris.Context) {
req := serverDomain.JobReq{}
if err := ctx.ReadQuery(&req); err != nil {
ctx.JSON(c.ErrResp(commConsts.CommErr, err.Error()))
return
}
err := c.JobService.Stop()
if err != nil {
ctx.JSON(c.ErrResp(commConsts.CommErr, err.Error()))
return
......
......@@ -22,7 +22,7 @@ type WebSocketCtrl struct {
*websocket.NSConn `stateless:"true"`
WorkspaceService *service.WorkspaceService `inject:""`
TestExecService *service.TestExecService `inject:""`
TestExecService *service.ExecService `inject:""`
}
func NewWebSocketCtrl() *WebSocketCtrl {
......@@ -79,7 +79,7 @@ func (c *WebSocketCtrl) OnChat(wsMsg websocket.Message) (err error) {
watchHelper.WatchFromReq(req.TestSets, &wsMsg)
} else if req.Act == commConsts.ExecStop {
c.TestExecService.Stop(req, &wsMsg)
c.TestExecService.Stop(&wsMsg)
} else {
c.TestExecService.Start(req, &wsMsg)
......
package serverDomain
type JobReq struct {
JobId uint `json:"jobId"`
Name string `json:"name"`
CaseIds []uint `json:"caseIds"`
ProductId int `json:"productId"`
ModuleId int `json:"moduleId"`
SuiteId int `json:"suiteId"`
TaskId int `json:"taskId"`
}
......@@ -18,7 +18,7 @@ type IndexModule struct {
ZentaoModule *index.ZentaoModule `inject:""`
SiteModule *index.SiteModule `inject:""`
ExecModule *index.ExecModule `inject:""`
JobModule *index.JobModule `inject:""`
InterpreterModule *index.InterpreterModule `inject:""`
WorkspaceModule *index.WorkspaceModule `inject:""`
......@@ -54,7 +54,8 @@ func (m *IndexModule) Party() module.WebModule {
m.ZentaoModule.Party(),
m.SiteModule.Party(),
m.ExecModule.Party(),
m.JobModule.Party(),
m.InterpreterModule.Party(),
m.WorkspaceModule.Party(),
m.ProxyModule.Party(),
......
......@@ -7,21 +7,22 @@ import (
"github.com/kataras/iris/v12"
)
type ExecModule struct {
ExecCtrl *controller.ExecCtrl `inject:""`
type JobModule struct {
JobCtrl *controller.JobCtrl `inject:""`
}
func NewExecModule() *ExecModule {
return &ExecModule{}
func NewJobModule() *JobModule {
return &JobModule{}
}
// Party 执行
func (m *ExecModule) Party() module.WebModule {
func (m *JobModule) Party() module.WebModule {
handler := func(index iris.Party) {
index.Use(middleware.InitCheck())
index.Post("/start", m.ExecCtrl.Start).Name = "执行测试"
index.Post("/stop", m.ExecCtrl.Stop).Name = "终止测试"
index.Post("/add", m.JobCtrl.Add).Name = "添加任务到队列"
index.Post("/remove", m.JobCtrl.Remove).Name = "移除队列中任务"
index.Post("/stop", m.JobCtrl.Stop).Name = "终止当前执行的任务"
}
return module.NewModule("/exec", handler)
return module.NewModule("/jobs", handler)
}
package model
import (
commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
"time"
)
type Job struct {
BaseModel
Name string
Priority int
ScmAddress string
ScmAccount string
ScmPassword string
ScmToken string
ProgressStatus commConsts.ProgressStatus
BuildStatus commConsts.BuildStatus
StartTime *time.Time
TimeoutTime *time.Time
TerminateTime *time.Time
ResultTime *time.Time
CaseIds []int `json:"caseIds" gorm:"-"`
ProductId int `json:"productId"`
ModuleId int `json:"moduleId"`
SuiteId int `json:"suiteId"`
TaskId int `json:"taskId"`
}
func NewJob() Job {
task := Job{
ProgressStatus: commConsts.ProgressCreated,
BuildStatus: commConsts.StatusCreated,
}
return task
}
func (Job) TableName() string {
return "biz_job"
}
......@@ -8,5 +8,6 @@ var (
&Proxy{},
&Server{},
&Statistic{},
&Job{},
}
)
package repo
import (
commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
logUtils "github.com/easysoft/zentaoatf/pkg/lib/log"
"github.com/fatih/color"
"gorm.io/gorm"
"time"
)
type JobRepo struct {
DB *gorm.DB `inject:""`
}
func NewJobRepo() *JobRepo {
return &JobRepo{}
}
func (r *JobRepo) ListByProgressStatus(progress commConsts.ProgressStatus) (pos []model.Job, err error) {
err = r.DB.
Where("progress_status = ?", progress).
Where("NOT deleted").
Find(&pos).Error
if err != nil {
logUtils.Errorf(color.RedString("list job failed: %s.", err.Error()))
return
}
return
}
func (r *JobRepo) Get(id uint) (po model.Job, err error) {
err = r.DB.
Where("id = ?", id).
Where("NOT deleted").
First(&po).Error
if err != nil {
logUtils.Errorf(color.RedString("get job by id failed: %s.", err.Error()))
return
}
return
}
func (r *JobRepo) Create(job *model.Job) (err error) {
err = r.DB.Model(&model.Job{}).Create(job).Error
if err != nil {
logUtils.Errorf(color.RedString("create job failed: %s.", err.Error()))
return
}
return
}
func (r *JobRepo) Delete(id uint) (err error) {
err = r.DB.Model(&model.Job{}).Where("id = ?", id).
Updates(map[string]interface{}{"deleted": true}).Error
if err != nil {
logUtils.Errorf("delete job by id error: %s.", err.Error())
return
}
return
}
func (r *JobRepo) SetTimeout(id uint) (err error) {
err = r.DB.Model(&model.Job{}).
Where("id = ?", id).
Updates(map[string]interface{}{
"progress_status": commConsts.ProgressTimeout,
"timeout_time": time.Now(),
}).Error
if err != nil {
logUtils.Errorf("set job timeout error: %s.", err.Error())
return
}
return
}
func (r *JobRepo) QueryForExec() (job model.Job, err error) {
err = r.DB.
Where("progress=?", commConsts.ProgressCreated).
Order("priority ASC").
First(&job).Error
return
}
func (r *JobRepo) UpdateProgressStatus(id uint, progress commConsts.ProgressStatus) (err error) {
err = r.DB.Model(&model.Job{}).
Where("id = ?", id).
Updates(map[string]interface{}{
"progress_status": progress,
"start_time": time.Now(),
}).Error
if err != nil {
logUtils.Errorf("set job progress error: %s.", err.Error())
return
}
return
}
......@@ -16,15 +16,15 @@ var (
ch chan int
)
type TestExecService struct {
type ExecService struct {
WorkspaceService *WorkspaceService `inject:""`
}
func NewTestExecService() *TestExecService {
return &TestExecService{}
func NewTestExecService() *ExecService {
return &ExecService{}
}
func (s *TestExecService) Start(req serverDomain.ExecReq, wsMsg *websocket.Message) (err error) {
func (s *ExecService) Start(req serverDomain.ExecReq, wsMsg *websocket.Message) (err error) {
if execHelper.GetRunning() && req.Act != commConsts.ExecStop {
msg := i118Utils.Sprintf("pls_stop_previous")
websocketHelper.SendExecMsg(msg, "true", commConsts.Run, nil, wsMsg)
......@@ -62,7 +62,7 @@ func (s *TestExecService) Start(req serverDomain.ExecReq, wsMsg *websocket.Messa
return
}
func (s *TestExecService) Stop(req serverDomain.ExecReq, wsMsg *websocket.Message) (err error) {
func (s *ExecService) Stop(wsMsg *websocket.Message) (err error) {
if ch != nil {
if !execHelper.GetRunning() {
ch = nil
......
package service
import (
commConsts "github.com/easysoft/zentaoatf/internal/pkg/consts"
"github.com/easysoft/zentaoatf/internal/server/core/cache"
serverDomain "github.com/easysoft/zentaoatf/internal/server/modules/v1/domain"
"github.com/easysoft/zentaoatf/internal/server/modules/v1/model"
"github.com/easysoft/zentaoatf/internal/server/modules/v1/repo"
"github.com/jinzhu/copier"
"strconv"
"time"
)
type JobService struct {
JobRepo *repo.JobRepo `inject:""`
ExecService *ExecService `inject:""`
}
func NewJobService() *JobService {
return &JobService{}
}
func (s *JobService) Add(req serverDomain.JobReq) (err error) {
job := model.Job{}
copier.CopyWithOption(&job, req, copier.Option{
IgnoreEmpty: true,
DeepCopy: true,
})
s.JobRepo.Create(&job)
return
}
func (s *JobService) Remove(req serverDomain.JobReq) (err error) {
currJob := cache.GetCurrJob()
if currJob.ID != 0 && currJob.ID == req.JobId {
err = s.Stop()
}
s.JobRepo.Delete(req.JobId)
return
}
func (s *JobService) Stop() (err error) {
s.ExecService.Stop(nil)
return
}
func (s *JobService) Check(currJob model.Job) (err error) {
s.CheckJob()
s.CheckTimeout()
return
}
func (s *JobService) CheckJob() (err error) {
job, err := s.JobRepo.QueryForExec()
if err != nil {
return
}
s.Run(job)
return
}
func (s JobService) Run(job model.Job) (err error) {
testSet := serverDomain.TestSet{
Cases: s.convertIntToStrArr(job.CaseIds),
}
req := serverDomain.ExecReq{
ProductId: job.ProductId,
ModuleId: job.ModuleId,
SuiteId: job.SuiteId,
TaskId: job.TaskId,
TestSets: []serverDomain.TestSet{testSet},
}
err = s.ExecService.Start(req, nil)
if err != nil {
return
}
s.JobRepo.UpdateProgressStatus(job.ID, commConsts.ProgressInProgress)
return
}
func (s JobService) CheckTimeout() (err error) {
currJob := cache.GetCurrJob()
pos, err := s.JobRepo.ListByProgressStatus(commConsts.ProgressInProgress)
if err != nil {
return
}
for _, po := range pos {
if time.Now().Unix()-po.StartTime.Unix() > commConsts.JobTimeout {
if currJob.ID != 0 && currJob.ID == po.ID {
err = s.Stop()
}
err = s.JobRepo.SetTimeout(currJob.ID)
}
}
return
}
func (s *JobService) convertIntToStrArr(ids []int) (ret []string) {
for _, item := range ids {
ret = append(ret, strconv.Itoa(item))
}
return
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册