提交 5d606631 编写于 作者: L Liu Jicong

improvement

上级 f51295a2
...@@ -1649,8 +1649,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { ...@@ -1649,8 +1649,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
typedef struct { typedef struct {
int64_t leftForVer; int64_t leftForVer;
int32_t vgId; int32_t vgId;
int64_t oldConsumerId; int64_t consumerId;
int64_t newConsumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql; char* sql;
...@@ -1659,55 +1658,30 @@ typedef struct { ...@@ -1659,55 +1658,30 @@ typedef struct {
char* qmsg; char* qmsg;
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
int32_t tlen = 0;
tlen += taosEncodeFixedU64(buf, pMsg->sId);
tlen += taosEncodeFixedU64(buf, pMsg->queryId);
tlen += taosEncodeFixedU64(buf, pMsg->taskId);
tlen += taosEncodeFixedU32(buf, pMsg->sqlLen);
tlen += taosEncodeFixedU32(buf, pMsg->phyLen);
//tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return tlen;
}
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
buf = taosDecodeFixedU64(buf, &pMsg->sId);
buf = taosDecodeFixedU64(buf, &pMsg->queryId);
buf = taosDecodeFixedU64(buf, &pMsg->taskId);
buf = taosDecodeFixedU32(buf, &pMsg->sqlLen);
buf = taosDecodeFixedU32(buf, &pMsg->phyLen);
//buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->topicName);
tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->qmsg); tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->topicName);
buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeString(buf, &pReq->qmsg); buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;
} }
......
...@@ -671,50 +671,57 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -671,50 +671,57 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
} }
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); int32_t beginVgIdx = pTopic->nextVgIdx;
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); while(1) {
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
if (pReq == NULL) { /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
ASSERT(false); SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg);
usleep(blocking_time * 1000); if (pReq == NULL) {
return NULL; ASSERT(false);
} usleep(blocking_time * 1000);
return NULL;
}
SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam));
if (param == NULL) { if (param == NULL) {
ASSERT(false); ASSERT(false);
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
return NULL; return NULL;
} }
param->tmq = tmq; param->tmq = tmq;
param->retMsg = &tmq_message; param->retMsg = &tmq_message;
param->pVg = pVg; param->pVg = pVg;
tsem_init(&param->rspSem, 0, 0); tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = param; sendInfo->param = param;
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
/*printf("req offset: %ld\n", pReq->offset);*/ /*printf("req offset: %ld\n", pReq->offset);*/
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tmq->pollCnt++; tmq->pollCnt++;
tsem_wait(&param->rspSem); tsem_wait(&param->rspSem);
tsem_destroy(&param->rspSem); tsem_destroy(&param->rspSem);
free(param); free(param);
if (tmq_message == NULL) { if (tmq_message == NULL) {
usleep(blocking_time * 1000); if (beginVgIdx == pTopic->nextVgIdx) {
} usleep(blocking_time * 1000);
} else {
continue;
}
}
return tmq_message; return tmq_message;
}
/*tsem_wait(&pRequest->body.rspSem);*/ /*tsem_wait(&pRequest->body.rspSem);*/
......
...@@ -107,9 +107,9 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj ...@@ -107,9 +107,9 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
SMqMVRebReq req = { SMqMVRebReq req = {
.vgId = pConsumerEp->vgId, .vgId = pConsumerEp->vgId,
.oldConsumerId = pConsumerEp->oldConsumerId, .oldConsumerId = pConsumerEp->oldConsumerId,
.newConsumerId = pConsumerEp->consumerId, .newConsumerId = pConsumerEp->consumerId,
}; };
int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
...@@ -133,7 +133,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume ...@@ -133,7 +133,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
} }
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId != -1); ASSERT(pConsumerEp->oldConsumerId != -1);
int32_t vgId = pConsumerEp->vgId; int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
...@@ -161,8 +160,7 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC ...@@ -161,8 +160,7 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
SMqSetCVgReq req = {0}; SMqSetCVgReq req = {0};
req.oldConsumerId = pConsumerEp->consumerId; req.consumerId = pConsumerEp->consumerId;
req.newConsumerId = -1;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen); void *buf = malloc(sizeof(SMsgHead) + tlen);
...@@ -220,7 +218,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -220,7 +218,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
} }
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
//TODO // TODO
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
mTrace("try to get sub ep, old val: %d", hbStatus); mTrace("try to get sub ep, old val: %d", hbStatus);
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
...@@ -232,7 +230,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -232,7 +230,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
rsp.consumerId = consumerId; rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch; rsp.epoch = pConsumer->epoch;
if (epoch != rsp.epoch) { if (epoch != rsp.epoch) {
mInfo("old epoch %d, new epoch %d", epoch, rsp.epoch); mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
SArray *pTopics = pConsumer->currentTopics; SArray *pTopics = pConsumer->currentTopics;
int sz = taosArrayGetSize(pTopics); int sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
...@@ -404,53 +402,57 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -404,53 +402,57 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
// iterate all consumers, set unassignedVgStash // iterate all consumers, set unassignedVgStash
for (int i = 0; i < consumerNum; i++) { for (int i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb; int vgThisConsumerAfterRb;
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; if (i < imbalanceVg)
else vgThisConsumerAfterRb = vgEachConsumer; vgThisConsumerAfterRb = vgEachConsumer + 1;
else
vgThisConsumerAfterRb = vgEachConsumer;
mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL); ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(pSub->unassignedVg, pConsumerEp); taosArrayPush(pSub->unassignedVg, pConsumerEp);
} }
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
int32_t status = atomic_load_32(&pRebConsumer->status); int32_t status = atomic_load_32(&pRebConsumer->status);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST) (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
) { pRebConsumer->epoch++;
pRebConsumer->epoch++; if (vgThisConsumerAfterRb != 0) {
if (vgThisConsumerAfterRb != 0) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); } else {
} else { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); }
}
mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw); mndTransAppendRedolog(pTrans, pConsumerRaw);
} }
mndReleaseConsumer(pMnode, pRebConsumer); mndReleaseConsumer(pMnode, pRebConsumer);
} }
//assign to vgroup // assign to vgroup
if (taosArrayGetSize(pSub->unassignedVg) != 0) { if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int i = 0; i < consumerNum; i++) { for (int i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb; int vgThisConsumerAfterRb;
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; if (i < imbalanceVg)
else vgThisConsumerAfterRb = vgEachConsumer; vgThisConsumerAfterRb = vgEachConsumer + 1;
else
vgThisConsumerAfterRb = vgEachConsumer;
while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) { while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
SMqConsumerEp* pConsumerEp = taosArrayPop(pSub->unassignedVg); SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
ASSERT(pConsumerEp != NULL); ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId; pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
...@@ -458,19 +460,21 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -458,19 +460,21 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
if (pConsumerEp->oldConsumerId == -1) { if (pConsumerEp->oldConsumerId == -1) {
char* topic; char *topic;
char* cgroup; char *cgroup;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup); mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, pConsumerEp->consumerId); mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
pConsumerEp->consumerId);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
free(topic); free(topic);
free(cgroup); free(cgroup);
} else { } else {
mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
} }
...@@ -488,10 +492,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -488,10 +492,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
taosHashCleanup(pReq->rebSubHash);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
taosHashCleanup(pReq->rebSubHash);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
} }
...@@ -738,15 +744,13 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub ...@@ -738,15 +744,13 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp) { const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId == -1); ASSERT(pConsumerEp->oldConsumerId == -1);
int32_t vgId = pConsumerEp->vgId; int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = { SMqSetCVgReq req = {
.vgId = vgId, .vgId = vgId,
.oldConsumerId = -1, .consumerId = pConsumerEp->consumerId,
.newConsumerId = pConsumerEp->consumerId,
.sql = pTopic->sql, .sql = pTopic->sql,
.logicalPlan = pTopic->logicalPlan, .logicalPlan = pTopic->logicalPlan,
.physicalPlan = pTopic->physicalPlan, .physicalPlan = pTopic->physicalPlan,
......
...@@ -134,7 +134,6 @@ const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHan ...@@ -134,7 +134,6 @@ const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHan
return NULL; return NULL;
} }
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
...@@ -160,7 +159,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -160,7 +159,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
// TODO: support multiple topic in one req // TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) { if (strcmp(pTopic->topicName, pReq->topic) != 0) {
ASSERT(false); /*ASSERT(false);*/
continue; continue;
} }
...@@ -174,7 +173,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -174,7 +173,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
} }
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset-1; pTopic->committedOffset = pReq->offset - 1;
} }
rsp.committedOffset = pTopic->committedOffset; rsp.committedOffset = pTopic->committedOffset;
...@@ -235,7 +234,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -235,7 +234,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break; break;
} }
} }
//TODO copy // TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
...@@ -270,11 +269,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -270,11 +269,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
void* abuf = buf; void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqConsumeRsp(&abuf, &rsp);
if (rsp.pBlockData) { if (rsp.pBlockData) {
taosArrayDestroyEx(rsp.pBlockData, (void(*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
rsp.pBlockData = NULL; rsp.pBlockData = NULL;
/*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/ /*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/ /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/
/*tDeleteSSDataBlock(pBlock);*/ /*tDeleteSSDataBlock(pBlock);*/
/*}*/ /*}*/
/*taosArrayDestroy(rsp.pBlockData);*/ /*taosArrayDestroy(rsp.pBlockData);*/
} }
...@@ -301,23 +300,20 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { ...@@ -301,23 +300,20 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req = {0}; SMqSetCVgReq req = {0};
tDecodeSMqSetCVgReq(msg, &req); tDecodeSMqSetCVgReq(msg, &req);
ASSERT(req.oldConsumerId == -1);
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/ /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
STqConsumerHandle* pConsumer = calloc(1, sizeof(STqConsumerHandle));
if (pConsumer == NULL) { if (pConsumer == NULL) {
pConsumer = calloc(sizeof(STqConsumerHandle), 1); terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
if (pConsumer == NULL) { return -1;
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
} }
strcpy(pConsumer->cgroup, req.cgroup); strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle)); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
pConsumer->consumerId = req.newConsumerId; pConsumer->consumerId = req.consumerId;
pConsumer->epoch = 0; pConsumer->epoch = 0;
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); STqTopicHandle* pTopic = calloc(1, sizeof(STqTopicHandle));
if (pTopic == NULL) { if (pTopic == NULL) {
free(pConsumer); free(pConsumer);
return -1; return -1;
...@@ -337,13 +333,13 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -337,13 +333,13 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pMeta }; SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId); tqHandleCommit(pTq->tqMeta, req.consumerId);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
...@@ -429,7 +425,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -429,7 +425,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t numOfCols = pHandle->pSchema->numOfCols; int32_t numOfCols = pHandle->pSchema->numOfCols;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
//TODO: stable case // TODO: stable case
if (colNumNeed > pSchemaWrapper->nCols) { if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols; colNumNeed = pSchemaWrapper->nCols;
} }
...@@ -445,7 +441,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -445,7 +441,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
j++; j++;
} }
SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes; int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册