提交 d75ab9b3 编写于 作者: L Liu Jicong


上级 19bbd680
......@@ -22,6 +22,7 @@
static int running = 1;
static void msg_process(TAOS_RES* msg) {
char buf[1024];
memset(buf, 0, 1024);
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("vg:%d\n", tmq_get_vgroup_id(msg));
while (1) {
......@@ -220,7 +221,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
......@@ -2396,6 +2396,7 @@ typedef struct {
int64_t consumerId;
} SMqRspHead;
#if 0
typedef struct {
SMsgHead head;
......@@ -2409,6 +2410,17 @@ typedef struct {
uint64_t reqId;
} SMqPollReq;
typedef struct {
SMsgHead head;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t blockingTime;
int64_t currentOffset;
} SMqPollReqV2;
typedef struct {
int32_t vgId;
......@@ -2482,6 +2494,71 @@ static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRs
return (void*)buf;
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*>
SArray* blockSchema; // SArray<SSchemaWrapper>
SArray* blockTags; // SArray<kvrow>
SArray* blockTagSchema; // SArray<kvrow>
} SMqDataBlkRsp;
static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->blockNum);
if (pRsp->blockNum != 0) {
tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
tlen += taosEncodeFixedI8(buf, pRsp->withTag);
tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
void* data = taosArrayGetP(pRsp->blockData, i);
tlen += taosEncodeFixedI32(buf, bLen);
tlen += taosEncodeBinary(buf, data, bLen);
return tlen;
static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0;
void* data = NULL;
buf = taosDecodeFixedI32(buf, &bLen);
buf = taosDecodeBinary(buf, &data, bLen);
taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data);
return (void*)buf;
typedef struct {
SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN];
......@@ -2489,7 +2566,7 @@ typedef struct {
} SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema);
......@@ -44,7 +44,7 @@ extern "C" {
} while (0)
#define HEARTBEAT_INTERVAL 1500 // ms
#define HEARTBEAT_INTERVAL 1500 // ms
enum {
......@@ -187,11 +187,13 @@ typedef struct SRequestSendRecvBody {
} SRequestSendRecvBody;
typedef struct {
int8_t resType;
char* topic;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
int32_t vgId;
int8_t resType;
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
SMqDataBlkRsp rsp;
SReqResultInfo resInfo;
} SMqRspObj;
typedef struct SRequestObj {
......@@ -211,16 +213,24 @@ typedef struct SRequestObj {
SRequestSendRecvBody body;
} SRequestObj;
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter;
return (SReqResultInfo*)taosArrayGet(msg->res, resIter);
return (SReqResultInfo*)&msg->resInfo;
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) {
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* msg = (SMqRspObj*)res;
if (++msg->resIter < taosArrayGetSize(msg->res)) {
return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter);
if (msg->resIter < msg->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
return &msg->resInfo;
return NULL;
......@@ -238,25 +248,25 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int taos_init();
int taos_init();
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
STscObj *acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId();
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj *acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
void resetConnectDB(STscObj* pTscObj);
int taos_options_imp(TSDB_OPTION option, const char* str);
int taos_options_imp(TSDB_OPTION option, const char* str);
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
......@@ -273,12 +283,6 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
......@@ -290,7 +294,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void);
// conn level
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
......@@ -14,12 +14,12 @@
#include "catalog.h"
#include "scheduler.h"
#include "clientInt.h"
#include "clientStmt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tref.h"
......@@ -177,25 +177,24 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return doFetchRows(pRequest, true, true);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);
if (msg->resIter == -1) msg->resIter++;
SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter);
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
} else {
pResultInfo = tmqGetCurResInfo(res);
if (pResultInfo->current < pResultInfo->numOfRows) {
pResultInfo->current += 1;
return pResultInfo->row;
} else {
if (msg->resIter < taosArrayGetSize(msg->res)) {
pResultInfo = taosArrayGet(msg->res, msg->resIter);
pResultInfo->current += 1;
return pResultInfo->row;
} else {
return NULL;
pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return NULL;
pResultInfo->current += 1;
return pResultInfo->row;
} else {
// assert to avoid un-initialization error
......@@ -455,7 +454,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
(*numOfRows) = pResultInfo->numOfRows;
return pRequest->code;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return -1;
pResultInfo->current = pResultInfo->numOfRows;
......@@ -474,7 +473,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false);
if (pResultInfo == NULL) {
(*numOfRows) = 0;
return 0;
......@@ -710,10 +709,8 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return stmtBindBatch(stmt, bind);
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
return NULL;
......@@ -72,25 +72,25 @@ struct tmq_conf_t {
struct tmq_t {
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
int8_t inWaiting;
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
/*int8_t inWaiting;*/
int64_t consumerId;
int32_t epoch;
int32_t resetOffsetCfg;
int64_t status;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
int8_t epStatus;
int32_t epSkipCnt;
int32_t waitingRequest;
int32_t readyRequest;
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
tsem_t rspSem;
/*int32_t nextTopicIdx;*/
int8_t epStatus;
int32_t epSkipCnt;
/*int32_t waitingRequest;*/
/*int32_t readyRequest;*/
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
tsem_t rspSem;
// stat
int64_t pollCnt;
......@@ -134,7 +134,7 @@ typedef struct {
int32_t epoch;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
SMqPollRspV2 msg;
SMqDataBlkRsp msg;
} SMqPollRspWrapper;
typedef struct {
......@@ -145,6 +145,7 @@ typedef struct {
typedef struct {
tmq_t* tmq;
int32_t code;
int32_t sync;
tsem_t rspSem;
} SMqAskEpCbParam;
......@@ -327,12 +328,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return NULL;
pTmq->pTscObj = (STscObj*)conn;
pTmq->inWaiting = 0;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->waitingRequest = 0;
pTmq->readyRequest = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf
......@@ -372,12 +373,12 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL;
pTmq->inWaiting = 0;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->waitingRequest = 0;
pTmq->readyRequest = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf
......@@ -862,7 +863,6 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("recv poll\n");*/
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
......@@ -875,17 +875,15 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
// do not write into queue since updating epoch reset
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
return 0;
if (msgEpoch != tmqEpoch) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
} else {
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
#if 0
......@@ -907,45 +905,33 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
/*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
/*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
// TODO: alloc mem
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0
if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
taosWriteQitem(tmq->mqueue, pRspWrapper);
atomic_add_fetch_32(&tmq->readyRequest, 1);
return 0;
return 0;
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return code;
return -1;
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
......@@ -1028,6 +1014,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
pParam->code = code;
if (code != 0) {
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
goto END;
......@@ -1067,6 +1054,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
taosWriteQitem(tmq->mqueue, pWrapper);
......@@ -1078,7 +1066,8 @@ END:
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
int32_t code = 0;
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
......@@ -1135,8 +1124,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (sync) tsem_wait(&pParam->rspSem);
return 0;
if (sync) {
code = pParam->code;
return code;
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
......@@ -1162,7 +1155,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
......@@ -1174,13 +1167,18 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
reqOffset = tmq->resetOffsetCfg;
SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2));
if (pReq == NULL) {
return NULL;
strcpy(pReq->topic, pTopic->topicName);
strcpy(pReq->cgroup, tmq->groupId);
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
int32_t tlen = strlen(tmq->groupId);
memcpy(pReq->subKey, tmq->groupId, tlen);
pReq->subKey[tlen] = TMQ_SEPARATOR;
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->blockingTime = blockingTime;
pReq->consumerId = tmq->consumerId;
......@@ -1189,101 +1187,26 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
pReq->reqId = generateRequestId();
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqPollReq));
pReq->head.contLen = htonl(sizeof(SMqPollReqV2));
return pReq;
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
pRspObj->topic = strdup(pWrapper->topicHandle->topicName);
pRspObj->resIter = -1;
strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
SMqPollRspV2* pRsp = &pWrapper->msg;
int32_t blockNum = taosArrayGetSize(pRsp->blockPos);
pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo));
for (int32_t i = 0; i < blockNum; i++) {
int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
SReqResultInfo resInfo = {0};
resInfo.totalRows = 0;
resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
setQueryResultFromRsp(&resInfo, pRetrieve, true);
taosArrayPush(pRspObj->res, &resInfo);
return pRspObj;
#if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL;
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
/*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return NULL;
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return NULL;
pParam->tmq = tmq;
pParam->pVg = pVg;
pParam->epoch = tmq->epoch;
pParam->sync = 1;
pParam->msg = &msg;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
return NULL;
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqPollReq),
.handle = NULL,
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_CONSUME;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
/*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
tmq_message_t* nmsg = NULL;
while (1) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
if (nmsg == NULL) continue;
while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
return nmsg;
return NULL;
return pRspObj;
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
/*printf("call poll\n");*/
......@@ -1306,7 +1229,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
......@@ -1337,7 +1260,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqPollReq),
.len = sizeof(SMqPollReqV2),
.handle = NULL,
sendInfo->requestId = pReq->reqId;
......@@ -1348,7 +1271,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
/*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
......@@ -1390,7 +1313,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
atomic_sub_fetch_32(&tmq->readyRequest, 1);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/
......@@ -1398,7 +1321,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->msg.dataLen == 0) {
if (pollRspWrapper->msg.blockNum == 0) {
rspWrapper = NULL;
......@@ -1454,7 +1377,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) {
tscDebug("not ready, retry\n");
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspObj) {
......@@ -186,6 +186,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
......@@ -321,7 +321,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
/*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
......@@ -334,10 +334,11 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
/*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/
/*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/
/*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/
/*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);*/
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
......@@ -127,7 +127,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
......@@ -143,6 +142,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
// do nothing
} else if (status == MQ_CONSUMER_STATUS__LOST) {
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) {
......@@ -151,7 +151,9 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
......@@ -169,11 +171,11 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} else {
// do nothing
mndReleaseConsumer(pMnode, pConsumer);
......@@ -188,7 +190,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
} else {
mInfo("mq rebalance finished, no modification");
mTrace("mq rebalance finished, no modification");
atomic_store_8(&mqInRebFlag, 0);
return 0;
......@@ -213,12 +215,12 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST) {
// recover consumer
// TODO: recover consumer
if (status != MQ_CONSUMER_STATUS__READY) {
mndReleaseConsumer(pMnode, pConsumer);
return -1;
......@@ -228,11 +230,13 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
// 2. check epoch, only send ep info when epoches do not match
if (epoch != serverEpoch) {
mInfo("process ask ep, consumer %ld(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
if (rsp.topics == NULL) {
goto FAIL;
......@@ -265,6 +269,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
if (topicEp.vgs == NULL) {
goto FAIL;
......@@ -250,13 +250,28 @@ void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) {
int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId);
tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);
int32_t sz = taosArrayGetSize(pEpInSub->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, i);
tlen += tEncodeSMqVgEp(buf, pVgEp);
/*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/
return tlen;
void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) {
buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId);
buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));
/*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pEpInSub->vgs = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
buf = tDecodeSMqVgEp(buf, pVgEp);
taosArrayPush(pEpInSub->vgs, &pVgEp);
return (void *)buf;
......@@ -268,10 +283,12 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
pSubNew->vgNum = -1;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set free fp
SMqConsumerEpInSub *pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));
pEpInSub->vgs = taosArrayInit(0, sizeof(SMqVgEp));
SMqConsumerEpInSub epInSub = {
.consumerId = -1,
.vgs = taosArrayInit(0, sizeof(void *)),
int64_t unexistKey = -1;
taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), pEpInSub, sizeof(SMqConsumerEpInSub));
taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
return pSubNew;
......@@ -287,7 +304,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
void *pIter = NULL;
SMqConsumerEpInSub *pEpInSub = NULL;
while (1) {
pIter = taosHashIterate(pSubNew->consumerHash, pIter);
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pEpInSub = (SMqConsumerEpInSub *)pIter;
SMqConsumerEpInSub newEp = {
......@@ -434,7 +434,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return -1;
ASSERT(pSub->vgNum == 0);
ASSERT(pSub->vgNum == -1);
pSub->vgNum = 0;
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
......@@ -455,6 +457,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int64_t unexistKey = -1;
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
void* pIter = NULL;
while (1) {
......@@ -492,10 +497,18 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
taosArrayPush(pEpInSub->vgs, &pVgEp);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
taosHashRelease(pSub->consumerHash, pEpInSub);
ASSERT(pEpInSub->vgs->size > 0);
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub->vgs->size > 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
return 0;
......@@ -109,6 +109,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return NULL;
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
......@@ -116,6 +117,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return NULL;
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
return pSub;
......@@ -255,7 +258,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subK
req.oldConsumerId = pRebVg->oldConsumerId;
req.newConsumerId = pRebVg->newConsumerId;
req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = req.qmsg;
req.qmsg = pRebVg->pVgEp->qmsg;
strncpy(req.subKey, subKey, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
......@@ -663,14 +666,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pInput->pTopic != NULL) {
// create subscribe
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 1);
} else {
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
int32_t totalVgNum = pOutput->pSub->vgNum;
mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0);
// 2. check and get actual removed consumers, put their vg into hash
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0;
......@@ -692,18 +699,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
taosHashRelease(pOutput->pSub->consumerHash, pEpInSub);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// put into removed
taosArrayPush(pOutput->removedConsumers, &consumerId);
ASSERT(removedNum == actualRemoved);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0);
// if previously no consumer, there are vgs not assigned
int64_t key = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &key, sizeof(int64_t));
int64_t unexistKey = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t));
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
for (int32_t i = 0; i < consumerVgNum; i++) {
......@@ -715,18 +722,22 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
taosHashRelease(pOutput->pSub->consumerHash, pEpInSub);
// 3. calc vg number of each consumer
int32_t actualConsumerNum = taosHashGetSize(pInput->pOldSub->consumerHash) - 1 +
taosArrayGetSize(pInput->pRebInfo->newConsumers) -
int32_t afterRebConsumerNum = taosHashGetSize(pOutput->pSub->consumerHash) - 1;
ASSERT(afterRebConsumerNum == actualConsumerNum);
int32_t oldSz = 0;
if (pInput->pOldSub) {
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash) - 1;
int32_t afterRebConsumerNum =
oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0;
// calc num
int32_t minVgCnt = totalVgNum / actualConsumerNum;
int32_t imbConsumerNum = totalVgNum % actualConsumerNum;
if (afterRebConsumerNum) {
minVgCnt = totalVgNum / afterRebConsumerNum;
imbConsumerNum = totalVgNum % afterRebConsumerNum;
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t imbCnt = 0;
......@@ -735,6 +746,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0);
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
// all old consumers still existing are touched
......@@ -783,6 +795,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp,
/*SMqConsumerEpInSub *pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
/*ASSERT(pTestNew->consumerId == consumerId);*/
/*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/
taosArrayPush(pOutput->newConsumers, &consumerId);
......@@ -796,6 +811,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0);
/*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/
if (imbCnt < imbConsumerNum) {
......@@ -833,8 +849,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
ASSERT(pRemovedIter == NULL);
} else {
// if all consumer is removed, put all vg into unassigned
int64_t key = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &key, sizeof(int64_t));
int64_t unexistKey = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub->consumerId == -1);
......@@ -845,7 +861,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pIter == NULL) break;
pRebOutput = (SMqRebOutputVg *)pIter;
ASSERT(pRebOutput->newConsumerId == -1);
taosArrayPush(pEpInSub->vgs, pRebOutput->pVgEp);
taosArrayPush(pEpInSub->vgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput);
......@@ -895,7 +912,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
// 3.2 set new consumer
consumerNum = taosArrayGetSize(pOutput->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
......@@ -948,17 +965,22 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
while (1) {
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) break;
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
if (pSub == NULL) {
// split sub key and extract topic
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup);
mndSplitSubscribeKey(pRebSub->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
......@@ -968,24 +990,16 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
rebInput.pRebInfo = pRebSub;
rebInput.pOldSub = pSub;
int32_t unassignedVgNum = 0;
int64_t key = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &key, sizeof(int64_t));
if (pEpInSub != NULL) {
ASSERT(pEpInSub->consumerId == key);
unassignedVgNum = taosArrayGetSize(pEpInSub->vgs);
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, unassignedVgNum);
// TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
if (rebInput.pTopic) {
SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic;
mndReleaseTopic(pMnode, pTopic);
} else {
mndReleaseSubscribe(pMnode, pSub);
......@@ -22,12 +22,14 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndGrant.h"
#include "mndInfoSchema.h"
#include "mndPerfSchema.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndPerfSchema.h"
#include "mndProfile.h"
#include "mndQnode.h"
#include "mndQuery.h"
#include "mndShow.h"
#include "mndSma.h"
#include "mndSnode.h"
......@@ -40,8 +42,6 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndQuery.h"
#include "mndGrant.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
......@@ -16,6 +16,13 @@
#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_
#include "executor.h"
#include "os.h"
#include "thash.h"
#include "tmsg.h"
#include "ttimer.h"
#include "wal.h"
#ifdef __cplusplus
extern "C" {
......@@ -30,12 +37,6 @@ extern "C" {
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
enum {
#define TQ_BUFFER_SIZE 4
......@@ -151,22 +152,27 @@ typedef struct {
} STqMetaStore;
typedef struct {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
} STqMemRef;
int64_t consumerId;
int32_t epoch;
char* qmsg;
// SRWLatch lock;
SWalReadHandle* pReadHandle;
// number should be identical to fetch thread num
qTaskInfo_t task[4];
} STqExec;
struct STQ {
// the collection of groups
// the handle of meta kvstore
bool writeTrigger;
char* path;
STqMemRef tqMemRef;
STqMetaStore* tqMeta;
// STqPushMgr* tqPushMgr;
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
SMeta* pVnodeMeta;
SHashObj* tqMetaNew; // subKey -> tqExec
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
SMeta* pVnodeMeta;
typedef struct {
......@@ -230,10 +236,6 @@ typedef struct {
// TODO sync function
} STqStreamPusher;
typedef struct {
int8_t type; // mq or stream
} STqPusher;
typedef struct {
SHashObj* pHash; // <id, STqPush*>
} STqPushMgr;
......@@ -257,9 +259,10 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
// int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
// int32_t tqProcessRebReq(STQ* pTq, char* msg);
// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
......@@ -314,4 +317,4 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet
#endif /*_TD_VNODE_TQ_H_*/
\ No newline at end of file
#endif /*_TD_VNODE_TQ_H_*/
......@@ -29,31 +29,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe
pTq->pVnode = pVnode;
pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta;
#if 0
pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) {
// TODO: error code of buffer pool
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) {
#if 0
allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
return NULL;
#if 0
pTq->tqPushMgr = tqPushMgrOpen();
if (pTq->tqPushMgr == NULL) {
// free store
return NULL;
pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
......@@ -248,16 +231,15 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0;
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
SMqPollReqV2* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
......@@ -267,65 +249,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRsp rsp = {
/*.consumerId = consumerId,*/
.numOfTopics = 0,
.pBlockData = NULL,
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
return 0;
STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey));
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
STqTopic* pTopic = NULL;
int32_t sz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < sz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
return 0;
consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0;
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockData = taosArrayInit(0, sizeof(void*));
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
consumerEpoch = atomic_load_32(&pConsumer->epoch);
consumerEpoch = atomic_load_32(&pExec->epoch);
if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
......@@ -333,101 +279,86 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
TD_VID(pTq->pVnode), fetchOffset);
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
qTaskInfo_t task = pExec->task[workerId];
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
if (pDataBlock == NULL) break;
taosArrayPush(pRes, pDataBlock);
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
// TODO batch optimization
if (rsp.blockNum != 0) break;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
return 0;
} else {
if (rsp.blockNum != 0)
rsp.rspOffset = fetchOffset;
rsp.rspOffset = fetchOffset - 1;
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
rsp.rspOffset = fetchOffset - 1;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
tEncodeSMqDataBlkRsp(&abuf, &rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
// TODO destroy
return 0;
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
......@@ -436,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = 0;
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
......@@ -635,7 +566,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req;
tDecodeSMqRebVgReq(msg, &req);
// todo lock
STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey));
if (pExec == NULL) {
ASSERT(req.oldConsumerId == -1);
ASSERT(req.newConsumerId != -1);
STqExec exec = {0};
pExec = &exec;
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pExec->consumerId = req.newConsumerId;
pExec->epoch = -1;
pExec->qmsg = req.qmsg;
req.qmsg = NULL;
pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 4; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnodeMeta,
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0;
} else {
if (req.newConsumerId != -1) {
ASSERT(pExec->consumerId == req.oldConsumerId);
// TODO handle qmsg and exec modification
atomic_store_64(&pExec->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pExec->epoch, 1);
return 0;
} else {
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
return 0;
#if 0
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
......@@ -754,6 +734,7 @@ int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) {
return 0;
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType == TASK_EXEC__NONE) return 0;
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "vnodeInt.h"
// TODO:replace by an abstract file layer
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
......@@ -80,6 +80,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
vnodeProcessSubmitReq(pVnode, ptr, pRsp);
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
// TODO: handle error
#if 0
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error
......@@ -93,6 +100,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
} break;
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
......@@ -307,4 +315,4 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
pRsp->contLen = sizeof(SSubmitRsp);
return 0;
\ No newline at end of file
......@@ -38,7 +38,7 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */
#include <sys/file.h>
#if !defined(_TD_DARWIN_64)
#include <sys/sendfile.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <unistd.h>
......@@ -58,9 +58,9 @@ typedef int32_t FileFd;
typedef struct TdFile {
TdThreadRwlock rwlock;
int refId;
FileFd fd;
FILE *fp;
int refId;
FileFd fd;
FILE *fp;
} * TdFilePtr, TdFile;
#define FILE_WITH_LOCK 1
......@@ -182,7 +182,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
return 0;
struct stat fileStat;
int32_t code = stat(path, &fileStat);
int32_t code = stat(path, &fileStat);
if (code < 0) {
return code;
......@@ -203,7 +203,7 @@ int32_t taosDevInoFile(const char *path, int64_t *stDev, int64_t *stIno) {
return 0;
struct stat fileStat;
int32_t code = stat(path, &fileStat);
int32_t code = stat(path, &fileStat);
if (code < 0) {
return code;
......@@ -226,7 +226,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return NULL;
int fd = -1;
int fd = -1;
FILE *fp = NULL;
if (tdFileOptions & TD_FILE_STREAM) {
char *mode = NULL;
......@@ -325,7 +325,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t leftbytes = count;
int64_t readbytes;
char *tbuf = (char *)buf;
......@@ -365,7 +365,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t ret = pread(pFile->fd, buf, count, offset);
......@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
assert(pFile->fd >= 0); // Please check if you have closed the file.
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/
int64_t nleft = count;
int64_t nwritten = 0;
......@@ -414,7 +414,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t ret = lseek(pFile->fd, (long)offset, whence);
......@@ -429,10 +429,10 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
if (pFile == NULL) {
return 0;
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat);
int32_t code = fstat(pFile->fd, &fileStat);
if (code < 0) {
return code;
......@@ -456,7 +456,7 @@ int32_t taosLockFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB);
......@@ -469,7 +469,7 @@ int32_t taosUnLockFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB);
......@@ -529,7 +529,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
if (pFile == NULL) {
return 0;
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
return ftruncate(pFile->fd, l_size);
......@@ -637,7 +637,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
off_t len = count;
while (len > 0) {
char buf[1024 * 16];
char buf[1024 * 16];
off_t n = sizeof(buf);
if (len < n) n = len;
size_t m = fread(buf, 1, n, in_file);
......@@ -662,7 +662,7 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
off_t len = count;
while (len > 0) {
char buf[1024 * 16];
char buf[1024 * 16];
off_t n = sizeof(buf);
if (len < n) n = len;
size_t m = read(sfd, buf, n);
......@@ -750,7 +750,7 @@ void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length) {
if (pFile == NULL) {
return NULL;
assert(pFile->fd >= 0); // Please check if you have closed the file.
assert(pFile->fd >= 0); // Please check if you have closed the file.
void *ptr = mmap(NULL, length, PROT_READ, MAP_SHARED, pFile->fd, 0);
return ptr;
......@@ -811,4 +811,4 @@ bool taosCheckAccessFile(const char *pathname, int32_t tdFileAccessOptions) {
bool taosCheckExistFile(const char *pathname) { return taosCheckAccessFile(pathname, TD_FILE_ACCESS_EXIST_OK); };
#endif // WINDOWS
#endif // WINDOWS
......@@ -76,7 +76,7 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
if (pArray == NULL || pData == NULL) {
if (pData == NULL) {
return NULL;
......@@ -318,7 +318,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray->size = 0;
void* taosArrayDestroy(SArray* pArray) {
if (pArray) {
......@@ -445,7 +444,8 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
SArray* pArray = taosArrayInit(pSrc->size, pSrc->elemSize);
ASSERT(pSrc->elemSize == sizeof(void*));
SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*));
for (int32_t i = 0; i < pSrc->size; i++) {
void* clone = deepCopy(taosArrayGetP(pSrc, i));
taosArrayPush(pArray, &clone);
......@@ -471,6 +471,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
for (int32_t i = 0; i < sz; i++) {
void* data = taosMemoryCalloc(1, dataSz);
buf = decode(buf, data);
taosArrayPush(*pArray, &data);
return (void*)buf;
......@@ -33,8 +33,8 @@ endi
sql connect
$dbNamme = d0
print =============== create database , vgroup 1
sql create database $dbNamme vgroups 1
print =============== create database , vgroup 4
sql create database $dbNamme vgroups 4
sql use $dbNamme
print =============== create super table
......@@ -276,7 +276,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
ASSERT(TMQ_CONF_OK == tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName));
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
......@@ -437,12 +437,14 @@ void* threadFunc(void* param) {
pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
#if 0
err = tmq_unsubscribe(tmq);
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1;
return NULL;
return NULL;
......@@ -485,11 +487,13 @@ int main(int32_t argc, char* argv[]) {
totalMsgs = parallel_consume(tmq, 0);
#if 0
err = tmq_unsubscribe(tmq);
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
if (g_stConfInfo.numOfTopic1) {
for (int32_t i = 0; i < numOfThreads; i++) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册