未验证 提交 18cd6c40 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11190 from taosdata/feature/tq

Feature/tq
......@@ -78,6 +78,7 @@ struct tmq_t {
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
int8_t epStatus;
int32_t waitingRequest;
int32_t readyRequest;
SArray* clientTopics; // SArray<SMqClientTopic>
......@@ -311,6 +312,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->epoch = 0;
pTmq->waitingRequest = 0;
pTmq->readyRequest = 0;
pTmq->epStatus = 0;
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
......@@ -833,7 +835,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = pParam->tmq;
if (code != 0) {
tscWarn("msg discard, code:%x", code);
goto WRITE_QUEUE_FAIL;
goto CREATE_MSG_FAIL;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
......@@ -873,7 +875,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) {
goto WRITE_QUEUE_FAIL;
goto CREATE_MSG_FAIL;
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg);
......@@ -882,11 +884,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0
if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/
taosFreeQitem(pRsp);
goto WRITE_QUEUE_FAIL;
goto CREATE_MSG_FAIL;
}
#endif
tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset,
pRsp->msg.rspOffset);
......@@ -897,7 +901,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tsem_post(&tmq->rspSem);
return 0;
WRITE_QUEUE_FAIL:
CREATE_MSG_FAIL:
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
......@@ -938,7 +942,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
/*printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);*/
tscDebug("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
}
break;
......@@ -952,12 +956,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
/*printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);*/
tscDebug("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);
if (pOffset != NULL) {
offset = *pOffset;
/*printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);*/
tscDebug("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);
}
/*printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);*/
tscDebug("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offset,
......@@ -1018,6 +1022,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
END:
atomic_store_8(&tmq->epStatus, 0);
if (pParam->sync) {
tsem_post(&pParam->rspSem);
}
......@@ -1025,6 +1030,10 @@ END:
}
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
return 0;
}
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) {
......@@ -1205,7 +1214,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
/*printf("skip vg %d\n", pVg->vgId);*/
tscDebug("skip vg %d", pVg->vgId);
continue;
}
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
......@@ -1249,7 +1258,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
/*tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);*/
tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
......@@ -1260,13 +1269,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
}
// return
int32_t tmqHandleRes(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspHead->epoch > atomic_load_32(&tmq->epoch)) {
SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead;
tmqUpdateEp(tmq, rspHead->epoch, rspMsg);
tmqClearUnhandleMsg(tmq);
/*tmqClearUnhandleMsg(tmq);*/
*pReset = true;
} else {
*pReset = false;
......@@ -1297,6 +1306,11 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = rspMsg->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (rspMsg->msg.numOfTopics == 0) {
taosFreeQitem(rspMsg);
rspHead = NULL;
continue;
}
return rspMsg;
} else {
/*printf("epoch mismatch\n");*/
......@@ -1305,10 +1319,10 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
} else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false;
tmqHandleRes(tmq, rspHead, &reset);
tmqHandleNoPollRsp(tmq, rspHead, &reset);
taosFreeQitem(rspHead);
if (pollIfReset && reset) {
printf("reset and repoll\n");
tscDebug("reset and repoll\n");
tmqPollImpl(tmq, blockingTime);
}
}
......
......@@ -620,13 +620,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->consumers) {
taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
//taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
// taosArrayDestroy(pSub->consumers);
pSub->consumers = NULL;
}
if (pSub->unassignedVg) {
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
//taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
// taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL;
}
......
......@@ -160,6 +160,7 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
pOldConsumer->epoch++;
// TODO handle update
/*taosWLockLatch(&pOldConsumer->lock);*/
......
......@@ -237,7 +237,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
for (int32_t j = 0; j < csz; j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
mInfo("topic %s has %d vg", topicName, pConsumer->epoch);
SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
......@@ -419,7 +420,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
int32_t vgNum = pSub->vgNum;
int32_t vgEachConsumer = vgNum / consumerNum;
int32_t imbalanceVg = vgNum % consumerNum;
int32_t imbalanceSolved = 0;
// iterate all consumers, set unassignedVgStash
for (int32_t i = 0; i < consumerNum; i++) {
......@@ -446,9 +446,9 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
pRebConsumer->epoch++;
}
/*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
/*pRebConsumer->epoch++;*/
/*}*/
if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else {
......@@ -460,7 +460,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
mndTransAppendCommitlog(pTrans, pConsumerRaw);
}
mndReleaseConsumer(pMnode, pRebConsumer);
}
......@@ -469,7 +469,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1;
......@@ -508,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
// TODO: log rebalance statistics
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
sdbSetRawStatus(pSubRaw, SDB_STATUS_UPDATING);
mndTransAppendRedolog(pTrans, pSubRaw);
}
mndReleaseSubscribe(pMnode, pSub);
......
......@@ -299,8 +299,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// response to user
break;
}
/*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType,
* pReq->epoch);*/
/*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
......@@ -353,9 +352,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset,
* pHead->msgType,*/
/*pReq->epoch);*/
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
......@@ -377,6 +374,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
rsp.rspOffset = fetchOffset - 1;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
......@@ -452,7 +450,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task);
}
printf("set topic %s to consumer %ld\n", pTopic->topicName, req.consumerId);
/*printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);*/
taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册