未验证 提交 053cf3f0 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20690 from taosdata/fix/cencVer

enh: optimize count performance
......@@ -185,7 +185,7 @@ typedef struct SBlockID {
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rowSize;
int32_t rows; // todo hide this attribute
int64_t rows; // todo hide this attribute
uint32_t capacity;
SBlockID id;
int16_t hasVarCol;
......
......@@ -112,7 +112,7 @@ typedef struct SResultDataInfo {
typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int32_t numOfRows; // the number of rows needs to be handled
int64_t numOfRows; // the number of rows needs to be handled
int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column
......
......@@ -982,7 +982,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
int64_t p1 = taosGetTimestampUs();
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
return TSDB_CODE_SUCCESS;
} else { // var data type
......@@ -1189,7 +1189,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
void blockDataEmpty(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info;
if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) {
if (pInfo->capacity == 0) {
return;
}
......@@ -1748,14 +1748,14 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.id.uid;
int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int16_t hasVarCol = pBlock->info.hasVarCol;
int32_t rows = pBlock->info.rows;
int64_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI16(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, hasVarCol);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI64(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
......@@ -1786,7 +1786,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
buf = taosDecodeFixedI16(buf, &numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
......@@ -1990,7 +1990,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
......
......@@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader);
......
......@@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
// tsdbFile.c ==============================================================================================
......
......@@ -669,7 +669,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid,
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRId64, output->info.id.uid,
output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
......
......@@ -340,7 +340,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
continue;
}
ret->fetchType = FETCH_TYPE__DATA;
tqDebug("return data rows %d", ret->data.info.rows);
tqDebug("return data rows %" PRId64, ret->data.info.rows);
return 0;
}
......
......@@ -114,7 +114,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
pRsp->blockNum++;
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId,
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%" PRId64 ", total blocks:%d", vgId, pHandle->consumerId,
pDataBlock->info.rows, pRsp->blockNum);
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
......
......@@ -282,6 +282,40 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return true;
}
int64_t tsdbCountTbDataRows(STbData *pTbData) {
SMemSkipListNode *pNode = pTbData->sl.pHead;
int64_t rowsNum = 0;
while (NULL != pNode) {
pNode = SL_GET_NODE_FORWARD(pNode, 0);
if (pNode == pTbData->sl.pTail) {
return rowsNum;
}
rowsNum++;
}
return rowsNum;
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;
}
*rowsNum += tsdbCountTbDataRows(pTbData);
pTbData = pTbData->next;
}
}
taosRUnLockLatch(&pMemTable->latch);
}
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
int32_t code = 0;
......@@ -787,4 +821,4 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
_exit:
return aTbDataP;
}
\ No newline at end of file
}
......@@ -25,6 +25,11 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef struct {
STbDataIter* iter;
int32_t index;
......@@ -168,6 +173,8 @@ struct STsdbReader {
uint64_t suid;
int16_t order;
bool freeBlock;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window; // the primary query time window that applies to all queries
SSDataBlock* pResBlock;
int32_t capacity;
......@@ -1777,7 +1784,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
setComposedBlockFlag(pReader, true);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlockScanInfo->uid, pReader->idStr);
......@@ -2770,7 +2777,7 @@ _end:
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -3022,7 +3029,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
return TSDB_CODE_SUCCESS;
......@@ -3106,7 +3113,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -3132,6 +3139,151 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
int64_t st = taosGetTimestampUs();
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return TSDB_CODE_SUCCESS;
}
SBlockIdx* pBlockIdx = NULL;
int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
STableBlockScanInfo *pScanInfo = *p;
tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
SDataBlk block = {0};
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
pReader->rowsNum += block.nRow;
}
}
_end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pLastBlockReader->pInfo[i];
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) {
return code;
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
}
if (s == pReader->suid) {
pReader->rowsNum += p->nRow;
} else if (s > pReader->suid) {
break;
}
}
}
}
}
return code;
}
static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
while (1) {
bool hasNext = false;
int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
if (code) {
return code;
}
if (!hasNext) { // no data files on disk
break;
}
code = doSumFileBlockRows(pReader, pReader->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doSumSttBlockRows(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pReader->status.loadFromFile = false;
return code;
}
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t memNum = 0, imemNum = 0;
if (pReader->pReadSnap->pMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
}
if (pReader->pReadSnap->pIMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
}
pReader->rowsNum += memNum + imemNum;
return code;
}
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList;
......@@ -4072,6 +4224,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false;
} else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
// DO NOTHING
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
}
......@@ -4090,7 +4244,7 @@ static void freeSchemaFunc(void* param) {
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
......@@ -4190,6 +4344,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pReader->suspended = true;
if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY;
}
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
......@@ -4490,6 +4648,33 @@ _err:
return code;
}
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->pResBlock;
if (pReader->status.loadFromFile == false) {
return false;
}
code = readRowsCountFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = readRowsCountFromMem(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0;
pReader->rowsNum = 0;
return pBlock->info.rows > 0;
}
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -4504,6 +4689,10 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
return code;
}
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
return tsdbReadRowsCountOnly(pReader);
}
if (pStatus->loadFromFile) {
code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -336,6 +336,7 @@ typedef struct STableScanInfo {
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
bool countOnly;
} STableScanInfo;
typedef struct STableMergeScanInfo {
......
......@@ -1167,7 +1167,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pScanBaseInfo->dataReader == NULL) {
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id);
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code;
......@@ -1218,7 +1218,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTableListInfo);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
......@@ -1201,7 +1201,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
(pRow->numOfRows+pBlock->info.rows),
pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size
......@@ -1214,7 +1214,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
pBlock->info.rows += pRow->numOfRows;
}
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
pBlock->info.id.groupId);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
......
......@@ -271,7 +271,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return NULL;
}
}
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows);
setOperatorCompleted(pOperator);
break;
......@@ -337,7 +337,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status);
qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
break;
}
} else {
......
......@@ -31,6 +31,9 @@
#include "thash.h"
#include "ttypes.h"
int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
......@@ -308,14 +311,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
taosMemoryFreeClear(pBlock->pBlockAgg);
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d, uid:%" PRIu64, GET_TASKID(pTaskInfo),
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1;
......@@ -326,7 +329,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
loadSMA = true; // mark the operation of load sma;
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
......@@ -346,7 +349,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
......@@ -363,7 +366,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
// try to filter data block according to current results
doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
......@@ -394,7 +397,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
......@@ -672,9 +675,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
ASSERT(pBlock->info.id.uid != 0);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
......@@ -698,7 +702,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.id.uid != 0);
return pBlock;
}
return NULL;
......@@ -804,7 +807,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -816,7 +819,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) {
ASSERT(result->info.id.uid != 0);
return result;
}
......@@ -936,6 +938,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error;
}
if (scanDebug) {
pInfo->countOnly = true;
}
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo);
......@@ -1027,7 +1033,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
......@@ -1049,7 +1055,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
tsdbReaderClose(pReader);
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64,
pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
......@@ -1639,7 +1645,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id);
pTaskInfo->streamInfo.returned = 1;
return pResult;
......@@ -1678,7 +1684,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
setBlockIntoRes(pInfo, &ret.data, true);
if (pInfo->pRes->info.rows > 0) {
pOperator->status = OP_EXEC_RECV;
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
qDebug("queue scan log return %" PRId64 " rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
......@@ -1865,7 +1871,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
qDebug("stream recover scan get block, rows %" PRId64 , pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
}
......@@ -2094,7 +2100,7 @@ FETCH_NEXT_BLOCK:
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
qDebug("scan rows: %d", pBlockInfo->rows);
qDebug("scan rows: %" PRId64 , pBlockInfo->rows);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
......@@ -2635,7 +2641,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2862,7 +2868,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
......
......@@ -698,7 +698,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pDataBlock->info.dataLoad = 1;
}
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
......
......@@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
......
......@@ -494,8 +494,8 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
static int64_t getNumOfElems(SqlFunctionCtx* pCtx) {
int64_t numOfElem = 0;
/*
* 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataSMAIsSet == true;
......@@ -528,7 +528,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t countFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
int64_t numOfElem = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
......@@ -555,7 +555,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
}
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumOfElems(pCtx);
int64_t numOfElem = getNumOfElems(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -1929,7 +1929,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
qDebug("%s total %d rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto);
qDebug("%s total %" PRId64 " rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto);
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
......
......@@ -199,7 +199,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_ERR_JRET(code);
}
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
QW_TASK_DLOG("data put into sink, rows:%" PRId64 ", continueExecTask:%d", pRes->info.rows, qcontinue);
}
if (numOfResBlock == 0 || (hasMore == false)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册