提交 9f38ff67 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

...@@ -96,6 +96,7 @@ extern char *qtypeStr[]; ...@@ -96,6 +96,7 @@ extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
#undef TD_DEBUG_PRINT_ROW #undef TD_DEBUG_PRINT_ROW
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -252,6 +252,45 @@ static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) { ...@@ -252,6 +252,45 @@ static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) {
pSrc->cols = pCols; pSrc->cols = pCols;
} }
static void printTsdbLoadBlkData(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const char *tag, int32_t ln) {
printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
if (pBlock) {
SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pHeadf->f.aname);
SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pDFile->f.aname);
}
SDataCol *pDCol = pDCols->cols + 0;
if (TSKEY_MIN == *(int64_t *)pDCol->pData) {
ASSERT(0);
}
int rows = pDCols->numOfRows;
for (int r = 0; r < rows; ++r) {
if (pBlock) {
printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
rows, r);
} else {
printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
}
int nDataCols = pDCols->numOfCols;
int j = 0;
SCellVal sVal = {0};
while (j < nDataCols) {
SDataCol *pDataCol = pDCols->cols + j;
tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
tdSCellValPrint(&sVal, pDataCol->type);
++j;
}
printf("\n");
}
fflush(stdout);
}
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
ASSERT(pBlock->numOfSubBlocks > 0); ASSERT(pBlock->numOfSubBlocks > 0);
STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo); STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo);
...@@ -266,15 +305,23 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -266,15 +305,23 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
} }
} }
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, __func__, __LINE__);
#endif
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[1], iBlock, __func__, __LINE__);
#endif
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
return -1; return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === MERGE === ", __LINE__);
#endif
} }
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
if (pBlock->numOfSubBlocks == 1) { if (pBlock->numOfSubBlocks == 1) {
...@@ -286,6 +333,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -286,6 +333,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
} }
tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]); tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
ASSERT(pReadh->pDCols[0]->bitmapMode != 0); ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === UPDATE FILTER === ", __LINE__);
#endif
} }
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
...@@ -295,6 +345,53 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ...@@ -295,6 +345,53 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return 0; return 0;
} }
static void printTsdbLoadBlkDataCols(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const int16_t *colIds,
int numOfColsIds, const char *tag, int32_t ln) {
printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
if (pBlock) {
SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pHeadf->f.aname);
SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
pDFile->f.aname);
}
int rows = pDCols->numOfRows;
for (int r = 0; r < rows; ++r) {
if (pBlock) {
printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
rows, r);
} else {
printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
}
int nDataCols = pDCols->numOfCols;
int j = 0, k = 0;
SCellVal sVal = {0};
while (j < nDataCols) {
if (k >= numOfColsIds) break;
SDataCol *pDataCol = pDCols->cols + j;
int16_t colId1 = pDataCol->colId;
int16_t colId2 = *(colIds + k);
if (colId1 < colId2) {
++j;
} else if (colId1 > colId2) {
++k; // colId2 not exists in SDataCols
printf("NotExists ");
} else {
tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
tdSCellValPrint(&sVal, pDataCol->type);
++j;
++k;
}
}
printf("\n");
}
fflush(stdout);
}
// TODO: filter by Multi-Version // TODO: filter by Multi-Version
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
bool mergeBitmap) { bool mergeBitmap) {
...@@ -310,14 +407,25 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -310,14 +407,25 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
} }
} }
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0)
return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], iBlock, colIds, numOfColsIds, __func__, __LINE__);
#endif
for (int i = 1; i < pBlock->numOfSubBlocks; i++) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++; iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0)
return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[1], iBlock, colIds, numOfColsIds, __func__, __LINE__);
#endif
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
return -1; return -1;
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, __func__, __LINE__);
#endif
} }
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
if (pBlock->numOfSubBlocks == 1) { if (pBlock->numOfSubBlocks == 1) {
...@@ -329,18 +437,23 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -329,18 +437,23 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
} }
tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]); tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
ASSERT(pReadh->pDCols[0]->bitmapMode != 0); ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds,
" === update filter === ", __LINE__);
#endif
} }
if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) { if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
for (int i = 0; i < numOfColsIds; ++i) { for (int i = 0; i < numOfColsIds; ++i) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
if (pDataCol->len > 0 && pDataCol->bitmap) { if (pDataCol->len > 0 && pDataCol->bitmap) {
ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(pDataCol->pBitmap);
tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap); tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap);
tdDataColsSetBitmapI(pReadh->pDCols[0]); tdDataColsSetBitmapI(pReadh->pDCols[0]);
} }
} }
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, " === merge bitmap === ", __LINE__);
#endif
} }
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows); ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
...@@ -551,9 +664,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -551,9 +664,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if (tdIsBitmapModeI(bitmapMode)) { pDataCols->bitmapMode = bitmapMode;
tdDataColsSetBitmapI(pDataCols);
}
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
...@@ -740,9 +851,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * ...@@ -740,9 +851,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if (tdIsBitmapModeI(bitmapMode)) { pDataCols->bitmapMode = bitmapMode;
tdDataColsSetBitmapI(pDataCols);
}
// If only load timestamp column, no need to load SBlockData part // If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1; if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册