提交 550d276d 编写于 作者: H Haojun Liao

[td-11818] Fix bug in select *

上级 971b1d28
......@@ -461,7 +461,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
}
qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId);
setQueryKilled(pQInfo);
setTaskKilled(pQInfo);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
......@@ -634,7 +634,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
setTaskKilled(pQInfo);
// wait query stop
int32_t loop = 0;
......
......@@ -250,9 +250,8 @@ typedef struct SExecTaskInfo {
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads
char *sql; // query sql string
jmp_buf env; //
struct SOperatorInfo *pRoot;
......@@ -622,8 +621,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t nu
int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters);
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableReq *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger);
......@@ -645,20 +642,18 @@ void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(STaskAttr* pQueryAttr);
//void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void setTaskKilled(SExecTaskInfo *pTaskInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void queryCostStatis(SExecTaskInfo *pTaskInfo);
void doDestroyTask(SQInfo *pQInfo);
void doDestroyTask(SExecTaskInfo *pTaskInfo);
void freeQueryAttr(STaskAttr *pQuery);
int32_t getMaximumIdleDurationSec();
......
......@@ -121,11 +121,19 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
}
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) {
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueSize(pDispatcher->pDataBlocks));
return false;
}
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
}
return NULL != pBuf->pData;
}
......
......@@ -149,7 +149,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
qError("QID:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
(void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code;
......@@ -160,7 +160,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
}
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
qDebug("QID:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
}
......@@ -169,12 +169,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret;
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
qDebug("QID:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
tstrerror(pTaskInfo->code));
return pTaskInfo->code;
}
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
qDebug("QID:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
bool newgroup = false;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -190,8 +190,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
*useconds = pTaskInfo->cost.elapsedTime;
}
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
GET_TASKID(pTaskInfo), 0, 0L, 0);
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
pTaskInfo->totalRows += current;
qDebug("QID:0x%" PRIx64 " task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0);
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
......@@ -200,14 +203,14 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
if (pQInfo == NULL) {
qError("QInfo invalid qhandle");
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
*buildRes = false;
if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code);
qDebug("QID:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code);
return pQInfo->code;
}
......@@ -227,11 +230,11 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize,
qDebug("QID:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize,
GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId);
qDebug("QID:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
......@@ -251,18 +254,18 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
}
int32_t qKillTask(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId);
setQueryKilled(pQInfo);
qDebug("QID:0x%"PRIx64" execTask killed", pTaskInfo->id.queryId);
setTaskKilled(pTaskInfo);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while (pQInfo->owner != 0) {
while (pTaskInfo->owner != 0) {
taosMsleep(100);
}
......@@ -270,14 +273,14 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query async killed", pQInfo->qId);
setQueryKilled(pQInfo);
qDebug("QID:0x%"PRIx64" query async killed", pTaskInfo->id.queryId);
setTaskKilled(pTaskInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -292,15 +295,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
}
void qDestroyTask(qTaskInfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
}
void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
qDebug("QID:0x%"PRIx64" execTask completed, numOfRows:%"PRId64, pTaskInfo->id.queryId, pTaskInfo->totalRows);
qDebug("QInfo:0x%"PRIx64" query completed", pQInfo->qId);
queryCostStatis(pQInfo); // print the query cost summary
doDestroyTask(pQInfo);
queryCostStatis(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo);
}
void* qOpenTaskMgmt(int32_t vgId) {
......@@ -381,7 +381,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo);
qError("QID:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
......@@ -389,7 +389,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo);
qError("QID:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
} else {
......@@ -445,7 +445,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
setTaskKilled(pQInfo);
// wait query stop
int32_t loop = 0;
......
......@@ -2432,9 +2432,9 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
}
bool isTaskKilled(SExecTaskInfo *pTaskInfo) {
if (IS_QUERY_KILLED(pTaskInfo)) {
return true;
}
// if (IS_QUERY_KILLED(pTaskInfo)) {
// return true;
// }
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
......@@ -2444,13 +2444,13 @@ bool isTaskKilled(SExecTaskInfo *pTaskInfo) {
assert(pTaskInfo->cost.start != 0);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
return true;
// return true;
}
return false;
}
void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
void setTaskKilled(SExecTaskInfo *pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
//static bool isFixedOutputQuery(STaskAttr* pQueryAttr) {
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
......@@ -4420,33 +4420,32 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
taosArrayDestroy(opStack);
}
void queryCostStatis(SQInfo *pQInfo) {
STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STaskCostInfo *pSummary = &pQInfo->summary;
void queryCostStatis(SExecTaskInfo *pTaskInfo) {
STaskCostInfo *pSummary = &pTaskInfo->cost;
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
pSummary->hashSize = hashSize;
// uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// pSummary->hashSize = hashSize;
// add the merge time
pSummary->elapsedTime += pSummary->firstStageMergeTime;
SResultRowPool* p = pQInfo->runtimeEnv.pool;
if (p != NULL) {
pSummary->winInfoSize = getResultRowPoolMemSize(p);
pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
} else {
pSummary->winInfoSize = 0;
pSummary->numOfTimeWindows = 0;
}
calculateOperatorProfResults(pQInfo);
//qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
// "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
// pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
// pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
// pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
// } else {
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// }
//
// calculateOperatorProfResults(pQInfo);
qDebug("QID:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pTaskInfo->id.queryId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
//
//qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
......@@ -7733,7 +7732,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
SExecTaskInfo* pTaskInfo = calloc(1, sizeof(SExecTaskInfo));
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pthread_mutex_init(&pTaskInfo->lock, NULL);
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId;
return pTaskInfo;
......@@ -8673,229 +8671,6 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
SQInfo* createQInfoImpl(SQueryTableReq* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId,
char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _cleanup_qinfo;
}
pQInfo->qId = qId;
pQInfo->startExecTs = 0;
pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
STaskAttr* pQueryAttr = &pQInfo->query;
pQInfo->runtimeEnv.pQueryAttr = pQueryAttr;
pQueryAttr->tableGroupInfo = *pTableGroupInfo;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
pQueryAttr->limit.limit = pQueryMsg->limit;
pQueryAttr->limit.offset = pQueryMsg->offset;
pQueryAttr->order.order = pQueryMsg->order;
pQueryAttr->order.col.info.colId = pQueryMsg->orderColId;
pQueryAttr->pExpr1 = pExprs;
pQueryAttr->pExpr2 = pSecExprs;
pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput;
pQueryAttr->pGroupbyExpr = pGroupbyExpr;
memcpy(&pQueryAttr->interval, &pQueryMsg->interval, sizeof(pQueryAttr->interval));
pQueryAttr->fillType = pQueryMsg->fillType;
pQueryAttr->numOfTags = pQueryMsg->numOfTags;
pQueryAttr->tagColList = pTagCols;
pQueryAttr->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
// pQueryAttr->sw = pQueryMsg->sw;
pQueryAttr->vgId = vgId;
pQueryAttr->stableQuery = pQueryMsg->stableQuery;
pQueryAttr->topBotQuery = pQueryMsg->topBotQuery;
pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn;
pQueryAttr->hasTagResults = pQueryMsg->hasTagResults;
pQueryAttr->timeWindowInterpo = pQueryMsg->timeWindowInterpo;
pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist;
pQueryAttr->stabledev = pQueryMsg->stabledev;
pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery;
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId;
// pQueryAttr->pFilters = pFilters;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) {
goto _cleanup;
}
pQueryAttr->srcRowSize = 0;
pQueryAttr->maxTableColumnWidth = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
pQueryAttr->tableCols[i] = pQueryMsg->tableCols[i];
// pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters);
pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes;
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) {
pQueryAttr->maxTableColumnWidth = pQueryAttr->tableCols[i].bytes;
}
}
for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].base.resSchema.bytes > 0);
pQueryAttr->resultRowSize += pExprs[col].base.resSchema.bytes;
// keep the tag length
if (TSDB_COL_IS_TAG(pExprs[col].base.pColumns->flag)) {
pQueryAttr->tagLen += pExprs[col].base.resSchema.bytes;
}
// if (pExprs[col].base.flist.filterInfo) {
// ++pQueryAttr->havingNum;
// }
}
doUpdateExprColumnIndex(pQueryAttr);
if (pSecExprs != NULL) {
int32_t resultRowSize = 0;
// calculate the result row size
for (int16_t col = 0; col < pQueryAttr->numOfExpr2; ++col) {
assert(pSecExprs[col].base.resSchema.bytes > 0);
resultRowSize += pSecExprs[col].base.resSchema.bytes;
}
if (resultRowSize > pQueryAttr->resultRowSize) {
pQueryAttr->resultRowSize = resultRowSize;
}
}
if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = malloc(sizeof(int64_t) * pQueryAttr->numOfOutput);
if (pQueryAttr->fillVal == NULL) {
goto _cleanup;
}
// the first column is the timestamp
memcpy(pQueryAttr->fillVal, (char *)pQueryMsg->fillVal, pQueryAttr->numOfOutput * sizeof(int64_t));
}
size_t numOfGroups = 0;
if (pTableGroupInfo->pGroupList != NULL) {
numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
STableGroupInfo* pTableqinfo = &pQInfo->runtimeEnv.tableqinfoGroupInfo;
pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pTableqinfo->numOfTables = pTableGroupInfo->numOfTables;
pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pQInfo->pBuf == NULL) {
goto _cleanup;
}
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pQInfo->rspContext = NULL;
pQInfo->sql = sql;
pthread_mutex_init(&pQInfo->lock, NULL);
tsem_init(&pQInfo->ready, 0, 0);
pQueryAttr->window = pQueryMsg->window;
updateDataCheckOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery);
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STimeWindow window = pQueryAttr->window;
int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
size_t s = taosArrayGetSize(pa);
SArray* p1 = taosArrayInit(s, POINTER_BYTES);
if (p1 == NULL) {
goto _cleanup;
}
taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1);
for(int32_t j = 0; j < s; ++j) {
// STableKeyInfo* info = taosArrayGet(pa, j);
// window.skey = info->lastKey;
//
// void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
// STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf);
// if (item == NULL) {
// goto _cleanup;
// }
//
// item->groupIndex = i;
// taosArrayPush(p1, &item);
// STableId* id = TSDB_TABLEID(info->pTable);
// taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
// index += 1;
}
}
colIdCheck(pQueryAttr, pQInfo->qId);
// int32_t functionId = getExprFunctionId(&pExpr[0]);
// pQInfo->query.queryBlockDist = (functionId == FUNCTION_BLKINFO);
//qDebug("qmsg:%p vgId:%d, QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->query.vgId, pQInfo->qId, pQInfo);
return pQInfo;
_cleanup_qinfo:
// tsdbDestroyTableGroup(pTableGroupInfo);
if (pGroupbyExpr != NULL) {
taosArrayDestroy(pGroupbyExpr->columnInfo);
free(pGroupbyExpr);
}
tfree(pTagCols);
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExprs[i];
if (pExprInfo->pExpr != NULL) {
tExprTreeDestroy(pExprInfo->pExpr, NULL);
pExprInfo->pExpr = NULL;
}
// if (pExprInfo->base.flist.filterInfo) {
// freeColumnFilterInfo(pExprInfo->base.flist.filterInfo, pExprInfo->base.flist.numOfFilters);
// }
}
tfree(pExprs);
// filterFreeInfo(pFilters);
_cleanup:
doDestroyTask(pQInfo);
return NULL;
}
bool isValidQInfo(void *param) {
SQInfo *pQInfo = (SQInfo *)param;
if (pQInfo == NULL) {
return false;
}
/*
* pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable
*/
uint64_t sig = (uint64_t)pQInfo->signature;
return (sig == (uint64_t)pQInfo);
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -8957,7 +8732,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
_error:
// table query ref will be decrease during error handling
doDestroyTask(pQInfo);
// doDestroyTask(pQInfo);
return code;
}
......@@ -9038,36 +8813,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
return NULL;
}
void doDestroyTask(SQInfo *pQInfo) {
if (!isValidQInfo(pQInfo)) {
return;
}
//qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId);
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
freeQueryAttr(pQueryAttr);
// tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo);
tfree(pQInfo->pBuf);
tfree(pQInfo->sql);
taosArrayDestroy(pQInfo->summary.queryProfEvents);
taosHashCleanup(pQInfo->summary.operatorProfResults);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0;
//qDebug("QInfo:0x%"PRIx64" QInfo is freed", pQInfo->qId);
void doDestroyTask(SExecTaskInfo *pTaskInfo) {
qDebug("QID:0x%"PRIx64" start to free execTask", pTaskInfo->id.queryId);
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
tfree(pQInfo);
qDebug("QID:0x%"PRIx64" execTask is freed", pTaskInfo->id.queryId);
tfree(pTaskInfo);
}
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) {
......
......@@ -413,7 +413,6 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree(pQueryTableInfo->tableName);
}
printf("----------->Free:%p\n", pQueryNode->pExpr);
taosArrayDestroy(pQueryNode->pExpr);
tfree(pQueryNode->pExtInfo);
......
......@@ -492,7 +492,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code);
QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code));
QW_ERR_JRET(code);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册