提交 a14b8dcc 编写于 作者: L Liu Jicong

enh(tmq): delayed task

上级 35e044f2
...@@ -48,7 +48,6 @@ int32_t init_env() { ...@@ -48,7 +48,6 @@ int32_t init_env() {
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
sleep(1);
pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
......
...@@ -27,10 +27,7 @@ typedef void TAOS; ...@@ -27,10 +27,7 @@ typedef void TAOS;
typedef void TAOS_STMT; typedef void TAOS_STMT;
typedef void TAOS_RES; typedef void TAOS_RES;
typedef void **TAOS_ROW; typedef void **TAOS_ROW;
#if 0 typedef void TAOS_SUB;
typedef void TAOS_STREAM;
#endif
typedef void TAOS_SUB;
// Data type definition // Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes #define TSDB_DATA_TYPE_NULL 0 // 1 bytes
...@@ -192,12 +189,6 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); ...@@ -192,12 +189,6 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif #endif
#if 0
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
...@@ -237,12 +228,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); ...@@ -237,12 +228,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0 #if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
...@@ -269,7 +256,7 @@ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); ...@@ -269,7 +256,7 @@ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO // TODO
#if 0 #if 0
DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
#endif #endif
#if 0 #if 0
......
...@@ -56,7 +56,7 @@ struct tmq_conf_t { ...@@ -56,7 +56,7 @@ struct tmq_conf_t {
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
uint16_t port; uint16_t port;
uint16_t autoCommitInterval; int32_t autoCommitInterval;
char* ip; char* ip;
char* user; char* user;
char* pass; char* pass;
...@@ -76,8 +76,9 @@ struct tmq_t { ...@@ -76,8 +76,9 @@ struct tmq_t {
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[256];
int8_t autoCommit; int8_t autoCommit;
int64_t consumerId; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
int64_t consumerId;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
// status // status
...@@ -87,6 +88,11 @@ struct tmq_t { ...@@ -87,6 +88,11 @@ struct tmq_t {
int32_t epSkipCnt; int32_t epSkipCnt;
int64_t pollCnt; int64_t pollCnt;
// timer
tmr_h hbTimer;
tmr_h reportTimer;
tmr_h commitTimer;
// connection // connection
STscObj* pTscObj; STscObj* pTscObj;
...@@ -111,6 +117,12 @@ enum { ...@@ -111,6 +117,12 @@ enum {
TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__READY,
}; };
enum {
TMQ_DELAYED_TASK__HB = 1,
TMQ_DELAYED_TASK__REPORT,
TMQ_DELAYED_TASK__COMMIT,
};
typedef struct { typedef struct {
// statistics // statistics
int64_t pollCnt; int64_t pollCnt;
...@@ -280,6 +292,50 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { ...@@ -280,6 +292,50 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg); return sprintf(dst, "%s:%d", topicName, vg);
} }
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__HB;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(tmq->delayedTask, qall);
while (1) {
int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType);
if (pTaskType == NULL) break;
if (*pTaskType == TMQ_DELAYED_TASK__HB) {
tmqAskEp(tmq, false);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit(tmq, NULL, true);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
ASSERT(0);
}
}
taosFreeQall(qall);
return 0;
}
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* msg = NULL; SMqRspWrapper* msg = NULL;
while (1) { while (1) {
...@@ -414,7 +470,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -414,7 +470,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->autoCommit; /*pTmq->autoCommit = conf->autoCommit;*/
pTmq->autoCommit = 0;
pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
...@@ -607,6 +665,14 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -607,6 +665,14 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
taosMsleep(500); taosMsleep(500);
} }
// init hb timer
tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
// init auto commit timer
if (tmq->autoCommit) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
}
code = 0; code = 0;
FAIL: FAIL:
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree); if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
...@@ -1216,7 +1282,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1216,7 +1282,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
} }
while (1) { while (1) {
tmqAskEp(tmq, false); tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, blocking_time); tmqPollImpl(tmq, blocking_time);
/*tsem_wait(&tmq->rspSem);*/ /*tsem_wait(&tmq->rspSem);*/
......
...@@ -61,7 +61,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -61,7 +61,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ);
// TODO:change to deserialize function
if (pIdxTFile == NULL) { if (pIdxTFile == NULL) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
...@@ -73,7 +72,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -73,7 +72,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1; return -1;
} }
// read idx file and get log file pos // read idx file and get log file pos
// TODO:change to deserialize function
SWalIdxEntry entry; SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -167,7 +165,7 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -167,7 +165,7 @@ int32_t walEndSnapshot(SWal *pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
// remove file // remove file
for (int i = 0; i < deleteCnt; i++) { for (int i = 0; i < deleteCnt; i++) {
SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr); walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr); taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
......
...@@ -39,11 +39,11 @@ void* taosLoadDll(const char* filename) { ...@@ -39,11 +39,11 @@ void* taosLoadDll(const char* filename) {
#else #else
void* handle = dlopen(filename, RTLD_LAZY); void* handle = dlopen(filename, RTLD_LAZY);
if (!handle) { if (!handle) {
//printf("load dll:%s failed, error:%s", filename, dlerror()); // printf("load dll:%s failed, error:%s", filename, dlerror());
return NULL; return NULL;
} }
//printf("dll %s loaded", filename); // printf("dll %s loaded", filename);
return handle; return handle;
#endif #endif
...@@ -59,17 +59,17 @@ void* taosLoadSym(void* handle, char* name) { ...@@ -59,17 +59,17 @@ void* taosLoadSym(void* handle, char* name) {
char* error = NULL; char* error = NULL;
if ((error = dlerror()) != NULL) { if ((error = dlerror()) != NULL) {
//printf("load sym:%s failed, error:%s", name, dlerror()); // printf("load sym:%s failed, error:%s", name, dlerror());
return NULL; return NULL;
} }
//printf("sym %s loaded", name); // printf("sym %s loaded", name);
return sym; return sym;
#endif #endif
} }
void taosCloseDll(void* handle) { void taosCloseDll(void* handle) {
#if defined(WINDOWS) #if defined(WINDOWS)
return; return;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
...@@ -100,7 +100,7 @@ int taosSetConsoleEcho(bool on) { ...@@ -100,7 +100,7 @@ int taosSetConsoleEcho(bool on) {
struct termios term; struct termios term;
if (tcgetattr(STDIN_FILENO, &term) == -1) { if (tcgetattr(STDIN_FILENO, &term) == -1) {
perror("Cannot get the attribution of the terminal"); /*perror("Cannot get the attribution of the terminal");*/
return -1; return -1;
} }
...@@ -111,7 +111,7 @@ int taosSetConsoleEcho(bool on) { ...@@ -111,7 +111,7 @@ int taosSetConsoleEcho(bool on) {
err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term); err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term);
if (err == -1 || err == EINTR) { if (err == -1 || err == EINTR) {
printf("Cannot set the attribution of the terminal"); /*printf("Cannot set the attribution of the terminal");*/
return -1; return -1;
} }
...@@ -154,7 +154,7 @@ void taosSetTerminalMode() { ...@@ -154,7 +154,7 @@ void taosSetTerminalMode() {
int32_t taosGetOldTerminalMode() { int32_t taosGetOldTerminalMode() {
#if defined(WINDOWS) #if defined(WINDOWS)
#else #else
/* Make sure stdin is a terminal. */ /* Make sure stdin is a terminal. */
if (!isatty(STDIN_FILENO)) { if (!isatty(STDIN_FILENO)) {
...@@ -181,7 +181,7 @@ void taosResetTerminalMode() { ...@@ -181,7 +181,7 @@ void taosResetTerminalMode() {
#endif #endif
} }
TdCmdPtr taosOpenCmd(const char *cmd) { TdCmdPtr taosOpenCmd(const char* cmd) {
if (cmd == NULL) return NULL; if (cmd == NULL) return NULL;
#ifdef WINDOWS #ifdef WINDOWS
return (TdCmdPtr)_popen(cmd, "r"); return (TdCmdPtr)_popen(cmd, "r");
...@@ -190,8 +190,8 @@ TdCmdPtr taosOpenCmd(const char *cmd) { ...@@ -190,8 +190,8 @@ TdCmdPtr taosOpenCmd(const char *cmd) {
#endif #endif
} }
int64_t taosGetLineCmd(TdCmdPtr pCmd, char ** __restrict ptrBuf) { int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf) {
if (pCmd == NULL || ptrBuf == NULL ) { if (pCmd == NULL || ptrBuf == NULL) {
return -1; return -1;
} }
if (*ptrBuf != NULL) { if (*ptrBuf != NULL) {
...@@ -219,7 +219,7 @@ int32_t taosEOFCmd(TdCmdPtr pCmd) { ...@@ -219,7 +219,7 @@ int32_t taosEOFCmd(TdCmdPtr pCmd) {
return feof((FILE*)pCmd); return feof((FILE*)pCmd);
} }
int64_t taosCloseCmd(TdCmdPtr *ppCmd) { int64_t taosCloseCmd(TdCmdPtr* ppCmd) {
if (ppCmd == NULL || *ppCmd == NULL) { if (ppCmd == NULL || *ppCmd == NULL) {
return 0; return 0;
} }
......
...@@ -25,7 +25,7 @@ $rowsPerCtb = 10 ...@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 5 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
......
...@@ -25,7 +25,7 @@ $rowsPerCtb = 10 ...@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 5 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
......
...@@ -25,7 +25,7 @@ $rowsPerCtb = 10 ...@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 5 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
......
...@@ -25,7 +25,7 @@ $rowsPerCtb = 10 ...@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 5 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册