未验证 提交 fd21406e 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10300 from taosdata/feature/tq

add tmq reset offset
...@@ -10,4 +10,4 @@ ExternalProject_Add(bdb ...@@ -10,4 +10,4 @@ ExternalProject_Add(bdb
BUILD_COMMAND "$(MAKE)" BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND "" INSTALL_COMMAND ""
TEST_COMMAND "" TEST_COMMAND ""
) )
\ No newline at end of file
...@@ -4,4 +4,4 @@ target_sources( ...@@ -4,4 +4,4 @@ target_sources(
"bdbTest.c" "bdbTest.c"
) )
target_link_libraries(bdbTest bdb) target_link_libraries(bdbTest bdb)
\ No newline at end of file
...@@ -185,4 +185,4 @@ static int idx_callback(DB * sdbp, /* secondary db handle */ ...@@ -185,4 +185,4 @@ static int idx_callback(DB * sdbp, /* secondary db handle */
skeyp->data = tmpdbt; skeyp->data = tmpdbt;
return 0; return 0;
} }
\ No newline at end of file
...@@ -28,7 +28,7 @@ int32_t init_env() { ...@@ -28,7 +28,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -52,20 +52,20 @@ extern char* tMsgInfo[]; ...@@ -52,20 +52,20 @@ extern char* tMsgInfo[];
extern int tMsgDict[]; extern int tMsgDict[];
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] #define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)]
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef uint16_t tmsg_t; typedef uint16_t tmsg_t;
/* ------------------------ OTHER DEFINITIONS ------------------------ */ /* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type // IE type
#define TSDB_IE_TYPE_SEC 1 #define TSDB_IE_TYPE_SEC 1
#define TSDB_IE_TYPE_META 2 #define TSDB_IE_TYPE_META 2
#define TSDB_IE_TYPE_MGMT_IP 3 #define TSDB_IE_TYPE_MGMT_IP 3
#define TSDB_IE_TYPE_DNODE_CFG 4 #define TSDB_IE_TYPE_DNODE_CFG 4
#define TSDB_IE_TYPE_NEW_VERSION 5 #define TSDB_IE_TYPE_NEW_VERSION 5
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum { typedef enum {
...@@ -110,53 +110,53 @@ typedef enum _mgmt_table { ...@@ -110,53 +110,53 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
} EShowType; } EShowType;
#define TSDB_ALTER_TABLE_ADD_TAG 1 #define TSDB_ALTER_TABLE_ADD_TAG 1
#define TSDB_ALTER_TABLE_DROP_TAG 2 #define TSDB_ALTER_TABLE_DROP_TAG 2
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3 #define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 #define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
#define TSDB_ALTER_TABLE_ADD_COLUMN 5 #define TSDB_ALTER_TABLE_ADD_COLUMN 5
#define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_DROP_COLUMN 6
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7 #define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 #define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
#define TSDB_FILL_NONE 0 #define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1 #define TSDB_FILL_NULL 1
#define TSDB_FILL_SET_VALUE 2 #define TSDB_FILL_SET_VALUE 2
#define TSDB_FILL_LINEAR 3 #define TSDB_FILL_LINEAR 3
#define TSDB_FILL_PREV 4 #define TSDB_FILL_PREV 4
#define TSDB_FILL_NEXT 5 #define TSDB_FILL_NEXT 5
#define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_SUPERUSER 0x2 #define TSDB_ALTER_USER_SUPERUSER 0x2
#define TSDB_ALTER_USER_ADD_READ_DB 0x3 #define TSDB_ALTER_USER_ADD_READ_DB 0x3
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 #define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 #define TSDB_ALTER_USER_CLEAR_READ_DB 0x5
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 #define TSDB_ALTER_USER_ADD_WRITE_DB 0x6
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 #define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 #define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8
#define TSDB_ALTER_USER_PRIVILEGES 0x2 #define TSDB_ALTER_USER_PRIVILEGES 0x2
#define TSDB_KILL_MSG_LEN 30 #define TSDB_KILL_MSG_LEN 30
#define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_READ_ACCCESS ((char)0x1)
#define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_WRITE_ACCCESS ((char)0x2)
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
#define TSDB_COL_NORMAL 0x0u // the normal column of the table #define TSDB_COL_NORMAL 0x0u // the normal column of the table
#define TSDB_COL_TAG 0x1u // the tag column type #define TSDB_COL_TAG 0x1u // the tag column type
#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column #define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column
#define TSDB_COL_TMP 0x4u // internal column generated by the previous operators #define TSDB_COL_TMP 0x4u // internal column generated by the previous operators
#define TSDB_COL_NULL 0x8u // the column filter NULL or not #define TSDB_COL_NULL 0x8u // the column filter NULL or not
#define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) #define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0)
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
#define TD_SUPER_TABLE TSDB_SUPER_TABLE #define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
typedef struct { typedef struct {
...@@ -1083,11 +1083,11 @@ typedef struct { ...@@ -1083,11 +1083,11 @@ typedef struct {
} STaskDropRsp; } STaskDropRsp;
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
int8_t igExists; int8_t igExists;
char* sql; char* sql;
char* physicalPlan; char* physicalPlan;
char* logicalPlan; char* logicalPlan;
} SMCreateTopicReq; } SMCreateTopicReq;
int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq); int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq);
...@@ -1733,7 +1733,7 @@ typedef struct { ...@@ -1733,7 +1733,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
int64_t consumerId; int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp; } SMqSetCVgRsp;
typedef struct { typedef struct {
...@@ -1741,9 +1741,42 @@ typedef struct { ...@@ -1741,9 +1741,42 @@ typedef struct {
int32_t vgId; int32_t vgId;
int64_t consumerId; int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqMVRebRsp; } SMqMVRebRsp;
typedef struct {
int32_t vgId;
int64_t offset;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqOffset;
typedef struct {
int32_t num;
SMqOffset* offsets;
} SMqCMResetOffsetReq;
typedef struct {
int32_t reserved;
} SMqCMResetOffsetRsp;
typedef struct {
int32_t num;
SMqOffset* offsets;
} SMqMVResetOffsetReq;
typedef struct {
int32_t reserved;
} SMqMVResetOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
typedef struct { typedef struct {
uint32_t nCols; uint32_t nCols;
SSchema* pSchema; SSchema* pSchema;
......
...@@ -142,6 +142,7 @@ enum { ...@@ -142,6 +142,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
......
...@@ -35,9 +35,7 @@ struct tmq_list_t { ...@@ -35,9 +35,7 @@ struct tmq_list_t {
}; };
struct tmq_topic_vgroup_t { struct tmq_topic_vgroup_t {
char* topic; SMqOffset offset;
int32_t vgId;
int64_t offset;
}; };
struct tmq_topic_vgroup_list_t { struct tmq_topic_vgroup_list_t {
...@@ -123,6 +121,12 @@ typedef struct { ...@@ -123,6 +121,12 @@ typedef struct {
tsem_t rspSem; tsem_t rspSem;
} SMqCommitCbParam; } SMqCommitCbParam;
typedef struct {
tmq_t* tmq;
tsem_t rspSem;
tmq_resp_err_t rspErr;
} SMqResetOffsetParam;
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false; conf->auto_commit = false;
...@@ -173,12 +177,6 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { ...@@ -173,12 +177,6 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
return 0; return 0;
} }
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
// build msg
// send to mnode
return TMQ_RESP_ERR__SUCCESS;
}
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
...@@ -196,6 +194,13 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -196,6 +194,13 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param;
pParam->rspErr = code;
tsem_post(&pParam->rspSem);
return 0;
}
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1); tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
if (pTmq == NULL) { if (pTmq == NULL) {
...@@ -218,6 +223,55 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -218,6 +223,55 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq; return pTmq;
} }
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
SRequestObj* pRequest = NULL;
// build msg
// send to mnode
SMqCMResetOffsetReq req;
req.num = offsets->cnt;
req.offsets = (SMqOffset*)offsets->elems;
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSMqCMResetOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = malloc(tlen);
if (buf == NULL) {
tCoderClear(&encoder);
return -1;
}
tCoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncodeSMqCMResetOffsetReq(&encoder, &req);
tCoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET);
if (pRequest == NULL) {
tscError("failed to malloc request");
}
SMqResetOffsetParam param = {0};
tsem_init(&param.rspSem, 0, 0);
param.tmq = tmq;
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param;
sendInfo->fp = tmqResetOffsetCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
return TMQ_RESP_ERR__SUCCESS;
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t sz = topic_list->cnt; int32_t sz = topic_list->cnt;
...@@ -244,6 +298,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -244,6 +298,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN); char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFname == NULL) { if (topicFname == NULL) {
goto _return;
} }
tNameExtractFullName(&name, topicFname); tNameExtractFullName(&name, topicFname);
tscDebug("subscribe topic: %s", topicFname); tscDebug("subscribe topic: %s", topicFname);
...@@ -251,9 +306,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -251,9 +306,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
.nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL}; .nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL};
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
taosArrayPush(tmq->clientTopics, &topic); taosArrayPush(tmq->clientTopics, &topic);
/*SMqClientTopic topic = {*/
/*.*/
/*};*/
taosArrayPush(req.topicNames, &topicFname); taosArrayPush(req.topicNames, &topicFname);
free(dbName); free(dbName);
} }
...@@ -270,7 +322,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -270,7 +322,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
if (pRequest == NULL) { if (pRequest == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc request");
} }
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
......
...@@ -2306,3 +2306,57 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { ...@@ -2306,3 +2306,57 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
} }
int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) {
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1;
if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1;
return encoder->pos;
}
int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1;
if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1;
if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1;
if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1;
return 0;
}
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
if (tStartEncode(encoder) < 0) return -1;
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
}
tEndEncode(encoder);
return encoder->pos;
}
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
}
return 0;
}
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
}
return encoder->pos;
}
int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
}
return 0;
}
...@@ -346,6 +346,14 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -346,6 +346,14 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
} }
if (status == MQ_CONSUMER_STATUS__MODIFY) {
int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
for (int32_t i = 0; i < removeSz; i++) {
char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
free(topicName);
}
taosArrayClear(pConsumer->recentRemovedTopics);
}
} }
} }
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
...@@ -1018,6 +1026,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -1018,6 +1026,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
break; break;
} }
} }
char *oldTopicNameDup = strdup(oldTopicName);
taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY); atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/ /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
} else if (newTopicName != NULL) { } else if (newTopicName != NULL) {
......
...@@ -192,12 +192,12 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -192,12 +192,12 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
if (pTopic->pReadhandle == NULL) { if (pTopic->pReadhandle == NULL) {
ASSERT(false); ASSERT(false);
} }
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册