未验证 提交 7cc35bd8 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11876 from taosdata/feature/tq

enh(tmq): delayed task
......@@ -48,7 +48,6 @@ int32_t init_env() {
return -1;
}
taos_free_result(pRes);
taosSsleep(1);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
......
......@@ -27,10 +27,7 @@ typedef void TAOS;
typedef void TAOS_STMT;
typedef void TAOS_RES;
typedef void **TAOS_ROW;
#if 0
typedef void TAOS_STREAM;
#endif
typedef void TAOS_SUB;
typedef void TAOS_SUB;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
......@@ -196,12 +193,6 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif
#if 0
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
......@@ -241,12 +232,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
......@@ -273,7 +260,7 @@ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO
#if 0
DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
#endif
#if 0
......
......@@ -56,7 +56,7 @@ struct tmq_conf_t {
int8_t autoCommit;
int8_t resetOffset;
uint16_t port;
uint16_t autoCommitInterval;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
......@@ -76,17 +76,25 @@ struct tmq_t {
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
int64_t consumerId;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
int64_t consumerId;
tmq_commit_cb* commit_cb;
// status
int8_t status;
int8_t epStatus;
int32_t epoch;
#if 0
int8_t epStatus;
int32_t epSkipCnt;
#endif
int64_t pollCnt;
// timer
tmr_h hbTimer;
tmr_h reportTimer;
tmr_h commitTimer;
// connection
STscObj* pTscObj;
......@@ -111,6 +119,12 @@ enum {
TMQ_CONSUMER_STATUS__READY,
};
enum {
TMQ_DELAYED_TASK__HB = 1,
TMQ_DELAYED_TASK__REPORT,
TMQ_DELAYED_TASK__COMMIT,
};
typedef struct {
// statistics
int64_t pollCnt;
......@@ -280,6 +294,50 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__HB;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(tmq->delayedTask, qall);
while (1) {
int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType);
if (pTaskType == NULL) break;
if (*pTaskType == TMQ_DELAYED_TASK__HB) {
tmqAskEp(tmq, false);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit(tmq, NULL, true);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
ASSERT(0);
}
}
taosFreeQall(qall);
return 0;
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* msg = NULL;
while (1) {
......@@ -408,13 +466,15 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
/*pTmq->epStatus = 0;*/
/*pTmq->epSkipCnt = 0;*/
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->autoCommit;
/*pTmq->autoCommit = conf->autoCommit;*/
pTmq->autoCommit = 0;
pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
......@@ -607,6 +667,14 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
taosMsleep(500);
}
// init hb timer
tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
// init auto commit timer
if (tmq->autoCommit) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
}
code = 0;
FAIL:
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
......@@ -909,7 +977,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
END:
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
if (pParam->sync) {
tsem_post(&pParam->rspSem);
}
......@@ -918,6 +986,7 @@ END:
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int32_t code = 0;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
......@@ -925,11 +994,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
if (epSkipCnt < 5000) return 0;
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) {
tscError("failed to malloc get subscribe ep buf");
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
req->consumerId = htobe64(tmq->consumerId);
......@@ -940,7 +1010,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
taosMemoryFree(req);
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
pParam->tmq = tmq;
......@@ -952,7 +1022,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(req);
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
......@@ -1216,7 +1286,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
}
while (1) {
tmqAskEp(tmq, false);
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, blocking_time);
/*tsem_wait(&tmq->rspSem);*/
......
......@@ -61,7 +61,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ);
// TODO:change to deserialize function
if (pIdxTFile == NULL) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......@@ -73,7 +72,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1;
}
// read idx file and get log file pos
// TODO:change to deserialize function
SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
taosThreadMutexUnlock(&pWal->mutex);
......@@ -167,7 +165,7 @@ int32_t walEndSnapshot(SWal *pWal) {
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i);
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
......
......@@ -39,11 +39,11 @@ void* taosLoadDll(const char* filename) {
#else
void* handle = dlopen(filename, RTLD_LAZY);
if (!handle) {
//printf("load dll:%s failed, error:%s", filename, dlerror());
// printf("load dll:%s failed, error:%s", filename, dlerror());
return NULL;
}
//printf("dll %s loaded", filename);
// printf("dll %s loaded", filename);
return handle;
#endif
......@@ -59,17 +59,17 @@ void* taosLoadSym(void* handle, char* name) {
char* error = NULL;
if ((error = dlerror()) != NULL) {
//printf("load sym:%s failed, error:%s", name, dlerror());
// printf("load sym:%s failed, error:%s", name, dlerror());
return NULL;
}
//printf("sym %s loaded", name);
// printf("sym %s loaded", name);
return sym;
#endif
}
void taosCloseDll(void* handle) {
void taosCloseDll(void* handle) {
#if defined(WINDOWS)
return;
#elif defined(_TD_DARWIN_64)
......@@ -100,7 +100,7 @@ int taosSetConsoleEcho(bool on) {
struct termios term;
if (tcgetattr(STDIN_FILENO, &term) == -1) {
perror("Cannot get the attribution of the terminal");
/*perror("Cannot get the attribution of the terminal");*/
return -1;
}
......@@ -111,7 +111,7 @@ int taosSetConsoleEcho(bool on) {
err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term);
if (err == -1 || err == EINTR) {
printf("Cannot set the attribution of the terminal");
/*printf("Cannot set the attribution of the terminal");*/
return -1;
}
......@@ -154,7 +154,7 @@ void taosSetTerminalMode() {
int32_t taosGetOldTerminalMode() {
#if defined(WINDOWS)
#else
/* Make sure stdin is a terminal. */
if (!isatty(STDIN_FILENO)) {
......@@ -181,7 +181,7 @@ void taosResetTerminalMode() {
#endif
}
TdCmdPtr taosOpenCmd(const char *cmd) {
TdCmdPtr taosOpenCmd(const char* cmd) {
if (cmd == NULL) return NULL;
#ifdef WINDOWS
return (TdCmdPtr)_popen(cmd, "r");
......@@ -190,8 +190,8 @@ TdCmdPtr taosOpenCmd(const char *cmd) {
#endif
}
int64_t taosGetLineCmd(TdCmdPtr pCmd, char ** __restrict ptrBuf) {
if (pCmd == NULL || ptrBuf == NULL ) {
int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf) {
if (pCmd == NULL || ptrBuf == NULL) {
return -1;
}
if (*ptrBuf != NULL) {
......@@ -219,7 +219,7 @@ int32_t taosEOFCmd(TdCmdPtr pCmd) {
return feof((FILE*)pCmd);
}
int64_t taosCloseCmd(TdCmdPtr *ppCmd) {
int64_t taosCloseCmd(TdCmdPtr* ppCmd) {
if (ppCmd == NULL || *ppCmd == NULL) {
return 0;
}
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册