提交 5cb35f2f 编写于 作者: wmmhello's avatar wmmhello

feat:add committed & position & commite_offset interface

上级 5c344b01
......@@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
......
......@@ -312,6 +312,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
......
......@@ -779,6 +779,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008)
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
......@@ -1327,6 +1327,9 @@ end:
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
int numFields) {
if (!taos || !pData || !tbname) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
......@@ -1413,6 +1416,9 @@ end:
}
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
if (!taos || !pData || !tbname) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
......@@ -1812,6 +1818,7 @@ end:
}
char* tmq_get_json_meta(TAOS_RES* res) {
if (res == NULL) return NULL;
uDebug("tmq_get_json_meta called");
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
return NULL;
......
......@@ -220,6 +220,12 @@ typedef struct SMqSeekParam {
int32_t code;
} SMqSeekParam;
typedef struct SMqCommittedParam {
tsem_t sem;
int32_t code;
SMqVgOffset vgOffset;
} SMqCommittedParam;
typedef struct SMqVgWalInfoParam {
int32_t vgId;
int32_t epoch;
......@@ -241,7 +247,7 @@ typedef struct {
typedef struct {
SMqCommitCbParamSet* params;
SMqVgOffset* pOffset;
// SMqVgOffset* pOffset;
char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
tmq_t* pTmq;
......@@ -292,6 +298,9 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
}
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (conf == NULL || key == NULL || value == NULL){
return TMQ_CONF_INVALID;
}
if (strcasecmp(key, "group.id") == 0) {
tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
return TMQ_CONF_OK;
......@@ -406,6 +415,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
if(list == NULL) return -1;
SArray* container = &list->container;
if (src == NULL || src[0] == 0) return -1;
char* topic = taosStrdup(src);
......@@ -414,16 +424,19 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
}
void tmq_list_destroy(tmq_list_t* list) {
if(list == NULL) return;
SArray* container = &list->container;
taosArrayDestroyP(container, taosMemoryFree);
}
int32_t tmq_list_get_size(const tmq_list_t* list) {
if(list == NULL) return -1;
const SArray* container = &list->container;
return taosArrayGetSize(container);
}
char** tmq_list_to_c_array(const tmq_list_t* list) {
if(list == NULL) return NULL;
const SArray* container = &list->container;
return container->pData;
}
......@@ -432,7 +445,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
taosMemoryFree(pParam->pOffset);
// taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
......@@ -441,30 +454,25 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
}
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
if (pOffset == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SMqVgOffset pOffset = {0};
pOffset->consumerId = tmq->consumerId;
pOffset->offset.val = *offset;
pOffset.consumerId = tmq->consumerId;
pOffset.offset.val = *offset;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName);
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
if (code < 0) {
taosMemoryFree(pOffset);
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
taosMemoryFree(pOffset);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -474,19 +482,18 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeMqVgOffset(&encoder, pOffset);
tEncodeMqVgOffset(&encoder, &pOffset);
tEncoderClear(&encoder);
// build param
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
taosMemoryFree(pOffset);
taosMemoryFree(buf);
return TSDB_CODE_OUT_OF_MEMORY;
}
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// pParam->pOffset = pOffset;
pParam->vgId = vgId;
pParam->pTmq = tmq;
......@@ -495,7 +502,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
taosMemoryFree(pOffset);
taosMemoryFree(buf);
taosMemoryFree(pParam);
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -553,40 +559,34 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p
return pParamSet;
}
static SMqClientVg* getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId){
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
if (pTopic == NULL) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s", tmq->consumerId, pTopicName);
return NULL;
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) {
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgId) {
*pVg = pClientVg;
break;
}
}
if (j == numOfVgroups) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
vgId, numOfVgroups, pTopicName);
return NULL;
}
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
return pVg;
return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
}
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0;
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
taosRLockLatch(&tmq->lock);
SMqClientVg* pVg = getClientVg(tmq, pTopicName, vgId);
if(pVg == NULL){
code = TSDB_CODE_TMQ_INVALID_VGID;
SMqClientVg* pVg = NULL;
code = getClientVg(tmq, pTopicName, vgId, &pVg);
if(code != 0){
goto end;
}
if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
......@@ -601,7 +601,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopicName, pParamSet);
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
......@@ -964,6 +964,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if (*topics == NULL) {
*topics = tmq_list_new();
}
......@@ -977,6 +978,7 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
}
int32_t tmq_unsubscribe(tmq_t* tmq) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
......@@ -1047,6 +1049,7 @@ static void tmqMgmtInit(void) {
}
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if(conf == NULL) return NULL;
taosThreadOnce(&tmqInit, tmqMgmtInit);
if (tmqInitRes != 0) {
terrno = tmqInitRes;
......@@ -1140,6 +1143,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
......@@ -1264,6 +1268,7 @@ FAIL:
}
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
if(conf == NULL) return;
conf->commitCb = cb;
conf->commitCbUserParam = param;
}
......@@ -2050,6 +2055,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if(tmq == NULL) return NULL;
void* rspObj;
int64_t startTime = taosGetTimestampMs();
......@@ -2129,6 +2136,8 @@ static void displayConsumeStatistics(tmq_t* pTmq) {
}
int32_t tmq_consumer_close(tmq_t* tmq) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq);
......@@ -2174,6 +2183,9 @@ const char* tmq_err2str(int32_t err) {
}
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
if (res == NULL){
return TMQ_RES_INVALID;
}
if (TD_RES_TMQ(res)) {
return TMQ_RES_DATA;
} else if (TD_RES_TMQ_META(res)) {
......@@ -2186,6 +2198,9 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
}
const char* tmq_get_topic_name(TAOS_RES* res) {
if (res == NULL){
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
......@@ -2201,6 +2216,10 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
}
const char* tmq_get_db_name(TAOS_RES* res) {
if (res == NULL){
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
......@@ -2216,6 +2235,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
}
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL){
return -1;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
......@@ -2231,6 +2253,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
}
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (res == NULL){
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*) res;
STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset;
......@@ -2254,10 +2279,13 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
}
// data from tsdb, no valid offset info
return -1;
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
const char* tmq_get_table_name(TAOS_RES* res) {
if (res == NULL){
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
......@@ -2277,6 +2305,10 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
return;
}
if (pRes == NULL) { // here needs to commit all offsets.
asyncCommitAllOffsets(tmq, cb, param);
} else { // only commit one offset
......@@ -2291,6 +2323,11 @@ static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
......@@ -2314,11 +2351,18 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
}
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
static bool isWalRangeOk(SVgOffsetInfo* offset){
if (offset->walVerBegin != -1 && offset->walVerEnd != -1) {
return true;
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value){
if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
tscError("Assignment or poll interface need to be called first");
return TSDB_CODE_TMQ_NEED_INITIALIZED;
}
return false;
if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd);
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
}
return 0;
}
int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){
......@@ -2332,41 +2376,18 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
SMqClientVg* pVg = NULL;
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgId) {
pVg = pClientVg;
break;
}
}
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_VGID;
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
if (!isWalRangeOk(pOffsetInfo)) {
tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_NEED_INITIALIZED;
}
if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
return code;
}
taosWUnLockLatch(&tmq->lock);
......@@ -2384,7 +2405,7 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
tsem_wait(&pInfo->sem);
int32_t code = pInfo->code;
code = pInfo->code;
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
......@@ -2394,6 +2415,41 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
return code;
}
int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
}
taosWUnLockLatch(&tmq->lock);
STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
SAskEpInfo* pInfo = param;
pInfo->code = code;
......@@ -2525,7 +2581,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
}
// if no more waiting rsp
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
if(pParamSet->callbackFn != NULL){
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
}
taosMemoryFree(pParamSet);
// tmq->needReportOffsetRows = true;
......@@ -2622,8 +2681,104 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){
return false;
}
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqCommittedParam* pParam = param;
if (code != 0){
goto end;
}
if (pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tDecoderClear(&decoder);
}
end:
if(pMsg){
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
pParam->code = code;
tsem_post(&pParam->sem);
return 0;
}
int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* epSet){
int32_t code = 0;
SMqVgOffset pOffset = {0};
pOffset.consumerId = tmq->consumerId;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset.offset.subKey + groupLen + 1, tname);
int32_t len = 0;
tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
if (code < 0) {
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeMqVgOffset(&encoder, &pOffset);
tEncoderClear(&encoder);
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(buf);
return TSDB_CODE_OUT_OF_MEMORY;
}
SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
if (pParam == NULL) {
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmCommittedCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);
code = pParam->code;
if(code == TSDB_CODE_SUCCESS){
if(pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG){
code = pParam->vgOffset.offset.val.version;
}else{
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
}
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
}
int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if (tmq == NULL) {
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
......@@ -2633,60 +2788,103 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
SMqClientVg* pVg = NULL;
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgId) {
pVg = pClientVg;
break;
}
}
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_VGID;
return code;
}
int32_t type = pVg->offsetInfo.endOffset.type;
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->endOffset.type;
if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
if (!isWalRangeOk(&pVg->offsetInfo)) {
tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId);
code = checkWalRange(pOffsetInfo, -1);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_NEED_INITIALIZED;
return code;
}
SEpSet epSet = pVg->epSet;
int64_t begin = pVg->offsetInfo.walVerBegin;
int64_t end = pVg->offsetInfo.walVerEnd;
taosWUnLockLatch(&tmq->lock);
int64_t position = 0;
STqOffsetVal* pOffsetInfo = &pVg->offsetInfo.endOffset;
if(type == TMQ_OFFSET__LOG){
position = pOffsetInfo->version;
}else if(type == TMQ_OFFSET__RESET_EARLIEST){
position = pVg->offsetInfo.walVerBegin;
}else if(type == TMQ_OFFSET__RESET_LATEST){
position = pVg->offsetInfo.walVerEnd;
position = pOffsetInfo->endOffset.version;
}else if(type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST){
code = getCommittedFromServer(tmq, tname, vgId, &epSet);
if(code == TSDB_CODE_TMQ_NO_COMMITTED){
if(type == TMQ_OFFSET__RESET_EARLIEST){
position = begin;
} else if(type == TMQ_OFFSET__RESET_LATEST){
position = end;
}
}else{
position = code;
}
}else{
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
}
taosWUnLockLatch(&tmq->lock);
return position;
}
int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
int64_t committed = 0;
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
committed = pOffsetInfo->committedOffset.version;
taosWUnLockLatch(&tmq->lock);
return committed;
}
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
return getCommittedFromServer(tmq, tname, vgId, &epSet);
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
int32_t* numOfAssignment) {
if(tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL){
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
*numOfAssignment = 0;
*assignment = NULL;
SMqVgCommon* pCommon = NULL;
......@@ -2881,7 +3079,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL) {
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
......@@ -2891,27 +3089,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
SMqClientVg* pVg = NULL;
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgId) {
pVg = pClientVg;
break;
}
}
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_VGID;
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
......@@ -2923,53 +3106,44 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
if (!isWalRangeOk(&pVg->offsetInfo)) {
tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_NEED_INITIALIZED;
}
if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
code = checkWalRange(pOffsetInfo, -1);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
return code;
}
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
// update the offset, and then commit to vnode
pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->endOffset.version = offset;
pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
pVg->seekUpdated = true;
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
SMqSeekReq req = {0};
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName);
req.head.vgId = pVg->vgId;
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
req.head.vgId = vgId;
req.consumerId = tmq->consumerId;
int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
if (msgSize < 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -2977,7 +3151,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pParam == NULL) {
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
......@@ -2991,18 +3164,15 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, pTopic->topicName, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
taosWUnLockLatch(&tmq->lock);
tmq->consumerId, tname, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);
int32_t code = pParam->code;
code = pParam->code;
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code));
}
tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
return code;
}
......@@ -1075,6 +1075,89 @@ TEST(clientCase, sub_db_test) {
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, tmq_commit) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
char topicName[128] = "tp";
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, topicName);
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 2000;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId);
printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1);
position = tmq_position(tmq, topicName, pAssign[i].vgId);
printf("after seek 100, position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
}
while (1) {
printf("start to poll\n");
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
printSubResults(pRes, &totalRows);
} else {
break;
}
tmq_commit_sync(tmq, pRes);
for(int i = 0; i < numOfAssign; i++) {
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
}
if (pRes != NULL) {
taos_free_result(pRes);
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, td_25129) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
......
......@@ -732,6 +732,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -232,6 +232,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
......
......@@ -578,6 +578,49 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SMqVgOffset vgOffset = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)data, len);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDecoderClear(&decoder);
STqOffset* pOffset = &vgOffset.offset;
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset == NULL) {
return TSDB_CODE_TMQ_NO_COMMITTED;
}
vgOffset.offset = *pSavedOffset;
int32_t code = 0;
tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
if (code < 0) {
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, len);
tEncodeMqVgOffset(&encoder, &vgOffset);
tEncoderClear(&encoder);
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
tmsgSendRsp(&rsp);
return 0;
}
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
......
......@@ -462,7 +462,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
break;
case TDMT_VND_TMQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, len) < 0) {
goto _err;
}
break;
......@@ -638,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_COMMITTEDINFO:
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
......
......@@ -633,6 +633,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NEED_INITIALIZED, "Assignment or poll interface need to be called first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_COMMITTED, "No committed info")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册