未验证 提交 4cad46a8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4243 from taosdata/feature/TD-2066

[TD-2066]<feature>: pass error code when commit failed
......@@ -46,7 +46,7 @@ extern "C" {
typedef struct {
void *appH;
void *cqH;
int (*notifyStatus)(void *, int status);
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
static int tsdbCommitTSData(STsdbRepo *pRepo);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
void *tsdbCommitData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem;
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d error occurs while committing META data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Create the iterator to read from cache
if (tsdbCommitTSData(pRepo) < 0) {
tsdbError("vgId:%d error occurs while committing TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
tsdbFitRetention(pRepo);
tsdbInfo("vgId:%d commit over, succeed", REPO_ID(pRepo));
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS);
return NULL;
_err:
ASSERT(terrno != TSDB_CODE_SUCCESS);
tsdbInfo("vgId:%d commit over, failed", REPO_ID(pRepo));
tsdbEndCommit(pRepo, terrno);
return NULL;
}
static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
STsdbCfg * pCfg = &(pRepo->config);
if (pMem->numOfRows <= 0) return 0;
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _err;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
}
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
return 0;
_err:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
return -1;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
if (listNEles(pMem->actList) <= 0) return 0;
if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
}
} else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
}
} else {
ASSERT(false);
}
}
if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
return 0;
_err:
return -1;
}
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
sem_post(&(pRepo->readyToCommit));
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
char * dataDir = NULL;
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = NULL;
SMemTable * pMem = pRepo->imem;
bool newLast = false;
TSKEY minKey = 0, maxKey = 0;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
if (!hasDataToCommit) {
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
return 0;
}
// Create and open files for commit
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (pIter->pIter != NULL) {
if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
tfree(dataDir);
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
(void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
if (newLast) {
(void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
} else {
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
}
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
_err:
tfree(dataDir);
tsdbCloseHelperFile(pHelper, 1, NULL);
return -1;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
for (int i = 0; i < pMem->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
tSkipListIterNext(iters[i].pIter);
}
}
return iters;
_err:
tsdbDestroyCommitIters(iters, pMem->maxTables);
return NULL;
}
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return;
for (int i = 1; i < maxTables; i++) {
if (iters[i].pTable != NULL) {
tsdbUnRefTable(iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
}
}
free(iters);
}
......@@ -256,7 +256,8 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
pthread_rwlock_unlock(&pFileH->fhlock);
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
ASSERT(pGroup != NULL);
}
return pGroup;
......
......@@ -23,12 +23,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
......@@ -215,7 +209,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
sem_wait(&(pRepo->readyToCommit));
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS);
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
......@@ -355,68 +349,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0;
}
void *tsdbCommitData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
tsdbEndCommit(pRepo);
return NULL;
}
// ---------------- LOCAL FUNCTIONS ----------------
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -508,240 +440,11 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
static int tsdbCommitMeta(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
if (listNEles(pMem->actList) > 0) {
if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
}
} else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
}
} else {
ASSERT(false);
}
}
if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
return 0;
_err:
return -1;
}
static void tsdbEndCommit(STsdbRepo *pRepo) {
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
sem_post(&(pRepo->readyToCommit));
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
char * dataDir = NULL;
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = NULL;
SMemTable * pMem = pRepo->imem;
bool newLast = false;
TSKEY minKey = 0, maxKey = 0;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
if (!hasDataToCommit) {
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
return 0;
}
// Create and open files for commit
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (pIter->pIter != NULL) {
if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
tfree(dataDir);
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
(void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
if (newLast) {
(void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
} else {
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
}
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
_err:
tfree(dataDir);
tsdbCloseHelperFile(pHelper, 1, NULL);
return -1;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
for (int i = 0; i < pMem->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
tSkipListIterNext(iters[i].pIter);
}
}
return iters;
_err:
tsdbDestroyCommitIters(iters, pMem->maxTables);
return NULL;
}
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return;
for (int i = 1; i < maxTables; i++) {
if (iters[i].pTable != NULL) {
tsdbUnRefTable(iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
}
}
free(iters);
}
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables);
......
......@@ -236,6 +236,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
rInfo.offset = lseek(pStore->fd, 0, SEEK_CUR);
if (rInfo.offset < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -254,6 +255,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
if (taosWrite(pStore->fd, cont, contLen) < contLen) {
uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......
......@@ -30,7 +30,7 @@
static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
static int vnodeProcessTsdbStatus(void *arg, int status, int eno);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(void *ahandle, int8_t role);
......@@ -590,9 +590,13 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
}
// TODO: this is a simple implement
static int vnodeProcessTsdbStatus(void *arg, int status) {
static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
SVnodeObj *pVnode = arg;
if (eno != TSDB_CODE_SUCCESS) {
// TODO: deal with the error here
}
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册