提交 e81b344b 编写于 作者: J jiacy-jcy

Merge branch 'main' into test/cyjia-main

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 94d6895 GIT_TAG 5aa25e9
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -273,7 +273,7 @@ if [ "$osType" != "Darwin" ]; then ...@@ -273,7 +273,7 @@ if [ "$osType" != "Darwin" ]; then
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
${csudo}./make-taos-tools-deb.sh ${top_dir} \ ${csudo}./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${verNumberComp}
fi fi
fi fi
else else
...@@ -298,7 +298,7 @@ if [ "$osType" != "Darwin" ]; then ...@@ -298,7 +298,7 @@ if [ "$osType" != "Darwin" ]; then
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
${csudo}./make-taos-tools-rpm.sh ${top_dir} \ ${csudo}./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${verNumberComp}
fi fi
fi fi
else else
......
...@@ -308,7 +308,8 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) { ...@@ -308,7 +308,8 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); ESdbStatus objStatus = 0;
pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
if (pIter == NULL) break; if (pIter == NULL) break;
SDnodeEp dnodeEp = {0}; SDnodeEp dnodeEp = {0};
......
...@@ -2333,32 +2333,33 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2333,32 +2333,33 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
} else { }
TSDBROW *pRow = NULL, *piRow = NULL;
if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
}
if (pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iiter.hasVal) {
piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
} }
// imem + file + last block // two levels of mem-table does contain the valid rows
if (pBlockScanInfo->iiter.hasVal) { if (pRow != NULL && piRow != NULL) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
} }
// mem + file + last block // imem + file + last block
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
} }
// files data blocks + last block // mem + file + last block
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData); if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
} }
// files data blocks + last block
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
} }
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
......
...@@ -682,6 +682,18 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo ...@@ -682,6 +682,18 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
} }
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -720,12 +732,35 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -720,12 +732,35 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
if (key.version > pMerger->version) { if (key.version > pMerger->version) {
if (!COL_VAL_IS_NONE(pColVal)) { if (!COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pMerger->pArray, iCol, pColVal); if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) return code;
tColVal->value.nData = pColVal->value.nData;
if (pColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
}
tColVal->flag = 0;
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} }
} else if (key.version < pMerger->version) { } else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pMerger->pArray, iCol, pColVal); if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) return code;
tColVal->value.nData = pColVal->value.nData;
if (pColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
}
tColVal->flag = 0;
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} }
} else { } else {
ASSERT(0 && "dup versions not allowed"); ASSERT(0 && "dup versions not allowed");
...@@ -765,6 +800,18 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -765,6 +800,18 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
// other // other
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -775,7 +822,16 @@ _exit: ...@@ -775,7 +822,16 @@ _exit:
return code; return code;
} }
void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } void tRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
tFree(pTColVal->value.pData);
}
}
taosArrayDestroy(pMerger->pArray);
}
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
...@@ -789,12 +845,47 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { ...@@ -789,12 +845,47 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
if (key.version > pMerger->version) { if (key.version > pMerger->version) {
if (!COL_VAL_IS_NONE(pColVal)) { if (!COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pMerger->pArray, iCol, pColVal); if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
pTColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} }
} else if (key.version < pMerger->version) { } else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pMerger->pArray, iCol, pColVal); if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
tColVal->value.nData = pColVal->value.nData;
if (tColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData);
}
tColVal->flag = 0;
} else {
tFree(tColVal->value.pData);
tColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} }
} else { } else {
ASSERT(0); ASSERT(0);
......
...@@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
if (NULL == *pPage) { if (NULL == *pPage) {
return NULL; return NULL;
} }
return (SResultRow*)((char*)(*pPage) + p1->offset); return (SResultRow*)((char*)(*pPage) + p1->offset);
} }
...@@ -1729,6 +1729,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1729,6 +1729,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*resetTableScanInfo(pTSInfo, pWin);*/ /*resetTableScanInfo(pTSInfo, pWin);*/
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
pInfo->pTableScanOp->status = OP_OPENED;
pTSInfo->scanTimes = 0; pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1; pTSInfo->currentGroupId = -1;
......
...@@ -635,6 +635,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in ...@@ -635,6 +635,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
} }
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
taosThreadMutexLock(&pWal->mutex);
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pIdxFile) < 0) { if (taosFsyncFile(pWal->pIdxFile) < 0) {
...@@ -647,4 +648,5 @@ void walFsync(SWal *pWal, bool forceFsync) { ...@@ -647,4 +648,5 @@ void walFsync(SWal *pWal, bool forceFsync) {
strerror(errno)); strerror(errno));
} }
} }
taosThreadMutexUnlock(&pWal->mutex);
} }
...@@ -600,6 +600,11 @@ class TDTestCase: ...@@ -600,6 +600,11 @@ class TDTestCase:
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test()
# add for TS-2440
for i in range(self.rows):
tdSql.execute("drop database if exists db3 ")
tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册