提交 f3b680f6 编写于 作者: L Liu Jicong

refactor(tmq): push mode

上级 946acdcd
...@@ -176,8 +176,8 @@ tmq_t* build_consumer() { ...@@ -176,8 +176,8 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
/*tmq_list_append(topic_list, "topic_ctb_column");*/ tmq_list_append(topic_list, "topic_ctb_column");
tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic"); /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return topic_list; return topic_list;
} }
......
...@@ -80,6 +80,37 @@ typedef struct { ...@@ -80,6 +80,37 @@ typedef struct {
int8_t type; int8_t type;
} SStreamCheckpoint; } SStreamCheckpoint;
typedef struct {
STaosQueue* queue;
STaosQall* qall;
void* qItem;
int8_t failed;
} SStreamQ;
static FORCE_INLINE void* streamQCurItem(SStreamQ* queue) {
//
return queue->qItem;
}
static FORCE_INLINE void* streamQNextItem(SStreamQ* queue) {
int8_t failed = atomic_load_8(&queue->failed);
if (failed) {
ASSERT(queue->qItem != NULL);
return streamQCurItem(queue);
} else {
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQCurItem(queue);
}
}
static FORCE_INLINE void streamQSetFail(SStreamQ* queue) { atomic_store_8(&queue->failed, 1); }
static FORCE_INLINE void streamQSetSuccess(SStreamQ* queue) { atomic_store_8(&queue->failed, 0); }
static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) return NULL;
......
...@@ -65,12 +65,6 @@ struct STqReadHandle { ...@@ -65,12 +65,6 @@ struct STqReadHandle {
// tqPush // tqPush
typedef struct {
STaosQueue* queue;
STaosQall* qall;
void* qItem;
} STqInputQ;
typedef struct { typedef struct {
// msg info // msg info
int64_t consumerId; int64_t consumerId;
...@@ -84,10 +78,10 @@ typedef struct { ...@@ -84,10 +78,10 @@ typedef struct {
tmr_h timerId; tmr_h timerId;
int8_t tmrStopped; int8_t tmrStopped;
// exec // exec
int8_t inputStatus; int8_t inputStatus;
int8_t execStatus; int8_t execStatus;
STqInputQ inputQ; SStreamQ inputQ;
SRWLatch lock; SRWLatch lock;
} STqPushHandle; } STqPushHandle;
// tqExec // tqExec
...@@ -155,6 +149,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* ...@@ -155,6 +149,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
// tqExec // tqExec
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId); int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp);
// tqMeta // tqMeta
int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaOpen(STQ* pTq);
......
...@@ -81,12 +81,41 @@ void tqClose(STQ* pTq) { ...@@ -81,12 +81,41 @@ void tqClose(STQ* pTq) {
// TODO // TODO
} }
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, pRsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, pRsp->reqOffset, pRsp->rspOffset);
return 0;
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout; int64_t timeout = pReq->timeout;
int32_t reqEpoch = pReq->epoch; int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset; int64_t fetchOffset;
int32_t code = 0;
// get offset to fetch message // get offset to fetch message
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
...@@ -155,7 +184,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -155,7 +184,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId); if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) {
/*ASSERT(0);*/
}
} else { } else {
// TODO // TODO
ASSERT(0); ASSERT(0);
...@@ -180,31 +211,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -180,31 +211,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {
void* buf = rpcMallocCont(tlen); code = -1;
if (buf == NULL) {
pMsg->code = -1;
return -1;
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
// TODO wrap in destroy func // TODO wrap in destroy func
taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen); taosArrayDestroy(rsp.blockDataLen);
...@@ -217,7 +227,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -217,7 +227,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
} }
return 0; return code;
} }
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
......
...@@ -21,21 +21,74 @@ void tqTmrRspFunc(void* param, void* tmrId) { ...@@ -21,21 +21,74 @@ void tqTmrRspFunc(void* param, void* tmrId) {
} }
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataBlkRsp rsp = {0};
// 1. guard and set status executing // 1. guard and set status executing
// 2. check processedVer int8_t execStatus =
// 2.1. if not missed, get msg from queue atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
// 2.2. if missed, scan wal if (execStatus == TASK_STATUS__IDLE) {
// SStreamDataSubmit* pSubmit = NULL;
// 3. exec, after each success, update processed ver // 2. check processedVer
// first run // 2.1. if not missed, get msg from queue
// set exec status closing // 2.2. if missed, scan wal
// second run pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
// set exec status idle while (pHandle->pushHandle.processedVer <= pSubmit->ver) {
// // read from wal
}
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQSetSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break;
}
// 3. exec, after each success, update processed ver
// first run
while (pSubmit != NULL) {
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) {
/*ASSERT(0);*/
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQSetSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
if (rsp.blockNum > 0) {
goto SEND_RSP;
} else {
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
}
}
// set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING);
// second run
while (pSubmit != NULL) {
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) {
/*ASSERT(0);*/
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQSetSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
if (rsp.blockNum > 0) {
goto SEND_RSP;
} else {
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
}
}
// set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
}
SEND_RSP:
// 4. if get result // 4. if get result
// 4.1 set exec input status blocked and exec status idle // 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
// 4.2 rpc send // 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
/*return -1;*/
/*}*/
// 4.3 clear rpc info // 4.3 clear rpc info
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册