提交 4213920f 编写于 作者: H Hongze Cheng

refactor more code

上级 2a4e788d
...@@ -958,7 +958,7 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { ...@@ -958,7 +958,7 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) {
while (true) { while (true) {
TSKEY keyNext = tsdbNextIterKey(pIter->pIter); TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
if (keyNext > maxKey) break; if (keyNext > pReadH->maxKey) break;
void *ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock), void *ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock),
NULL, TD_GE); NULL, TD_GE);
......
...@@ -463,316 +463,11 @@ static void tsdbFreeTableData(STableData *pTableData) { ...@@ -463,316 +463,11 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); }
// static void *tsdbCommitData(void *arg) {
// STsdbRepo * pRepo = (STsdbRepo *)arg;
// SMemTable * pMem = pRepo->imem;
// STsdbCfg * pCfg = &pRepo->config;
// STsdbMeta * pMeta = pRepo->tsdbMeta;
// SCommitHandle commitHandle = {0};
// SCommitHandle *pCommitH = &commitHandle;
// ASSERT(pRepo->commit == 1 && pMem != NULL);
// tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
// pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// pCommitH->pRepo = pRepo;
// if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit;
// // Create the iterator to read from cache
// if (pMem->numOfRows > 0) {
// iters = tsdbCreateCommitIters(pRepo);
// if (iters == NULL) {
// tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _exit;
// }
// if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) {
// tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _exit;
// }
// if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
// REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
// goto _exit;
// }
// int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
// int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// // Loop to commit to each file
// for (int fid = sfid; fid <= efid; fid++) {
// if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) {
// tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// goto _exit;
// }
// }
// }
// // Commit to update meta file
// if (tsdbCommitMeta(pRepo) < 0) {
// tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _exit;
// }
// tsdbFitRetention(pRepo);
// if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) {
// // TODO
// }
// tsdbApplyManifestAction(&pCommitH->manifest);
// _exit:
// tdFreeDataCols(pDataCols);
// tsdbDestroyCommitIters(iters, pMem->maxTables);
// tsdbCloseManifestHandle(&(pCommitH->manifest));
// tsdbDestroyHelper(&(pCommitH->whelper));
// tsdbEndCommit(pRepo);
// tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
// return NULL;
// }
// static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) {
// SMemTable *pMem = pRepo->imem;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// SActObj * pAct = NULL;
// SActCont * pCont = NULL;
// if (listNEles(pMem->actList) > 0) {
// pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer));
// if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err;
// if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
// tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// SListNode *pNode = NULL;
// while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
// pAct = (SActObj *)pNode->data;
// if (pAct->act == TSDB_UPDATE_META) {
// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
// if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
// tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
// tstrerror(terrno));
// tdKVStoreEndCommit(pMeta->pStore);
// goto _err;
// }
// } else if (pAct->act == TSDB_DROP_META) {
// if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
// tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
// tstrerror(terrno));
// tdKVStoreEndCommit(pMeta->pStore);
// goto _err;
// }
// } else {
// ASSERT(false);
// }
// }
// if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
// tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// }
// return 0;
// _err:
// return -1;
// }
// static void tsdbEndCommit(STsdbRepo *pRepo) {
// ASSERT(pRepo->commit == 1);
// if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
// }
// static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
// for (int i = 0; i < nIters; i++) {
// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
// if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
// }
// return 0;
// }
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
} }
// static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
// char * dataDir = NULL;
// STsdbCfg * pCfg = &pRepo->config;
// STsdbFileH *pFileH = pRepo->tsdbFileH;
// SFileGroup *pGroup = NULL;
// SMemTable * pMem = pRepo->imem;
// bool newLast = false;
// TSKEY minKey = 0, maxKey = 0;
// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// // Check if there are data to commit to this file
// int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
// if (!hasDataToCommit) {
// tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
// return 0;
// }
// // Create and open files for commit
// dataDir = tsdbGetDataDirName(pRepo->rootDir);
// if (dataDir == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return -1;
// }
// if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
// tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// goto _err;
// }
// // Open files for write/read
// if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
// tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// newLast = TSDB_NLAST_FILE_OPENED(pHelper);
// if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
// tsdbError("vgId:%d failed to load SBlockIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// // Loop to commit data in each table
// for (int tid = 1; tid < pMem->maxTables; tid++) {
// SCommitIter *pIter = iters + tid;
// if (pIter->pTable == NULL) continue;
// taosRLockLatch(&(pIter->pTable->latch));
// if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
// if (pIter->pIter != NULL) {
// if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _err;
// }
// if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
// taosRUnLockLatch(&(pIter->pTable->latch));
// tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
// TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
// tstrerror(terrno));
// goto _err;
// }
// }
// taosRUnLockLatch(&(pIter->pTable->latch));
// // Move the last block to the new .l file if neccessary
// if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
// tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// // Write the SBlock part
// if (tsdbWriteCompInfo(pHelper) < 0) {
// tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// }
// if (tsdbWriteCompIdx(pHelper) < 0) {
// tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// goto _err;
// }
// taosTFree(dataDir);
// tsdbCloseHelperFile(pHelper, 0, pGroup);
// pthread_rwlock_wrlock(&(pFileH->fhlock));
// (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
// if (newLast) {
// (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
// } else {
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
// }
// pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
// pthread_rwlock_unlock(&(pFileH->fhlock));
// return 0;
// _err:
// taosTFree(dataDir);
// tsdbCloseHelperFile(pHelper, 1, NULL);
// return -1;
// }
// static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
// SMemTable *pMem = pRepo->imem;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
// if (iters == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return NULL;
// }
// if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// // reference all tables
// for (int i = 0; i < pMem->maxTables; i++) {
// if (pMeta->tables[i] != NULL) {
// tsdbRefTable(pMeta->tables[i]);
// iters[i].pTable = pMeta->tables[i];
// }
// }
// if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
// for (int i = 0; i < pMem->maxTables; i++) {
// if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
// if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _err;
// }
// tSkipListIterNext(iters[i].pIter);
// }
// }
// return iters;
// _err:
// tsdbDestroyCommitIters(iters, pMem->maxTables);
// return NULL;
// }
// static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
// if (iters == NULL) return;
// for (int i = 1; i < maxTables; i++) {
// if (iters[i].pTable != NULL) {
// tsdbUnRefTable(iters[i].pTable);
// tSkipListDestroyIter(iters[i].pIter);
// }
// }
// free(iters);
// }
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables); ASSERT(pMemTable->maxTables < maxTables);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册