diff --git a/.gitignore b/.gitignore index d5c7f763cf41939b0e577fc0ce72a2d8bf2436b6..d7fcb019ae14a70ead3f84bbe97e01c3053acd5b 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ mac/ .mypy_cache *.tmp *.swp +*.swo *.orig src/connector/nodejs/node_modules/ src/connector/nodejs/out/ diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9e7aea03ea28e6b2a4c43a39681a6556fd367051..e1aadd448663616e8526b3201a1af13bb7775ed2 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond { int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo* colList; - int32_t type; // data block load type: - // int32_t numOfTWindows; - STimeWindow twindows; - int64_t startVersion; - int64_t endVersion; + int32_t type; // data block load type: + STimeWindow twindows; + int64_t startVersion; + int64_t endVersion; } SQueryTableDataCond; int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a73808f2ed0e85657ed90789c4deb4b437715b1e..36900e3dfaa32ce5eaf6953b14402924736d39ca 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -213,10 +213,6 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) { memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg)); if (pCfg->level == 0 && pCfg->primary == 1) { tstrncpy(tsDataDir, pCfg->dir, PATH_MAX); - if (taosMulMkDir(tsDataDir) != 0) { - uError("failed to create dataDir:%s since %s", tsDataDir, terrstr()); - return -1; - } } if (taosMulMkDir(pCfg->dir) != 0) { uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr()); @@ -227,12 +223,13 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) { if (tsDataDir[0] == 0) { if (pItem->str != NULL) { - taosAddDataDir(0, pItem->str, 0, 1); + taosAddDataDir(tsDiskCfgNum, pItem->str, 0, 1); tstrncpy(tsDataDir, pItem->str, PATH_MAX); if (taosMulMkDir(tsDataDir) != 0) { - uError("failed to create dataDir:%s since %s", tsDataDir, terrstr()); + uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr()); return -1; } + tsDiskCfgNum++; } else { uError("datadir not set"); return -1; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 5871d56a8f1cd5afa171e317e2d1a53b91f02daf..f6ecd4493d7d7002398f2f3ef3d7f14f4bf75a23 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pBasic->queryDesc = NULL; - mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries); + mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries); taosWUnLockLatch(&pConn->queryLock); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6d976083d2a4f0ee67a83270e9a808c8db5edde4..4d95a9d7a5dde52572773e8b84a579c127c28598 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); // typedef struct STsdb STsdb; typedef struct STsdbReader STsdbReader; -#define BLOCK_LOAD_OFFSET_ORDER 1 -#define BLOCK_LOAD_TABLESEQ_ORDER 2 -#define BLOCK_LOAD_EXTERN_ORDER 3 +#define TIMEWINDOW_RANGE_CONTAINED 1 +#define TIMEWINDOW_RANGE_EXTERNAL 2 #define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index cec714e0ee90a06391c2f6a7b9a238f7c38ee282..ea8ac09429af5018e4b01b3bc40e9a3242295fa9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,6 +16,12 @@ #include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +typedef enum { + EXTERNAL_ROWS_PREV = 0x1, + EXTERNAL_ROWS_MAIN = 0x2, + EXTERNAL_ROWS_NEXT = 0x3, +} EContentData; + typedef struct { STbDataIter* iter; int32_t index; @@ -70,9 +76,9 @@ typedef struct SFilesetIter { } SFilesetIter; typedef struct SFileDataBlockInfo { - int32_t - tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it + // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it uint64_t uid; + int32_t tbBlockIdx; } SFileDataBlockInfo; typedef struct SDataBlockIter { @@ -99,12 +105,11 @@ typedef struct SReaderStatus { SHashObj* pTableMap; // SHash STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. SFileBlockDumpInfo fBlockDumpInfo; - - SDFileSet* pCurrentFileset; // current opened file set - SBlockData fileBlockData; - SFilesetIter fileIter; - SDataBlockIter blockIter; - bool composedDataBlock; // the returned data block is a composed block or not + SDFileSet* pCurrentFileset; // current opened file set + SBlockData fileBlockData; + SFilesetIter fileIter; + SDataBlockIter blockIter; + bool composedDataBlock; // the returned data block is a composed block or not } SReaderStatus; struct STsdbReader { @@ -115,15 +120,17 @@ struct STsdbReader { SSDataBlock* pResBlock; int32_t capacity; SReaderStatus status; - char* idStr; // query info handle, for debug purpose - int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows + char* idStr; // query info handle, for debug purpose + int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; + SIOCostSummary cost; + STSchema* pSchema; + SDataFReader* pFileReader; + SVersionRange verRange; - SIOCostSummary cost; - STSchema* pSchema; - SDataFReader* pFileReader; - SVersionRange verRange; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -200,6 +207,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK pTsdbReader->idStr); } + tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0, + pTsdbReader->idStr); + return pTableMap; } @@ -328,7 +338,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { continue; } - tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid, pReader->window.skey, + tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey, pReader->window.ekey, pReader->idStr); return true; } @@ -378,7 +388,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) return pResBlock; } -static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) { +static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) { int32_t code = 0; int8_t level = 0; STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader)); @@ -392,7 +402,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->suid = pCond->suid; pReader->order = pCond->order; - pReader->capacity = 4096; + pReader->capacity = capacity; pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL; pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; @@ -483,95 +493,6 @@ _end: // return res; // } -// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT -// maxVer) { -// TSDBROW row = {0}; -// STSRow *rmem = NULL, *rimem = NULL; - -// if (pCheckInfo->iter) { -// if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) { -// rmem = row.pTSRow; -// } -// } - -// if (pCheckInfo->iiter) { -// if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) { -// rimem = row.pTSRow; -// } -// } - -// if (rmem == NULL && rimem == NULL) { -// return TSKEY_INITIAL_VAL; -// } - -// if (rmem != NULL && rimem == NULL) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; -// return TD_ROW_KEY(rmem); -// } - -// if (rmem == NULL && rimem != NULL) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; -// return TD_ROW_KEY(rimem); -// } - -// TSKEY r1 = TD_ROW_KEY(rmem); -// TSKEY r2 = TD_ROW_KEY(rimem); - -// if (r1 == r2) { -// if (TD_SUPPORT_UPDATE(update)) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; -// } else { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; -// tsdbTbDataIterNext(pCheckInfo->iter); -// } -// return r1; -// } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; -// return r1; -// } else { -// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; -// return r2; -// } -// } - -// static bool moveToNextRowInMem(STableBlockScanInfo* pCheckInfo) { -// bool hasNext = false; -// if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) { -// if (pCheckInfo->iter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iter); -// } - -// if (hasNext) { -// return hasNext; -// } - -// if (pCheckInfo->iiter != NULL) { -// return tsdbTbDataIterGet(pCheckInfo->iiter, NULL); -// } -// } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) { -// if (pCheckInfo->iiter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iiter); -// } - -// if (hasNext) { -// return hasNext; -// } - -// if (pCheckInfo->iter != NULL) { -// return tsdbTbDataIterGet(pCheckInfo->iter, NULL); -// } -// } else { -// if (pCheckInfo->iter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iter); -// } -// if (pCheckInfo->iiter != NULL) { -// hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext; -// } -// } - -// return hasNext; -// } - // static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { // int32_t firstSlot = 0; // int32_t lastSlot = numOfBlocks - 1; @@ -602,18 +523,22 @@ _end: static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { SArray* aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + int64_t st = taosGetTimestampUs(); int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); if (code != TSDB_CODE_SUCCESS) { goto _end; } - if (taosArrayGetSize(aBlockIdx) == 0) { + size_t num = taosArrayGetSize(aBlockIdx); + if (num == 0) { taosArrayClear(aBlockIdx); return TSDB_CODE_SUCCESS; } - SBlockIdx* pBlockIdx; - for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { + int64_t et1 = taosGetTimestampUs(); + + SBlockIdx* pBlockIdx = NULL; + for (int32_t i = 0; i < num; ++i) { pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); // uid check @@ -627,17 +552,6 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, continue; } - // todo: not valid info in bockIndex - // time range check - // if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) { - // continue; - // } - - // version check - // if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) { - // continue; - // } - STableBlockScanInfo* pScanInfo = p; if (pScanInfo->pBlockList == NULL) { pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock)); @@ -647,6 +561,9 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, taosArrayPush(pIndexList, pBlockIdx); } + int64_t et2 = taosGetTimestampUs(); + tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%d bytes %s", + (int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx), pReader->idStr); _end: taosArrayDestroy(aBlockIdx); return code; @@ -655,9 +572,11 @@ _end: static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables, int32_t* numOfBlocks) { size_t numOfTables = taosArrayGetSize(pIndexList); - *numOfValidTables = 0; + int64_t st = taosGetTimestampUs(); + size_t size = 0; + STableBlockScanInfo* px = NULL; while (1) { px = taosHashIterate(pReader->status.pTableMap, px); @@ -675,6 +594,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ tMapDataReset(&mapData); tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL); + size += mapData.nData; + STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); for (int32_t j = 0; j < mapData.nItem; ++j) { SBlock block = {0}; @@ -706,6 +627,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ } } + int64_t et = taosGetTimestampUs(); + tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s", + numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, (et-st)/1000.0, pReader->idStr); + return TSDB_CODE_SUCCESS; } @@ -816,7 +741,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn return TSDB_CODE_SUCCESS; } -// todo consider the output buffer size static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); @@ -853,346 +777,6 @@ _error: return code; } -// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { -// int firstPos, lastPos, midPos = -1; -// int numOfRows; -// TSKEY* keyList; - -// assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - -// if (num <= 0) return -1; - -// keyList = (TSKEY*)pValue; -// firstPos = 0; -// lastPos = num - 1; - -// if (order == TSDB_ORDER_DESC) { -// // find the first position which is smaller than the key -// while (1) { -// if (key >= keyList[lastPos]) return lastPos; -// if (key == keyList[firstPos]) return firstPos; -// if (key < keyList[firstPos]) return firstPos - 1; - -// numOfRows = lastPos - firstPos + 1; -// midPos = (numOfRows >> 1) + firstPos; - -// if (key < keyList[midPos]) { -// lastPos = midPos - 1; -// } else if (key > keyList[midPos]) { -// firstPos = midPos + 1; -// } else { -// break; -// } -// } - -// } else { -// // find the first position which is bigger than the key -// while (1) { -// if (key <= keyList[firstPos]) return firstPos; -// if (key == keyList[lastPos]) return lastPos; - -// if (key > keyList[lastPos]) { -// lastPos = lastPos + 1; -// if (lastPos >= num) -// return -1; -// else -// return lastPos; -// } - -// numOfRows = lastPos - firstPos + 1; -// midPos = (numOfRows >> 1) + firstPos; - -// if (key < keyList[midPos]) { -// lastPos = midPos - 1; -// } else if (key > keyList[midPos]) { -// firstPos = midPos + 1; -// } else { -// break; -// } -// } -// } - -// return midPos; -// } - -// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) { -// SQueryFilePos* cur = &pTsdbReadHandle->cur; - -// if (cur->rows > 0) { -// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { -// assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey); -// } else { -// assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey); -// } - -// SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0); -// assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && -// cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]); -// } else { -// cur->win = pTsdbReadHandle->window; - -// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; -// cur->lastKey = pTsdbReadHandle->window.ekey + step; -// } -// } - -// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, -// SDataBlockInfo* pBlockInfo, int32_t endPos) { -// SQueryFilePos* cur = &pTsdbReadHandle->cur; - -// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; -// TSKEY* tsArray = pCols->cols[0].pData; - -// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); - -// int32_t step = ascScan ? 1 : -1; - -// int32_t start = cur->pos; -// int32_t end = endPos; - -// if (!ascScan) { -// TSWAP(start, end); -// } - -// assert(pTsdbReadHandle->outputCapacity >= (end - start + 1)); -// int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end); - -// // the time window should always be ascending order: skey <= ekey -// cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]}; -// cur->mixBlock = (numOfRows != pBlockInfo->rows); -// cur->lastKey = tsArray[endPos] + step; -// cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0)); - -// // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. -// int32_t pos = endPos + step; -// updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); -// doCheckGeneratedBlockRange(pTsdbReadHandle); - -// tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", -// pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, -// pTsdbReadHandle->idStr); -// } - -// // only return the qualified data to client in terms of query time window, data rows in the same block but do not -// // be included in the query time window will be discarded -// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, SBlock* pBlock) { -// SQueryFilePos* cur = &pTsdbReadHandle->cur; -// SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); -// STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); - -// initTableMemIterator(pTsdbReadHandle, pCheckInfo); - -// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; -// assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID && -// cur->pos >= 0 && cur->pos < pBlock->numOfRows); -// // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData -// interface. TSKEY* tsArray = pCols->cols[0].pData; assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == -// pBlock->minKey.ts && -// tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts); - -// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); -// int32_t step = ascScan ? 1 : -1; - -// // for search the endPos, so the order needs to reverse -// int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - -// int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); -// int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); - -// STimeWindow* pWin = &blockInfo.window; -// tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 -// " rows:%d, start:%d, end:%d, %s", -// pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos, -// pTsdbReadHandle->idStr); - -// // compared with the data from in-memory buffer, to generate the correct timestamp array list -// int32_t numOfRows = 0; -// int32_t curRow = 0; - -// int16_t rv1 = -1; -// int16_t rv2 = -1; -// STSchema* pSchema1 = NULL; -// STSchema* pSchema2 = NULL; - -// int32_t pos = cur->pos; -// cur->win = TSWINDOW_INITIALIZER; -// bool adjustPos = false; - -// // no data in buffer, load data from file directly -// if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { -// copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos); -// return; -// } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { -// SSkipListNode* node = NULL; -// TSKEY lastKeyAppend = TSKEY_INITIAL_VAL; - -// do { -// STSRow* row2 = NULL; -// STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX); -// if (row1 == NULL) { -// break; -// } - -// TSKEY key = TD_ROW_KEY(row1); -// if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) { -// break; -// } - -// if (adjustPos) { -// if (key == lastKeyAppend) { -// pos -= step; -// } -// adjustPos = false; -// } - -// if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) || -// ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) { -// break; -// } - -// if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) { -// if (rv1 != TD_ROW_SVER(row1)) { -// // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); -// rv1 = TD_ROW_SVER(row1); -// } -// if (row2 && rv2 != TD_ROW_SVER(row2)) { -// // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); -// rv2 = TD_ROW_SVER(row2); -// } - -// numOfRows += -// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, -// pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = key; -// } - -// cur->win.ekey = key; -// cur->lastKey = key + step; -// cur->mixBlock = true; -// moveToNextRowInMem(pCheckInfo); -// } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it -// if (TD_SUPPORT_UPDATE(pCfg->update)) { -// if (lastKeyAppend != key) { -// if (lastKeyAppend != TSKEY_INITIAL_VAL) { -// ++curRow; -// } -// lastKeyAppend = key; -// } -// // load data from file firstly -// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); - -// if (rv1 != TD_ROW_SVER(row1)) { -// rv1 = TD_ROW_SVER(row1); -// } -// if (row2 && rv2 != TD_ROW_SVER(row2)) { -// rv2 = TD_ROW_SVER(row2); -// } - -// // still assign data into current row -// numOfRows += -// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, -// pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); - -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = key; -// } - -// cur->win.ekey = key; -// cur->lastKey = key + step; -// cur->mixBlock = true; - -// moveToNextRowInMem(pCheckInfo); - -// pos += step; -// adjustPos = true; -// } else { -// // discard the memory record -// moveToNextRowInMem(pCheckInfo); -// } -// } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) { -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = tsArray[pos]; -// } - -// int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); -// assert(end != -1); - -// if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it -// #if 0 -// if (pCfg->update == TD_ROW_DISCARD_UPDATE) { -// moveToNextRowInMem(pCheckInfo); -// } else { -// end -= step; -// } -// #endif -// if (!TD_SUPPORT_UPDATE(pCfg->update)) { -// moveToNextRowInMem(pCheckInfo); -// } else { -// end -= step; -// } -// } - -// int32_t qstart = 0, qend = 0; -// getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - -// if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) { -// ++curRow; -// } - -// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); -// pos += (qend - qstart + 1) * step; -// if (numOfRows > 0) { -// curRow = numOfRows - 1; -// } - -// cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart]; -// cur->lastKey = cur->win.ekey + step; -// lastKeyAppend = cur->win.ekey; -// } -// } while (numOfRows < pTsdbReadHandle->outputCapacity); - -// if (numOfRows < pTsdbReadHandle->outputCapacity) { -// /** -// * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT -// * copy them all to result buffer, since it may be overlapped with file data block. -// */ -// if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) -// || -// ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) { -// // no data in cache or data in cache is greater than the ekey of time window, load data from file block -// if (cur->win.skey == TSKEY_INITIAL_VAL) { -// cur->win.skey = tsArray[pos]; -// } - -// int32_t start = -1, end = -1; -// getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end); - -// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end); -// pos += (end - start + 1) * step; - -// cur->win.ekey = ascScan ? tsArray[end] : tsArray[start]; -// cur->lastKey = cur->win.ekey + step; -// cur->mixBlock = true; -// } -// } -// } - -// cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) || -// ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan)); - -// if (!ascScan) { -// TSWAP(cur->win.skey, cur->win.ekey); -// } - -// updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); -// doCheckGeneratedBlockRange(pTsdbReadHandle); - -// tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", -// pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, -// pTsdbReadHandle->idStr); -// } - static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) { taosMemoryFreeClear(pSup->numOfBlocksPerTable); taosMemoryFreeClear(pSup->indexPerTable); @@ -1252,8 +836,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); - SBlockOrderSupporter sup = {0}; + int64_t st = taosGetTimestampUs(); + SBlockOrderSupporter sup = {0}; int32_t code = initBlockOrderSupporter(&sup, numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1302,11 +887,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i}; taosArrayPush(pBlockIter->blockList, &blockInfo); } - tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", pReader, cnt, - pReader->idStr); - pBlockIter->index = asc ? 0 : (numOfBlocks - 1); + int64_t et = taosGetTimestampUs(); + tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt, + (et - st)/1000.0, pReader->idStr); + pBlockIter->index = asc ? 0 : (numOfBlocks - 1); cleanupBlockOrderSupporter(&sup); return TSDB_CODE_SUCCESS; } @@ -1340,7 +926,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); } - tsdbDebug("%p %d data blocks sort completed, %s", pReader, cnt, pReader->idStr); + int64_t et = taosGetTimestampUs(); + tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr); cleanupBlockOrderSupporter(&sup); taosMemoryFree(pTree); @@ -1813,6 +1400,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* SBlockData* pBlockData = &pReader->status.fileBlockData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + int64_t st = taosGetTimestampUs(); + while (1) { // todo check the validate of row in file block { @@ -1851,10 +1440,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); + int64_t et = taosGetTimestampUs(); - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", pReader, + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, - pReader->idStr); + (et - st)/1000.0, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -2031,7 +1621,9 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { SReaderStatus* pStatus = &pReader->status; - SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); + + size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx)); while (1) { bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); @@ -2799,24 +2391,57 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { - int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, idstr); - if (code) { + int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr); + if (code != TSDB_CODE_SUCCESS) { goto _err; } - if (pCond->suid != 0) { - (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1); - } else if (taosArrayGetSize(pTableList) > 0) { - STableKeyInfo* pKey = taosArrayGet(pTableList, 0); - (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1); - } - + // check for query time window STsdbReader* pReader = *ppReader; if (isEmptyQueryTimeWindow(&pReader->window)) { tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); return TSDB_CODE_SUCCESS; } + if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { + // update the SQueryTableDataCond to create inner reader + STimeWindow w = pCond->twindows; + int32_t order = pCond->order; + if (order == TSDB_ORDER_ASC) { + pCond->twindows.ekey = pCond->twindows.skey; + pCond->twindows.skey = INT64_MIN; + pCond->order = TSDB_ORDER_DESC; + } else { + pCond->twindows.skey = pCond->twindows.ekey; + pCond->twindows.ekey = INT64_MAX; + pCond->order = TSDB_ORDER_ASC; + } + + code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (order == TSDB_ORDER_ASC) { + pCond->twindows.skey = w.ekey; + pCond->twindows.ekey = INT64_MAX; + } else { + pCond->twindows.skey = INT64_MIN; + pCond->twindows.ekey = w.ekey; + } + code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + } + + if (pCond->suid != 0) { + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); + } else if (taosArrayGetSize(pTableList) > 0) { + STableKeyInfo* pKey = taosArrayGet(pTableList, 0); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1); + } + int32_t numOfTables = taosArrayGetSize(pTableList); pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables); if (pReader->status.pTableMap == NULL) { @@ -2827,21 +2452,41 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - SDataBlockIter* pBlockIter = &pReader->status.blockIter; - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); - if (code) goto _err; + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } - initFilesetIterator(&pReader->status.fileIter, (*ppReader)->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { + SDataBlockIter* pBlockIter = &pReader->status.blockIter; - // no data in files, let's try buffer in memory - if (pReader->status.fileIter.numOfFiles == 0) { - pReader->status.loadFromFile = false; + initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + + // no data in files, let's try buffer in memory + if (pReader->status.fileIter.numOfFiles == 0) { + pReader->status.loadFromFile = false; + } else { + code = initForFirstBlockInFile(pReader, pBlockIter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } } else { - code = initForFirstBlockInFile(pReader, pBlockIter); - if (code != TSDB_CODE_SUCCESS) { - return code; + STsdbReader* pPrevReader = pReader->innerReader[0]; + SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; + + initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr); + resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order); + + // no data in files, let's try buffer in memory + if (pPrevReader->status.fileIter.numOfFiles == 0) { + pPrevReader->status.loadFromFile = false; + } else { + code = initForFirstBlockInFile(pPrevReader, pBlockIter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } @@ -2881,20 +2526,6 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } -#if 0 -// if (pReader->status.pTableScanInfo != NULL) { -// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo); -// } - -// tsdbDestroyReadH(&pReader->rhelper); - -// tdFreeDataCols(pReader->pDataCols); -// pReader->pDataCols = NULL; -// -// pReader->prev = doFreeColumnInfoData(pReader->prev); -// pReader->next = doFreeColumnInfoData(pReader->next); -#endif - SIOCostSummary* pCost = &pReader->cost; tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64 @@ -2907,55 +2538,100 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFreeClear(pReader); } -bool tsdbNextDataBlock(STsdbReader* pReader) { - if (isEmptyQueryTimeWindow(&pReader->window)) { - return false; - } - +static bool doTsdbNextDataBlock(STsdbReader* pReader) { // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); int64_t stime = taosGetTimestampUs(); - int64_t elapsedTime = stime; SReaderStatus* pStatus = &pReader->status; - if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { - if (pStatus->loadFromFile) { - int32_t code = buildBlockFromFiles(pReader); - if (code != TSDB_CODE_SUCCESS) { - return false; - } + if (pStatus->loadFromFile) { + int32_t code = buildBlockFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } - if (pBlock->info.rows > 0) { - return true; - } else { - buildBlockFromBufferSequentially(pReader); - return pBlock->info.rows > 0; - } - } else { // no data in files, let's try the buffer + if (pBlock->info.rows > 0) { + return true; + } else { buildBlockFromBufferSequentially(pReader); return pBlock->info.rows > 0; } - } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { - } else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) { - } else { - ASSERT(0); + } else { // no data in files, let's try the buffer + buildBlockFromBufferSequentially(pReader); + return pBlock->info.rows > 0; } + return false; } -void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { +bool tsdbNextDataBlock(STsdbReader* pReader) { + if (isEmptyQueryTimeWindow(&pReader->window)) { + return false; + } + + if (pReader->innerReader[0] != NULL) { + bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); + if (ret) { + pReader->step = EXTERNAL_ROWS_PREV; + return ret; + } + + tsdbReaderClose(pReader->innerReader[0]); + pReader->innerReader[0] = NULL; + } + + pReader->step = EXTERNAL_ROWS_MAIN; + bool ret = doTsdbNextDataBlock(pReader); + if (ret) { + return ret; + } + + if (pReader->innerReader[1] != NULL) { + bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); + if (ret1) { + pReader->step = EXTERNAL_ROWS_NEXT; + return ret1; + } + + tsdbReaderClose(pReader->innerReader[1]); + pReader->innerReader[1] = NULL; + } + + return false; +} + +static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { ASSERT(pDataBlockInfo != NULL && pReader != NULL); pDataBlockInfo->rows = pReader->pResBlock->info.rows; pDataBlockInfo->uid = pReader->pResBlock->info.uid; pDataBlockInfo->window = pReader->pResBlock->info.window; } +void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { + if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + if (pReader->step == EXTERNAL_ROWS_MAIN) { + setBlockInfo(pReader, pDataBlockInfo); + } else if (pReader->step == EXTERNAL_ROWS_PREV) { + setBlockInfo(pReader->innerReader[0], pDataBlockInfo); + } else { + setBlockInfo(pReader->innerReader[1], pDataBlockInfo); + } + } else { + setBlockInfo(pReader, pDataBlockInfo); + } +} + int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t code = 0; *allHave = false; + if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + *pBlockStatis = NULL; + return TSDB_CODE_SUCCESS; + } + // there is no statistics data for composed block if (pReader->status.composedDataBlock) { *pBlockStatis = NULL; @@ -3025,7 +2701,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS return code; } -SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { +static SArray* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; if (pStatus->composedDataBlock) { @@ -3054,16 +2730,27 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { return pReader->pResBlock->pDataBlock; } +SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { + if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + if (pReader->step == EXTERNAL_ROWS_PREV) { + return doRetrieveDataBlock(pReader->innerReader[0]); + } else if (pReader->step == EXTERNAL_ROWS_NEXT) { + return doRetrieveDataBlock(pReader->innerReader[1]); + } + } + + return doRetrieveDataBlock(pReader); +} + int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { if (isEmptyQueryTimeWindow(&pReader->window)) { return TSDB_CODE_SUCCESS; } pReader->order = pCond->order; - pReader->type = BLOCK_LOAD_OFFSET_ORDER; + pReader->type = TIMEWINDOW_RANGE_CONTAINED; pReader->status.loadFromFile = true; pReader->status.pTableIter = NULL; - pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); // allocate buffer in order to load data blocks from file @@ -3073,10 +2760,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; tsdbDataFReaderClose(&pReader->pFileReader); - // todo set the correct numOfTables - int32_t numOfTables = 1; - SDataBlockIter* pBlockIter = &pReader->status.blockIter; - + int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); tsdbDataFReaderClose(&pReader->pFileReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr); @@ -3084,18 +2768,23 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { resetDataBlockScanInfo(pReader->status.pTableMap); int32_t code = 0; + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { pReader->status.loadFromFile = false; } else { code = initForFirstBlockInFile(pReader, pBlockIter); if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", + pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); return code; } } tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + return code; } @@ -3186,7 +2875,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { STbData* d = NULL; if (pReader->pTsdb->mem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); + tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d); if (d != NULL) { rows += tsdbGetNRowsInTbData(d); } @@ -3194,7 +2883,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { STbData* di = NULL; if (pReader->pTsdb->imem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); + tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di); if (di != NULL) { rows += tsdbGetNRowsInTbData(di); } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 23732a6f9a627b192a739db94dd068fecbc9fc26..be97b20455349818fdfccb9d893f46d0ea756ccf 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); -void closeAllResultRows(SResultRowInfo* pResultRowInfo); - void initResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow); @@ -96,6 +94,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } +static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { + void* pPage = getBufPage(pBuf, pos->pageId); + setBufPageDirty(pPage, true); +} + void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1ad17bbc76644686a6722f90760035107b830918..f4d0eb3b5ed3c1ef546df87f0e03a4eaea78a75e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -108,7 +108,6 @@ typedef struct STaskCostInfo { SFileBlockLoadRecorder* pRecoder; uint64_t elapsedTime; - uint64_t firstStageMergeTime; uint64_t winInfoSize; uint64_t tableInfoSize; uint64_t hashSize; @@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo { SLimitInfo limitInfo; bool mergeDataBlocks; SSDataBlock* pFinalRes; + SNode* pCondition; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a76253ab2074f84987a9cd1964c3406823548ea0..ec8e3c4abba40e8c03b26b21217f21739da2d93e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { } } -void closeAllResultRows(SResultRowInfo* pResultRowInfo) { - // do nothing -} - bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } @@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { SArray* createSortInfo(SNodeList* pNodeList) { size_t numOfCols = 0; + if (pNodeList != NULL) { numOfCols = LIST_LENGTH(pNodeList); } else { numOfCols = 0; } + SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); if (pList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); - /*if (!pDescNode->output) { // todo disable it temporarily*/ - /*continue;*/ - /*}*/ - SColumnInfoData idata = createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); idata.info.scale = pDescNode->dataType.scale; @@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu } } -#ifdef BUF_PAGE_DEBUG - qDebug("page_setSelect num:%d", num); -#endif if (p != NULL) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; @@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi // TODO: get it from stable scan node pCond->twindows = pTableScanNode->scanRange; pCond->suid = pTableScanNode->scan.suid; - pCond->type = BLOCK_LOAD_OFFSET_ORDER; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; // pCond->type = pTableScanNode->scanFlag; @@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter } // get the correct time window according to the handled timestamp +// todo refactor STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order) { STimeWindow w = {0}; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7bac828a537755e67eb002e3326688d275fb3cb6..38da9de32c8aad3a1ca366554e81a4d8ebdec60e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); // pSummary->hashSize = hashSize; - // add the merge time - pSummary->elapsedTime += pSummary->firstStageMergeTime; - // SResultRowPool* p = pTaskInfo->pool; // if (p != NULL) { // pSummary->winInfoSize = getResultRowPoolMemSize(p); @@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // pSummary->winInfoSize = 0; // pSummary->numOfTimeWindows = 0; // } - // - // calculateOperatorProfResults(pQInfo); SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; if (pSummary->pRecoder != NULL) { - qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 - " us, total blocks:%d, " - "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, - GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks, - pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows); + qDebug( + "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%" + PRId64 ", check rows:%" PRId64, GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, + pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, + pRecorder->totalCheckedRows); } + // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); @@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } - closeAllResultRows(&pAggInfo->binfo.resultRowInfo); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); @@ -3328,6 +3323,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.groupId; + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); continue; } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { // now it is the data from a new group @@ -3336,6 +3332,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // ignore data block in current group if (pLimitInfo->remainGroupOffset > 0) { + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); continue; } } @@ -3380,10 +3377,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pLimitInfo->remainOffset >= pInfo->pRes->info.rows) { pLimitInfo->remainOffset -= pInfo->pRes->info.rows; blockDataCleanup(pInfo->pRes); + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); continue; } else if (pLimitInfo->remainOffset < pInfo->pRes->info.rows && pLimitInfo->remainOffset > 0) { blockDataTrimFirstNRows(pInfo->pRes, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); } // check for the limitation in each group @@ -3391,6 +3390,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pLimitInfo->numOfOutputRows + pInfo->pRes->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pInfo->pRes, keepRows); + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { pOperator->status = OP_EXEC_DONE; } @@ -3400,27 +3400,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } - // no results generated - if (pInfo->pRes->info.rows == 0 || (!pProjectInfo->mergeDataBlocks)) { - break; - } - - if (pProjectInfo->mergeDataBlocks) { - pFinalRes->info.groupId = pInfo->pRes->info.groupId; - pFinalRes->info.version = pInfo->pRes->info.version; + if (pProjectInfo->mergeDataBlocks && pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) { + if (pRes->info.rows > 0) { + pFinalRes->info.groupId = pRes->info.groupId; + pFinalRes->info.version = pRes->info.version; - // continue merge data, ignore the group id - blockDataMerge(pFinalRes, pInfo->pRes); - - if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold) { - continue; + // continue merge data, ignore the group id + blockDataMerge(pFinalRes, pRes); + if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold) { + continue; + } } - } - // do apply filter - SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes; - doFilter(pProjectInfo->pFilterNode, p, NULL); - if (p->info.rows > 0) { + // do apply filter + doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL); + if (pFinalRes->info.rows > 0 || pRes->info.rows == 0) { + break; + } + } else { + // do apply filter + if (pRes->info.rows > 0) { + doFilter(pProjectInfo->pFilterNode, pRes, NULL); + if (pRes->info.rows == 0) { + continue; + } + } + break; } } @@ -3884,8 +3889,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; - pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - + pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; @@ -4416,7 +4420,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pCond->suid = uid; - pCond->type = BLOCK_LOAD_OFFSET_ORDER; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->startVersion = -1; pCond->endVersion = -1; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0f1272c9642936f7e82be7ffa26295ea1bdd09b1..1e001a29a09ccf08641ea007f498c1fa44b96ed9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResultRow(pResult, tableGroupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } } @@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResultRow(pResult, tableGroupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } ekey = ascScan ? nextWin.ekey : nextWin.skey; @@ -1092,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); } - closeAllResultRows(&pInfo->binfo.resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); OPTR_SET_OPENED(pOperator); @@ -1248,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pBInfo->resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -2043,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pBInfo->resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -2207,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SSDataBlock* pResBlock = pSliceInfo->pRes; SExprSupp* pSup = &pOperator->exprSupp; - blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); - // if (pOperator->status == OP_RES_TO_RETURN) { // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { @@ -2348,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); - pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->win = pInterpPhyNode->timeRange; + pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; - pInfo->current = pInfo->win.skey; + pInfo->current = pInfo->win.skey; pOperator->name = "TimeSliceOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; @@ -2542,6 +2539,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr } if (find && pUpdated) { saveResultRow(pCurResult, pWinRes->groupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur); } } } @@ -2662,6 +2660,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { saveResultRow(pResult, tableGroupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ebccb7950cf537d255881a81443e5723873a1ab3..d77e42388b454d6a6d6b3c39ee68d8b3963a7dda 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } -int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { +int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; SSDataBlock *pRes = NULL; @@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - if (queryEnd) { - *queryEnd = true; + if (queryStop) { + *queryStop = true; } break; @@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); if (!qcontinue) { + if (queryStop) { + *queryStop = true; + } + break; } @@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWPhaseInput input = {0}; void *rsp = NULL; int32_t dataLen = 0; - bool queryEnd = false; + bool queryStop = false; do { QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); @@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8((int8_t *)&ctx->queryInQueue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; @@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9fd4e483c28d1e6a0c08618e98bd5a30aae93807..f256c9603724bcfdeeaa5d007c55e9cb8a2bcd10 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -300,6 +300,8 @@ int transSendResponse(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int transGetSockDebugInfo(struct sockaddr* sockname, char* dst); + int64_t transAllocHandle(); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index da59dc605fdc0d54e0af31124b575eb8a8a12ad6..70d56dca139972d33aa3f0e633727dadb0b35ea9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -37,9 +37,11 @@ typedef struct SCliConn { uint32_t port; SDelayTask* task; + // debug and log info - struct sockaddr_in addr; - struct sockaddr_in localAddr; + char src[32]; + char dst[32]; + } SCliConn; typedef struct SCliMsg { @@ -95,6 +97,14 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); +static int sockDebugInfo(struct sockaddr* sockname, char* dst) { + struct sockaddr_in addr = *(struct sockaddr_in*)sockname; + + char buf[20] = {0}; + int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); + sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); + return r; +} // register timer in each thread to clear expire conn // static void cliTimeoutCb(uv_timer_t* handle); // alloc buf for recv @@ -363,9 +373,9 @@ void cliHandleResp(SCliConn* conn) { } STraceId* trace = &transMsg.info.traceId; - tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn), - conn, TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), - taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); + + tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn), conn, + TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, transMsg.code); if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); @@ -741,9 +751,8 @@ void cliSend(SCliConn* pConn) { uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); STraceId* trace = &pMsg->info.traceId; - tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port)); + tGTrace("%s conn %p %s is sent to %s, local info %s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), + pConn->dst, pConn->src); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -764,11 +773,16 @@ void cliConnCb(uv_connect_t* req, int status) { cliHandleExcept(pConn); return; } - int addrlen = sizeof(pConn->addr); - uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen); + // int addrlen = sizeof(pConn->addr); + struct sockaddr peername, sockname; + int addrlen = sizeof(peername); + + uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); + transGetSockDebugInfo(&peername, pConn->dst); - addrlen = sizeof(pConn->localAddr); - uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen); + addrlen = sizeof(sockname); + uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); + transGetSockDebugInfo(&sockname, pConn->src); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 6ac75a75e15aa068ce5cbed3aa17c14583a74ad7..155cdd1062afd12093afff9496ae4a36583e93a3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -102,7 +102,14 @@ void transFreeMsg(void* msg) { } taosMemoryFree((char*)msg - sizeof(STransMsgHead)); } +int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) { + struct sockaddr_in addr = *(struct sockaddr_in*)sockname; + char buf[20] = {0}; + int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); + sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); + return r; +} int transInitBuffer(SConnBuffer* buf) { transClearBuffer(buf); return 0; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index ac14e22a51e2ff69bf5ec534a13bc735d95b60d4..fe7ab47feebd8e5ac40327f37b9334fa8eb22093 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -43,9 +43,13 @@ typedef struct SSvrConn { SSvrRegArg regArg; bool broken; // conn broken; - ConnStatus status; - struct sockaddr_in addr; - struct sockaddr_in localAddr; + ConnStatus status; + + uint32_t clientIp; + uint16_t port; + + char src[32]; + char dst[32]; int64_t refId; int spi; @@ -248,15 +252,11 @@ static void uvHandleReq(SSvrConn* pConn) { if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen); + tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d", transLabel(pTransInst), pConn, + TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen); } else { - tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d, code:%d", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), - transMsg.contLen, pHead->noResp, transMsg.code); - // no ref here + tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, resp:%d, code:%d", transLabel(pTransInst), + pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code); } // pHead->noResp = 1, @@ -278,14 +278,13 @@ static void uvHandleReq(SSvrConn* pConn) { // set up conn info SRpcConnInfo* pConnInfo = &(transMsg.info.conn); - pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr); - pConnInfo->clientPort = ntohs(pConn->addr.sin_port); + pConnInfo->clientIp = pConn->clientIp; + pConnInfo->clientPort = pConn->port; tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); - // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -418,9 +417,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pMsg->info.traceId; - tGTrace("%s conn %p %s is sent to %s:%d, local info:%s:%d, msglen:%d", transLabel(pTransInst), pConn, - TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len); + tGTrace("%s conn %p %s is sent to %s, local info:%s, msglen:%d", transLabel(pTransInst), pConn, + TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len); pHead->msgLen = htonl(len); wb->base = msg; @@ -646,20 +644,26 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tTrace("conn %p created, fd:%d", pConn, fd); - int addrlen = sizeof(pConn->addr); - if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { + struct sockaddr peername, sockname; + int addrlen = sizeof(peername); + if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&peername, &addrlen)) { tError("conn %p failed to get peer info", pConn); transUnrefSrvHandle(pConn); return; } + transGetSockDebugInfo(&peername, pConn->dst); - addrlen = sizeof(pConn->localAddr); - if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) { + addrlen = sizeof(sockname); + if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) { tError("conn %p failed to get local info", pConn); transUnrefSrvHandle(pConn); return; } + transGetSockDebugInfo(&sockname, pConn->src); + struct sockaddr_in addr = *(struct sockaddr_in*)&sockname; + pConn->clientIp = addr.sin_addr.s_addr; + pConn->port = ntohs(addr.sin_port); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); } else { diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 8990c24305b8676f195a5cc715b35d78a84d0e8d..d5ffc1b7c1db1b86d238dffb1eb3b78724892ffe 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -1327,6 +1327,8 @@ class Task(): # TDengine 3.0 Error Codes: 0x0333, # Object is creating # TODO: this really is NOT an acceptable error + 0x0369, # Tag already exists + 0x0388, # Database not exist 0x03A0, # STable already exists 0x03A1, # STable [does] not exist 0x03AA, # Tag already exists diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1363e0f4a3dace70228de5b1ed907add4ed349b9..a606311f3c3676141a401b891eb142335244a6ee 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -101,7 +101,7 @@ ./test.sh -f tsim/parser/constCol.sim #./test.sh -f tsim/parser/create_db.sim ./test.sh -f tsim/parser/create_mt.sim -# TD-17653 ./test.sh -f tsim/parser/create_tb_with_tag_name.sim +./test.sh -f tsim/parser/create_tb_with_tag_name.sim ./test.sh -f tsim/parser/create_tb.sim ./test.sh -f tsim/parser/dbtbnameValidate.sim ./test.sh -f tsim/parser/distinct.sim diff --git a/tests/script/tsim/parser/create_tb_with_tag_name.sim b/tests/script/tsim/parser/create_tb_with_tag_name.sim index a0e8dab99e5e95e0b3973218194c6b3e7707dfef..c4d9c11648cb03c00c9b596810d690e1111a6349 100644 --- a/tests/script/tsim/parser/create_tb_with_tag_name.sim +++ b/tests/script/tsim/parser/create_tb_with_tag_name.sim @@ -93,7 +93,7 @@ sql_error create table tb11 using st2 (id,t1,) tags (1,1,1); sql create table tb12 using st2 (t1,id) tags (2,1); sql show tags from tb12; -if $rows != 5 then +if $rows != 4 then return -1 endi if $data05 != 1 then @@ -109,9 +109,9 @@ if $data35 != NULL then return -1 endi -sql create table tb13 using st2 ("t1",'id') tags (2,1); +sql create table tb13 using st2 (t1,id) tags (2,1); sql show tags from tb13; -if $rows != 2 then +if $rows != 4 then return -1 endi if $data05 != 1 then diff --git a/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py b/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py new file mode 100644 index 0000000000000000000000000000000000000000..22160002145fa5b96e5ab5651deb35a28eee2f32 --- /dev/null +++ b/tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py @@ -0,0 +1,250 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 100 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("flush db to let data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 500, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + # after start consume, continue insert some data + paraDict['batchNum'] = 100 + paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + pInsertThread.join() + + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + + tdLog.info("wait the consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + if expectRowsList[0] != resultList[0]: + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("flush database %s"%(paraDict['dbName'])) + + for i in range(len(topicNameList)): + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 500, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + totalRowsInserted = expectRowsList[0] + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 1 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3) + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 0") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + actConsumeRows = resultList[0] + + tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) + if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows): + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # reinit consume info, and start tmq_sim, then check consume result + tmqCom.initConsumerTable() + consumerId = 2 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 1") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + actConsumeRows = resultList[0] + tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) + if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)): + tdLog.exit("%d tmq consume rows error!"%consumerId) + + for i in range(len(topicNameList)): + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + self.tmqCase1() + self.tmqCase2() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/dataFromTsdbNWal.py b/tests/system-test/7-tmq/dataFromTsdbNWal.py index 227ce9d5a5c51e28ccbe009c7d472a18fbb297c6..faa70f482077d66cff9861eaf313a0f3698f5c96 100644 --- a/tests/system-test/7-tmq/dataFromTsdbNWal.py +++ b/tests/system-test/7-tmq/dataFromTsdbNWal.py @@ -17,8 +17,8 @@ from tmqCommon import * class TDTestCase: def __init__(self): - self.vgroups = 1 - self.ctbNum = 100 + self.vgroups = 4 + self.ctbNum = 1 self.rowsPerTbl = 10000 def init(self, conn, logSql): @@ -38,9 +38,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 100, + 'ctbNum': 1, 'rowsPerTbl': 10000, - 'batchNum': 3000, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, 'showMsg': 1, @@ -85,7 +85,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, + 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, 'snapshot': 1} @@ -117,17 +117,16 @@ class TDTestCase: keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - # after start consume, continue insert some data paraDict['batchNum'] = 100 paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - # + pInsertThread.join() + tdSql.query(queryString) expectRowsList.append(tdSql.getRows()) @@ -135,15 +134,16 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) if expectRowsList[0] != resultList[0]: tdLog.exit("%d tmq consume rows error!"%consumerId) tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + tdSql.query("flush database %s"%(paraDict['dbName'])) + for i in range(len(topicNameList)): + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") @@ -204,13 +204,12 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - - if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): - tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) + actConsumeRows = resultList[0] + + tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) + if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows): tdLog.exit("%d tmq consume rows error!"%consumerId) - - firstConsumeRows = resultList[0] - + # reinit consume info, and start tmq_sim, then check consume result tmqCom.initConsumerTable() consumerId = 2 @@ -224,15 +223,13 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - actConsumeTotalRows = firstConsumeRows + resultList[0] - - if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): - tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) - tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) + actConsumeRows = resultList[0] + tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted)) + if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)): tdLog.exit("%d tmq consume rows error!"%consumerId) - time.sleep(10) for i in range(len(topicNameList)): + tmqCom.waitSubscriptionExit(tdSql,topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 2 end ...... ") @@ -241,7 +238,7 @@ class TDTestCase: tdSql.prepare() self.prepareTestEnv() self.tmqCase1() - # self.tmqCase2() + self.tmqCase2() def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index cec6985a4ea4c10aa8ed82dd672734601393461f..5117ee3d24c289e695eb3113e643e94a6eb01e53 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -151,41 +151,6 @@ class TDTestCase: if not (totalConsumeRows == totalRowsFromQury): tdLog.exit("tmq consume rows error!") - - - - # tdLog.info("****************************************************************************") - # tmqCom.initConsumerTable() - # consumerId = 1 - # expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 - # topicList = topicFromStb1 - # ifcheckdata = 0 - # ifManualCommit = 0 - # keyList = 'group.id:cgrp2,\ - # enable.auto.commit:true,\ - # auto.commit.interval.ms:3000,\ - # auto.offset.reset:earliest' - # tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - # tdLog.info("start consume processor") - # tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - - # expectRows = 1 - # resultList = tmqCom.selectConsumeResult(expectRows) - # totalConsumeRows = 0 - # for i in range(expectRows): - # totalConsumeRows += resultList[i] - - # tdSql.query(queryString) - # totalRowsFromQury = tdSql.getRows() - - # tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) - # if not (totalConsumeRows == totalRowsFromQury): - # tdLog.exit("tmq consume rows error!") - - - # tdLog.info("****************************************************************************") - tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1) @@ -259,7 +224,7 @@ class TDTestCase: tdLog.info("create some new child table and insert data ") paraDict["batchNum"] = 100 paraDict["ctbPrefix"] = 'newCtb' - # tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("insert process end, and start to check consume result") expectRows = 1 diff --git a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py new file mode 100644 index 0000000000000000000000000000000000000000..650d9188285580b649fca67cd1971ed9ae9c7041 --- /dev/null +++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py @@ -0,0 +1,225 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.vgroups = 4 + self.ctbNum = 1000 + self.rowsPerTbl = 10 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + # drop some ntbs + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ntb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 100, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'endTs': 0, + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdLog.info("start create database....") + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("start create normal tables....") + tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) + tdLog.info("start insert data into normal tables....") + tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) + + tdLog.info("create topics from database") + topicFromDb = 'topic_dbt' + tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) + + if self.snapshot == 0: + consumerId = 0 + elif self.snapshot == 1: + consumerId = 1 + + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) + topicList = topicFromDb + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tdLog.info("drop some ntables") + # drop 1/4 ctbls from half offset + paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) + paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) + tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + + if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): + tdLog.exit("tmq consume rows error with snapshot = 0!") + + tdLog.info("wait subscriptions exit ....") + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) + + tdSql.query("drop topic %s"%topicFromDb) + tdLog.info("success dorp topic: %s"%topicFromDb) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + + + # drop some ntbs and create some new ntbs + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ntb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 100, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'endTs': 0, + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdLog.info("start create database....") + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("start create normal tables....") + tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) + tdLog.info("start insert data into normal tables....") + tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) + + tdLog.info("create topics from database") + topicFromDb = 'topic_dbt' + tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) + + if self.snapshot == 0: + consumerId = 2 + elif self.snapshot == 1: + consumerId = 3 + + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) + topicList = topicFromDb + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tdLog.info("drop some ntables") + # drop 1/4 ctbls from half offset + paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) + paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) + tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) + + tdLog.info("start create some new normal tables....") + paraDict["ctbPrefix"] = 'newCtb' + paraDict["ctbNum"] = self.ctbNum + tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) + tdLog.info("start insert data into these new normal tables....") + tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + + if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): + tdLog.exit("tmq consume rows error with snapshot = 0!") + + tdLog.info("wait subscriptions exit ....") + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) + + tdSql.query("drop topic %s"%topicFromDb) + tdLog.info("success dorp topic: %s"%topicFromDb) + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdLog.printNoPrefix("=============================================") + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.snapshot = 0 + self.tmqCase1() + self.tmqCase2() + + # tdLog.printNoPrefix("====================================================================") + # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + # self.snapshot = 1 + # self.tmqCase1() + # self.tmqCase2() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqDropNtb.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot1.py similarity index 93% rename from tests/system-test/7-tmq/tmqDropNtb.py rename to tests/system-test/7-tmq/tmqDropNtb-snapshot1.py index e1f5794ce278d05cf316365fff0a3ffa91f58e27..b23f42258584164b1868eaca4f884cf478bd40b0 100644 --- a/tests/system-test/7-tmq/tmqDropNtb.py +++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot1.py @@ -18,7 +18,7 @@ class TDTestCase: def __init__(self): self.snapshot = 0 self.vgroups = 4 - self.ctbNum = 100 + self.ctbNum = 1000 self.rowsPerTbl = 10 def init(self, conn, logSql): @@ -39,9 +39,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ntb', 'ctbStartIdx': 0, - 'ctbNum': 100, - 'rowsPerTbl': 1000, - 'batchNum': 1000, + 'ctbNum': 1000, + 'rowsPerTbl': 100, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'endTs': 0, 'pollDelay': 5, @@ -125,9 +125,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ntb', 'ctbStartIdx': 0, - 'ctbNum': 100, - 'rowsPerTbl': 1000, - 'batchNum': 1000, + 'ctbNum': 1000, + 'rowsPerTbl': 100, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'endTs': 0, 'pollDelay': 10, @@ -203,16 +203,16 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 2 end ...... ") def run(self): - tdLog.printNoPrefix("=============================================") - tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") - self.snapshot = 0 + # tdLog.printNoPrefix("=============================================") + # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + # self.snapshot = 0 # self.tmqCase1() - self.tmqCase2() + # self.tmqCase2() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 - # self.tmqCase1() + self.tmqCase1() self.tmqCase2() def stop(self): diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 5cc7aca67512f432693db2985cf46245c5ef3655..f074bd885045553cbe94029bd128dbc2235aea7a 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -210,7 +210,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py -#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py +python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py @@ -219,12 +219,14 @@ python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py python3 ./test.py -f 7-tmq/tmqDropStb.py python3 ./test.py -f 7-tmq/tmqDropStbCtb.py -python3 ./test.py -f 7-tmq/tmqDropNtb.py +python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py +python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py python3 ./test.py -f 7-tmq/tmqUdf.py python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py - +python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py +python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py # python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py #------------querPolicy 2----------- diff --git a/tools/taos-tools b/tools/taos-tools index 9cfa195713d1cae9edf417a8d49bde87dd971016..0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 9cfa195713d1cae9edf417a8d49bde87dd971016 +Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a