提交 69199ae3 编写于 作者: H Hongze Cheng

finish more code

上级 84e2db4d
...@@ -32,8 +32,8 @@ static void tsdbEndCommit(STsdbRepo *pRepo); ...@@ -32,8 +32,8 @@ static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
...@@ -430,7 +430,7 @@ static void *tsdbCommitData(void *arg) { ...@@ -430,7 +430,7 @@ static void *tsdbCommitData(void *arg) {
// Create the iterator to read from cache // Create the iterator to read from cache
if (pMem->numOfRows > 0) { if (pMem->numOfRows > 0) {
iters = tsdbCreateTableIters(pRepo); iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) { if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit; goto _exit;
...@@ -470,7 +470,7 @@ static void *tsdbCommitData(void *arg) { ...@@ -470,7 +470,7 @@ static void *tsdbCommitData(void *arg) {
_exit: _exit:
tdFreeDataCols(pDataCols); tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables); tsdbDestroyCommitIters(iters, pCfg->maxTables);
tsdbDestroyHelper(&whelper); tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo); tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
...@@ -636,7 +636,7 @@ _err: ...@@ -636,7 +636,7 @@ _err:
return -1; return -1;
} }
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem; SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
...@@ -666,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { ...@@ -666,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
goto _err; goto _err;
} }
if (!tSkipListIterNext(iters[i].pIter)) { tSkipListIterNext(iters[i].pIter);
terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM;
goto _err;
}
} }
} }
return iters; return iters;
_err: _err:
tsdbDestroyTableIters(iters, pCfg->maxTables); tsdbDestroyCommitIters(iters, pCfg->maxTables);
return NULL; return NULL;
} }
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) { static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return; if (iters == NULL) return;
for (int i = 1; i < maxTables; i++) { for (int i = 1; i < maxTables; i++) {
......
...@@ -230,8 +230,12 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ...@@ -230,8 +230,12 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
tdInitDataCols(pHelper->pDataCols[1], pSchema); tdInitDataCols(pHelper->pDataCols[1], pSchema);
SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid; SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
if (pIdx->offset > 0 && pIdx->hasLast) { if (pIdx->offset > 0) {
pHelper->hasOldLastBlock = true; if (pIdx->uid != TABLE_UID(pTable)) {
memset((void *)pIdx, 0, sizeof(SCompIdx));
} else {
if (pIdx->hasLast) pHelper->hasOldLastBlock = true;
}
} }
helperSetState(pHelper, TSDB_HELPER_TABLE_SET); helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
...@@ -244,11 +248,11 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols ...@@ -244,11 +248,11 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols
SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]); SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]);
int blkIdx = 0; int blkIdx = 0;
ASSERT(TABLE_TID(pCommitIter->pTable) == pHelper->tableInfo.tid); ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable));
if (TABLE_UID(pCommitIter->pTable) != pIdx->uid) memset((void *)pIdx, 0, sizeof(*pIdx));
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
while (true) { while (true) {
ASSERT(blkIdx <= pIdx->numOfBlocks);
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
if (keyFirst < 0 || keyFirst > maxKey) break; // iter over if (keyFirst < 0 || keyFirst > maxKey) break; // iter over
...@@ -268,29 +272,43 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -268,29 +272,43 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock compBlock; SCompBlock compBlock = {0};
if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1; SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last); ASSERT(pCompBlock->last);
if (pCompBlock->numOfSubBlocks > 1) { if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0) if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0)
return -1; return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
} else { } else {
if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1; if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
if (pCompBlock->offset < 0) return -1; if (pCompBlock->offset < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
}
pHelper->hasOldLastBlock = false; pHelper->hasOldLastBlock = false;
} }
...@@ -354,7 +372,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -354,7 +372,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (offset < 0) return -1; if (offset < 0) {
tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
SFile *pFile = &(pHelper->files.nHeadF); SFile *pFile = &(pHelper->files.nHeadF);
pFile->info.offset = offset; pFile->info.offset = offset;
...@@ -366,6 +389,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -366,6 +389,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
int drift = POINTER_DISTANCE(buf, pHelper->pBuffer); int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
if (tsizeof(pHelper->pBuffer) - drift < 128) { if (tsizeof(pHelper->pBuffer) - drift < 128) {
pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2); pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} }
buf = POINTER_SHIFT(pHelper->pBuffer, drift); buf = POINTER_SHIFT(pHelper->pBuffer, drift);
taosEncodeVariantU32(&buf, i); taosEncodeVariantU32(&buf, i);
...@@ -376,7 +403,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -376,7 +403,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize,
pHelper->files.nHeadF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pFile->info.len = tsize; pFile->info.len = tsize;
return 0; return 0;
} }
...@@ -1327,7 +1359,10 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1327,7 +1359,10 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
} }
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
} else { } else {
ASSERT(!pHelper->hasOldLastBlock);
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
...@@ -1359,7 +1394,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1359,7 +1394,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
if (pCompBlock->last) { if (pCompBlock->last) {
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1);
int16_t colId = 0; int16_t colId = 0;
slIter = *(pCommitIter->pIter); slIter = *(pCommitIter->pIter);
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
...@@ -1378,36 +1413,41 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1378,36 +1413,41 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++; tblkIdx++;
} else { } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; int round = 0;
int dIter = 0; int dIter = 0;
while (true) { while (true) {
tdResetDataCols(pDataCols);
int rowsRead = int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
if (rowsRead == 0) break; if (rowsRead == 0) break;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (round == 0) { if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else { } else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} }
tblkIdx++; tblkIdx++;
round++; round++;
} }
} }
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
} }
} else { } else {
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
if (keyFirst < pCompBlock->keyFirst) { TSKEY blkKeyFirst = pCompBlock->keyFirst;
TSKEY blkKeyLast = pCompBlock->keyLast;
if (keyFirst < blkKeyFirst) {
while (true) { while (true) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break; if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == pDataCols->numOfRows);
...@@ -1416,14 +1456,14 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1416,14 +1456,14 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
tblkIdx++; tblkIdx++;
} }
} else { } else {
ASSERT(keyFirst <= pCompBlock->keyLast); ASSERT(keyFirst <= blkKeyLast);
int16_t colId = 0; int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
slIter = *(pCommitIter->pIter); slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows); pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out if (rows2 == 0) { // all filtered out
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册