未验证 提交 e44f7c0d 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10222 from taosdata/feature/tq

rewrite rebalance
......@@ -44,11 +44,11 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
return -1;
}
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
......@@ -114,19 +114,19 @@ void basic_consume_loop(tmq_t *tmq,
return;
}
int32_t cnt = 0;
clock_t startTime = clock();
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
/*msg_process(tmqmessage);*/
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
} else {
break;
/*} else {*/
/*break;*/
}
}
clock_t endTime = clock();
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)
......
......@@ -1096,7 +1096,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeString(buf, &pReq->consumerGroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for (int i = 0; i < pReq->topicNum; i++) {
char* name = NULL;
char* name;
buf = taosDecodeString(buf, &name);
taosArrayPush(pReq->topicNames, &name);
}
......@@ -1173,9 +1173,60 @@ typedef struct {
} SMqTmrMsg;
typedef struct {
int64_t consumerId;
const char* key;
SArray* lostConsumers; //SArray<int64_t>
SArray* removedConsumers; //SArray<int64_t>
SArray* newConsumers; //SArray<int64_t>
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)calloc(1, sizeof(SMqRebSubscribe));
if (pRebSub == NULL) {
goto _err;
}
pRebSub->key = key;
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) {
goto _err;
}
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->removedConsumers == NULL) {
goto _err;
}
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->newConsumers == NULL) {
goto _err;
}
return pRebSub;
_err:
taosArrayDestroy(pRebSub->lostConsumers);
taosArrayDestroy(pRebSub->removedConsumers);
taosArrayDestroy(pRebSub->newConsumers);
tfree(pRebSub);
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization
typedef struct {
//SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
#if 0
static FORCE_INLINE SMqDoRebalanceMsg* tNewSMqDoRebalanceMsg() {
SMqDoRebalanceMsg *pMsg = malloc(sizeof(SMqDoRebalanceMsg));
if (pMsg == NULL) {
return NULL;
}
pMsg->rebSubscribes = taosArrayInit(0, sizeof(SMqRebSubscribe));
if (pMsg->rebSubscribes == NULL) {
free(pMsg);
return NULL;
}
return pMsg;
}
#endif
typedef struct {
int64_t status;
} SMVSubscribeRsp;
......
......@@ -222,7 +222,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw);
* @param pKey The key value of the row.
* @return void* The object of the row.
*/
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey);
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
/**
* @brief Release a row from sdb.
......
......@@ -22,6 +22,16 @@
extern "C" {
#endif
enum {
MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__ACTIVE,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__MODIFY
};
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
......
......@@ -391,6 +391,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsum
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
......@@ -501,9 +502,9 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedVg, &cEp);
SMqConsumerEp consumerEp = {0};
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
taosArrayPush(pSub->unassignedVg, &consumerEp);
}
return buf;
}
......@@ -542,7 +543,8 @@ typedef struct {
int64_t connId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<char*>
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
int64_t epoch;
// stat
int64_t pollCnt;
......@@ -555,16 +557,25 @@ typedef struct {
} SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
int32_t sz;
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
tlen += taosEncodeString(buf, pConsumer->cgroup);
int32_t sz = taosArrayGetSize(pConsumer->topics);
sz = taosArrayGetSize(pConsumer->currentTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(pConsumer->currentTopics, i);
tlen += taosEncodeString(buf, topic);
}
sz = taosArrayGetSize(pConsumer->recentRemovedTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(pConsumer->topics, i);
char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i);
tlen += taosEncodeString(buf, topic);
}
return tlen;
......@@ -577,12 +588,21 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->currentTopics, &topic);
}
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->topics, &topic);
taosArrayPush(pConsumer->recentRemovedTopics, &topic);
}
return buf;
}
......
......@@ -25,7 +25,8 @@ extern "C" {
int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char* key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
#ifdef __cplusplus
......
......@@ -61,6 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
}
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
pConsumer->status = MQ_CONSUMER_STATUS__INIT;
strcpy(pConsumer->cgroup, cgroup);
taosInitRWLatch(&pConsumer->lock);
return pConsumer;
......
......@@ -107,7 +107,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
return hash;
}
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
int32_t keySize;
EKeyType keyType = pSdb->keyTypes[type];
......@@ -263,7 +263,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
return code;
}
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
terrno = 0;
SHashObj *hash = sdbGetHash(pSdb, type);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册