提交 f3c30e33 编写于 作者: S Shengliang Guan

minor changes

上级 d89f396d
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mndSubscribe.h" #include "mndSubscribe.h"
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
...@@ -54,12 +54,12 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); ...@@ -54,12 +54,12 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp); const SMqConsumerEp *pConsumerEp);
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE, SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
...@@ -232,22 +232,22 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -232,22 +232,22 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
if (epoch != rsp.epoch) { if (epoch != rsp.epoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
SArray *pTopics = pConsumer->currentTopics; SArray *pTopics = pConsumer->currentTopics;
int sz = taosArrayGetSize(pTopics); int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topicName = taosArrayGetP(pTopics, i); char *topicName = taosArrayGetP(pTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
ASSERT(pSub); ASSERT(pSub);
int csz = taosArrayGetSize(pSub->consumers); int32_t csz = taosArrayGetSize(pSub->consumers);
// TODO: change to bsearch // TODO: change to bsearch
for (int j = 0; j < csz; j++) { for (int32_t j = 0; j < csz; j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) { if (consumerId == pSubConsumer->consumerId) {
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
SMqSubTopicEp topicEp; SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName); strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
for (int k = 0; k < vgsz; k++) { for (int32_t k = 0; k < vgsz; k++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId};
...@@ -276,7 +276,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -276,7 +276,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
} }
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
int i = 0; int32_t i = 0;
while (key[i] != ':') { while (key[i] != ':') {
i++; i++;
} }
...@@ -317,8 +317,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -317,8 +317,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
if (old == MQ_CONSUMER_STATUS__ACTIVE) { if (old == MQ_CONSUMER_STATUS__ACTIVE) {
// get all topics of that topic // get all topics of that topic
int sz = taosArrayGetSize(pConsumer->currentTopics); int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *topic = taosArrayGetP(pConsumer->currentTopics, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
...@@ -334,8 +334,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -334,8 +334,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
} else { } else {
rebSubs = pConsumer->recentRemovedTopics; rebSubs = pConsumer->recentRemovedTopics;
} }
int sz = taosArrayGetSize(rebSubs); int32_t sz = taosArrayGetSize(rebSubs);
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(rebSubs, i); char *topic = taosArrayGetP(rebSubs, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
...@@ -375,12 +375,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -375,12 +375,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
mInfo("mq rebalance subscription: %s", pSub->key); mInfo("mq rebalance subscription: %s", pSub->key);
// remove lost consumer // remove lost consumer
for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);
mInfo("mq remove lost consumer %ld", lostConsumerId); mInfo("mq remove lost consumer %ld", lostConsumerId);
for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) { for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (pSubConsumer->consumerId == lostConsumerId) { if (pSubConsumer->consumerId == lostConsumerId) {
taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo); taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
...@@ -400,10 +400,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -400,10 +400,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
int32_t imbalanceSolved = 0; int32_t imbalanceSolved = 0;
// iterate all consumers, set unassignedVgStash // iterate all consumers, set unassignedVgStash
for (int i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb; int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg) if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1; vgThisConsumerAfterRb = vgEachConsumer + 1;
else else
...@@ -442,10 +442,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -442,10 +442,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
// assign to vgroup // assign to vgroup
if (taosArrayGetSize(pSub->unassignedVg) != 0) { if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb; int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg) if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1; vgThisConsumerAfterRb = vgEachConsumer + 1;
else else
...@@ -602,7 +602,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -602,7 +602,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
#if 0 #if 0
//update consumer status for the subscribption //update consumer status for the subscribption
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId; int64_t consumerId = pCEp->consumerId;
if (pCEp->status != -1) { if (pCEp->status != -1) {
...@@ -619,7 +619,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -619,7 +619,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
// TODO: swap with last one, reduce size and reset i // TODO: swap with last one, reduce size and reset i
taosArrayRemove(pSub->assigned, i); taosArrayRemove(pSub->assigned, i);
// remove from available consumer // remove from available consumer
for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
taosArrayRemove(pSub->availConsumer, j); taosArrayRemove(pSub->availConsumer, j);
break; break;
...@@ -699,7 +699,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { ...@@ -699,7 +699,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
} }
#endif #endif
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
...@@ -742,8 +742,8 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub ...@@ -742,8 +742,8 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
return 0; return 0;
} }
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp) { const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId == -1); ASSERT(pConsumerEp->oldConsumerId == -1);
int32_t vgId = pConsumerEp->vgId; int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
...@@ -890,7 +890,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) { ...@@ -890,7 +890,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
if (key == NULL) { if (key == NULL) {
return NULL; return NULL;
} }
int tlen = strlen(cgroup); int32_t tlen = strlen(cgroup);
memcpy(key, cgroup, tlen); memcpy(key, cgroup, tlen);
key[tlen] = ':'; key[tlen] = ':';
strcpy(key + tlen + 1, topicName); strcpy(key + tlen + 1, topicName);
...@@ -931,12 +931,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -931,12 +931,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char *cgroup = subscribe.consumerGroup; char *cgroup = subscribe.consumerGroup;
SArray *newSub = subscribe.topicNames; SArray *newSub = subscribe.topicNames;
int newTopicNum = subscribe.topicNum; int32_t newTopicNum = subscribe.topicNum;
taosArraySortString(newSub, taosArrayCompareString); taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL; SArray *oldSub = NULL;
int oldTopicNum = 0; int32_t oldTopicNum = 0;
bool createConsumer = false; bool createConsumer = false;
// create consumer if not exist // create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
...@@ -960,7 +960,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -960,7 +960,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1; return -1;
} }
int i = 0, j = 0; int32_t i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) { while (i < newTopicNum || j < oldTopicNum) {
char *newTopicName = NULL; char *newTopicName = NULL;
char *oldTopicName = NULL; char *oldTopicName = NULL;
...@@ -975,7 +975,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -975,7 +975,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
newTopicName = taosArrayGetP(newSub, i); newTopicName = taosArrayGetP(newSub, i);
oldTopicName = taosArrayGetP(oldSub, j); oldTopicName = taosArrayGetP(oldSub, j);
int comp = compareLenPrefixedStr(newTopicName, oldTopicName); int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) { if (comp == 0) {
// do nothing // do nothing
oldTopicName = newTopicName = NULL; oldTopicName = newTopicName = NULL;
...@@ -997,12 +997,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -997,12 +997,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// cancel subscribe of old topic // cancel subscribe of old topic
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
ASSERT(pSub); ASSERT(pSub);
int csz = taosArrayGetSize(pSub->consumers); int32_t csz = taosArrayGetSize(pSub->consumers);
for (int ci = 0; ci < csz; ci++) { for (int32_t ci = 0; ci < csz; ci++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
if (pSubConsumer->consumerId == consumerId) { if (pSubConsumer->consumerId == consumerId) {
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
for (int vgi = 0; vgi < vgsz; vgi++) { for (int32_t vgi = 0; vgi < vgsz; vgi++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
taosArrayPush(pSub->unassignedVg, pConsumerEp); taosArrayPush(pSub->unassignedVg, pConsumerEp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册