diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index a913deea3772437b76391f95d51509bf61a31754..474214be7e48a74c7e4f87bae39245a1c90f69f5 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -46,7 +46,7 @@ extern "C" { typedef struct { void *appH; void *cqH; - int (*notifyStatus)(void *, int status); + int (*notifyStatus)(void *, int status, int eno); int (*eventCallBack)(void *); void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); void (*cqDropFunc)(void *handle); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c new file mode 100644 index 0000000000000000000000000000000000000000..0112f40ffd709170fadb9ae80dc90214cc63332a --- /dev/null +++ b/src/tsdb/src/tsdbCommit.c @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "tsdbMain.h" + +static int tsdbCommitTSData(STsdbRepo *pRepo); +static int tsdbCommitMeta(STsdbRepo *pRepo); +static void tsdbEndCommit(STsdbRepo *pRepo, int eno); +static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); + +void *tsdbCommitData(STsdbRepo *pRepo) { + SMemTable * pMem = pRepo->imem; + + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", + REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); + + // Commit to update meta file + if (tsdbCommitMeta(pRepo) < 0) { + tsdbError("vgId:%d error occurs while committing META data since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + // Create the iterator to read from cache + if (tsdbCommitTSData(pRepo) < 0) { + tsdbError("vgId:%d error occurs while committing TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + tsdbFitRetention(pRepo); + + tsdbInfo("vgId:%d commit over, succeed", REPO_ID(pRepo)); + tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); + + return NULL; + +_err: + ASSERT(terrno != TSDB_CODE_SUCCESS); + tsdbInfo("vgId:%d commit over, failed", REPO_ID(pRepo)); + tsdbEndCommit(pRepo, terrno); + + return NULL; +} + +static int tsdbCommitTSData(STsdbRepo *pRepo) { + SMemTable * pMem = pRepo->imem; + SDataCols * pDataCols = NULL; + STsdbMeta * pMeta = pRepo->tsdbMeta; + SCommitIter *iters = NULL; + SRWHelper whelper = {0}; + STsdbCfg * pCfg = &(pRepo->config); + + if (pMem->numOfRows <= 0) return 0; + + iters = tsdbCreateCommitIters(pRepo); + if (iters == NULL) { + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + goto _err; + } + + int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); + int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + + // Loop to commit to each file + for (int fid = sfid; fid <= efid; fid++) { + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _err; + } + } + + tdFreeDataCols(pDataCols); + tsdbDestroyCommitIters(iters, pMem->maxTables); + tsdbDestroyHelper(&whelper); + + return 0; + +_err: + tdFreeDataCols(pDataCols); + tsdbDestroyCommitIters(iters, pMem->maxTables); + tsdbDestroyHelper(&whelper); + + return -1; +} + +static int tsdbCommitMeta(STsdbRepo *pRepo) { + SMemTable *pMem = pRepo->imem; + STsdbMeta *pMeta = pRepo->tsdbMeta; + SActObj * pAct = NULL; + SActCont * pCont = NULL; + + if (listNEles(pMem->actList) <= 0) return 0; + + if (tdKVStoreStartCommit(pMeta->pStore) < 0) { + tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + SListNode *pNode = NULL; + + while ((pNode = tdListPopHead(pMem->actList)) != NULL) { + pAct = (SActObj *)pNode->data; + if (pAct->act == TSDB_UPDATE_META) { + pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); + if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { + tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); + tdKVStoreEndCommit(pMeta->pStore); + goto _err; + } + } else if (pAct->act == TSDB_DROP_META) { + if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { + tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); + tdKVStoreEndCommit(pMeta->pStore); + goto _err; + } + } else { + ASSERT(false); + } + } + + if (tdKVStoreEndCommit(pMeta->pStore) < 0) { + tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + return 0; + +_err: + return -1; +} + +static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); + sem_post(&(pRepo->readyToCommit)); +} + +static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { + for (int i = 0; i < nIters; i++) { + TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); + if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1; + } + return 0; +} + +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { + char * dataDir = NULL; + STsdbCfg * pCfg = &pRepo->config; + STsdbFileH *pFileH = pRepo->tsdbFileH; + SFileGroup *pGroup = NULL; + SMemTable * pMem = pRepo->imem; + bool newLast = false; + + TSKEY minKey = 0, maxKey = 0; + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + // Check if there are data to commit to this file + int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey); + if (!hasDataToCommit) { + tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); + return 0; + } + + // Create and open files for commit + dataDir = tsdbGetDataDirName(pRepo->rootDir); + if (dataDir == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { + tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _err; + } + + // Open files for write/read + if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { + tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + newLast = TSDB_NLAST_FILE_OPENED(pHelper); + + if (tsdbLoadCompIdx(pHelper, NULL) < 0) { + tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + // Loop to commit data in each table + for (int tid = 1; tid < pMem->maxTables; tid++) { + SCommitIter *pIter = iters + tid; + if (pIter->pTable == NULL) continue; + + taosRLockLatch(&(pIter->pTable->latch)); + + if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; + + if (pIter->pIter != NULL) { + if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), + tstrerror(terrno)); + goto _err; + } + } + + taosRUnLockLatch(&(pIter->pTable->latch)); + + // Move the last block to the new .l file if neccessary + if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { + tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + // Write the SCompBlock part + if (tsdbWriteCompInfo(pHelper) < 0) { + tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + } + + if (tsdbWriteCompIdx(pHelper) < 0) { + tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _err; + } + + tfree(dataDir); + tsdbCloseHelperFile(pHelper, 0, pGroup); + + pthread_rwlock_wrlock(&(pFileH->fhlock)); + + (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; + + if (newLast) { + (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; + } else { + pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; + } + + pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; + + pthread_rwlock_unlock(&(pFileH->fhlock)); + + return 0; + +_err: + tfree(dataDir); + tsdbCloseHelperFile(pHelper, 1, NULL); + return -1; +} + +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { + SMemTable *pMem = pRepo->imem; + STsdbMeta *pMeta = pRepo->tsdbMeta; + + SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); + if (iters == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; + + // reference all tables + for (int i = 0; i < pMem->maxTables; i++) { + if (pMeta->tables[i] != NULL) { + tsdbRefTable(pMeta->tables[i]); + iters[i].pTable = pMeta->tables[i]; + } + } + + if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; + + for (int i = 0; i < pMem->maxTables; i++) { + if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { + if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + tSkipListIterNext(iters[i].pIter); + } + } + + return iters; + +_err: + tsdbDestroyCommitIters(iters, pMem->maxTables); + return NULL; +} + +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { + if (iters == NULL) return; + + for (int i = 1; i < maxTables; i++) { + if (iters[i].pTable != NULL) { + tsdbUnRefTable(iters[i].pTable); + tSkipListDestroyIter(iters[i].pIter); + } + } + + free(iters); +} diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 58ae2c3ae14c2a7feaee8323ce36e9021df7a91e..03c50d42f7324c1cbe7889138bced9d9d986c424 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -256,7 +256,8 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) { pFileH->pFGroup[pFileH->nFGroups++] = fGroup; qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); pthread_rwlock_unlock(&pFileH->fhlock); - return tsdbSearchFGroup(pFileH, fid, TD_EQ); + pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); + ASSERT(pGroup != NULL); } return pGroup; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 9f9c52222e5dfb24e812690025c9312e5180d151..6d7835438cc04004dc50a23b77cea185ff149697 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -23,12 +23,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); -static int tsdbCommitMeta(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo); -static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); -static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); -static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); @@ -215,7 +209,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { sem_wait(&(pRepo->readyToCommit)); - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS); if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; pRepo->mem = NULL; @@ -355,68 +349,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -void *tsdbCommitData(STsdbRepo *pRepo) { - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &pRepo->config; - SDataCols * pDataCols = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitIter *iters = NULL; - SRWHelper whelper = {0}; - ASSERT(pMem != NULL); - - tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), - pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - - // Create the iterator to read from cache - if (pMem->numOfRows > 0) { - iters = tsdbCreateCommitIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); - goto _exit; - } - - int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); - int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); - - // Loop to commit to each file - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _exit; - } - } - } - - // Commit to update meta file - if (tsdbCommitMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - tsdbFitRetention(pRepo); - -_exit: - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); - tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); - tsdbEndCommit(pRepo); - - return NULL; -} - // ---------------- LOCAL FUNCTIONS ---------------- static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -508,240 +440,11 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } - -static int tsdbCommitMeta(STsdbRepo *pRepo) { - SMemTable *pMem = pRepo->imem; - STsdbMeta *pMeta = pRepo->tsdbMeta; - SActObj * pAct = NULL; - SActCont * pCont = NULL; - - if (listNEles(pMem->actList) > 0) { - if (tdKVStoreStartCommit(pMeta->pStore) < 0) { - tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - SListNode *pNode = NULL; - - while ((pNode = tdListPopHead(pMem->actList)) != NULL) { - pAct = (SActObj *)pNode->data; - if (pAct->act == TSDB_UPDATE_META) { - pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); - if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { - tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tdKVStoreEndCommit(pMeta->pStore); - goto _err; - } - } else if (pAct->act == TSDB_DROP_META) { - if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { - tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tdKVStoreEndCommit(pMeta->pStore); - goto _err; - } - } else { - ASSERT(false); - } - } - - if (tdKVStoreEndCommit(pMeta->pStore) < 0) { - tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - } - - return 0; - -_err: - return -1; -} - -static void tsdbEndCommit(STsdbRepo *pRepo) { - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); - sem_post(&(pRepo->readyToCommit)); -} - -static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { - for (int i = 0; i < nIters; i++) { - TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); - if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1; - } - return 0; -} - void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { - char * dataDir = NULL; - STsdbCfg * pCfg = &pRepo->config; - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup *pGroup = NULL; - SMemTable * pMem = pRepo->imem; - bool newLast = false; - - TSKEY minKey = 0, maxKey = 0; - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - - // Check if there are data to commit to this file - int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey); - if (!hasDataToCommit) { - tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); - return 0; - } - - // Create and open files for commit - dataDir = tsdbGetDataDirName(pRepo->rootDir); - if (dataDir == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { - tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; - } - - // Open files for write/read - if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { - tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - newLast = TSDB_NLAST_FILE_OPENED(pHelper); - - if (tsdbLoadCompIdx(pHelper, NULL) < 0) { - tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Loop to commit data in each table - for (int tid = 1; tid < pMem->maxTables; tid++) { - SCommitIter *pIter = iters + tid; - if (pIter->pTable == NULL) continue; - - taosRLockLatch(&(pIter->pTable->latch)); - - if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; - - if (pIter->pIter != NULL) { - if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { - taosRUnLockLatch(&(pIter->pTable->latch)); - tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), - tstrerror(terrno)); - goto _err; - } - } - - taosRUnLockLatch(&(pIter->pTable->latch)); - - // Move the last block to the new .l file if neccessary - if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { - tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Write the SCompBlock part - if (tsdbWriteCompInfo(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - } - - if (tsdbWriteCompIdx(pHelper) < 0) { - tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; - } - - tfree(dataDir); - tsdbCloseHelperFile(pHelper, 0, pGroup); - - pthread_rwlock_wrlock(&(pFileH->fhlock)); - - (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); - pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; - - if (newLast) { - (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; - } else { - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; - } - - pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; - - pthread_rwlock_unlock(&(pFileH->fhlock)); - - return 0; - -_err: - tfree(dataDir); - tsdbCloseHelperFile(pHelper, 1, NULL); - return -1; -} - -static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { - SMemTable *pMem = pRepo->imem; - STsdbMeta *pMeta = pRepo->tsdbMeta; - - SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); - if (iters == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; - - // reference all tables - for (int i = 0; i < pMem->maxTables; i++) { - if (pMeta->tables[i] != NULL) { - tsdbRefTable(pMeta->tables[i]); - iters[i].pTable = pMeta->tables[i]; - } - } - - if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; - - for (int i = 0; i < pMem->maxTables; i++) { - if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { - if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - tSkipListIterNext(iters[i].pIter); - } - } - - return iters; - -_err: - tsdbDestroyCommitIters(iters, pMem->maxTables); - return NULL; -} - -static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { - if (iters == NULL) return; - - for (int i = 1; i < maxTables; i++) { - if (iters[i].pTable != NULL) { - tsdbUnRefTable(iters[i].pTable); - tSkipListDestroyIter(iters[i].pIter); - } - } - - free(iters); -} - static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ASSERT(pMemTable->maxTables < maxTables); diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 101dd4a6f4e726dc2f74f96f4a56db5135af5c24..31641ac9a74d3fc914dcc05443da9384ba20c339 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -236,6 +236,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe rInfo.offset = lseek(pStore->fd, 0, SEEK_CUR); if (rInfo.offset < 0) { uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -254,6 +255,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe if (taosWrite(pStore->fd, cont, contLen) < contLen) { uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3dd1bae37f753c423accdffd53e4780573764bd8..7922476898401f99a292431748e83463297e3f89 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,7 +30,7 @@ static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); -static int vnodeProcessTsdbStatus(void *arg, int status); +static int vnodeProcessTsdbStatus(void *arg, int status, int eno); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId); static void vnodeNotifyRole(void *ahandle, int8_t role); @@ -590,9 +590,13 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // TODO: this is a simple implement -static int vnodeProcessTsdbStatus(void *arg, int status) { +static int vnodeProcessTsdbStatus(void *arg, int status, int eno) { SVnodeObj *pVnode = arg; + if (eno != TSDB_CODE_SUCCESS) { + // TODO: deal with the error here + } + if (status == TSDB_STATUS_COMMIT_START) { pVnode->fversion = pVnode->version; vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);