diff --git a/src/modules/transfer/backend/query.go b/src/modules/transfer/backend/query.go index 25f689f381233d46c79654cb730f01fb3a5460dc..70185037c6449b5c6804200930e2c580ec3c9096 100644 --- a/src/modules/transfer/backend/query.go +++ b/src/modules/transfer/backend/query.go @@ -20,11 +20,19 @@ import ( ) func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { - resp := []*dataobj.TsdbQueryResponse{} workerNum := 100 - worker := make(chan struct{}, workerNum) //控制goroutine并发数 + worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数 dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) + done := make(chan struct{}, 1) + resp := make([]*dataobj.TsdbQueryResponse, 0) + go func() { + defer func() { done <- struct{}{} }() + for d := range dataChan { + resp = append(resp, d) + } + }() + for _, input := range inputs { for _, endpoint := range input.Endpoints { for _, counter := range input.Counters { @@ -34,29 +42,32 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse { } } - //等待所有goroutine执行完成 + // 等待所有 goroutine 执行完成 for i := 0; i < workerNum; i++ { worker <- struct{}{} } - close(dataChan) - for { - d, ok := <-dataChan - if !ok { - break - } - resp = append(resp, d) - } + + // 等待所有 dataChan 被消费完 + <-done return resp } func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { - resp := []*dataobj.TsdbQueryResponse{} workerNum := 100 - worker := make(chan struct{}, workerNum) //控制goroutine并发数 + worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数 dataChan := make(chan *dataobj.TsdbQueryResponse, 20000) + done := make(chan struct{}, 1) + resp := make([]*dataobj.TsdbQueryResponse, 0) + go func() { + defer func() { done <- struct{}{} }() + for d := range dataChan { + resp = append(resp, d) + } + }() + for _, endpoint := range input.Endpoints { if len(input.Tags) == 0 { counter, err := GetCounter(input.Metric, "", nil) @@ -85,16 +96,10 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { } close(dataChan) - for { - d, ok := <-dataChan - if !ok { - break - } - resp = append(resp, d) - } + <-done //进行数据计算 - aggrDatas := []*dataobj.TsdbQueryResponse{} + aggrDatas := make([]*dataobj.TsdbQueryResponse, 0) if input.AggrFunc != "" && len(resp) > 1 { aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)