提交 1a139a07 编写于 作者: H Hongze Cheng

TD-353

上级 f09d73cf
......@@ -191,6 +191,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE, 0, 0x060B, "tsdb times
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submit message is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
......
......@@ -295,6 +295,8 @@ int tsdbUpdateTable(STsdbMeta* pMeta, STable* pTable, STableCfg* pCfg);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
// ------------------ tsdbBuffer.c
STsdbBufPool* tsdbNewBufPool();
......@@ -317,8 +319,8 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH);
STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH);
// ------------------ tsdbRWHelper.c
#define TSDB_MAX_SUBBLOCKS 8
......
......@@ -325,11 +325,6 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile)
return 0;
}
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
}
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) {
if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId)
......
......@@ -844,35 +844,4 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable);
}
/**
* Return the next iterator key.
*
* @return the next key if iter has
* -1 if iter not
*/
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
if (pIter == NULL) return -1;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) return -1;
SDataRow row = SL_GET_NODE_DATA(node);
return dataRowKey(row);
}
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) {
TSKEY nextKey;
for (int i = 0; i < nIters; i++) {
SSkipListIterator *pIter = iters[i];
nextKey = tsdbNextIterKey(pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
#endif
\ No newline at end of file
......@@ -18,6 +18,11 @@
#define TSDB_DATA_SKIPLIST_LEVEL 5
typedef struct {
STable * pTable;
SSkipListIterator *pIter;
} SCommitIter;
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
static void * tsdbAllocBytes(STsdbRepo *pRepo, int bytes);
......@@ -337,9 +342,12 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo *pRepo = (STsdbRepo *)arg;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SMemTable *pMem = pRepo->imem;
STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
SCommitIter *iters = NULL;
ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
......@@ -347,69 +355,84 @@ static void *tsdbCommitData(void *arg) {
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pRepo);
iters = tsdbCreateTableIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create table iterators since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO: deal with the error here
return NULL;
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d since %s", REPO_ID(pRepo),
pMeta->maxRowBytes, pMeta->maxCols, tstrerror(terrno));
// TODO
if ((pDataCols = tdNewDataCols(pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
int sfid = TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
ASSERT(false);
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
// Do retention actions
// TODO: Do retention actions
tsdbFitRetention(pRepo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
// TODO: Commit action meta data
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables);
tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbPrint("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool);
tsdbAdjustCacheBlocks(pCache);
tdListFree(pCache->imem->list);
free(pCache->imem);
pCache->imem = NULL;
static void tsdbEndCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1);
tsdbLockRepo(pRepo);
pRepo->commit = 0;
for (int i = 1; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) {
tsdbFreeMemTable(pTable->imem);
pTable->imem = NULL;
}
tsdbUnlockRepo(pRepo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
}
static TSKEY tsdbNextIterKey(SCommitIter *pIter) {
if (pIter == NULL) return -1;
SSkipListNode *node = tSkipListIterGet(pIter->pIter);
if (node == NULL) return -1;
SDataRow row = SL_GET_NODE_DATA(node);
return dataRowKey(row);
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey(iters + i);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
tsdbUnLockRepo(arg);
tsdbPrint("vgId:%d, commit over....", pRepo->config.tsdbId);
return 0;
}
return NULL;
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper,
SDataCols *pDataCols) {
char dataDir[128] = {0};
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
char * dataDir = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
......@@ -420,70 +443,84 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
if (!hasDataToCommit) {
tsdbTrace("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
return 0;
}
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid);
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
free(dataDir);
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId);
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL) continue;
SSkipListIterator *pIter = iters[tid];
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
// Set the helper and the buffer dataCols object to help to write this table
tsdbSetHelperTable(pHelper, pTable, pRepo);
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
// Loop to write the data in the cache to files. If no data to write, just break the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
int nLoop = 0;
while (true) {
int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols);
assert(rowsRead >= 0);
if (pDataCols->numOfRows == 0) break;
nLoop++;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0);
if (rowsWritten < 0) goto _err;
ASSERT(rowsWritten <= pDataCols->numOfRows);
tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
}
ASSERT(pDataCols->numOfRows == 0);
if (pIter->pIter != NULL) {
tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable));
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
int nLoop = 0;
while (true) {
int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols);
ASSERT(rowsRead >= 0);
if (pDataCols->numOfRows == 0) break;
nLoop++;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0);
if (rowsWritten < 0) {
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
}
ASSERT(rowsWritten <= pDataCols->numOfRows);
tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
}
ASSERT(pDataCols->numOfRows == 0);
}
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId);
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId);
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId);
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
......@@ -496,43 +533,63 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
return 0;
_err:
ASSERT(false);
// ASSERT(false);
tsdbCloseHelperFile(pHelper, 1);
return -1;
}
static SSkipListIterator **tsdbCreateTableIters(STsdbRepo *pRepo) {
STsdbCfg *pCfg = &(pRepo->config);
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *));
SCommitIter *iters = (SCommitIter *)calloc(pCfg->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
for (int tid = 1; tid < maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL || pTable->imem == NULL || pTable->imem->numOfRows == 0) continue;
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// reference all tables
for (int i = 0; i < pCfg->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
}
}
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
if (iters[tid] == NULL) goto _err;
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
if (!tSkipListIterNext(iters[tid])) goto _err;
for (int i = 0; i < pCfg->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TALBE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (!tSkipListIterNext(iters[i].pIter)) {
terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM;
goto _err;
}
}
}
return iters;
_err:
tsdbDestroyTableIters(iters, maxTables);
tsdbDestroyTableIters(iters, pCfg->maxTables);
return NULL;
}
static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) {
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return;
for (int tid = 1; tid < maxTables; tid++) {
if (iters[tid] == NULL) continue;
tSkipListDestroyIter(iters[tid]);
for (int i = 1; i < maxTables; i++) {
if (iters[i].pTable != NULL) {
tsdbUnRefTable(iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
}
}
free(iters);
......
......@@ -516,6 +516,14 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
return 0;
}
void tsdbRefTable(STable *pTable) { T_REF_INC(pTable); }
void tsdbUnRefTable(STable *pTable) {
if (T_REF_DEC(pTable) == 0) {
tsdbFreeTable(pTable);
}
}
// ------------------ LOCAL FUNCTIONS ------------------
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册