未验证 提交 c16bbfad 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20115 from taosdata/feature/3_liaohj

refactor: add some logs.
...@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) { ...@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "consumer_group"); tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
......
...@@ -100,8 +100,8 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { ...@@ -100,8 +100,8 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
return 0; return 0;
} }
mInfo("receive consumer lost msg, consumer id %" PRId64 ", status %s", pLostMsg->consumerId, mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); pConsumer->status, mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) { if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
...@@ -114,9 +114,17 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { ...@@ -114,9 +114,17 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
if (pTrans == NULL) goto FAIL; if (pTrans == NULL) {
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; }
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
goto FAIL;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
goto FAIL;
}
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
...@@ -135,8 +143,8 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { ...@@ -135,8 +143,8 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
ASSERT(pConsumer); ASSERT(pConsumer);
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId, mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); pConsumer->status, mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
...@@ -150,7 +158,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { ...@@ -150,7 +158,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
if (pTrans == NULL) goto FAIL; if (pTrans == NULL) {
goto FAIL;
}
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
...@@ -244,21 +255,24 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -244,21 +255,24 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64,
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime);
pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
.pCont = pLostMsg,
.contLen = sizeof(SMqConsumerLostMsg),
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
if (status == MQ_CONSUMER_STATUS__READY) { if (status == MQ_CONSUMER_STATUS__READY) {
// do nothing if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
.pCont = pLostMsg,
.contLen = sizeof(SMqConsumerLostMsg),
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
...@@ -379,11 +393,18 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -379,11 +393,18 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1; return -1;
} }
ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0); int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
if (ret != 0) {
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
pConsumer->cgroup);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
...@@ -392,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -392,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#if 1 #if 1
if (status == MQ_CONSUMER_STATUS__LOST_REBD) { if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer:0x%"PRIx64 "", consumerId); mInfo("try to recover consumer:0x%"PRIx64, consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
...@@ -401,6 +422,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -401,6 +422,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
.pCont = pRecoverMsg, .pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg), .contLen = sizeof(SMqConsumerRecoverMsg),
}; };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
#endif #endif
...@@ -416,7 +438,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -416,7 +438,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2. check epoch, only send ep info when epochs do not match // 2. check epoch, only send ep info when epochs do not match
if (epoch != serverEpoch) { if (epoch != serverEpoch) {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch); mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch, serverEpoch);
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
...@@ -426,7 +448,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -426,7 +448,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
goto FAIL; goto FAIL;
} }
// handle all topic subscribed by the consumer // handle all topics subscribed by this consumer
for (int32_t i = 0; i < numOfTopics; i++) { for (int32_t i = 0; i < numOfTopics; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
...@@ -455,6 +477,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -455,6 +477,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
// this customer assigned vgroups
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
if (topicEp.vgs == NULL) { if (topicEp.vgs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -484,6 +507,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -484,6 +507,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} }
// encode rsp // encode rsp
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
...@@ -491,6 +515,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -491,6 +515,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = serverEpoch; ((SMqRspHead *)buf)->epoch = serverEpoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
...@@ -506,6 +531,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -506,6 +531,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
pMsg->info.rsp = buf; pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen; pMsg->info.rspLen = tlen;
return 0; return 0;
FAIL: FAIL:
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
...@@ -547,7 +573,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -547,7 +573,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t newTopicNum = taosArrayGetSize(newSub); int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance // check topic existence
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) { if (pTrans == NULL) {
goto _over; goto _over;
...@@ -718,13 +744,14 @@ CM_ENCODE_OVER: ...@@ -718,13 +744,14 @@ CM_ENCODE_OVER:
} }
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL; SSdbRow *pRow = NULL;
SMqConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
void *buf = NULL; void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto CM_DECODE_OVER;
}
if (sver != MND_CONSUMER_VER_NUMBER) { if (sver != MND_CONSUMER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
...@@ -732,52 +759,62 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { ...@@ -732,52 +759,62 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
} }
pRow = sdbAllocRow(sizeof(SMqConsumerObj)); pRow = sdbAllocRow(sizeof(SMqConsumerObj));
if (pRow == NULL) goto CM_DECODE_OVER; if (pRow == NULL) {
goto CM_DECODE_OVER;
}
pConsumer = sdbGetRowObj(pRow); pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) goto CM_DECODE_OVER; if (pConsumer == NULL) {
goto CM_DECODE_OVER;
}
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t len; int32_t len;
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
buf = taosMemoryMalloc(len); buf = taosMemoryMalloc(len);
if (buf == NULL) goto CM_DECODE_OVER; if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto CM_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO set correct error code
goto CM_DECODE_OVER; goto CM_DECODE_OVER;
} }
tmsgUpdateDnodeEpSet(&pConsumer->ep);
terrno = TSDB_CODE_SUCCESS; tmsgUpdateDnodeEpSet(&pConsumer->ep);
CM_DECODE_OVER: CM_DECODE_OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer == NULL ? 0 : pConsumer->consumerId, mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
pRaw, terrstr()); pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
return NULL;
} }
return pRow; return pRow;
} }
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId); mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action",
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
pConsumer->epoch);
pConsumer->subscribeTime = pConsumer->upTime; pConsumer->subscribeTime = pConsumer->upTime;
return 0; return 0;
} }
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId); mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, mndConsumerStatusName(pConsumer->status));
tDeleteSMqConsumerObj(pConsumer); tDeleteSMqConsumerObj(pConsumer);
return 0; return 0;
} }
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%"PRId64", uptime:%"PRId64,
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
taosWLockLatch(&pOldConsumer->lock); taosWLockLatch(&pOldConsumer->lock);
...@@ -817,7 +854,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -817,7 +854,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
int32_t status = pOldConsumer->status;
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
...@@ -844,7 +885,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -844,7 +885,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
// not exist in current topic // not exist in current topic
bool existing = false; bool existing = false;
#if 1 #if 1
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) { int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < numOfExistedTopics; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(topic, addedTopic) == 0) { if (strcmp(topic, addedTopic) == 0) {
existing = true; existing = true;
...@@ -869,27 +911,28 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -869,27 +911,28 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
} }
// set status // set status
int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY; pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
} }
} else { } else {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB; pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST || } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
} }
} }
// the re-balance is triggered when the new consumer is launched.
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1); ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
...@@ -928,27 +971,27 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -928,27 +971,27 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
ASSERT(i < sz); ASSERT(i < sz);
// set status // set status
int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY; pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || } else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
} }
} else { } else {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB; pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST || } else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
} }
} }
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
} }
taosWUnLockLatch(&pOldConsumer->lock); taosWUnLockLatch(&pOldConsumer->lock);
...@@ -1036,8 +1079,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -1036,8 +1079,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
if (hasTopic) { if (hasTopic) {
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i)); const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN); STR_TO_VARSTR(topic, topicName);
varDataSetLen(topic, strlen(varDataVal(topic)));
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false); colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
} else { } else {
colDataSetVal(pColInfo, numOfRows, NULL, true); colDataSetVal(pColInfo, numOfRows, NULL, true);
......
...@@ -561,17 +561,17 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { ...@@ -561,17 +561,17 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
mInfo("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
if (mndCheckCreateTopicReq(&createTopicReq) != 0) { if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); mError("topic:%s failed to create since %s", createTopicReq.name, terrstr());
goto _OVER; goto _OVER;
} }
pTopic = mndAcquireTopic(pMnode, createTopicReq.name); pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
if (pTopic != NULL) { if (pTopic != NULL) {
if (createTopicReq.igExists) { if (createTopicReq.igExists) {
mInfo("topic:%s, already exist, ignore exist is set", createTopicReq.name); mInfo("topic:%s already exist, ignore exist is set", createTopicReq.name);
code = 0; code = 0;
goto _OVER; goto _OVER;
} else { } else {
...@@ -731,8 +731,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -731,8 +731,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (pTopic->ntbUid != 0) { if (pTopic->ntbUid != 0) {
// broadcast to all vnode // broadcast to all vnode
void *pIter = NULL; pIter = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
......
...@@ -746,7 +746,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -746,7 +746,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter); pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) break; if (pIter == NULL) {
break;
}
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd); int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
......
...@@ -279,11 +279,15 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { ...@@ -279,11 +279,15 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) { static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
int32_t numOfUids = taosArrayGetSize(tableIdList);
if (numOfUids == 0) {
return qa;
}
// let's discard the tables those are not created according to the queried super table. // let's discard the tables those are not created according to the queried super table.
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { for (int32_t i = 0; i < numOfUids; ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i); uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id); int32_t code = metaGetTableEntryByUid(&mr, *id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册