未验证 提交 9a1b5da4 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22072 from taosdata/mark/tmq

fix:modify commit version to next validate version
......@@ -287,11 +287,20 @@ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
......@@ -309,11 +318,6 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
enum tmq_res_t {
......
......@@ -312,6 +312,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
......
......@@ -228,7 +228,7 @@ typedef struct SStoreTqReader {
} SStoreTqReader;
typedef struct SStoreSnapshotFn {
int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid);
int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid);
int32_t (*destroySnapshot)(SSnapContext* ctx);
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
......
......@@ -778,6 +778,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007)
#define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008)
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
......@@ -1327,6 +1327,9 @@ end:
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
int numFields) {
if (!taos || !pData || !tbname) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
......@@ -1413,6 +1416,9 @@ end:
}
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
if (!taos || !pData || !tbname) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
......@@ -1812,6 +1818,7 @@ end:
}
char* tmq_get_json_meta(TAOS_RES* res) {
if (res == NULL) return NULL;
uDebug("tmq_get_json_meta called");
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
return NULL;
......
......@@ -139,8 +139,8 @@ enum {
typedef struct SVgOffsetInfo {
STqOffsetVal committedOffset;
STqOffsetVal currentOffset;
STqOffsetVal seekOffset; // the first version in block for seek operation
STqOffsetVal endOffset; // the last version in TAOS_RES + 1
STqOffsetVal beginOffset; // the first version in TAOS_RES
int64_t walVerBegin;
int64_t walVerEnd;
} SVgOffsetInfo;
......@@ -220,6 +220,12 @@ typedef struct SMqSeekParam {
int32_t code;
} SMqSeekParam;
typedef struct SMqCommittedParam {
tsem_t sem;
int32_t code;
SMqVgOffset vgOffset;
} SMqCommittedParam;
typedef struct SMqVgWalInfoParam {
int32_t vgId;
int32_t epoch;
......@@ -241,7 +247,7 @@ typedef struct {
typedef struct {
SMqCommitCbParamSet* params;
SMqVgOffset* pOffset;
// SMqVgOffset* pOffset;
char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
tmq_t* pTmq;
......@@ -255,8 +261,7 @@ typedef struct SSyncCommitInfo {
static int32_t doAskEp(tmq_t* tmq);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups, int32_t type);
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
......@@ -293,6 +298,9 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
}
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (conf == NULL || key == NULL || value == NULL){
return TMQ_CONF_INVALID;
}
if (strcasecmp(key, "group.id") == 0) {
tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
return TMQ_CONF_OK;
......@@ -407,6 +415,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
if(list == NULL) return -1;
SArray* container = &list->container;
if (src == NULL || src[0] == 0) return -1;
char* topic = taosStrdup(src);
......@@ -415,84 +424,28 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
}
void tmq_list_destroy(tmq_list_t* list) {
if(list == NULL) return;
SArray* container = &list->container;
taosArrayDestroyP(container, taosMemoryFree);
}
int32_t tmq_list_get_size(const tmq_list_t* list) {
if(list == NULL) return -1;
const SArray* container = &list->container;
return taosArrayGetSize(container);
}
char** tmq_list_to_c_array(const tmq_list_t* list) {
if(list == NULL) return NULL;
const SArray* container = &list->container;
return container->pData;
}
//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
// int32_t* numOfVgroups) {
// int32_t numOfTopics = taosArrayGetSize(pTopicList);
// *index = -1;
// *numOfVgroups = 0;
//
// for (int32_t i = 0; i < numOfTopics; ++i) {
// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
// if (strcmp(pTopic->topicName, pName) != 0) {
// continue;
// }
//
// *numOfVgroups = taosArrayGetSize(pTopic->vgs);
// for (int32_t j = 0; j < (*numOfVgroups); ++j) {
// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
// if (pClientVg->vgId == vgId) {
// *index = j;
// return pClientVg;
// }
// }
// }
//
// return NULL;
//}
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
// taosThreadMutexLock(&pParam->pTmq->lock);
// int32_t numOfVgroups, index;
// SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
// &numOfVgroups); if (pVg == NULL) {
// tscDebug("consumer:0x%" PRIx64
// " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
// ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
// index + 1, numOfVgroups);
// } else { // let's retry the commit
// int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
// if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
// " retry failed, ignore this commit. code:%s ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
// tstrerror(terrno), index + 1, numOfVgroups);
// }
// }
//
// taosThreadMutexUnlock(&pParam->pTmq->lock);
//
// taosMemoryFree(pParam->pOffset);
// taosMemoryFree(pBuf->pData);
// taosMemoryFree(pBuf->pEpSet);
//
// commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// return 0;
// }
//
// // todo replace the pTmq with refId
taosMemoryFree(pParam->pOffset);
// taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
......@@ -500,54 +453,48 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
return 0;
}
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups, int32_t type) {
SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
if (pOffset == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
SMqVgOffset pOffset = {0};
pOffset->consumerId = tmq->consumerId;
pOffset->offset.val = pVg->offsetInfo.currentOffset;
pOffset.consumerId = tmq->consumerId;
pOffset.offset.val = *offset;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName);
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
if (code < 0) {
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
taosMemoryFree(pOffset);
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeMqVgOffset(&encoder, pOffset);
tEncodeMqVgOffset(&encoder, &pOffset);
tEncoderClear(&encoder);
// build param
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
taosMemoryFree(pOffset);
taosMemoryFree(buf);
return TSDB_CODE_OUT_OF_MEMORY;
}
pParam->params = pParamSet;
pParam->pOffset = pOffset;
pParam->vgId = pVg->vgId;
// pParam->pOffset = pOffset;
pParam->vgId = vgId;
pParam->pTmq = tmq;
tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
......@@ -555,7 +502,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
taosMemoryFree(pOffset);
taosMemoryFree(buf);
taosMemoryFree(pParam);
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -568,23 +514,16 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = type;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
SEp* pEp = GET_ACTIVE_EP(epSet);
char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
totalVgroups, pMsgSendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -604,154 +543,180 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
return NULL;
}
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
char* pTopicName = NULL;
int32_t vgId = 0;
int32_t code = 0;
if (pRes == NULL || tmq == NULL) {
pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
return;
}
if (TD_RES_TMQ(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
} else if (TD_RES_TMQ_META(pRes)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
pTopicName = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
} else if (TD_RES_TMQ_METADATA(pRes)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
} else {
pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
return;
}
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return;
return NULL;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam;
pParamSet->waitingRspNum = rspNum;
return pParamSet;
}
taosRLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
if (pTopic == NULL) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
pTopicName, numOfTopics);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
taosRUnLockLatch(&tmq->lock);
return;
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) {
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgId) {
*pVg = pClientVg;
break;
}
}
if (j == numOfVgroups) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
vgId, numOfVgroups, pTopicName);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
taosRUnLockLatch(&tmq->lock);
return;
return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
}
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0;
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
taosRLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
code = getClientVg(tmq, pTopicName, vgId, &pVg);
if(code != 0){
goto end;
}
if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
// failed to commit, callback user function directly.
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
goto end;
}
// update the offset value.
pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
} else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
pVg->offsetInfo.committedOffset = *offsetVal;
}
end:
taosRUnLockLatch(&tmq->lock);
return code;
}
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return;
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam){
char* pTopicName = NULL;
int32_t vgId = 0;
STqOffsetVal offsetVal = {0};
int32_t code = 0;
if (pRes == NULL || tmq == NULL) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam;
if (TD_RES_TMQ(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
offsetVal = pRspObj->rsp.rspOffset;
} else if (TD_RES_TMQ_META(pRes)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
pTopicName = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
offsetVal = pMetaRspObj->metaRsp.rspOffset;
} else if (TD_RES_TMQ_METADATA(pRes)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
offsetVal = pRspObj->rsp.rspOffset;
} else {
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
end:
if(code != TSDB_CODE_SUCCESS){
pCommitFp(tmq, code, userParam);
}
}
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0;
// init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1;
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1);
if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
taosRLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
numOfVgroups);
tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) {
char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset);
char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
j + 1, numOfVgroups);
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1, numOfVgroups);
continue;
}
// update the offset value.
pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset;
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
}
}
}
taosRUnLockLatch(&tmq->lock);
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
numOfTopics);
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics);
// no request is sent
if (pParamSet->totalRspNum == 0) {
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
// request is sent
if (pParamSet->totalRspNum != 0) {
// count down since waiting rsp num init as 1
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
return;
}
// count down since waiting rsp num init as 1
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
end:
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
return;
}
static void generateTimedTask(int64_t refId, int32_t type) {
......@@ -827,7 +792,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.seekOffset;
offRows->offset = pVg->offsetInfo.beginOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
......@@ -999,6 +964,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if (*topics == NULL) {
*topics = tmq_list_new();
}
......@@ -1012,6 +978,7 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
}
int32_t tmq_unsubscribe(tmq_t* tmq) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
......@@ -1082,6 +1049,7 @@ static void tmqMgmtInit(void) {
}
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if(conf == NULL) return NULL;
taosThreadOnce(&tmqInit, tmqMgmtInit);
if (tmqInitRes != 0) {
terrno = tmqInitRes;
......@@ -1175,6 +1143,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
......@@ -1299,6 +1268,7 @@ FAIL:
}
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
if(conf == NULL) return;
conf->commitCb = cb;
conf->commitCbUserParam = param;
}
......@@ -1523,9 +1493,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.numOfRows = pInfo ? pInfo->numOfRows : 0,
};
clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew;
clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew;
clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
clientVg.offsetInfo.seekOffset = pInfo ? pInfo->seekOffset : offsetNew;
clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew;
clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false;
......@@ -1581,11 +1551,11 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf);
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .seekOffset = pVgCur->offsetInfo.seekOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
}
}
......@@ -1682,7 +1652,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->consumerId = tmq->consumerId;
pReq->timeout = timeout;
pReq->epoch = tmq->epoch;
pReq->reqOffset = pVg->offsetInfo.currentOffset;
pReq->reqOffset = pVg->offsetInfo.endOffset;
pReq->head.vgId = pVg->vgId;
pReq->useSnapshot = tmq->useSnapshot;
pReq->reqId = generateRequestId();
......@@ -1809,7 +1779,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
......@@ -1890,8 +1860,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.seekOffset = *reqOffset;
pVg->offsetInfo.currentOffset = *rspOffset;
pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
}
......@@ -1901,7 +1871,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
// update the valid wal version range
pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever;
pVg->offsetInfo.walVerEnd = ever + 1;
// pVg->receivedInfoFromVnode = true;
}
......@@ -2053,7 +2023,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
......@@ -2085,6 +2055,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if(tmq == NULL) return NULL;
void* rspObj;
int64_t startTime = taosGetTimestampMs();
......@@ -2164,6 +2136,8 @@ static void displayConsumeStatistics(tmq_t* pTmq) {
}
int32_t tmq_consumer_close(tmq_t* tmq) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq);
......@@ -2209,6 +2183,9 @@ const char* tmq_err2str(int32_t err) {
}
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
if (res == NULL){
return TMQ_RES_INVALID;
}
if (TD_RES_TMQ(res)) {
return TMQ_RES_DATA;
} else if (TD_RES_TMQ_META(res)) {
......@@ -2221,6 +2198,9 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
}
const char* tmq_get_topic_name(TAOS_RES* res) {
if (res == NULL){
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
......@@ -2236,6 +2216,10 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
}
const char* tmq_get_db_name(TAOS_RES* res) {
if (res == NULL){
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
......@@ -2251,6 +2235,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
}
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL){
return -1;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
......@@ -2266,6 +2253,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
}
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (res == NULL){
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*) res;
STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset;
......@@ -2289,10 +2279,13 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
}
// data from tsdb, no valid offset info
return -1;
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
const char* tmq_get_table_name(TAOS_RES* res) {
if (res == NULL){
return NULL;
}
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
......@@ -2312,10 +2305,14 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
return;
}
if (pRes == NULL) { // here needs to commit all offsets.
asyncCommitAllOffsets(tmq, cb, param);
} else { // only commit one offset
asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
asyncCommitFromResult(tmq, pRes, cb, param);
}
}
......@@ -2326,6 +2323,11 @@ static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
......@@ -2335,7 +2337,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
if (pRes == NULL) {
asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
} else {
asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
}
tsem_wait(&pInfo->sem);
......@@ -2348,6 +2350,106 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
return code;
}
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value){
if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
tscError("Assignment or poll interface need to be called first");
return TSDB_CODE_TMQ_NEED_INITIALIZED;
}
if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd);
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
}
return 0;
}
int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
}
taosWUnLockLatch(&tmq->lock);
STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
if (pInfo == NULL) {
tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pInfo->sem, 0, 0);
pInfo->code = 0;
asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
tsem_wait(&pInfo->sem);
code = pInfo->code;
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
}
taosWUnLockLatch(&tmq->lock);
STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
SAskEpInfo* pInfo = param;
pInfo->code = code;
......@@ -2479,7 +2581,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
}
// if no more waiting rsp
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
if(pParamSet->callbackFn != NULL){
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
}
taosMemoryFree(pParamSet);
// tmq->needReportOffsetRows = true;
......@@ -2490,12 +2595,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if (waitingRspNum == 0) {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
vgId);
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId);
tmqCommitDone(pParamSet);
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
waitingRspNum);
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum);
}
}
......@@ -2541,7 +2644,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqRspHead* pHead = pMsg->pData;
tmq_topic_assignment assignment = {.begin = pHead->walsver,
.end = pHead->walever,
.end = pHead->walever + 1,
.currentOffset = rsp.rspOffset.version,
.vgId = pParam->vgId};
......@@ -2578,14 +2681,216 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){
return false;
}
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqCommittedParam* pParam = param;
if (code != 0){
goto end;
}
if (pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tDecoderClear(&decoder);
}
end:
if(pMsg){
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
pParam->code = code;
tsem_post(&pParam->sem);
return 0;
}
int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* epSet){
int32_t code = 0;
SMqVgOffset pOffset = {0};
pOffset.consumerId = tmq->consumerId;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset.offset.subKey + groupLen + 1, tname);
int32_t len = 0;
tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
if (code < 0) {
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeMqVgOffset(&encoder, &pOffset);
tEncoderClear(&encoder);
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(buf);
return TSDB_CODE_OUT_OF_MEMORY;
}
SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
if (pParam == NULL) {
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmCommittedCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);
code = pParam->code;
if(code == TSDB_CODE_SUCCESS){
if(pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG){
code = pParam->vgOffset.offset.val.version;
}else{
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
}
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
}
int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->endOffset.type;
if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
code = checkWalRange(pOffsetInfo, -1);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
}
SEpSet epSet = pVg->epSet;
int64_t begin = pVg->offsetInfo.walVerBegin;
int64_t end = pVg->offsetInfo.walVerEnd;
taosWUnLockLatch(&tmq->lock);
int64_t position = 0;
if(type == TMQ_OFFSET__LOG){
position = pOffsetInfo->endOffset.version;
}else if(type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST){
code = getCommittedFromServer(tmq, tname, vgId, &epSet);
if(code == TSDB_CODE_TMQ_NO_COMMITTED){
if(type == TMQ_OFFSET__RESET_EARLIEST){
position = begin;
} else if(type == TMQ_OFFSET__RESET_LATEST){
position = end;
}
}else{
position = code;
}
}else{
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
}
return position;
}
int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
int64_t committed = 0;
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
committed = pOffsetInfo->committedOffset.version;
taosWUnLockLatch(&tmq->lock);
return committed;
}
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
return getCommittedFromServer(tmq, tname, vgId, &epSet);
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
int32_t* numOfAssignment) {
if(tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL){
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
*numOfAssignment = 0;
*assignment = NULL;
SMqVgCommon* pCommon = NULL;
int32_t accId = tmq->pTscObj->acctId;
char tname[128] = {0};
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -2600,7 +2905,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
int32_t type = pClientVg->offsetInfo.currentOffset.type;
int32_t type = pClientVg->offsetInfo.beginOffset.type;
if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
......@@ -2620,13 +2925,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) {
if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
needFetch = true;
break;
}
tmq_topic_assignment* pAssignment = &(*assignment)[j];
pAssignment->currentOffset = pClientVg->offsetInfo.seekOffset.version;
pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId;
......@@ -2665,7 +2970,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
req.reqOffset = pClientVg->offsetInfo.seekOffset;
req.reqOffset = pClientVg->offsetInfo.beginOffset;
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
......@@ -2705,7 +3010,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
......@@ -2774,90 +3079,71 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL) {
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
int32_t accId = tmq->pTscObj->acctId;
char tname[128] = {0};
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
SMqClientVg* pVg = NULL;
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgId) {
pVg = pClientVg;
break;
}
}
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_INVALID_VGID;
return code;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->currentOffset.type;
int32_t type = pOffsetInfo->endOffset.type;
if (isInSnapshotMode(type, tmq->useSnapshot)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
if ((offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
return code;
}
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
// update the offset, and then commit to vnode
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pOffsetInfo->seekOffset = pOffsetInfo->currentOffset;
// pOffsetInfo->committedOffset.version = INT64_MIN;
pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->endOffset.version = offset;
pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
pVg->seekUpdated = true;
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
SMqSeekReq req = {0};
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName);
req.head.vgId = pVg->vgId;
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
req.head.vgId = vgId;
req.consumerId = tmq->consumerId;
int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
if (msgSize < 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -2865,7 +3151,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pParam == NULL) {
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
......@@ -2879,18 +3164,15 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, pTopic->topicName, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
taosWUnLockLatch(&tmq->lock);
tmq->consumerId, tname, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);
int32_t code = pParam->code;
code = pParam->code;
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code));
}
tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
return code;
}
......@@ -1075,6 +1075,98 @@ TEST(clientCase, sub_db_test) {
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, tmq_commit) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
char topicName[128] = "tp";
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, topicName);
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 2000;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId);
printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1);
position = tmq_position(tmq, topicName, pAssign[i].vgId);
printf("after seek 1, position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
}
while (1) {
printf("start to poll\n");
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
printSubResults(pRes, &totalRows);
} else {
break;
}
tmq_commit_sync(tmq, pRes);
for(int i = 0; i < numOfAssign; i++) {
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
if(committed > 0){
int32_t code = tmq_commit_offset_sync(tmq, topicName, pAssign[i].vgId, 4);
printf("tmq_commit_offset_sync vgId:%d, offset:4, code:%d\n", pAssign[i].vgId, code);
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("after tmq_commit_offset_sync, committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
}
}
if (pRes != NULL) {
taos_free_result(pRes);
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, td_25129) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
......@@ -1094,9 +1186,10 @@ TEST(clientCase, td_25129) {
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
char topicName[128] = "tp";
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
tmq_list_append(topicList, topicName);
// 启动订阅
tmq_subscribe(tmq, topicList);
......@@ -1114,7 +1207,7 @@ TEST(clientCase, td_25129) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
......@@ -1131,7 +1224,7 @@ TEST(clientCase, td_25129) {
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
......@@ -1147,7 +1240,7 @@ TEST(clientCase, td_25129) {
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
......@@ -1177,7 +1270,7 @@ TEST(clientCase, td_25129) {
printSubResults(pRes, &totalRows);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
......@@ -1191,10 +1284,11 @@ TEST(clientCase, td_25129) {
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
} else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
for(int i = 0; i < numOfAssign; i++) {
tmq_offset_seek(tmq, topicName, pAssign[i].vgId, pAssign[i].currentOffset);
}
tmq_commit_sync(tmq, pRes);
continue;
break;
}
// tmq_commit_sync(tmq, pRes);
......@@ -1226,6 +1320,7 @@ TEST(clientCase, td_25129) {
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
......
......@@ -732,6 +732,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -232,6 +232,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
......
......@@ -85,9 +85,9 @@ void tqDestroyTqHandle(void* data) {
}
}
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
pLeft->val.version == pRight->val.version;
}
STQ* tqOpen(const char* path, SVnode* pVnode) {
......@@ -302,10 +302,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
} else if (pOffset->val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
pOffset->val.version);
if (pOffset->val.version + 1 == sversion) {
pOffset->val.version += 1;
......@@ -316,8 +316,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) {
tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
return 0; // no need to update the offset value
}
......@@ -578,6 +578,49 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SMqVgOffset vgOffset = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)data, len);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDecoderClear(&decoder);
STqOffset* pOffset = &vgOffset.offset;
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset == NULL) {
return TSDB_CODE_TMQ_NO_COMMITTED;
}
vgOffset.offset = *pSavedOffset;
int32_t code = 0;
tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
if (code < 0) {
return TSDB_CODE_INVALID_PARA;
}
void* buf = rpcMallocCont(len);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, len);
tEncodeMqVgOffset(&encoder, &vgOffset);
tEncoderClear(&encoder);
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
tmsgSendRsp(&rsp);
return 0;
}
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
......
......@@ -196,7 +196,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
", no more log to return, reqId:0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
*fetchOffset = offset - 1;
*fetchOffset = offset;
code = -1;
goto END;
}
......
......@@ -119,7 +119,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
}
} else {
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
......@@ -127,7 +127,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
......@@ -138,7 +138,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer);
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer + 1);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
tDeleteSTaosxRsp(&taosxRsp);
......@@ -246,7 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) {
walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1;
int64_t fetchVer = offset->version;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -279,14 +279,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
......@@ -309,7 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
......
......@@ -237,7 +237,7 @@ void initCacheFn(SStoreCacheReader* pCache) {
}
void initSnapshotFn(SStoreSnapshotFn* pSnapshot) {
pSnapshot->createSnapshot = setForSnapShot;
pSnapshot->setForSnapShot = setForSnapShot;
pSnapshot->destroySnapshot = destroySnapContext;
pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot;
pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot;
......
......@@ -462,7 +462,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
break;
case TDMT_VND_TMQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, len) < 0) {
goto _err;
}
break;
......@@ -638,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_COMMITTEDINFO:
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
......
......@@ -1112,8 +1112,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
walReaderVerifyOffset(pWalReader, pOffset);
if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id) < 0) {
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
return -1;
}
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
......@@ -1202,7 +1202,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
......@@ -1239,7 +1239,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
if (pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
......
......@@ -1645,12 +1645,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1;
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer);
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) {
return NULL;
}
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer);
}
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
......@@ -1661,8 +1662,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
// curVersion move to next, so currentOffset = curVersion - 1
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1);
// curVersion move to next
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
if (hasResult) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
......@@ -2183,7 +2184,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
STqOffsetVal offset = {0};
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal");
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1);
} else {
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
......
......@@ -135,8 +135,8 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
taosThreadMutexUnlock(&pWalReader->pWal->mutex);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
if (pOffset->version < firstVer){
pOffset->version = firstVer;
}
}
......
......@@ -631,7 +631,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s
//tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NEED_INITIALIZED, "Assignment or poll interface need to be called first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_COMMITTED, "No committed info")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册