提交 f09d73cf 编写于 作者: H Hongze Cheng

TD-353

上级 77b4dcf8
...@@ -307,6 +307,7 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); ...@@ -307,6 +307,7 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
// ------------------ tsdbFile.c // ------------------ tsdbFile.c
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
......
...@@ -103,14 +103,14 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -103,14 +103,14 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
} }
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
ASSERT(IS_REPO_LOCKED(pRepo)); if (pMemTable == NULL) return 0;
ASSERT(pMemTable != NULL);
T_REF_INC(pMemTable); T_REF_INC(pMemTable);
return 0;
} }
// Need to lock the repository // Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
ASSERT(pMemTable != NULL); if (pMemTable == NULL) return 0;
if (T_REF_DEC(pMemTable) == 0) { if (T_REF_DEC(pMemTable) == 0) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
...@@ -143,6 +143,17 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -143,6 +143,17 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
return 0; return 0;
} }
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
if (tsdbLockRepo(pRepo) < 0) return -1;
*pMem = pRepo->mem;
*pIMem = pRepo->mem;
tsdbRefMemTable(pRepo, *pMem);
tsdbRefMemTable(pRepo, *pIMem);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
}
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL); ASSERT(pRepo != NULL);
...@@ -171,10 +182,16 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -171,10 +182,16 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
if (tsdbUnRefMemTable(pRepo, pRepo->imem) < 0) {
tsdbError("vgId:%d failed to unref memtable since %s", REPO_ID(pRepo), tstrerror(terrno))
return NULL;
}
} }
ASSERT(pRepo->commit == 0); ASSERT(pRepo->commit == 0);
SMemTable *pImem = pRepo->imem; SMemTable *pImem = pRepo->imem;
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (tsdbLockRepo(pRepo) < 0) return NULL; if (tsdbLockRepo(pRepo) < 0) return NULL;
pRepo->imem = pRepo->mem; pRepo->imem = pRepo->mem;
...@@ -322,20 +339,12 @@ static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } ...@@ -322,20 +339,12 @@ static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
static void *tsdbCommitData(void *arg) { static void *tsdbCommitData(void *arg) {
STsdbRepo *pRepo = (STsdbRepo *)arg; STsdbRepo *pRepo = (STsdbRepo *)arg;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
ASSERT(pRepo->imem != NULL); SMemTable *pMem = pRepo->imem;
ASSERT(pRepo->commit == 1); ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
tsdbPrint("vgId:%d start to commit! keyFirst " PRId64 " keyLast " PRId64 " numOfRows " PRId64, REPO_ID(pRepo), tsdbPrint("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pRepo->imem->keyFirst, pRepo->imem->keyLast, pRepo->imem->numOfRows); pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// STsdbMeta * pMeta = pRepo->tsdbMeta;
// STsdbCache *pCache = pRepo->tsdbCache;
// STsdbCfg * pCfg = &(pRepo->config);
// SDataCols * pDataCols = NULL;
// SRWHelper whelper = {{0}};
// if (pCache->imem == NULL) return NULL;
tsdbPrint("vgId:%d, starting to commit....", pRepo->config.tsdbId);
// Create the iterator to read from cache // Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pRepo); SSkipListIterator **iters = tsdbCreateTableIters(pRepo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册