提交 2de2eafa 编写于 作者: L Liu Jicong

feat(tmq): use tdb as persistent storage

上级 984b8315
...@@ -96,7 +96,8 @@ struct STQ { ...@@ -96,7 +96,8 @@ struct STQ {
SHashObj* pStreamTasks; SHashObj* pStreamTasks;
SVnode* pVnode; SVnode* pVnode;
SWal* pWal; SWal* pWal;
TDB* pTdb; TDB* pMetaStore;
TTB* pExecStore;
}; };
typedef struct { typedef struct {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tq.h" #include "tq.h"
#include "tdbInt.h"
int32_t tqInit() { int32_t tqInit() {
int8_t old; int8_t old;
...@@ -46,6 +47,10 @@ void tqCleanUp() { ...@@ -46,6 +47,10 @@ void tqCleanUp() {
} }
} }
int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) {
return strcmp(pKey1, pKey2);
}
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
STQ* pTq = taosMemoryMalloc(sizeof(STQ)); STQ* pTq = taosMemoryMalloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
...@@ -55,9 +60,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ...@@ -55,9 +60,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {
ASSERT(0);
}
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
...@@ -65,6 +67,43 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ...@@ -65,6 +67,43 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
ASSERT(0);
}
if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0);
}
TXN txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
}
/*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/
/*ASSERT(0);*/
/*}*/
TBC* pCur;
if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
ASSERT(0);
}
void* pKey;
int kLen;
void* pVal;
int vLen;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
// create, put into execsj
}
if (tdbTxnClose(&txn) < 0) {
ASSERT(0);
}
return pTq; return pTq;
} }
...@@ -74,7 +113,7 @@ void tqClose(STQ* pTq) { ...@@ -74,7 +113,7 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->execs); taosHashCleanup(pTq->execs);
taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pushMgr);
tdbClose(pTq->pTdb); tdbClose(pTq->pMetaStore);
taosMemoryFree(pTq); taosMemoryFree(pTq);
} }
// TODO // TODO
...@@ -91,7 +130,6 @@ int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) { ...@@ -91,7 +130,6 @@ int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1; if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
// TODO encode modified exec
} }
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
...@@ -108,7 +146,6 @@ int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) { ...@@ -108,7 +146,6 @@ int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1; if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
// TODO decode modified exec
} }
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
...@@ -556,6 +593,23 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -556,6 +593,23 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0); ASSERT(code == 0);
TXN txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
ASSERT(0);
}
tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn);
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
ASSERT(0);
}
return 0; return 0;
} }
...@@ -604,6 +658,45 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -604,6 +658,45 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
int32_t code;
int32_t vlen;
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
ASSERT(code == 0);
void* buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
ASSERT(0);
}
SEncoder encoder;
tEncoderInit(&encoder, buf, vlen);
if (tEncodeSTqExec(&encoder, pExec) < 0) {
ASSERT(0);
}
TXN txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
ASSERT(0);
}
if (tdbTbUpsert(pTq->pExecStore, req.subKey, (int)strlen(req.subKey), buf, vlen, &txn) < 0) {
ASSERT(0);
}
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
ASSERT(0);
}
tEncoderClear(&encoder);
taosMemoryFree(buf);
return 0; return 0;
} else { } else {
/*if (req.newConsumerId != -1) {*/ /*if (req.newConsumerId != -1) {*/
......
...@@ -83,11 +83,11 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { ...@@ -83,11 +83,11 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid,
int32_t* pNumOfRows, int16_t* pNumOfCols) { int32_t* pNumOfRows, int16_t* pNumOfCols) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
*pUid = 0; *pUid = 0;
int32_t sversion = 1; // TODO set to real sversion
/*int32_t sversion = 1;*/
int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) { if (pHandle->pSchema == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册