提交 f0caae86 编写于 作者: H Hongze Cheng

more work

上级 5d2491de
......@@ -71,6 +71,9 @@ typedef struct STsdbFS STsdbFS;
#define HAS_NULL ((int8_t)0x2)
#define HAS_VALUE ((int8_t)0x4)
#define VERSION_MIN 0
#define VERSION_MAX INT64_MAX
// tsdbUtil.c ==============================================================================================
// TSDBROW
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
......@@ -277,7 +280,10 @@ struct SDelDataInfo {
struct STbData {
tb_uid_t suid;
tb_uid_t uid;
KEYINFO info;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
SDelData *pHead;
SDelData *pTail;
SMemSkipList sl;
......@@ -287,7 +293,10 @@ struct SMemTable {
SRWLatch latch;
STsdb *pTsdb;
int32_t nRef;
KEYINFO info;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t nRow;
int64_t nDel;
SArray *aTbData; // SArray<STbData*>
......
......@@ -830,6 +830,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
memset(pCommitter, 0, sizeof(*pCommitter));
ASSERT(pTsdb->mem && pTsdb->imem == NULL);
// lock();
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
......@@ -841,9 +842,50 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
code = tsdbFSBegin(pTsdb->fs);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
pCommitter->pReader = NULL;
pCommitter->oBlockIdxMap = tMapDataInit();
pCommitter->oBlockMap = tMapDataInit();
pCommitter->oBlock = tBlockInit();
pCommitter->pWriter = NULL;
pCommitter->nBlockIdxMap = tMapDataInit();
pCommitter->nBlockMap = tMapDataInit();
pCommitter->nBlock = tBlockInit();
code = tBlockDataInit(&pCommitter->oBlockData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->nBlockData);
if (code) {
tBlockDataClear(&pCommitter->oBlockData);
goto _exit;
}
_exit:
return code;
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
tMapDataClear(&pCommitter->oBlockIdxMap);
tMapDataClear(&pCommitter->oBlockMap);
tBlockClear(&pCommitter->oBlock);
tBlockDataClear(&pCommitter->oBlockData);
tMapDataClear(&pCommitter->nBlockIdxMap);
tMapDataClear(&pCommitter->nBlockMap);
tBlockClear(&pCommitter->nBlock);
tBlockDataClear(&pCommitter->nBlockData);
}
static int32_t tsdbCommitData(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
......@@ -852,8 +894,12 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
// check
if (pMemTable->nRow == 0) goto _exit;
// loop
pCommitter->nextKey = pMemTable->info.minKey.ts;
// start ====================
code = tsdbCommitDataStart(pCommitter);
if (code) return code;
// impl ====================
pCommitter->nextKey = pMemTable->minKey;
while (pCommitter->nextKey < TSKEY_MAX) {
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
......@@ -862,11 +908,15 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
if (code) goto _err;
}
// end ====================
tsdbCommitDataEnd(pCommitter);
_exit:
tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
return code;
_err:
tsdbCommitDataEnd(pCommitter);
tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -19,7 +19,7 @@ static const char *tsdbFileSuffix[] = {".del", ".cache", ".head", ".data", ".las
// SHeadFile ===============================================
void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]) {
// TODO
// snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/v%df%dver%18d.head", );
}
// SDataFile ===============================================
......
......@@ -42,7 +42,10 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosInitRWLatch(&pMemTable->latch);
pMemTable->pTsdb = pTsdb;
pMemTable->nRef = 1;
pMemTable->info = tKEYINFOInit();
pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN;
pMemTable->minVersion = VERSION_MAX;
pMemTable->maxVersion = VERSION_MIN;
pMemTable->nRow = 0;
pMemTable->nDel = 0;
pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *));
......@@ -174,7 +177,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++;
if (tsdbKeyCmprFn(&lastKey, &pTbData->info.maxKey) >= 0) {
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid);
}
......@@ -324,7 +327,10 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
}
pTbData->suid = suid;
pTbData->uid = uid;
pTbData->info = tKEYINFOInit();
pTbData->minKey = TSKEY_MAX;
pTbData->maxKey = TSKEY_MIN;
pTbData->minVersion = VERSION_MAX;
pTbData->maxVersion = VERSION_MIN;
pTbData->pHead = NULL;
pTbData->pTail = NULL;
pTbData->sl.seed = taosRand();
......@@ -515,13 +521,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
goto _err;
}
if (tsdbKeyCmprFn(&key, &pTbData->info.minKey) < 0) {
pTbData->info.minKey = key;
}
if (tsdbKeyCmprFn(&key, &pMemTable->info.minKey) < 0) {
pMemTable->info.minKey = key;
}
if (pTbData->minKey > key.ts) pTbData->minKey = key.ts;
if (pMemTable->minKey > key.ts) pMemTable->minKey = key.ts;
pLastRow = row.pTSRow;
......@@ -546,21 +547,18 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
} while (row.pTSRow);
}
if (tsdbKeyCmprFn(&key, &pTbData->info.maxKey) > 0) {
pTbData->info.maxKey = key;
if (key.ts > pTbData->maxKey) {
pTbData->maxKey = key.ts;
if (pLastRow) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
}
}
if (key.ts > pMemTable->maxKey) pMemTable->maxKey = key.ts;
if (pTbData->minVersion > version) pTbData->minVersion = version;
if (pTbData->maxVersion < version) pTbData->maxVersion = version;
pMemTable->nRow += nRow;
if (tsdbKeyCmprFn(&key, &pMemTable->info.maxKey) > 0) {
pMemTable->info.maxKey = key;
}
if (pTbData->info.minVerion > version) pTbData->info.minVerion = version;
if (pTbData->info.maxVersion < version) pTbData->info.maxVersion = version;
pMemTable->nRef++;
pRsp->numOfRows = nRow;
pRsp->affectedRows = nRow;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册