提交 670a90e5 编写于 作者: H Hongze Cheng

feat: tsdb multi-version

上级 81ef708c
......@@ -870,6 +870,7 @@ struct SDelOp {
int64_t version;
TSKEY sKey; // included
TSKEY eKey; // included
SDelOp *pNext;
};
#endif
......
......@@ -26,7 +26,8 @@ struct SMemData {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRows;
SArray *aDelOp; // SArray<SDelOp>
SDelOp *delOpHead;
SDelOp *delOpTail;
};
struct SMemTable {
......@@ -38,7 +39,8 @@ struct SMemTable {
SArray *pArray; // SArray<SMemData>
};
static int32_t tsdbGetOrCreateTbData(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
static int memDataPCmprFn(const void *p1, const void *p2);
// SMemTable ==============================================
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
......@@ -52,8 +54,8 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
}
pMemTable->pTsdb = pTsdb;
pMemTable->nRef = 1;
pMemTable->minKey = (TSDBKEY){.version = -1, .ts = TSKEY_MAX};
pMemTable->maxKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MIN};
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
pMemTable->nRows = 0;
pMemTable->pArray = taosArrayInit(512, sizeof(SMemData *));
if (pMemTable->pArray == NULL) {
......@@ -76,15 +78,17 @@ void tsdbMemTableDestroy2(SMemTable *pMemTable) {
}
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk) {
int32_t code = 0;
SMemData *pMemData;
int32_t code = 0;
SMemTable *pMemTable = (SMemTable *)pTsdb->mem; // TODO
SMemData *pMemData;
ASSERT(pMemTable);
// check if table exists
{
// TODO
// check if table exists (todo)
}
code = tsdbGetOrCreateTbData(pTsdb, pSubmitBlk->suid, pSubmitBlk->uid, &pMemData);
code = tsdbGetOrCreateMemData(pMemTable, pSubmitBlk->suid, pSubmitBlk->uid, &pMemData);
if (code) {
tsdbError("vgId:%d failed to create/get table data since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
goto _err;
......@@ -99,24 +103,90 @@ _err:
}
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0;
// TODO
int32_t code = 0;
SMemTable *pMemTable = (SMemTable *)pTsdb->mem; // TODO
SMemData *pMemData;
SVBufPool *pPool = pTsdb->pVnode->inUse;
ASSERT(pMemTable);
{
// check if table exists (todo)
}
code = tsdbGetOrCreateMemData(pMemTable, suid, uid, &pMemData);
if (code) {
goto _err;
}
// do delete
SDelOp *pDelOp = (SDelOp *)vnodeBufPoolMalloc(pPool, sizeof(*pDelOp));
if (pDelOp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pDelOp->version = version;
pDelOp->sKey = sKey;
pDelOp->eKey = eKey;
pDelOp->pNext = NULL;
if (pMemData->delOpHead == NULL) {
ASSERT(pMemData->delOpTail == NULL);
pMemData->delOpHead = pMemData->delOpTail = pDelOp;
} else {
pMemData->delOpTail->pNext = pDelOp;
pMemData->delOpTail = pDelOp;
}
{
// update the state of pMemTable, pMemData, last and lastrow (todo)
}
tsdbDebug("vgId:%d delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
" since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
return code;
_err:
tsdbError("vgId:%d failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
" since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
return code;
}
static int32_t tsdbGetOrCreateTbData(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) {
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) {
int32_t code = 0;
int32_t idx = 0;
SMemData *pMemDataT = &(SMemData){.suid = suid, .uid = uid};
SMemData *pMemData = NULL;
SMemTable *pMemTable = (SMemTable *)pTsdb->mem;
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
// search
pMemData = (SMemData *)taosbsearch(NULL, pMemTable->pArray->pData, taosArrayGetSize(pMemTable->pArray),
sizeof(SMemData *), NULL, TD_GE);
// get
idx = taosArraySearchIdx(pMemTable->pArray, &pMemDataT, memDataPCmprFn, TD_GE);
if (idx >= 0) {
pMemData = (SMemData *)taosArrayGet(pMemTable->pArray, idx);
if (memDataPCmprFn(&pMemDataT, &pMemData) == 0) goto _exit;
}
// create
pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData));
if (pMemData == NULL) {
// not found, create one
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMemData->suid = suid;
pMemData->uid = uid;
pMemData->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
pMemData->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
pMemData->nRows = 0;
pMemData->delOpHead = pMemData->delOpTail = NULL;
if (idx < 0) idx = 0;
if (taosArrayInsert(pMemTable->pArray, idx, &pMemData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
_exit:
*ppMemData = pMemData;
return code;
......@@ -125,6 +195,25 @@ _err:
return code;
}
static int memDataPCmprFn(const void *p1, const void *p2) {
SMemData *pMemData1 = *(SMemData **)p1;
SMemData *pMemData2 = *(SMemData **)p2;
if (pMemData1->suid < pMemData2->suid) {
return -1;
} else if (pMemData1->suid > pMemData2->suid) {
return 1;
}
if (pMemData1->uid < pMemData2->uid) {
return -1;
} else if (pMemData1->uid > pMemData2->uid) {
return 1;
}
return 0;
}
#if 0 //====================================================================================
#define SL_MAX_LEVEL 5
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册