From cad6e7ec510378f6909ad172d7c311f9eb6de94f Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 18 May 2021 16:29:26 +0800 Subject: [PATCH] [TD-4034]restore last not NULL column --- src/tsdb/inc/tsdbMeta.h | 1 + src/tsdb/src/tsdbMain.c | 190 +++++++++++++++++++++++++++------------- src/tsdb/src/tsdbMeta.c | 1 + 3 files changed, 129 insertions(+), 63 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index ff47e0cf39..a8c7a6c358 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -39,6 +39,7 @@ typedef struct STable { SDataCol *lastCols; int32_t lastColNum; + int32_t restoreColumnNum; T_REF_DECLARE() } STable; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 44e2bffbe2..3be78e21dd 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -616,6 +616,118 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } +static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + SBlock* pBlock; + int numColumns; + int32_t blockIdx; + SDataStatis* pBlockStatis = NULL; + SDataRow row = NULL; + STSchema *pSchema = tsdbGetTableSchema(pTable); + int err = 0; + + numColumns = schemaNCols(pSchema); + if (numColumns <= pTable->restoreColumnNum) { + return 0; + } + + row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (row == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + tdInitDataRow(row, pSchema); + + // first load block index info + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + err = -1; + goto out; + } + + pBlockStatis = calloc(numColumns, sizeof(SDataStatis)); + if (pBlockStatis == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); + for(int32_t i = 0; i < numColumns; ++i) { + pBlockStatis[i].colId = i; + } + + // load block from backward + SBlockIdx *pIdx = pReadh->pBlkIdx; + blockIdx = (int32_t)(pIdx->numOfBlocks - 1); + + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { + bool loadStatisData = false; + pBlock = pReadh->pBlkInfo->blocks + blockIdx; + blockIdx -= 1; + + // load block data + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + err = -1; + goto out; + } + + // file block with sub-blocks has no statistics data + if (pBlock->numOfSubBlocks <= 1) { + tsdbLoadBlockStatis(pReadh, pBlock); + tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns); + loadStatisData = true; + } + + for (uint32_t colId = 0; colId < numColumns && numColumns > pTable->restoreColumnNum; ++colId) { + // ignore loaded columns + if (pTable->lastCols[colId].bytes != 0) { + continue; + } + + // ignore block which has no not-null colId column + if (loadStatisData && pBlockStatis[colId].numOfNull == pBlock->numOfRows) { + continue; + } + + // OK,let's load row from backward to get not-null column + STColumn *pCol = schemaColAt(pSchema, colId); + for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { + SDataCol *pDataCol = pReadh->pDCols[0]->cols + colId; + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + //SDataCol *pDataCol = readh.pDCols[0]->cols + j; + void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + if (isNullN(value, pCol->type)) { + continue; + } + + // save not-null column + SDataCol *pLastCol = &(pTable->lastCols[colId]); + pLastCol->pData = malloc(pCol->bytes); + pLastCol->bytes = pCol->bytes; + pLastCol->offset = pCol->offset; + pLastCol->colId = pCol->colId; + memcpy(pLastCol->pData, value, pCol->bytes); + + // save row ts(in column 0) + pDataCol = pReadh->pDCols[0]->cols + 0; + pCol = schemaColAt(pSchema, 0); + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + pLastCol->ts = dataRowTKey(row); + + pTable->restoreColumnNum += 1; + + tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %ld", REPO_ID(pRepo), pTable->name->data, colId, pLastCol->ts); + break; + } + } + } + +out: + taosTZfree(row); + tfree(pBlockStatis); + + return err; +} + int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; @@ -630,6 +742,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + pTable->restoreColumnNum = 0; + } + } + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { tsdbDestroyReadH(&readh); @@ -688,71 +808,15 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { pCol->offset); } } - - // restore NULL columns - if (CACHE_LAST_NULL_COLUMN(pCfg)) { - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - STSchema *pSchema = tsdbGetTableSchema(pTable); - int numColumns = schemaNCols(pSchema); - pTable->lastCols = (SDataCol*)malloc(numColumns * sizeof(SDataCol)); - if (pTable->lastCols == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(&readh); - return -1; - } - pTable->lastColNum = numColumns; - - SDataRow row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (row == NULL) { - tfree(pTable->lastCols); - pTable->lastColNum = 0; - tsdbDestroyReadH(&readh); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - tdInitDataRow(row, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(row, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } - - SDataCol *pLatestCols = pTable->lastCols; - for (i = 0; i < pTable->lastColNum; ++i) { - STColumn *pTCol = schemaColAt(pSchema, i); - - SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); - pDataCol->pData = malloc(pTCol->bytes); - pDataCol->bytes = pTCol->bytes; - - void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pTCol->offset); - if (isNullN(value, pTCol->type)) { - //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d NULL", REPO_ID(pRepo), pTable->name->data, pTCol->colId); - continue; - } - - memcpy(pDataCol->pData, value, pDataCol->bytes); - //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d for %d,%s", REPO_ID(pRepo), pTable->name->data, pTCol->colId, pDataCol->bytes, (char*)pDataCol->pData); - pDataCol->ts = dataRowTKey(row); - } - - taosTZfree(row); - } } + // restore NULL columns + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + if (restoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + } } } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index bdf0cfdfad..5a108a5d06 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -678,6 +678,7 @@ static STable *tsdbNewTable() { pTable->lastCols[i].bytes = 0; pTable->lastCols[i].pData = NULL; } + pTable->restoreColumnNum = 0; return pTable; } -- GitLab