提交 e6644d85 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/main' into fix/main_bugfix_wxy

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 181bcac
GIT_TAG a0234fe
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -215,6 +215,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
......@@ -1112,9 +1113,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -2186,17 +2187,17 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
} else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
tstrerror(code), pReader->idStr);
return code;
}
}
} else {
tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
}
STbData* di = NULL;
......@@ -2207,17 +2208,17 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
} else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
tstrerror(code), pReader->idStr);
return code;
}
}
} else {
tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
}
initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
......@@ -2526,6 +2527,14 @@ _end:
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
if (pDelSkyline == NULL) {
return 0;
}
return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
STbData* piMemTbData) {
if (pBlockScanInfo->delSkyline != NULL) {
......@@ -2543,7 +2552,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (pIdx != NULL) {
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
}
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -2572,11 +2580,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
}
taosArrayDestroy(pDelData);
pBlockScanInfo->iter.index =
ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);
pBlockScanInfo->iter.index = index;
pBlockScanInfo->iiter.index = index;
pBlockScanInfo->fileDelIndex = index;
pBlockScanInfo->lastBlockDelIndex = index;
return code;
_err:
......@@ -2676,7 +2686,7 @@ static int32_t uidComparFunc(const void* p1, const void* p2) {
}
}
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) {
int32_t index = 0;
int32_t total = taosHashGetSize(pStatus->pTableMap);
......@@ -2690,7 +2700,21 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
// reset the last del file index
static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) {
void* p = taosHashIterate(pStatus->pTableMap, NULL);
while (p != NULL) {
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
// reset the last del file index
pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
p = taosHashIterate(pStatus->pTableMap, p);
}
}
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
int32_t total = taosHashGetSize(pStatus->pTableMap);
if (total == 0) {
return TSDB_CODE_SUCCESS;
......@@ -2703,7 +2727,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt
return TSDB_CODE_OUT_OF_MEMORY;
}
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
uint64_t uid = pOrderCheckInfo->tableUidList[0];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
} else {
......@@ -2720,7 +2744,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt
}
pOrderCheckInfo->tableUidList = p;
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
uid = pOrderCheckInfo->tableUidList[0];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
......@@ -2740,11 +2764,7 @@ static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
if (pStatus->pTableIter == NULL) {
return false;
}
return true;
return (pStatus->pTableIter != NULL);
}
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
......@@ -2752,7 +2772,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pReader);
if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
return code;
}
......@@ -2817,6 +2837,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
ASSERT(pBlockInfo != NULL);
if (pBlockInfo != NULL) {
pScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
......@@ -2837,7 +2859,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (pBlockInfo == NULL) { // build data block from last data file
/*if (pBlockInfo == NULL) { // build data block from last data file
SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData);
......@@ -2869,7 +2891,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
} else*/ if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -3040,6 +3062,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// this file does not have data files, let's start check the last block file if exists
if (pBlockIter->numOfBlocks == 0) {
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin;
}
}
......@@ -3071,6 +3094,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
......@@ -3082,6 +3106,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) {
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
goto _begin;
}
}
......
......@@ -192,7 +192,7 @@ void streamStateClose(SStreamState* pState) {
}
int32_t streamStateBegin(SStreamState* pState) {
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
return -1;
......@@ -208,7 +208,7 @@ int32_t streamStateCommit(SStreamState* pState) {
return -1;
}
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
......@@ -220,7 +220,7 @@ int32_t streamStateAbort(SStreamState* pState) {
return -1;
}
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
......
......@@ -28,14 +28,14 @@ static void median(void *src, int64_t size, int64_t s, int64_t e, const void *pa
void *buf) {
int32_t mid = ((int32_t)(e - s) >> 1u) + (int32_t)s;
if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) == 1) {
if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) > 0) {
doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf);
}
if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, e), param) == 1) {
if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, e), param) > 0) {
doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf);
doswap(elePtrAt(src, size, mid), elePtrAt(src, size, e), size, buf);
} else if (comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) == 1) {
} else if (comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) > 0) {
doswap(elePtrAt(src, size, s), elePtrAt(src, size, e), size, buf);
}
......@@ -47,7 +47,7 @@ static void tInsertSort(void *src, int64_t size, int32_t s, int32_t e, const voi
void *buf) {
for (int32_t i = s + 1; i <= e; ++i) {
for (int32_t j = i; j > s; --j) {
if (comparFn(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), param) == -1) {
if (comparFn(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), param) < 0) {
doswap(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), size, buf);
} else {
break;
......
......@@ -63,4 +63,38 @@ if $rows != 8198 then
return -1
endi
print ===========================> TD-22077 && TD-21877
sql drop database if exists $db -x step1
sql create database $db vgroups 1;
sql use $db
sql create stable st1 (ts timestamp, c int) tags(a int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
$i = 0
$ts = 1674977959000
$rowNum = 200
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
sql insert into t1 values ( $ts , $x )
sql insert into t2 values ( $ts + 1000a, $x )
$x = $x + 1
$ts = $ts + 1000
endw
sql flush database $db
sql insert into t1 values('2018-09-17 09:00:26', 26);
sql insert into t2 values('2018-09-17 09:00:25', 25);
sql insert into t2 values('2018-09-17 09:00:30', 30);
sql flush database reg_db0;
sql delete from st1 where ts<='2018-9-17 09:00:26';
sql select * from st1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册