task_scheduler.go 7.0 KB
Newer Older
Z
zhenshan.cao 已提交
1 2 3 4
package proxy

import (
	"container/list"
5
	"context"
Z
zhenshan.cao 已提交
6 7
	"log"
	"sync"
Z
zhenshan.cao 已提交
8 9

	"github.com/zilliztech/milvus-distributed/internal/allocator"
Z
zhenshan.cao 已提交
10 11
)

12
type BaseTaskQueue struct {
Z
zhenshan.cao 已提交
13
	unissuedTasks *list.List
B
bigsheeper 已提交
14
	activeTasks   map[Timestamp]task
Z
zhenshan.cao 已提交
15 16 17 18
	utLock        sync.Mutex
	atLock        sync.Mutex
}

19
func (queue *BaseTaskQueue) Empty() bool {
Z
zhenshan.cao 已提交
20 21 22 23 24 25 26
	queue.utLock.Lock()
	defer queue.utLock.Unlock()
	queue.atLock.Lock()
	defer queue.atLock.Unlock()
	return queue.unissuedTasks.Len() <= 0 && len(queue.activeTasks) <= 0
}

B
bigsheeper 已提交
27
func (queue *BaseTaskQueue) AddUnissuedTask(t task) {
Z
zhenshan.cao 已提交
28 29 30 31 32
	queue.utLock.Lock()
	defer queue.utLock.Unlock()
	queue.unissuedTasks.PushBack(t)
}

B
bigsheeper 已提交
33
func (queue *BaseTaskQueue) FrontUnissuedTask() task {
Z
zhenshan.cao 已提交
34 35 36 37 38 39
	queue.utLock.Lock()
	defer queue.utLock.Unlock()
	if queue.unissuedTasks.Len() <= 0 {
		log.Fatal("sorry, but the unissued task list is empty!")
		return nil
	}
B
bigsheeper 已提交
40
	return queue.unissuedTasks.Front().Value.(task)
Z
zhenshan.cao 已提交
41 42
}

B
bigsheeper 已提交
43
func (queue *BaseTaskQueue) PopUnissuedTask() task {
Z
zhenshan.cao 已提交
44 45 46 47 48 49 50
	queue.utLock.Lock()
	defer queue.utLock.Unlock()
	if queue.unissuedTasks.Len() <= 0 {
		log.Fatal("sorry, but the unissued task list is empty!")
		return nil
	}
	ft := queue.unissuedTasks.Front()
B
bigsheeper 已提交
51
	return queue.unissuedTasks.Remove(ft).(task)
Z
zhenshan.cao 已提交
52 53
}

B
bigsheeper 已提交
54
func (queue *BaseTaskQueue) AddActiveTask(t task) {
Z
zhenshan.cao 已提交
55 56
	queue.atLock.Lock()
	defer queue.atLock.Lock()
B
bigsheeper 已提交
57
	ts := t.EndTs()
Z
zhenshan.cao 已提交
58 59
	_, ok := queue.activeTasks[ts]
	if ok {
60
		log.Fatalf("task with timestamp %v already in active task list!", ts)
Z
zhenshan.cao 已提交
61 62 63 64
	}
	queue.activeTasks[ts] = t
}

B
bigsheeper 已提交
65
func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task {
Z
zhenshan.cao 已提交
66 67 68 69 70 71 72 73 74 75 76
	queue.atLock.Lock()
	defer queue.atLock.Lock()
	t, ok := queue.activeTasks[ts]
	if ok {
		delete(queue.activeTasks, ts)
		return t
	}
	log.Fatalf("sorry, but the timestamp %d was not found in the active task list!", ts)
	return nil
}

C
cai.zhang 已提交
77
func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task {
78 79 80
	queue.utLock.Lock()
	defer queue.utLock.Lock()
	for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
C
cai.zhang 已提交
81
		if e.Value.(task).ID() == reqID {
B
bigsheeper 已提交
82
			return e.Value.(task)
83 84 85 86 87 88
		}
	}

	queue.atLock.Lock()
	defer queue.atLock.Unlock()
	for ats := range queue.activeTasks {
C
cai.zhang 已提交
89
		if queue.activeTasks[ats].ID() == reqID {
90 91 92 93 94 95 96 97
			return queue.activeTasks[ats]
		}
	}

	return nil
}

func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
Z
zhenshan.cao 已提交
98 99 100
	queue.utLock.Lock()
	defer queue.utLock.Unlock()
	for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
B
bigsheeper 已提交
101
		if e.Value.(task).EndTs() >= ts {
Z
zhenshan.cao 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
			return false
		}
	}

	queue.atLock.Lock()
	defer queue.atLock.Unlock()
	for ats := range queue.activeTasks {
		if ats >= ts {
			return false
		}
	}

	return true
}

B
bigsheeper 已提交
117
type DdTaskQueue struct {
118 119 120 121
	BaseTaskQueue
	lock sync.Mutex
}

B
bigsheeper 已提交
122
type DmTaskQueue struct {
123 124 125
	BaseTaskQueue
}

B
bigsheeper 已提交
126
type DqTaskQueue struct {
127 128 129
	BaseTaskQueue
}

B
bigsheeper 已提交
130
func (queue *DdTaskQueue) Enqueue(t task) error {
Z
zhenshan.cao 已提交
131 132
	queue.lock.Lock()
	defer queue.lock.Unlock()
133
	// TODO: set Ts, ReqID, ProxyID
Z
zhenshan.cao 已提交
134 135 136 137
	queue.AddUnissuedTask(t)
	return nil
}

B
bigsheeper 已提交
138
func (queue *DmTaskQueue) Enqueue(t task) error {
139
	// TODO: set Ts, ReqID, ProxyID
Z
zhenshan.cao 已提交
140 141 142 143
	queue.AddUnissuedTask(t)
	return nil
}

B
bigsheeper 已提交
144
func (queue *DqTaskQueue) Enqueue(t task) error {
145
	// TODO: set Ts, ReqID, ProxyID
Z
zhenshan.cao 已提交
146 147 148 149
	queue.AddUnissuedTask(t)
	return nil
}

B
bigsheeper 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
func NewDdTaskQueue() *DdTaskQueue {
	return &DdTaskQueue{
		BaseTaskQueue: BaseTaskQueue{
			unissuedTasks: list.New(),
			activeTasks:   make(map[Timestamp]task),
		},
	}
}

func NewDmTaskQueue() *DmTaskQueue {
	return &DmTaskQueue{
		BaseTaskQueue: BaseTaskQueue{
			unissuedTasks: list.New(),
			activeTasks:   make(map[Timestamp]task),
		},
	}
}

func NewDqTaskQueue() *DqTaskQueue {
	return &DqTaskQueue{
		BaseTaskQueue: BaseTaskQueue{
			unissuedTasks: list.New(),
			activeTasks:   make(map[Timestamp]task),
		},
	}
}

177
type TaskScheduler struct {
B
bigsheeper 已提交
178 179 180
	DdQueue *DdTaskQueue
	DmQueue *DmTaskQueue
	DqQueue *DqTaskQueue
Z
zhenshan.cao 已提交
181

C
cai.zhang 已提交
182
	idAllocator  *allocator.IDAllocator
Z
zhenshan.cao 已提交
183 184
	tsoAllocator *allocator.TimestampAllocator

185 186 187
	wg     sync.WaitGroup
	ctx    context.Context
	cancel context.CancelFunc
Z
zhenshan.cao 已提交
188 189
}

Z
zhenshan.cao 已提交
190
func NewTaskScheduler(ctx context.Context,
C
cai.zhang 已提交
191
	idAllocator *allocator.IDAllocator,
Z
zhenshan.cao 已提交
192 193 194
	tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) {
	ctx1, cancel := context.WithCancel(ctx)
	s := &TaskScheduler{
B
bigsheeper 已提交
195 196 197
		DdQueue:      NewDdTaskQueue(),
		DmQueue:      NewDmTaskQueue(),
		DqQueue:      NewDqTaskQueue(),
Z
zhenshan.cao 已提交
198 199 200 201 202 203 204 205 206
		idAllocator:  idAllocator,
		tsoAllocator: tsoAllocator,
		ctx:          ctx1,
		cancel:       cancel,
	}

	return s, nil
}

B
bigsheeper 已提交
207
func (sched *TaskScheduler) scheduleDdTask() task {
Z
zhenshan.cao 已提交
208 209 210
	return sched.DdQueue.PopUnissuedTask()
}

B
bigsheeper 已提交
211
func (sched *TaskScheduler) scheduleDmTask() task {
Z
zhenshan.cao 已提交
212 213 214
	return sched.DmQueue.PopUnissuedTask()
}

B
bigsheeper 已提交
215
func (sched *TaskScheduler) scheduleDqTask() task {
Z
zhenshan.cao 已提交
216 217 218
	return sched.DqQueue.PopUnissuedTask()
}

C
cai.zhang 已提交
219 220
func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task {
	if t := sched.DdQueue.getTaskByReqID(collMeta); t != nil {
221 222
		return t
	}
C
cai.zhang 已提交
223
	if t := sched.DmQueue.getTaskByReqID(collMeta); t != nil {
224 225
		return t
	}
C
cai.zhang 已提交
226
	if t := sched.DqQueue.getTaskByReqID(collMeta); t != nil {
227 228 229 230 231
		return t
	}
	return nil
}

232 233 234
func (sched *TaskScheduler) definitionLoop() {
	defer sched.wg.Done()
	defer sched.cancel()
235 236 237 238 239 240 241 242 243

	for {
		if sched.DdQueue.Empty() {
			continue
		}

		//sched.DdQueue.atLock.Lock()
		t := sched.scheduleDdTask()

B
bigsheeper 已提交
244
		err := t.PreExecute()
245 246 247
		if err != nil {
			return
		}
B
bigsheeper 已提交
248
		err = t.Execute()
249 250 251
		if err != nil {
			log.Printf("execute definition task failed, error = %v", err)
		}
B
bigsheeper 已提交
252
		t.Notify(err)
253 254 255

		sched.DdQueue.AddActiveTask(t)

B
bigsheeper 已提交
256 257
		t.WaitToFinish()
		t.PostExecute()
258

B
bigsheeper 已提交
259
		sched.DdQueue.PopActiveTask(t.EndTs())
260
	}
261 262 263 264 265 266 267 268 269
}

func (sched *TaskScheduler) manipulationLoop() {
	defer sched.wg.Done()
	defer sched.cancel()

	for {
		if sched.DmQueue.Empty() {
			continue
Z
zhenshan.cao 已提交
270
		}
271 272 273 274

		sched.DmQueue.atLock.Lock()
		t := sched.scheduleDmTask()

B
bigsheeper 已提交
275
		if err := t.PreExecute(); err != nil {
276
			return
Z
zhenshan.cao 已提交
277
		}
278 279

		go func() {
B
bigsheeper 已提交
280
			err := t.Execute()
281 282
			if err != nil {
				log.Printf("execute manipulation task failed, error = %v", err)
Z
zhenshan.cao 已提交
283
			}
B
bigsheeper 已提交
284
			t.Notify(err)
285 286 287 288 289 290
		}()

		sched.DmQueue.AddActiveTask(t)
		sched.DmQueue.atLock.Unlock()

		go func() {
B
bigsheeper 已提交
291 292
			t.WaitToFinish()
			t.PostExecute()
293 294

			// remove from active list
B
bigsheeper 已提交
295
			sched.DmQueue.PopActiveTask(t.EndTs())
296 297 298 299 300 301 302
		}()
	}
}

func (sched *TaskScheduler) queryLoop() {
	defer sched.wg.Done()
	defer sched.cancel()
303 304 305 306 307 308 309 310 311

	for {
		if sched.DqQueue.Empty() {
			continue
		}

		sched.DqQueue.atLock.Lock()
		t := sched.scheduleDqTask()

B
bigsheeper 已提交
312
		if err := t.PreExecute(); err != nil {
313 314 315 316
			return
		}

		go func() {
B
bigsheeper 已提交
317
			err := t.Execute()
318 319 320
			if err != nil {
				log.Printf("execute query task failed, error = %v", err)
			}
B
bigsheeper 已提交
321
			t.Notify(err)
322 323 324 325 326 327
		}()

		sched.DqQueue.AddActiveTask(t)
		sched.DqQueue.atLock.Unlock()

		go func() {
B
bigsheeper 已提交
328 329
			t.WaitToFinish()
			t.PostExecute()
330 331

			// remove from active list
B
bigsheeper 已提交
332
			sched.DqQueue.PopActiveTask(t.EndTs())
333 334
		}()
	}
335 336
}

Z
zhenshan.cao 已提交
337
func (sched *TaskScheduler) Start() error {
338 339 340 341 342 343
	sched.wg.Add(3)

	go sched.definitionLoop()
	go sched.manipulationLoop()
	go sched.queryLoop()

Z
zhenshan.cao 已提交
344 345 346
	return nil
}

347 348 349 350 351 352
func (sched *TaskScheduler) Close() {
	sched.cancel()
	sched.wg.Wait()
}

func (sched *TaskScheduler) TaskDoneTest(ts Timestamp) bool {
Z
zhenshan.cao 已提交
353 354 355 356 357
	ddTaskDone := sched.DdQueue.TaskDoneTest(ts)
	dmTaskDone := sched.DmQueue.TaskDoneTest(ts)
	dqTaskDone := sched.DqQueue.TaskDoneTest(ts)
	return ddTaskDone && dmTaskDone && dqTaskDone
}