提交 21e5ddbb 编写于 作者: L Liu Jicong

refactor tmq msg handle

上级 906b8d75
......@@ -224,10 +224,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics);
#endif
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
......
......@@ -1380,8 +1380,6 @@ typedef struct SMqCMGetSubEpReq {
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCMGetSubEpReq;
#pragma pack(pop)
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pMsg->contLen);
......@@ -1851,6 +1849,12 @@ typedef struct {
SMqTbData* tbData;
} SMqTopicData;
typedef struct {
int8_t mqMsgType;
int32_t code;
int32_t epoch;
} SMqRspHead;
typedef struct {
int64_t consumerId;
SSchemaWrapper* schemas;
......@@ -1867,6 +1871,7 @@ typedef struct {
int64_t consumerId;
int64_t blockingTime;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t currentOffset;
......@@ -1886,11 +1891,18 @@ typedef struct {
typedef struct {
int64_t consumerId;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
struct tmq_message_t {
SMqRspHead head;
union {
SMqConsumeRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp;
};
};
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
......@@ -1972,6 +1984,8 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
return buf;
}
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
......
......@@ -26,6 +26,7 @@
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedbuf.h"
#include "tqueue.h"
#include "tref.h"
struct tmq_list_t {
......@@ -59,22 +60,34 @@ struct tmq_t {
char groupId[256];
char clientId[256];
int8_t autoCommit;
SRWLatch lock;
int64_t consumerId;
int32_t epoch;
int32_t resetOffsetCfg;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
SRWLatch pollLock;
// stat
int64_t pollCnt;
};
struct tmq_message_t {
SMqConsumeRsp rsp;
enum {
TMQ_MSG_TYPE__POLL_RSP = 0,
TMQ_MSG_TYPE__EP_RSP,
};
enum {
TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT,
};
enum {
TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY,
};
typedef struct {
......@@ -84,6 +97,7 @@ typedef struct {
int64_t currentOffset;
// connection info
int32_t vgId;
int32_t vgStatus;
SEpSet epSet;
} SMqClientVg;
......@@ -105,15 +119,16 @@ typedef struct {
typedef struct {
tmq_t* tmq;
int32_t wait;
int32_t sync;
tsem_t rspSem;
} SMqAskEpCbParam;
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t** retMsg;
tsem_t rspSem;
} SMqConsumeCbParam;
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t* rspMsg;
tsem_t rspSem;
} SMqPollCbParam;
typedef struct {
tmq_t* tmq;
......@@ -210,6 +225,22 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if (*topics == NULL) {
*topics = tmq_list_new();
}
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
tmq_list_append(*topics, strdup(topic->topicName));
}
return TMQ_RESP_ERR__SUCCESS;
}
tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
tmq_list_t* lst = tmq_list_new();
return tmq_subscribe(tmq, lst);
}
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
if (pTmq == NULL) {
......@@ -219,7 +250,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
taosInitRWLatch(&pTmq->lock);
taosInitRWLatch(&pTmq->pollLock);
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
......@@ -227,9 +258,11 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall();
return pTmq;
}
......@@ -291,7 +324,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = pParam;
......@@ -366,10 +403,17 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tscError("failed to malloc request");
}
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
SMqSubscribeCbParam param = {
.rspErr = TMQ_RESP_ERR__SUCCESS,
.tmq = tmq,
};
tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param;
......@@ -392,36 +436,6 @@ _return:
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
tmq_t* pTmq = (void*)param;
SArray* pArray = taosArrayInit(0, sizeof(SKv));
if (pArray == NULL) {
return NULL;
}
SKv kv = {0};
kv.key = HEARTBEAT_KEY_MQ_TMP;
SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (pMqHb == NULL) {
return pArray;
}
pMqHb->consumerId = connKey.connId;
SArray* clientTopics = pTmq->clientTopics;
int sz = taosArrayGetSize(clientTopics);
for (int i = 0; i < sz; i++) {
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
/*if (pCTopic->vgId == -1) {*/
/*pMqHb->status = 1;*/
/*break;*/
/*}*/
}
kv.value = pMqHb;
kv.valueLen = sizeof(SMqHbMsg);
taosArrayPush(pArray, &kv);
return pArray;
}
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
......@@ -579,7 +593,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
static bool noPrintSchema;
char pBuf[128];
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
int32_t colNum = pRsp->schemas->nCols;
if (!noPrintSchema) {
printf("|");
......@@ -619,17 +633,16 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
}
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
if (code != 0) {
printf("msg discard\n");
tsem_post(&pParam->rspSem);
return 0;
}
SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));
if (pRsp == NULL) {
tsem_post(&pParam->rspSem);
taosWUnLockLatch(&pParam->tmq->pollLock);
return -1;
}
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
......@@ -637,76 +650,80 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (pRsp->numOfTopics == 0) {
/*printf("no data\n");*/
free(pRsp);
tsem_post(&pParam->rspSem);
taosWUnLockLatch(&pParam->tmq->pollLock);
return 0;
}
*pParam->retMsg = (tmq_message_t*)pRsp;
pParam->rspMsg = (tmq_message_t*)pRsp;
pVg->currentOffset = pRsp->rspOffset;
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
tsem_post(&pParam->rspSem);
taosWUnLockLatch(&pParam->tmq->pollLock);
/*printf("\n-----msg end------\n");*/
return 0;
}
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics);
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = pVgEp->offset,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
};
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(tmq->clientTopics, &topic);
}
atomic_store_32(&tmq->epoch, epoch);
return set;
}
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait);
if (pParam->wait) {
tsem_post(&tmq->rspSem);
printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
if (pParam->sync) {
tsem_post(&pParam->rspSem);
}
return 0;
}
tscDebug("tmq ask ep cb called");
bool set = false;
SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
int32_t sz = taosArrayGetSize(rsp.topics);
// TODO: lock
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if (rsp.epoch != tmq->epoch) {
// TODO
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
// clang-format off
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = pVgEp->offset,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
// clang-format on
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(tmq->clientTopics, &topic);
if (pParam->sync) {
SMqRspHead* head = pMsg->pData;
SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) {
atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
tmq->epoch = rsp.epoch;
}
if (set) {
atomic_store_64(&tmq->status, 1);
}
// unlock
/*tsem_post(&tmq->rspSem);*/
if (pParam->wait) {
tsem_post(&tmq->rspSem);
tsem_post(&pParam->rspSem);
tDeleteSMqCMGetSubEpRsp(&rsp);
} else {
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp);
taosWriteQitem(tmq->mqueue, pRsp);
}
tDeleteSMqCMGetSubEpRsp(&rsp);
return 0;
}
int32_t tmqAskEp(tmq_t* tmq, bool wait) {
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) {
......@@ -723,7 +740,11 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) {
goto END;
}
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
......@@ -731,7 +752,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) {
goto END;
}
pParam->tmq = tmq;
pParam->wait = wait;
pParam->sync = sync;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
......@@ -744,7 +766,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) {
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
END:
if (wait) tsem_wait(&tmq->rspSem);
if (sync) tsem_wait(&pParam->rspSem);
return 0;
}
......@@ -792,6 +814,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;
pReq->currentOffset = reqOffset;
pReq->head.vgId = htonl(pVg->vgId);
......@@ -799,11 +822,146 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie
return pReq;
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg;
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
continue;
}
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return -1;
}
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
if (param == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return -1;
}
param->tmq = tmq;
param->pVg = pVg;
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqConsumeReq),
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = param;
sendInfo->fp = tmqPollCb;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
}
}
return 0;
}
void tmqFetchLeftRes(tmq_t* tmq, tmq_message_t** pRspMsg) {
taosGetQitem(tmq->qall, (void**)pRspMsg);
if (pRspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)pRspMsg);
}
}
// return
int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) {
tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp);
tmqClearUnhandleMsg(tmq);
*pReset = true;
} else {
*pReset = false;
}
} else {
*pReset = false;
return -1;
}
return 0;
}
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_message = NULL;
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == 0);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
tmqFetchLeftRes(tmq, &rspMsg);
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
}
while (1) {
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) break;
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
return rspMsg;
}
bool reset = false;
tmqHandleRes(tmq, rspMsg, &reset);
taosFreeQitem(rspMsg);
}
tmqPollImpl(tmq, blocking_time);
while (1) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) break;
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
return rspMsg;
} else {
bool reset = false;
tmqHandleRes(tmq, rspMsg, &reset);
taosFreeQitem(rspMsg);
if (reset) tmqPollImpl(tmq, blocking_time);
}
}
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
return NULL;
}
}
}
#if 0
if (blocking_time <= 0) blocking_time = 1;
if (blocking_time > 1000) blocking_time = 1000;
......@@ -835,7 +993,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
return NULL;
}
SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam));
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
if (param == NULL) {
ASSERT(false);
usleep(blocking_time * 1000);
......@@ -847,7 +1005,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqConsumeReq),
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
......@@ -887,6 +1049,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*return pRequest;*/
}
#endif
#if 0
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
......
......@@ -114,7 +114,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg;
......
......@@ -72,7 +72,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
......@@ -121,7 +121,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
int32_t len;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
......@@ -206,7 +206,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TABLE_FNAME_LEN] = {0};
char db[TSDB_TOPIC_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
......@@ -223,7 +223,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN);
memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
pDrop->tuid = htobe64(pTopic->uid);
return pDrop;
......@@ -343,6 +343,7 @@ CREATE_TOPIC_OVER:
}
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
// TODO: cannot drop when subscribed
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
......@@ -408,76 +409,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
STableInfoReq infoReq = {0};
if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
mDebug("topic:%s, start to retrieve meta", infoReq.tbName);
#if 0
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname);
if (pTopic == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_TOPIC;
mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pTopic->lock);
int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pTopic->numOfTags);
pMeta->numOfColumns = htonl(pTopic->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pTopic->version);
pMeta->tuid = htonl(pTopic->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchemas[i];
SSchema *pSrcSchema = &pTopic->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
pReq->pCont = pMeta;
pReq->contLen = contLen;
mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags);
#endif
return 0;
}
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
......@@ -504,6 +436,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
mndReleaseDb(pMnode, pDb);
return 0;
}
#endif
static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
......@@ -571,7 +504,7 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
if (pTopic->dbUid != pDb->uid) {
if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pTopic);
......@@ -580,8 +513,8 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
cols = 0;
char topicName[TSDB_TABLE_NAME_LEN] = {0};
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN);
char topicName[TSDB_TOPIC_NAME_LEN] = {0};
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, topicName);
cols++;
......
......@@ -52,7 +52,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, int64_t version);
int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -221,7 +221,6 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
// pHandle->tbUid = tbUid;
}
return 0;
}
......
......@@ -79,19 +79,19 @@ extern int32_t tqDebugFlag;
// 4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
#define TQ_DUP_INTXN_REJECT 2
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
......@@ -160,7 +160,7 @@ struct STQ {
STqMemRef tqMemRef;
STqMetaStore* tqMeta;
SWal* pWal;
SMeta* pMeta;
SMeta* pVnodeMeta;
};
typedef struct {
......@@ -190,9 +190,6 @@ typedef struct {
char* logicalPlan;
char* physicalPlan;
char* qmsg;
int64_t persistedOffset;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer;
SWalReadHandle* pReadhandle;
} STqTopic;
......@@ -201,7 +198,7 @@ typedef struct {
int64_t consumerId;
int64_t epoch;
char cgroup[TSDB_TOPIC_FNAME_LEN];
SArray* topics; // SArray<STqTopicHandle>
SArray* topics; // SArray<STqTopic>
} STqConsumer;
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
......
......@@ -42,7 +42,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
pTq->path = strdup(path);
pTq->tqConfig = tqConfig;
pTq->pWal = pWal;
pTq->pMeta = pMeta;
pTq->pVnodeMeta = pMeta;
#if 0
pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
......@@ -71,9 +71,11 @@ void tqClose(STQ* pTq) {
// TODO
}
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
// add reference
// judge and launch new query
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
// TODO: add reference
// if handle waiting, launch query and response to consumer
//
// if no waiting handle, return
return 0;
}
......@@ -101,9 +103,9 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic)
/*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
/*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
tlen += taosEncodeString(buf, pTopic->qmsg);
tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);
tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);
tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);
/*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
/*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
/*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
return tlen;
}
......@@ -113,9 +115,9 @@ static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopi
/*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
/*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
buf = taosDecodeString(buf, &pTopic->qmsg);
buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);
buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);
buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);
/*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
/*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
/*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
return buf;
}
......@@ -194,8 +196,8 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
}
for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta};
pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
}
......@@ -243,7 +245,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// rsponse to user
// response to user
break;
}
pHead = pTopic->pReadhandle->pHead;
......@@ -268,7 +270,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
taosArrayPush(pRes, pDataBlock);
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
......@@ -312,158 +313,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
#if 0
int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset = pReq->offset;
int64_t blockingTime = pReq->blockingTime;
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
/*printf("vg %d get consume req\n", pReq->head.vgId);*/
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
rpcSendResponse(pMsg);
return 0;
}
int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
// TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
ASSERT(false);
continue;
}
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset - 1;
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
if (fetchOffset <= pTopic->committedOffset) {
fetchOffset = pTopic->committedOffset + 1;
}
/*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/
int8_t pos;
int8_t skip = 0;
SWalHead* pHead;
while (1) {
pos = fetchOffset % TQ_BUFFER_SIZE;
skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
if (skip == 1) {
// do nothing
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
// read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
}
rsp.skipLogNum++;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
fetchOffset++;
}
if (skip == 1) continue;
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
printf("current fetch offset %ld\n", fetchOffset);
qSetStreamInput(task, pCont);
// SArray<SSDataBlock>
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
break;
}
if (pDataBlock != NULL) {
taosArrayPush(pRes, pDataBlock);
} else {
break;
}
}
// TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
fetchOffset++;
continue;
} else {
rsp.numOfTopics++;
}
rsp.pBlockData = pRes;
#if 0
pTopic->buffer.output[pos].dst = pRes;
if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
pTopic->buffer.firstOffset = pReq->offset;
}
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset;
}
#endif
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
if (rsp.pBlockData) {
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
rsp.pBlockData = NULL;
}
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
#endif
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
tDecodeSMqMVRebReq(msg, &req);
......@@ -505,8 +354,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->logicalPlan = req.logicalPlan;
pTopic->physicalPlan = req.physicalPlan;
pTopic->qmsg = req.qmsg;
pTopic->committedOffset = -1;
pTopic->currentOffset = -1;
/*pTopic->committedOffset = -1;*/
/*pTopic->currentOffset = -1;*/
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
......@@ -516,8 +365,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
}
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta};
pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
}
......
......@@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// todo: change the interface here
int64_t ver;
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) {
// TODO: handle error
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册