提交 77e03bfd 编写于 作者: wmmhello's avatar wmmhello

opti:change push mgr to consume msg for subscribe

上级 6efbcfec
......@@ -449,7 +449,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
#if 1
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
......@@ -463,7 +462,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
if (status != MQ_CONSUMER_STATUS__READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
......@@ -100,6 +100,7 @@ typedef struct {
SWalRef* pRef;
STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
SRpcMsg* msg;
} STqHandle;
typedef struct {
......@@ -114,6 +115,7 @@ struct STQ {
int64_t walLogLastVer;
SRWLatch lock;
SHashObj* pPushMgr; // consumerId -> STqPushEntry
SArray * pPushArray;
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
......@@ -214,6 +214,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit);
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......@@ -71,6 +71,11 @@ static void destroyTqHandle(void* data) {
if(pData->msg != NULL) {
pData->msg = NULL;
static void tqPushEntryFree(void* data) {
......@@ -104,6 +109,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
pTq->pPushArray = taosArrayInit(8, POINTER_BYTES);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
......@@ -152,6 +159,7 @@ void tqClose(STQ* pTq) {
void tqNotifyClose(STQ* pTq) {
......@@ -350,25 +358,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 2. check re-balance status
// taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
// taosRUnLockLatch(&pTq->lock);
return -1;
// taosRUnLockLatch(&pTq->lock);
// 3. update the epoch value
// taosWLockLatch(&pTq->lock);
int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
pHandle->epoch = reqEpoch;
// taosWUnLockLatch(&pTq->lock);
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
......@@ -560,8 +558,20 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_store_32(&pHandle->epoch, -1);
// remove if it has been register in the push manager, and return one empty block to consumer
//tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++) {
void* handle = taosArrayGetP(pTq->pPushArray, i);
if(handle == pHandle) {
tqInfo("vgId:%d remove handle when switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
taosArrayRemove(pTq->pPushArray, i);
if(pHandle->msg != NULL) {
pHandle->msg = NULL;
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
......@@ -1067,6 +1077,28 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0;
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d start set submit for subscribe", vgId);
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){
STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i);
if(pHandle->msg == NULL){
tqError("pHandle->msg should not be null");
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
pHandle->msg = NULL;
// unlock
return 0;
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0
void* pIter = NULL;
......@@ -268,59 +268,61 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
int32_t vgId = TD_VID(pTq->pVnode);
// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
// int32_t len = msgLen - sizeof(SSubmitReq2Msg);
// int32_t vgId = TD_VID(pTq->pVnode);
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) {
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
return -1;
memcpy(data, pReq, len);
SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
if (pHandle == NULL) {
tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
STqExecHandle* pExec = &pHandle->execHandle;
doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
doRemovePushedEntry(cachedKey, pTq);
taosArrayDestroyEx(cachedKey, freeItem);
// unlock
// taosWLockLatch(&pTq->lock);
// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
// if (numOfRegisteredPush > 0) {
// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
// void* data = taosMemoryMalloc(len);
// if (data == NULL) {
// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
// taosWUnLockLatch(&pTq->lock);
// return -1;
// }
// memcpy(data, pReq, len);
// SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
// void* pIter = NULL;
// while (1) {
// pIter = taosHashIterate(pTq->pPushMgr, pIter);
// if (pIter == NULL) {
// break;
// }
// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
// if (pHandle == NULL) {
// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
// pPushEntry->subKey);
// continue;
// }
// STqExecHandle* pExec = &pHandle->execHandle;
// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
// }
// doRemovePushedEntry(cachedKey, pTq);
// taosArrayDestroyEx(cachedKey, freeItem);
// taosMemoryFree(data);
// }
// // unlock
// taosWUnLockLatch(&pTq->lock);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
......@@ -169,22 +169,29 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
// taosWLockLatch(&pTq->lock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
goto end;
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// taosWUnLockLatch(&pTq->lock);
// return code;
// }
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// lock
if(pHandle->msg != NULL){
tqError("pHandle->msg should be null");
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
taosArrayPush(pTq->pPushArray, &pHandle);
return code;
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
......@@ -447,11 +447,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version);
//if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
//vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
//return -1;
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
// commit if need
if (needCommit) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册