未验证 提交 d0542f99 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21744 from taosdata/mark/tmq

opti:tmq logic
...@@ -162,6 +162,8 @@ extern char tsSmlTagName[]; ...@@ -162,6 +162,8 @@ extern char tsSmlTagName[];
// extern bool tsSmlDataFormat; // extern bool tsSmlDataFormat;
// extern int32_t tsSmlBatchSize; // extern int32_t tsSmlBatchSize;
extern int32_t tmqMaxTopicNum;
// wal // wal
extern int64_t tsWalFsyncDataSizeLimit; extern int64_t tsWalFsyncDataSizeLimit;
......
...@@ -145,7 +145,7 @@ enum { ...@@ -145,7 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
......
...@@ -768,6 +768,8 @@ int32_t* taosGetErrno(); ...@@ -768,6 +768,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
// stream // stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
...@@ -290,7 +290,7 @@ static const SSysDbTableSchema subscriptionSchema[] = { ...@@ -290,7 +290,7 @@ static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };
static const SSysDbTableSchema vnodesSchema[] = { static const SSysDbTableSchema vnodesSchema[] = {
...@@ -350,7 +350,7 @@ static const SSysDbTableSchema connectionsSchema[] = { ...@@ -350,7 +350,7 @@ static const SSysDbTableSchema connectionsSchema[] = {
static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
......
...@@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table ...@@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// bool tsSmlDataFormat = false; // bool tsSmlDataFormat = false;
// int32_t tsSmlBatchSize = 10000; // int32_t tsSmlBatchSize = 10000;
// tmq
int32_t tmqMaxTopicNum = 20;
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0; int32_t tsQueryRspPolicy = 0;
...@@ -507,6 +509,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -507,6 +509,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
...@@ -875,6 +879,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -875,6 +879,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tmqMaxTopicNum= cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
......
...@@ -25,14 +25,15 @@ extern "C" { ...@@ -25,14 +25,15 @@ extern "C" {
enum { enum {
MQ_CONSUMER_STATUS_REBALANCE = 1, MQ_CONSUMER_STATUS_REBALANCE = 1,
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore // MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__READY, MQ_CONSUMER_STATUS_READY,
MQ_CONSUMER_STATUS__LOST, MQ_CONSUMER_STATUS_LOST,
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore // MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__LOST_REBD, // MQ_CONSUMER_STATUS__LOST_REBD,
}; };\
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
......
...@@ -137,12 +137,12 @@ typedef enum { ...@@ -137,12 +137,12 @@ typedef enum {
} EDndReason; } EDndReason;
typedef enum { typedef enum {
CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance
CONSUMER_UPDATE__ADD, CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance
CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance
CONSUMER_UPDATE__LOST, // CONSUMER_UPDATE_TIMER_LOST,
CONSUMER_UPDATE__RECOVER, CONSUMER_UPDATE_RECOVER,
CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
...@@ -548,13 +548,13 @@ typedef struct { ...@@ -548,13 +548,13 @@ typedef struct {
// data for display // data for display
int32_t pid; int32_t pid;
SEpSet ep; SEpSet ep;
int64_t upTime; int64_t createTime;
int64_t subscribeTime; int64_t subscribeTime;
int64_t rebalanceTime; int64_t rebalanceTime;
} SMqConsumerObj; } SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer);
......
...@@ -218,7 +218,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { ...@@ -218,7 +218,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
return (void *)buf; return (void *)buf;
} }
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) { SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) { if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -249,16 +249,20 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L ...@@ -249,16 +249,20 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
return NULL; return NULL;
} }
pConsumer->upTime = taosGetTimestampMs(); pConsumer->createTime = taosGetTimestampMs();
return pConsumer; return pConsumer;
} }
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) {
if(pConsumer == NULL) return;
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
if(delete){
taosMemoryFree(pConsumer);
}
} }
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
...@@ -273,7 +277,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { ...@@ -273,7 +277,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI32(buf, pConsumer->pid); tlen += taosEncodeFixedI32(buf, pConsumer->pid);
tlen += taosEncodeSEpSet(buf, &pConsumer->ep); tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
tlen += taosEncodeFixedI64(buf, pConsumer->upTime); tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime); tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime); tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
...@@ -339,7 +343,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { ...@@ -339,7 +343,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
buf = taosDecodeFixedI32(buf, &pConsumer->pid); buf = taosDecodeFixedI32(buf, &pConsumer->pid);
buf = taosDecodeSEpSet(buf, &pConsumer->ep); buf = taosDecodeSEpSet(buf, &pConsumer->ep);
buf = taosDecodeFixedI64(buf, &pConsumer->upTime); buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime); buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime); buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
......
...@@ -160,10 +160,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj ...@@ -160,10 +160,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
// return -1; return -1;
// } }
void *buf; void *buf;
int32_t tlen; int32_t tlen;
...@@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc ...@@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) { if (pVgObj == NULL) {
taosMemoryFree(buf); taosMemoryFree(buf);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
return -1; return -1;
} }
...@@ -296,17 +296,17 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { ...@@ -296,17 +296,17 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
} }
} }
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ //static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ // for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); // SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
SMqRebOutputVg outputVg = { // SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId, // .oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = pConsumerEp->consumerId, // .newConsumerId = pConsumerEp->consumerId,
.pVgEp = pVgEp, // .pVgEp = pVgEp,
}; // };
taosArrayPush(pOutput->rebVgs, &outputVg); // taosArrayPush(pOutput->rebVgs, &outputVg);
} // }
} //}
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
int32_t imbConsumerNum) { int32_t imbConsumerNum) {
...@@ -357,7 +357,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas ...@@ -357,7 +357,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
} }
} }
} }
putNoTransferToOutput(pOutput, pConsumerEp); // putNoTransferToOutput(pOutput, pConsumerEp);
} }
} }
...@@ -540,50 +540,44 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -540,50 +540,44 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
return -1; return -1;
} }
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
// 3. commit log: consumer to update status and epoch // 3. commit log: consumer to update status and epoch
// 3.1 set touched consumer // 3.1 set touched consumer
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC;
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew, true);
taosMemoryFree(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew, true);
taosMemoryFree(pConsumerNew);
} }
// 3.2 set new consumer // 3.2 set new consumer
consumerNum = taosArrayGetSize(pOutput->newConsumers); consumerNum = taosArrayGetSize(pOutput->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC;
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); char* topicTmp = taosStrdup(topic);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp);
pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew, true);
taosMemoryFree(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew, true);
taosMemoryFree(pConsumerNew);
} }
// 3.3 set removed consumer // 3.3 set removed consumer
...@@ -591,24 +585,19 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -591,24 +585,19 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE;
pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char* topicTmp = taosStrdup(topic);
char cgroup[TSDB_CGROUP_LEN]; taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp);
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew, true);
taosMemoryFree(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew, true);
taosMemoryFree(pConsumerNew);
} }
// 4. TODO commit log: modification log // 4. TODO commit log: modification log
...@@ -762,6 +751,20 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { ...@@ -762,6 +751,20 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
return -1; return -1;
} }
void *pIter = NULL;
SMqConsumerObj *pConsumer;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
}
sdbRelease(pMnode->pSdb, pConsumer);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
...@@ -1107,8 +1110,12 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock ...@@ -1107,8 +1110,12 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
// consumer id // consumer id
char consumerIdHex[32] = {0};
sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumerEp->consumerId);
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
......
...@@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { ...@@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SCMCreateTopicReq createTopicReq = {0}; SCMCreateTopicReq createTopicReq = {0};
if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){
terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
mError("topic num out of range");
return code;
}
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) { if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -697,7 +702,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -697,7 +702,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
break; break;
} }
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndReleaseConsumer(pMnode, pConsumer);
continue;
}
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
......
...@@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval ...@@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
// stream // stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册