未验证 提交 1f1ae670 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2253 from taosdata/feature/query

Feature/query
......@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->numOfBuffer = numOfFlush;
pReducer->numOfVnode = numOfBuffer;
pReducer->pDesc = pDesc;
tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
......@@ -604,7 +604,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
tOrderDescriptor *pOrderDesc = pReducer->pDesc;
SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
// no group by columns, all data belongs to one group
int32_t numOfCols = orderInfo->numOfCols;
if (numOfCols <= 0) {
......@@ -627,7 +627,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
// only one row exists
int32_t index = orderInfo->pData[0];
int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
return ret == 0;
}
......@@ -1040,7 +1040,7 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
......@@ -1182,7 +1182,7 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
*/
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default
int16_t functionId = pLocalReducer->pCtx[0].functionId;
// todo opt performance
......
......@@ -209,6 +209,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
tscError("%p sql is already released", pSql->signature);
return;
}
if (pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
return;
......@@ -217,10 +218,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
// tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]);
if (pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
if (pObj->signature != pObj || pSql->freed == 1) {
tscTrace("%p sqlObj needs to be released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
pObj, pObj->signature);
tscFreeSqlObj(pSql);
rpcFreeCont(rpcMsg->pCont);
......@@ -375,7 +375,7 @@ int tscProcessSql(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = NULL;
uint16_t type = 0;
uint32_t type = 0;
if (pQueryInfo != NULL) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -424,7 +424,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
//taosStopRpcConn(pSql->pSubs[i]->thandle);
// taosStopRpcConn(pSql->pSubs[i]->);
}
/*
......
......@@ -219,6 +219,11 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
sem_post(&pSql->rspSem);
}
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
sem_post(&pSql->rspSem);
}
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
......@@ -369,11 +374,6 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows;
}
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
sem_post(&pSql->rspSem);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) {
......@@ -476,7 +476,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
static void tscFreeQhandleInVnode(SSqlObj* pSql) {
static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -496,10 +496,19 @@ static void tscFreeQhandleInVnode(SSqlObj* pSql) {
tscProcessSql(pSql);
// in case of sync model query, waits for response and then goes on
if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) {
sem_wait(&pSql->rspSem);
}
// if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) {
// sem_wait(&pSql->rspSem);
// tscFreeSqlObj(pSql);
// tscTrace("%p sqlObj is freed by app", pSql);
// } else {
tscTrace("%p sqlObj will be freed while rsp received", pSql);
// }
return true;
}
return false;
}
void taos_free_result(TAOS_RES *res) {
......@@ -527,10 +536,10 @@ void taos_free_result(TAOS_RES *res) {
}
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
tscFreeQhandleInVnode(pSql);
tscFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
if (!tscFreeQhandleInVnode(pSql)) {
tscFreeSqlObj(pSql);
tscTrace("%p sqlObj is freed by app", pSql);
}
}
// todo should not be used in async query
......
......@@ -134,24 +134,6 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) {
db[0] = 0;
}
//STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
// if (pSidList == NULL) {
// tscError("illegal sidlist");
// return 0;
// }
//
// if (idx < 0 || idx >= pSidList->numOfSids) {
// int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0;
//
// tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange);
// idx = 0;
// }
//
// assert(pSidList->pSidExtInfoList[idx] >= 0);
//
// return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
//}
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) {
return false;
......@@ -176,8 +158,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false;
}
if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) &&
pQueryInfo->command == TSDB_SQL_SELECT) {
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) {
return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
}
......
......@@ -112,7 +112,7 @@ enum {
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_QUERY))
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
typedef struct SArithmeticSupport {
......
......@@ -484,7 +484,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
// set time window for current result
pWindowRes->window = *win;
setWindowResOutputBufInitCtx(pRuntimeEnv, pWindowRes);
return TSDB_CODE_SUCCESS;
}
......@@ -685,14 +685,14 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys,
__block_search_fn_t searchFn) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// tumbling time window query, a special case of sliding time window query
if (pQuery->slidingTime == pQuery->intervalTime) {
// todo opt
}
getNextTimeWindow(pQuery, pNextWin);
// next time window is not in current block
if ((pNextWin->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pNextWin->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
......@@ -720,7 +720,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
*/
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNextWin->ekey) {
TSKEY next = primaryKeys[startPos];
pNextWin->ekey += ((next - pNextWin->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
pNextWin->skey = pNextWin->ekey - pQuery->intervalTime + 1;
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNextWin->skey) {
......@@ -729,7 +729,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
pNextWin->skey -= ((pNextWin->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
pNextWin->ekey = pNextWin->skey + pQuery->intervalTime - 1;
}
return startPos;
}
......@@ -2072,7 +2072,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SQueryCostInfo* summary = &pRuntimeEnv->summary;
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey,
pQuery->order.order);
......@@ -2113,7 +2113,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
......@@ -2502,7 +2502,7 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
SResultInfo *pResultInfo = &pWindowRes->resultInfo[j];
assert(pResultInfo != NULL);
if (pResultInfo->numOfRes > 0) {
return pResultInfo->numOfRes;
}
......@@ -2551,7 +2551,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
assert(pQInfo->numOfGroupResultPages == 0);
return 0;
} else if (numOfTables == 1) { // no need to merge results since only one table in each group
}
SCompSupporter cs = {pTableList, posList, pQInfo};
......@@ -2640,7 +2640,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
#endif
qTrace("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pQInfo, pQInfo->groupIndex, endt - startt);
tfree(pTableList);
tfree(posList);
tfree(pTree);
......@@ -2870,12 +2870,12 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
pRuntimeEnv->pCtx[j].currentStage = 0;
SResultInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
if (pResInfo->initialized) {
continue;
}
aAggs[functionId].init(&pRuntimeEnv->pCtx[j]);
}
}
......@@ -3248,7 +3248,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
......@@ -3268,7 +3268,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
......@@ -3277,21 +3277,21 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
if (pCtx->resultInfo->complete) {
continue;
}
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
pCtx->currentStage = 0;
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
/*
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT
*/
pCtx->resultInfo->superTableQ = pRuntimeEnv->stableQuery; // set super table query flag
if (!pCtx->resultInfo->initialized) {
aAggs[functionId].init(pCtx);
}
......@@ -4470,7 +4470,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code);
qTrace("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
return;
}
......@@ -4491,7 +4491,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code);
qTrace("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
return;
}
......@@ -4867,7 +4867,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
(pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) {
continue;
}
return false;
}
}
......@@ -5851,6 +5851,8 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
}
int16_t ref = T_REF_DEC(pQInfo);
qTrace("QInfo:%p dec refCount, value:%d", pQInfo, ref);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
......@@ -5994,25 +5996,25 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
assert(numOfGroup == 0 || numOfGroup == 1);
if (numOfGroup == 0) {
return;
}
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
size_t num = taosArrayGetSize(pa);
assert(num == pQInfo->groupInfo.numOfTables);
int32_t count = 0;
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
int32_t rsize = pExprInfo->bytes;
count = 0;
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pQInfo->tableIndex++;
SGroupItem *item = taosArrayGet(pa, i);
......@@ -6054,12 +6056,12 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
}
}
}
count += 1;
}
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count);
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
*(int64_t*) pQuery->sdata[0]->data = num;
......@@ -6071,7 +6073,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SSchema tbnameSchema = tGetTableNameColumnSchema();
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pQInfo->tableIndex++;
SExprInfo* pExprInfo = pQuery->pSelectExpr;
SGroupItem* item = taosArrayGet(pa, i);
......@@ -6086,7 +6088,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes);
char* dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (data == NULL) {
setVardataNull(dst, type);
......@@ -6104,7 +6106,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
}
count += 1;
}
qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, count);
}
......
......@@ -393,7 +393,6 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
tTrace("%s, connection is already released, rsp wont be sent", pConn->info);
rpcUnlockConn(pConn);
rpcDecRef(pRpc);
return;
}
......
......@@ -72,7 +72,7 @@ typedef struct STableCheckInfo {
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
......@@ -311,14 +311,14 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
rmem = SL_GET_NODE_DATA(node);
}
}
if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = SL_GET_NODE_DATA(node);
}
}
if (rmem != NULL && rimem != NULL) {
if (dataRowKey(rmem) < dataRowKey(rimem)) {
pCheckInfo->chosen = 0;
......@@ -333,17 +333,17 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
return rimem;
}
}
if (rmem != NULL) {
pCheckInfo->chosen = 0;
return rmem;
}
if (rimem != NULL) {
pCheckInfo->chosen = 1;
return rimem;
}
return NULL;
}
......@@ -353,11 +353,11 @@ bool moveToNextRow(STableCheckInfo* pCheckInfo) {
if (pCheckInfo->iter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
}
......@@ -366,17 +366,17 @@ bool moveToNextRow(STableCheckInfo* pCheckInfo) {
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iter != NULL) {
return tSkipListIterGet(pCheckInfo->iter) != NULL;
}
}
}
return hasNext;
}
......@@ -395,7 +395,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
if (row == NULL) {
return false;
}
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
......@@ -581,9 +581,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
......@@ -603,13 +603,13 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0);
taosArrayDestroy(sa);
tfree(data);
int64_t et = taosGetTimestampUs() - st;
tsdbTrace("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
return blockLoaded;
}
......@@ -681,7 +681,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
......@@ -1212,7 +1212,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
*numOfAllocBlocks = numOfBlocks;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SBlockOrderSupporter sup = {0};
sup.numOfTables = numOfTables;
sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
......@@ -1256,17 +1256,17 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
}
assert(numOfBlocks == cnt);
// since there is only one table qualified, blocks are not sorted
if (numOfQualTables == 1) {
memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
cleanBlockOrderSupporter(&sup, numOfQualTables);
tsdbTrace("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
pQueryHandle->qinfo);
return TSDB_CODE_SUCCESS;
}
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
numOfQualTables, pQueryHandle->qinfo);
......@@ -1683,7 +1683,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbTrace("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols);
return numOfRows;
}
......
......@@ -68,6 +68,7 @@ static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId)
killQueryMsg->header.vgId = htonl(vgId);
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle);
rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
}
......@@ -85,10 +86,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle);
return TSDB_CODE_SUCCESS;
qKillQuery((qinfo_t) killQueryMsg->qhandle);
return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code
}
int32_t code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册