From fa5e2698fec39a6a62397886e93dd91776b74aad Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 7 Apr 2020 12:55:31 +0800 Subject: [PATCH] [td-98] fix bugs in query when data in both cache and files --- src/client/src/tscServer.c | 4 ++-- src/query/src/queryExecutor.c | 36 ++++++++++++----------------------- src/util/src/tcache.c | 4 ++-- src/vnode/tsdb/src/tsdbMain.c | 16 ++++++++++------ src/vnode/tsdb/src/tsdbRead.c | 18 ++++++++++++++---- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 327120e0ed..1e7ad937ac 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2399,8 +2399,8 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); if (pTableMetaInfo->pTableMeta != NULL) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns, - tinfo.numOfTags); + tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, + tinfo.numOfTags, pTableMetaInfo->pTableMeta); return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index d7477d6c85..edb0e0aaed 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2239,24 +2239,6 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { - if (pQInfo == NULL) { - return; - } - - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - - if (pQInfo->pTableDataInfo != NULL) { - // size_t num = taosHashGetSize(pQInfo->pTableIdList); - for (int32_t j = 0; j < 0; ++j) { - destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); - } - } - - tfree(pQInfo->pTableDataInfo); -} - int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { @@ -2264,14 +2246,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->window.ekey, pQuery->order.order); sem_post(&pQInfo->dataReady); - // pQInfo->over = 1; - return TSDB_CODE_SUCCESS; } pQuery->status = 0; - - pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0}; changeExecuteScanOrder(pQuery, true); @@ -5925,7 +5903,7 @@ static void freeQInfo(SQInfo *pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; setQueryKilled(pQInfo); - dTrace("QInfo:%p start to free SQInfo", pQInfo); + dTrace("QInfo:%p start to free QInfo", pQInfo); for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } @@ -5939,7 +5917,16 @@ static void freeQInfo(SQInfo *pQInfo) { // } sem_destroy(&(pQInfo->dataReady)); - vnodeQueryFreeQInfoEx(pQInfo); + teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); + + if (pQInfo->pTableDataInfo != NULL) { + // size_t num = taosHashGetSize(pQInfo->pTableIdList); + for (int32_t j = 0; j < 0; ++j) { + destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); + } + } + + tfree(pQInfo->pTableDataInfo); for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; @@ -6121,6 +6108,7 @@ _query_over: } void qDestroyQueryInfo(SQInfo* pQInfo) { + dTrace("QInfo:%p query completed", pQInfo); freeQInfo(pQInfo); } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 8bfec97032..47b358c052 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { } int32_t ref = T_REF_INC(ptNode); - pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref) + pTrace("%p acquired by data in cache, refcnt:%d", ptNode, ref) // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); @@ -516,7 +516,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { *data = NULL; int16_t ref = T_REF_DEC(pNode); - pTrace("%p is released, refcnt:%d", pNode, ref); + pTrace("%p data released, refcnt:%d", pNode, ref); if (_remove) { __cache_wr_lock(pCacheObj); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 343e332574..61f4995e43 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -367,14 +367,16 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { SSubmitMsgIter msgIter; tsdbInitSubmitMsgIter(pMsg, &msgIter); - SSubmitBlk *pBlock; + SSubmitBlk *pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { - if (tsdbInsertDataToTable(repo, pBlock) < 0) { - return -1; + if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) { + return code; } } - return 0; + return code; } /** @@ -735,7 +737,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); - if (pTable == NULL) return -1; + if (pTable == NULL) { + return TSDB_CODE_INVALID_TABLE_ID; + } SSubmitBlkIter blkIter; SDataRow row; @@ -747,7 +751,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } } - return 0; + return TSDB_CODE_SUCCESS; } static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index e1377f8a34..49cbd10042 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -438,10 +438,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { return true; } } else { // check data in cache + pQueryHandle->cur.fid = -1; return hasMoreDataInCacheForSingleModel(pQueryHandle); } - } else { - // next block in the same file + } else { // next block in the same file cur->slot += step; SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; @@ -526,9 +526,11 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.size - 1; pQueryHandle->realNumOfRows = endPos - cur->pos + 1; + pCheckInfo->lastKey = blockInfo.window.ekey + 1; } else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { endPos = 0; pQueryHandle->realNumOfRows = cur->pos + 1; + pCheckInfo->lastKey = blockInfo.window.ekey - 1; } else { endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order); @@ -539,6 +541,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo } else { pQueryHandle->realNumOfRows = endPos - cur->pos; } + + pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1; } else { if (endPos > cur->pos) { pQueryHandle->realNumOfRows = 0; @@ -546,6 +550,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo } else { pQueryHandle->realNumOfRows = cur->pos - endPos; } + + assert(0); } } @@ -888,7 +894,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { rows = pHandle->realNumOfRows; skey = *(TSKEY*) pColInfoEx->pData; - ekey = *(TSKEY*) pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1); + ekey = *(TSKEY*) ((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1)); } } else { if (pTable->mem != NULL) { @@ -926,6 +932,10 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; if (pHandle->cur.fid < 0) { + + + + return pHandle->pColumns; } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); @@ -945,6 +955,7 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList } else { doLoadDataFromFileBlock(pHandle); filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa); + return pHandle->pColumns; } } @@ -1297,7 +1308,6 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle; - size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for(int32_t i = 0; i < size; ++i) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); -- GitLab