提交 7b53b814 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 ab52d28c
...@@ -1306,6 +1306,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { ...@@ -1306,6 +1306,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
colDataAssign(pDst, pSrc, src->info.rows, &src->info); colDataAssign(pDst, pSrc, src->info.rows, &src->info);
} }
uint32_t cap = dst->info.capacity; uint32_t cap = dst->info.capacity;
dst->info = src->info; dst->info = src->info;
dst->info.capacity = cap; dst->info.capacity = cap;
......
...@@ -153,14 +153,13 @@ typedef struct { ...@@ -153,14 +153,13 @@ typedef struct {
SSchemaWrapper* qsw; SSchemaWrapper* qsw;
} SSchemaInfo; } SSchemaInfo;
typedef struct SExecTaskInfo { struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
int64_t version; // used for stream to record wal version int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo; SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo; SSchemaInfo schemaInfo;
...@@ -171,7 +170,8 @@ typedef struct SExecTaskInfo { ...@@ -171,7 +170,8 @@ typedef struct SExecTaskInfo {
SSubplan* pSubplan; SSubplan* pSubplan;
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
SLocalFetch localFetch; SLocalFetch localFetch;
} SExecTaskInfo; SArray* pResultBlockList;// result block list
};
enum { enum {
OP_NOT_OPENED = 0x0, OP_NOT_OPENED = 0x0,
......
...@@ -536,7 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo ...@@ -536,7 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
} }
taosArrayClearEx(pResList, freeBlock); taosArrayClear(pResList);
int64_t curOwner = 0; int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
...@@ -574,8 +574,20 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo ...@@ -574,8 +574,20 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t blockIndex = 0;
while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) { while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
SSDataBlock* p = createOneDataBlock(pRes, true); SSDataBlock* p = NULL;
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
SSDataBlock* p1 = createOneDataBlock(pRes, true);
taosArrayPush(pTaskInfo->pResultBlockList, &p1);
p = p1;
} else {
p = *(SSDataBlock**) taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
copyDataBlock(p, pRes);
}
blockIndex += 1;
current += p->info.rows; current += p->info.rows;
ASSERT(p->info.rows > 0); ASSERT(p->info.rows > 0);
taosArrayPush(pResList, &p); taosArrayPush(pResList, &p);
......
...@@ -2600,6 +2600,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -2600,6 +2600,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model; pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate(); pTaskInfo->pTableInfoList = tableListCreate();
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
char* p = taosMemoryCalloc(1, 128); char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
...@@ -3201,6 +3202,11 @@ _complete: ...@@ -3201,6 +3202,11 @@ _complete:
return terrno; return terrno;
} }
static void freeBlock(void* pParam) {
SSDataBlock* pBlock = *(SSDataBlock**)pParam;
blockDataDestroy(pBlock);
}
void doDestroyTask(SExecTaskInfo* pTaskInfo) { void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
...@@ -3213,6 +3219,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -3213,6 +3219,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
nodesDestroyNode((SNode*)pTaskInfo->pSubplan); nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
} }
taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo); taosMemoryFreeClear(pTaskInfo);
......
...@@ -18,11 +18,6 @@ SQWorkerMgmt gQwMgmt = { ...@@ -18,11 +18,6 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
static void freeBlock(void *param) {
SSDataBlock *pBlock = *(SSDataBlock **)param;
blockDataDestroy(pBlock);
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
...@@ -193,7 +188,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -193,7 +188,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
_return: _return:
taosArrayDestroyEx(pResList, freeBlock); taosArrayDestroy(pResList);
QW_RET(code); QW_RET(code);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册