diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index bb9d1e44af784c661a13d5f483b257b242b2a15c..a9a84c186033f8b9574f3ef3f22fed10e6794207 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = taosArrayInit(2, sizeof(SQWorker)); + pool->workers = taosArrayInit(2, sizeof(SQWorker *)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -153,20 +153,21 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { int32_t size = taosArrayGetSize(pool->workers); for (int32_t i = 0; i < size; ++i) { - SQWorker *worker = taosArrayGet(pool->workers, i); + SQWorker *worker = taosArrayGetP(pool->workers, i); if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } } for (int32_t i = 0; i < size; ++i) { - SQWorker *worker = taosArrayGet(pool->workers, i); + SQWorker *worker = taosArrayGetP(pool->workers, i); if (taosCheckPthreadValid(worker->thread)) { uInfo("worker:%s:%d is stopping", pool->name, worker->id); taosThreadJoin(worker->thread, NULL); taosThreadClear(&worker->thread); uInfo("worker:%s:%d is stopped", pool->name, worker->id); } + taosMemoryFree(worker); } taosArrayDestroy(pool->workers); @@ -218,27 +219,28 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem int32_t curWorkerNum = taosArrayGetSize(pool->workers); int32_t dstWorkerNum = ceil(queueNum * pool->ratio); if (dstWorkerNum < 1) dstWorkerNum = 1; - // spawn a thread to process queue + // spawn a thread to process queue while (curWorkerNum < dstWorkerNum) { - SQWorker wobj = { - .id = (int32_t)taosArrayGetSize(pool->workers), - .pool = pool, - }; - SQWorker *worker = taosArrayPush(pool->workers, &wobj); - if (worker == NULL) { - uError("worker:%s:%d failed to create, total:%d", pool->name, wobj.id, (int32_t)taosArrayGetSize(pool->workers)); + SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker)); + if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) { + uError("worker:%s:%d failed to create", pool->name, curWorkerNum); + taosMemoryFree(worker); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + worker->id = curWorkerNum; + worker->pool = pool; + TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) { - uError("worker:%s:%d failed to create thread, total:%d", pool->name, wobj.id, - (int32_t)taosArrayGetSize(pool->workers)); + uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum); + (void)taosArrayPop(pool->workers); + taosMemoryFree(worker); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL;