work_pool.go 2.2 KB
Newer Older
Y
yangrui07 已提交
1 2 3 4 5 6
package agent

import (
	"errors"
	"fmt"
	"sync"
Y
yangrui07 已提交
7
	"sync/atomic"
Y
yangrui07 已提交
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
)

type (
	workType struct {
		poolWorker    PoolWorker
		resultChannel chan error
	}

	WorkPool struct {
		queueChannel    chan workType
		workChannel     chan PoolWorker
		queuedWorkNum   int32
		activeWorkerNum int32
		queueCapacity   int32
		workFilter      sync.Map
	}
)

type PoolWorker interface {
	Token() string
	DoWork()
}

func NewWorkPool(workerNum int, queueCapacity int32) *WorkPool {
	workPool := WorkPool{
		queueChannel:    make(chan workType),
		workChannel:     make(chan PoolWorker, queueCapacity),
		queuedWorkNum:   0,
		activeWorkerNum: 0,
		queueCapacity:   queueCapacity,
	}

	for i := 0; i < workerNum; i++ {
		go workPool.startWorkRoutine()
	}

	go workPool.startQueueRoutine()

	return &workPool
}

func (workPool *WorkPool) startWorkRoutine() {
	for {
		select {
		case work := <-workPool.workChannel:
			workPool.doWork(work)
			break
		}
	}
}

func (workPool *WorkPool) startQueueRoutine() {
	for {
		select {
		case queueItem := <-workPool.queueChannel:
			if atomic.AddInt32(&workPool.queuedWorkNum, 0) == workPool.queueCapacity {
				queueItem.resultChannel <- fmt.Errorf("work pool fulled with %v pending works", QueueCapacity)
				continue
			}

			atomic.AddInt32(&workPool.queuedWorkNum, 1)

			workPool.workChannel <- queueItem.poolWorker

			queueItem.resultChannel <- nil

			break
		}
	}
}

func (workPool *WorkPool) doWork(poolWorker PoolWorker) {
	defer atomic.AddInt32(&workPool.activeWorkerNum, -1)
	defer workPool.workFilter.Delete(poolWorker.Token())

	atomic.AddInt32(&workPool.queuedWorkNum, -1)
	atomic.AddInt32(&workPool.activeWorkerNum, 1)

	poolWorker.DoWork()
}

func (workPool *WorkPool) PostWorkWithToken(poolWorker PoolWorker) (err error) {
	if _, ok := workPool.workFilter.Load(poolWorker.Token()); ok {
		return errors.New("another work with same key is doing.")
	}
	workPool.workFilter.Store(poolWorker.Token(), true)
	return workPool.PostWork(poolWorker)
}

func (workPool *WorkPool) PostWork(poolWorker PoolWorker) (err error) {
	work := workType{poolWorker, make(chan error)}

	defer close(work.resultChannel)

	workPool.queueChannel <- work

	err = <-work.resultChannel

	return err
}