提交 e92aa57e 编写于 作者: L Liu Jicong

feat(tmq): support commit one msg

上级 abc7d063
...@@ -146,8 +146,8 @@ int32_t create_topic() { ...@@ -146,8 +146,8 @@ int32_t create_topic() {
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p offsets %p param %p\n", resp, tmq, offsets, param); printf("commit %d tmq %p param %p\n", code, tmq, param);
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
...@@ -183,10 +183,10 @@ tmq_list_t* build_topic_list() { ...@@ -183,10 +183,10 @@ tmq_list_t* build_topic_list() {
} }
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err; int32_t code;
if ((err = tmq_subscribe(tmq, topics))) { if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n"); printf("subscribe err\n");
return; return;
} }
...@@ -205,9 +205,9 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -205,9 +205,9 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
} }
err = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (err) if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else else
fprintf(stderr, "%% Consumer closed\n"); fprintf(stderr, "%% Consumer closed\n");
} }
...@@ -215,11 +215,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -215,11 +215,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1; static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0; int msg_count = 0;
tmq_resp_err_t err; int32_t code;
if ((err = tmq_subscribe(tmq, topics))) { if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
return; return;
} }
...@@ -245,9 +245,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -245,9 +245,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
} }
err = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (err) if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else else
fprintf(stderr, "%% Consumer closed\n"); fprintf(stderr, "%% Consumer closed\n");
} }
......
...@@ -209,21 +209,20 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi ...@@ -209,21 +209,20 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
/* --------------------------TMQ INTERFACE------------------------------- */ /* --------------------------TMQ INTERFACE------------------------------- */
#if 0
enum { enum {
TMQ_RESP_ERR__FAIL = -1, TMQ_RESP_ERR__FAIL = -1,
TMQ_RESP_ERR__SUCCESS = 0, TMQ_RESP_ERR__SUCCESS = 0,
}; };
typedef int32_t tmq_resp_err_t; typedef int32_t tmq_resp_err_t;
#endif
typedef struct tmq_t tmq_t; typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
...@@ -233,24 +232,19 @@ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); ...@@ -233,24 +232,19 @@ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(int32_t code);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
// timeout: -1 means infinitely waiting // timeout: -1 means infinitely waiting
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
......
...@@ -710,6 +710,8 @@ int32_t* taosGetErrno(); ...@@ -710,6 +710,8 @@ int32_t* taosGetErrno();
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -48,14 +48,6 @@ struct tmq_list_t { ...@@ -48,14 +48,6 @@ struct tmq_list_t {
SArray container; SArray container;
}; };
struct tmq_topic_vgroup_t {
SMqOffset offset;
};
struct tmq_topic_vgroup_list_t {
SArray container; // SArray<tmq_topic_vgroup_t*>
};
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[256];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
...@@ -161,9 +153,9 @@ typedef struct { ...@@ -161,9 +153,9 @@ typedef struct {
} SMqPollRspWrapper; } SMqPollRspWrapper;
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
tsem_t rspSem; tsem_t rspSem;
tmq_resp_err_t rspErr; int32_t rspErr;
} SMqSubscribeCbParam; } SMqSubscribeCbParam;
typedef struct { typedef struct {
...@@ -189,7 +181,7 @@ typedef struct { ...@@ -189,7 +181,7 @@ typedef struct {
int8_t freeOffsets; int8_t freeOffsets;
tmq_commit_cb* userCb; tmq_commit_cb* userCb;
tsem_t rspSem; tsem_t rspSem;
tmq_resp_err_t rspErr; int32_t rspErr;
SArray* offsets; SArray* offsets;
void* userParam; void* userParam;
} SMqCommitCbParam; } SMqCommitCbParam;
...@@ -201,7 +193,7 @@ typedef struct { ...@@ -201,7 +193,7 @@ typedef struct {
int8_t freeOffsets; int8_t freeOffsets;
int32_t waitingRspNum; int32_t waitingRspNum;
int32_t totalRspNum; int32_t totalRspNum;
tmq_resp_err_t rspErr; int32_t rspErr;
tmq_commit_cb* userCb; tmq_commit_cb* userCb;
/*SArray* successfulOffsets;*/ /*SArray* successfulOffsets;*/
/*SArray* failedOffsets;*/ /*SArray* failedOffsets;*/
...@@ -347,10 +339,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -347,10 +339,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
pParam->rspErr = code; pParam->rspErr = code;
if (pParam->async) { if (pParam->async) {
if (pParam->automatic && pParam->tmq->commitCb) { if (pParam->automatic && pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
pParam->tmq->commitCbUserParam);
} else if (!pParam->automatic && pParam->userCb) { } else if (!pParam->automatic && pParam->userCb) {
pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam); pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
} }
if (pParam->freeOffsets) { if (pParam->freeOffsets) {
...@@ -388,10 +379,10 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { ...@@ -388,10 +379,10 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
if (pParamSet->async) { if (pParamSet->async) {
// call async cb func // call async cb func
if (pParamSet->automatic && pParamSet->tmq->commitCb) { if (pParamSet->automatic && pParamSet->tmq->commitCb) {
pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam); pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) { } else if (!pParamSet->automatic && pParamSet->userCb) {
// sem post // sem post
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam); pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
} }
} else { } else {
tsem_post(&pParamSet->rspSem); tsem_post(&pParamSet->rspSem);
...@@ -405,10 +396,132 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { ...@@ -405,10 +396,132 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
return 0; return 0;
} }
int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
tmq_commit_cb* userCb, void* userParam) { void* userParam) {
int32_t code = -1; int32_t code = -1;
if (msg != NULL) {
SMqRspObj* pRspObj = (SMqRspObj*)msg;
if (!TD_RES_TMQ(pRspObj)) {
return TSDB_CODE_TMQ_INVALID_MSG;
}
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pParamSet->tmq = tmq;
pParamSet->automatic = automatic;
pParamSet->async = async;
pParamSet->freeOffsets = 1;
pParamSet->userCb = userCb;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, pRspObj->topic) == 0) {
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == pRspObj->vgId) {
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
return 0;
}
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pOffset->type = TMQ_OFFSET__LOG;
pOffset->version = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen);
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
// TODO
continue;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->version);
// TODO: put into cb
pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
}
}
}
}
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return 0;
}
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
} else {
code = 0;
}
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else {
userCb(tmq, code, userParam);
}
}
return 0;
}
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) { if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -521,9 +634,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8 ...@@ -521,9 +634,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
if (code != 0 && async) { if (code != 0 && async) {
if (automatic) { if (automatic) {
tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam); tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else { } else {
userCb(tmq, code, NULL, userParam); userCb(tmq, code, userParam);
} }
} }
...@@ -537,7 +650,8 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8 ...@@ -537,7 +650,8 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
return 0; return 0;
} }
int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, #if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) { tmq_commit_cb* userCb, void* userParam) {
SMqCMCommitOffsetReq req; SMqCMCommitOffsetReq req;
SArray* pOffsets = NULL; SArray* pOffsets = NULL;
...@@ -547,7 +661,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ ...@@ -547,7 +661,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
int8_t freeOffsets; int8_t freeOffsets;
int32_t code = -1; int32_t code = -1;
if (offsets == NULL) { if (msg == NULL) {
freeOffsets = 1; freeOffsets = 1;
pOffsets = taosArrayInit(0, sizeof(SMqOffset)); pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
...@@ -564,7 +678,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ ...@@ -564,7 +678,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
} }
} else { } else {
freeOffsets = 0; freeOffsets = 0;
pOffsets = (SArray*)&offsets->container; pOffsets = (SArray*)&msg->container;
} }
req.num = (int32_t)pOffsets->size; req.num = (int32_t)pOffsets->size;
...@@ -651,6 +765,7 @@ END: ...@@ -651,6 +765,7 @@ END:
} }
return code; return code;
} }
#endif
void tmqAssignDelayedHbTask(void* param, void* tmrId) { void tmqAssignDelayedHbTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
...@@ -729,7 +844,7 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -729,7 +844,7 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if (*topics == NULL) { if (*topics == NULL) {
*topics = tmq_list_new(); *topics = tmq_list_new();
} }
...@@ -737,12 +852,12 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { ...@@ -737,12 +852,12 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
} }
return TMQ_RESP_ERR__SUCCESS; return 0;
} }
tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_unsubscribe(tmq_t* tmq) {
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); int32_t rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst); tmq_list_destroy(lst);
return rsp; return rsp;
} }
...@@ -858,11 +973,13 @@ FAIL: ...@@ -858,11 +973,13 @@ FAIL:
return NULL; return NULL;
} }
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { #if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
} }
#endif
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
...@@ -902,7 +1019,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -902,7 +1019,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (sendInfo == NULL) goto FAIL; if (sendInfo == NULL) goto FAIL;
SMqSubscribeCbParam param = { SMqSubscribeCbParam param = {
.rspErr = TMQ_RESP_ERR__SUCCESS, .rspErr = 0,
.tmq = tmq, .tmq = tmq,
}; };
...@@ -1337,7 +1454,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1337,7 +1454,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
return code; return code;
} }
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { #if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
const SMqOffset* pOffset = &offset->offset; const SMqOffset* pOffset = &offset->offset;
if (strcmp(pOffset->cgroup, tmq->groupId) != 0) { if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
return TMQ_RESP_ERR__FAIL; return TMQ_RESP_ERR__FAIL;
...@@ -1359,6 +1477,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { ...@@ -1359,6 +1477,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
} }
return TMQ_RESP_ERR__FAIL; return TMQ_RESP_ERR__FAIL;
} }
#endif
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset; int64_t reqOffset;
...@@ -1599,10 +1718,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1599,10 +1718,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
} }
} }
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL); int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != TMQ_RESP_ERR__SUCCESS) { if (rsp != 0) {
return rsp; return rsp;
} }
...@@ -1610,18 +1729,18 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { ...@@ -1610,18 +1729,18 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
rsp = tmq_subscribe(tmq, lst); rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst); tmq_list_destroy(lst);
if (rsp != TMQ_RESP_ERR__SUCCESS) { if (rsp != 0) {
return rsp; return rsp;
} }
} }
// TODO: free resources // TODO: free resources
return TMQ_RESP_ERR__SUCCESS; return 0;
} }
const char* tmq_err2str(tmq_resp_err_t err) { const char* tmq_err2str(int32_t err) {
if (err == TMQ_RESP_ERR__SUCCESS) { if (err == 0) {
return "success"; return "success";
} else if (err == TMQ_RESP_ERR__FAIL) { } else if (err == -1) {
return "fail"; return "fail";
} else { } else {
return tstrerror(err); return tstrerror(err);
...@@ -1667,10 +1786,8 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1667,10 +1786,8 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner2(tmq, offsets, 0, 1, cb, param); tmqCommitInner2(tmq, msg, 0, 1, cb, param);
} }
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); }
return tmqCommitInner2(tmq, offsets, 0, 0, NULL, NULL);
}
...@@ -583,6 +583,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset ...@@ -583,6 +583,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
#endif #endif
......
...@@ -353,8 +353,8 @@ tmq_list_t* build_topic_list() { ...@@ -353,8 +353,8 @@ tmq_list_t* build_topic_list() {
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000; static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0; int msg_count = 0;
tmq_resp_err_t err; int32_t err;
if ((err = tmq_subscribe(tmq, topics))) { if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
...@@ -379,7 +379,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -379,7 +379,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) { void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
tmq_resp_err_t err; int32_t err;
if ((err = tmq_subscribe(tmq, topics))) { if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
......
...@@ -339,8 +339,8 @@ int queryDB(TAOS* taos, char* command) { ...@@ -339,8 +339,8 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", resp); pError("tmq_commit_cb_print() commit %d\n", code);
} }
void build_consumer(SThreadInfo* pInfo) { void build_consumer(SThreadInfo* pInfo) {
...@@ -443,7 +443,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { ...@@ -443,7 +443,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
} }
void loop_consume(SThreadInfo* pInfo) { void loop_consume(SThreadInfo* pInfo) {
tmq_resp_err_t err; int32_t code;
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
int64_t totalRows = 0; int64_t totalRows = 0;
...@@ -496,8 +496,8 @@ void* consumeThreadFunc(void* param) { ...@@ -496,8 +496,8 @@ void* consumeThreadFunc(void* param) {
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
...@@ -517,14 +517,14 @@ void* consumeThreadFunc(void* param) { ...@@ -517,14 +517,14 @@ void* consumeThreadFunc(void* param) {
} }
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err != 0) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
/*pInfo->consumeMsgCnt = -1;*/ /*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/ /*return NULL;*/
} }
err = tmq_consumer_close(pInfo->tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err != 0) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
/*exit(-1);*/ /*exit(-1);*/
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册