提交 3f0e89ca 编写于 作者: L Liu Jicong

feat(tmq): refine commit offset api

上级 976ec81a
......@@ -167,7 +167,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
return tmq;
......@@ -176,6 +176,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_ctb_column");
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return topic_list;
}
......@@ -190,7 +191,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
/*printf("get data\n");*/
......
......@@ -233,6 +233,8 @@ DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
......@@ -251,7 +253,7 @@ typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
......
......@@ -182,10 +182,13 @@ typedef struct {
typedef struct {
tmq_t* tmq;
int32_t async;
int8_t async;
int8_t automatic;
tmq_commit_cb* userCb;
tsem_t rspSem;
tmq_resp_err_t rspErr;
SArray* offsets;
void* userParam;
} SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() {
......@@ -314,6 +317,135 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->async) {
if (pParam->automatic && pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
pParam->tmq->commitCbUserParam);
} else if (!pParam->automatic && pParam->userCb) {
pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
}
if (pParam->offsets) {
taosArrayDestroy(pParam->offsets);
}
taosMemoryFree(pParam);
} else {
tsem_post(&pParam->rspSem);
}
return 0;
}
int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) {
SMqCMCommitOffsetReq req;
SArray* pOffsets = NULL;
void* buf = NULL;
SMqCommitCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
if (offsets == NULL) {
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pOffsets, &offset);
}
}
} else {
pOffsets = (SArray*)&offsets->container;
}
req.num = (int32_t)pOffsets->size;
req.offsets = pOffsets->pData;
int32_t code;
int32_t tlen = 0;
tEncodeSize(tEncodeSMqCMCommitOffsetReq, &req, tlen, code);
if (code < 0) {
goto END;
}
code = -1;
buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
goto END;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tEncoderClear(&encoder);
pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
goto END;
}
pParam->tmq = tmq;
pParam->automatic = automatic;
pParam->async = async;
pParam->offsets = pOffsets;
pParam->userCb = userCb;
pParam->userParam = userParam;
if (!async) tsem_init(&pParam->rspSem, 0, 0);
sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto END;
sendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->rspErr;
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
}
// avoid double free if msg is sent
buf = NULL;
code = 0;
END:
if (buf) taosMemoryFree(buf);
if (pParam) taosMemoryFree(pParam);
if (sendInfo) taosMemoryFree(sendInfo);
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
} else {
userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
}
}
if (offsets == NULL) {
taosArrayDestroy(pOffsets);
}
return code;
}
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
......@@ -350,7 +482,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp(tmq, true);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit(tmq, NULL, true);
/*tmq_commit(tmq, NULL, true);*/
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
......@@ -385,32 +518,11 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
tmq_t* tmq = pParam->tmq;
/*tmq_t* tmq = pParam->tmq;*/
tsem_post(&pParam->rspSem);
return 0;
}
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam);
}
if (!pParam->async)
tsem_post(&pParam->rspSem);
else {
if (pParam->offsets) {
taosArrayDestroy(pParam->offsets);
}
tsem_destroy(&pParam->rspSem);
/*if (pParam->pArray) {*/
/*taosArrayDestroy(pParam->pArray);*/
/*}*/
taosMemoryFree(pParam);
}
return 0;
}
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if (*topics == NULL) {
*topics = tmq_list_new();
......@@ -541,6 +653,8 @@ FAIL:
}
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
#if 0
// TODO: add read write lock
SRequestObj* pRequest = NULL;
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
......@@ -627,6 +741,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
}
return resp;
#endif
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
......@@ -723,7 +838,7 @@ FAIL:
return code;
}
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
//
conf->commitCb = cb;
conf->commitCbUserParam = param;
......@@ -1384,3 +1499,10 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
return NULL;
}
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}
......@@ -196,6 +196,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
}
}
tDecoderClear(&decoder);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
......
......@@ -37,10 +37,10 @@ typedef struct {
TdThread thread;
int32_t consumerId;
int32_t ifManualCommit;
//int32_t autoCommitIntervalMs; // 1000 ms
//char autoCommit[8]; // true, false
//char autoOffsetRest[16]; // none, earliest, latest
int32_t ifManualCommit;
// int32_t autoCommitIntervalMs; // 1000 ms
// char autoCommit[8]; // true, false
// char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData;
int64_t expectMsgCnt;
......@@ -99,21 +99,15 @@ static void printHelp() {
}
void initLogFile() {
time_t now;
struct tm curTime;
char filename[256];
time_t now;
struct tm curTime;
char filename[256];
now = taosTime(NULL);
now = taosTime(NULL);
taosLocalTime(&now, &curTime);
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
configDir,
curTime.tm_year+1900,
curTime.tm_mon+1,
curTime.tm_mday,
curTime.tm_hour,
curTime.tm_min,
curTime.tm_sec);
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900,
curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec);
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", filename);
......@@ -137,9 +131,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
//taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
//taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
//taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
// taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
// taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
// taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
......@@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
}
totalRows++;
}
......@@ -276,9 +270,9 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// tmq_conf_set(conf, "group.id", "cgrp1");
for (int32_t i = 0; i < pInfo->numOfKey; i++) {
......@@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) {
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return;
}
......@@ -323,7 +317,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
......@@ -354,11 +348,11 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
break;
}
}
......@@ -386,7 +380,7 @@ void* consumeThreadFunc(void* param) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
tmq_list_destroy(pInfo->topicList);
pInfo->topicList = NULL;
......@@ -394,17 +388,17 @@ void* consumeThreadFunc(void* param) {
if (pInfo->ifManualCommit) {
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n");
tmq_commit(pInfo->tmq, NULL, 0);
}
err = tmq_unsubscribe(pInfo->tmq);
if (err) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1;
return NULL;
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
......@@ -482,9 +476,9 @@ int32_t getConsumeInfo() {
int32_t* lengths = taos_fetch_lengths(pRes);
// set default value
//g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
//memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
//memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
// g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
// memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
// memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册