diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 401ab1ed246fcf38e2bb8d6c96d2e327e06a04f2..1cd212b216d73c9824323c2c7e0e62a8188bd548 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -32,8 +32,8 @@ static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); -static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo); -static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables); +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { @@ -430,7 +430,7 @@ static void *tsdbCommitData(void *arg) { // Create the iterator to read from cache if (pMem->numOfRows > 0) { - iters = tsdbCreateTableIters(pRepo); + iters = tsdbCreateCommitIters(pRepo); if (iters == NULL) { tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _exit; @@ -470,7 +470,7 @@ static void *tsdbCommitData(void *arg) { _exit: tdFreeDataCols(pDataCols); - tsdbDestroyTableIters(iters, pCfg->maxTables); + tsdbDestroyCommitIters(iters, pCfg->maxTables); tsdbDestroyHelper(&whelper); tsdbEndCommit(pRepo); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); @@ -636,7 +636,7 @@ _err: return -1; } -static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { STsdbCfg * pCfg = &(pRepo->config); SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -666,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { goto _err; } - if (!tSkipListIterNext(iters[i].pIter)) { - terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM; - goto _err; - } + tSkipListIterNext(iters[i].pIter); } } return iters; _err: - tsdbDestroyTableIters(iters, pCfg->maxTables); + tsdbDestroyCommitIters(iters, pCfg->maxTables); return NULL; } -static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) { +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { if (iters == NULL) return; for (int i = 1; i < maxTables; i++) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 000302af57c73d8071734478d23e5cda6f845d7f..4edb29e712afa28e346a51334040f0d08b8bfee0 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -230,8 +230,12 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { tdInitDataCols(pHelper->pDataCols[1], pSchema); SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid; - if (pIdx->offset > 0 && pIdx->hasLast) { - pHelper->hasOldLastBlock = true; + if (pIdx->offset > 0) { + if (pIdx->uid != TABLE_UID(pTable)) { + memset((void *)pIdx, 0, sizeof(SCompIdx)); + } else { + if (pIdx->hasLast) pHelper->hasOldLastBlock = true; + } } helperSetState(pHelper, TSDB_HELPER_TABLE_SET); @@ -244,11 +248,11 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]); int blkIdx = 0; - ASSERT(TABLE_TID(pCommitIter->pTable) == pHelper->tableInfo.tid); - if (TABLE_UID(pCommitIter->pTable) != pIdx->uid) memset((void *)pIdx, 0, sizeof(*pIdx)); + ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; while (true) { + ASSERT(blkIdx <= pIdx->numOfBlocks); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); if (keyFirst < 0 || keyFirst > maxKey) break; // iter over @@ -268,28 +272,42 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - SCompBlock compBlock; - if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { + SCompBlock compBlock = {0}; + if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1; + SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last); if (pCompBlock->numOfSubBlocks > 1) { - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && + pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - } else { - if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); - if (pCompBlock->offset < 0) return -1; + if (pCompBlock->offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) + if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) { + tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), + pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; + } } pHelper->hasOldLastBlock = false; @@ -354,7 +372,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) return -1; + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } SFile *pFile = &(pHelper->files.nHeadF); pFile->info.offset = offset; @@ -366,6 +389,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int drift = POINTER_DISTANCE(buf, pHelper->pBuffer); if (tsizeof(pHelper->pBuffer) - drift < 128) { pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2); + if (pHelper->pBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } } buf = POINTER_SHIFT(pHelper->pBuffer, drift); taosEncodeVariantU32(&buf, i); @@ -376,7 +403,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1; + if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, + pHelper->files.nHeadF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } pFile->info.len = tsize; return 0; } @@ -1327,7 +1359,10 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; } + + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; } else { + ASSERT(!pHelper->hasOldLastBlock); tdResetDataCols(pDataCols); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); @@ -1359,7 +1394,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); if (pCompBlock->last) { - ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); + ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); int16_t colId = 0; slIter = *(pCommitIter->pIter); if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; @@ -1378,36 +1413,41 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; int round = 0; int dIter = 0; while (true) { + tdResetDataCols(pDataCols); int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); if (rowsRead == 0) break; if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } tblkIdx++; round++; } } + if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; } } else { TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); - if (keyFirst < pCompBlock->keyFirst) { + TSKEY blkKeyFirst = pCompBlock->keyFirst; + TSKEY blkKeyLast = pCompBlock->keyLast; + + if (keyFirst < blkKeyFirst) { while (true) { tdResetDataCols(pDataCols); int rowsRead = - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); if (rowsRead == 0) break; ASSERT(rowsRead == pDataCols->numOfRows); @@ -1416,14 +1456,14 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tblkIdx++; } } else { - ASSERT(keyFirst <= pCompBlock->keyLast); + ASSERT(keyFirst <= blkKeyLast); int16_t colId = 0; if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); slIter = *(pCommitIter->pIter); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); if (rows2 == 0) { // all filtered out