未验证 提交 da703c8f 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12876 from taosdata/feature/tq

feat(tmq): serializer and deserializer for tq exec
......@@ -82,7 +82,7 @@ typedef struct {
do { \
SEncoder coder = {0}; \
tEncoderInit(&coder, NULL, 0); \
if ((E)(&coder, S) == 0) { \
if ((E)(&coder, S) >= 0) { \
SIZE = coder.pos; \
RET = 0; \
} else { \
......
......@@ -20,9 +20,9 @@
#include "executor.h"
#include "os.h"
#include "tcache.h"
#include "thash.h"
#include "tmsg.h"
#include "tqueue.h"
#include "trpc.h"
#include "ttimer.h"
#include "wal.h"
......@@ -86,6 +86,9 @@ typedef struct {
qTaskInfo_t task[5];
} STqExec;
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);
struct STQ {
char* path;
SHashObj* pushMgr; // consumerId -> STqExec*
......@@ -93,7 +96,7 @@ struct STQ {
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
// TDB* pTdb;
TDB* pTdb;
};
typedef struct {
......@@ -101,7 +104,7 @@ typedef struct {
tmr_h timer;
} STqMgmt;
static STqMgmt tqMgmt;
static STqMgmt tqMgmt = {0};
// init once
int tqInit();
......
......@@ -14,14 +14,37 @@
*/
#include "tq.h"
#include "tqueue.h"
int32_t tqInit() {
//
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
if (old != 2) break;
}
if (old == 0) {
tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
if (tqMgmt.timer == NULL) {
atomic_store_8(&tqMgmt.inited, 0);
return -1;
}
atomic_store_8(&tqMgmt.inited, 1);
}
return 0;
}
void tqCleanUp() {}
void tqCleanUp() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
if (old != 2) break;
}
if (old == 1) {
taosTmrCleanUp(tqMgmt.timer);
atomic_store_8(&tqMgmt.inited, 0);
}
}
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
......@@ -32,9 +55,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->path = strdup(path);
pTq->pVnode = pVnode;
pTq->pWal = pWal;
/*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/
/*ASSERT(0);*/
/*}*/
if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {
ASSERT(0);
}
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
......@@ -51,11 +74,45 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->execs);
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr);
tdbClose(pTq->pTdb);
taosMemoryFree(pTq);
}
// TODO
}
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
// TODO encode modified exec
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
// TODO decode modified exec
}
tEndDecode(pDecoder);
return 0;
}
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL;
while (1) {
......
......@@ -14,17 +14,17 @@
*/
#define _DEFAULT_SOURCE
#include "tcompare.h"
#include "os.h"
#include "taoserror.h"
#include "tcompare.h"
#include "tref.h"
#include "walInt.h"
typedef struct {
int8_t stop;
int8_t inited;
uint32_t seq;
int32_t refSetId;
int8_t stop;
int8_t inited;
uint32_t seq;
int32_t refSetId;
TdThread thread;
} SWalMgmt;
......@@ -36,30 +36,42 @@ static void walFreeObj(void *pWal);
int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
int32_t walInit() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
if (old == 1) return 0;
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
if (old != 2) break;
}
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
if (old == 0) {
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
int32_t code = walCreateThread();
if (code != 0) {
wError("failed to init wal module since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0);
return code;
int32_t code = walCreateThread();
if (code != 0) {
wError("failed to init wal module since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0);
return code;
}
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
atomic_store_8(&tsWal.inited, 1);
}
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
return 0;
}
void walCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
if (old == 0) {
return;
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2);
if (old != 2) break;
}
if (old == 1) {
walStopThread();
taosCloseRef(tsWal.refSetId);
wInfo("wal module is cleaned up");
atomic_store_8(&tsWal.inited, 0);
}
walStopThread();
taosCloseRef(tsWal.refSetId);
wInfo("wal module is cleaned up");
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
......@@ -126,7 +138,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
if (walCheckAndRepairIdx(pWal) < 0) {
}
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册