未验证 提交 671d06ef 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13394 from taosdata/feature/tq

refactor(tmq): push mode
......@@ -176,8 +176,8 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
/*tmq_list_append(topic_list, "topic_ctb_column");*/
tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");
tmq_list_append(topic_list, "topic_ctb_column");
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return topic_list;
}
......
......@@ -80,6 +80,37 @@ typedef struct {
int8_t type;
} SStreamCheckpoint;
typedef struct {
STaosQueue* queue;
STaosQall* qall;
void* qItem;
int8_t failed;
} SStreamQ;
static FORCE_INLINE void* streamQCurItem(SStreamQ* queue) {
//
return queue->qItem;
}
static FORCE_INLINE void* streamQNextItem(SStreamQ* queue) {
int8_t failed = atomic_load_8(&queue->failed);
if (failed) {
ASSERT(queue->qItem != NULL);
return streamQCurItem(queue);
} else {
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQCurItem(queue);
}
}
static FORCE_INLINE void streamQSetFail(SStreamQ* queue) { atomic_store_8(&queue->failed, 1); }
static FORCE_INLINE void streamQSetSuccess(SStreamQ* queue) { atomic_store_8(&queue->failed, 0); }
static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pDataSubmit == NULL) return NULL;
......
......@@ -65,12 +65,6 @@ struct STqReadHandle {
// tqPush
typedef struct {
STaosQueue* queue;
STaosQall* qall;
void* qItem;
} STqInputQ;
typedef struct {
// msg info
int64_t consumerId;
......@@ -84,10 +78,10 @@ typedef struct {
tmr_h timerId;
int8_t tmrStopped;
// exec
int8_t inputStatus;
int8_t execStatus;
STqInputQ inputQ;
SRWLatch lock;
int8_t inputStatus;
int8_t execStatus;
SStreamQ inputQ;
SRWLatch lock;
} STqPushHandle;
// tqExec
......@@ -155,6 +149,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
// tqExec
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
......
......@@ -81,12 +81,41 @@ void tqClose(STQ* pTq) {
// TODO
}
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, pRsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, pRsp->reqOffset, pRsp->rspOffset);
return 0;
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout;
int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset;
int32_t code = 0;
// get offset to fetch message
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
......@@ -155,7 +184,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId);
if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) {
/*ASSERT(0);*/
}
} else {
// TODO
ASSERT(0);
......@@ -180,31 +211,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {
code = -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
......@@ -217,7 +227,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
}
return 0;
return code;
}
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
......
......@@ -21,21 +21,74 @@ void tqTmrRspFunc(void* param, void* tmrId) {
}
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataBlkRsp rsp = {0};
// 1. guard and set status executing
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
//
// 3. exec, after each success, update processed ver
// first run
// set exec status closing
// second run
// set exec status idle
//
int8_t execStatus =
atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) {
SStreamDataSubmit* pSubmit = NULL;
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
while (pHandle->pushHandle.processedVer <= pSubmit->ver) {
// read from wal
}
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQSetSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break;
}
// 3. exec, after each success, update processed ver
// first run
while (pSubmit != NULL) {
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) {
/*ASSERT(0);*/
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQSetSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
if (rsp.blockNum > 0) {
goto SEND_RSP;
} else {
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
}
}
// set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING);
// second run
while (pSubmit != NULL) {
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) {
/*ASSERT(0);*/
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQSetSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
if (rsp.blockNum > 0) {
goto SEND_RSP;
} else {
pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ);
}
}
// set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
}
SEND_RSP:
// 4. if get result
// 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
// 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
/*return -1;*/
/*}*/
// 4.3 clear rpc info
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册