提交 ef901f10 编写于 作者: B Bomin Zhang

[TD-2309]<feature>: add config option to enable/disable stream (continuous query)

上级 fd0c6ade
...@@ -95,6 +95,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修 ...@@ -95,6 +95,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修
- logKeepDays:日志文件的最长保存时间。大于0时,日志文件会被重命名为taosdlog.xxx,其中xxx为日志文件最后修改的时间戳,单位为秒。默认值:0天。 - logKeepDays:日志文件的最长保存时间。大于0时,日志文件会被重命名为taosdlog.xxx,其中xxx为日志文件最后修改的时间戳,单位为秒。默认值:0天。
- maxSQLLength:单条SQL语句允许最长限制。默认值:65380字节。 - maxSQLLength:单条SQL语句允许最长限制。默认值:65380字节。
- telemetryReporting: 是否允许 TDengine 采集和上报基本使用信息,0表示不允许,1表示允许。 默认值:1。 - telemetryReporting: 是否允许 TDengine 采集和上报基本使用信息,0表示不允许,1表示允许。 默认值:1。
- stream: 是否启用连续查询(流计算功能),0表示不允许,1表示允许。 默认值:1。
**注意:**对于端口,TDengine会使用从serverPort起13个连续的TCP和UDP端口号,请务必在防火墙打开。因此如果是缺省配置,需要打开从6030都6042共13个端口,而且必须TCP和UDP都打开。 **注意:**对于端口,TDengine会使用从serverPort起13个连续的TCP和UDP端口号,请务必在防火墙打开。因此如果是缺省配置,需要打开从6030都6042共13个端口,而且必须TCP和UDP都打开。
......
...@@ -260,4 +260,7 @@ ...@@ -260,4 +260,7 @@
# maxBinaryDisplayWidth 30 # maxBinaryDisplayWidth 30
# enable/disable telemetry reporting # enable/disable telemetry reporting
# telemetryReporting 1 # telemetryReporting 1
\ No newline at end of file
# enable/disable stream (continuous query)
# stream 1
...@@ -125,6 +125,9 @@ extern char tsMonitorDbName[]; ...@@ -125,6 +125,9 @@ extern char tsMonitorDbName[];
extern char tsInternalPass[]; extern char tsInternalPass[];
extern int32_t tsMonitorInterval; extern int32_t tsMonitorInterval;
// stream
extern int32_t tsEnableStream;
// internal // internal
extern int32_t tsPrintAuth; extern int32_t tsPrintAuth;
extern int32_t tscEmbedded; extern int32_t tscEmbedded;
......
...@@ -161,6 +161,9 @@ char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; ...@@ -161,6 +161,9 @@ char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey"; char tsInternalPass[] = "secretkey";
int32_t tsMonitorInterval = 30; // seconds int32_t tsMonitorInterval = 30; // seconds
// stream
int32_t tsEnableStream = 1;
// internal // internal
int32_t tsPrintAuth = 0; int32_t tsPrintAuth = 0;
int32_t tscEmbedded = 0; int32_t tscEmbedded = 0;
...@@ -1015,6 +1018,16 @@ static void doInitGlobalConfig(void) { ...@@ -1015,6 +1018,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "stream";
cfg.ptr = &tsEnableStream;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "httpEnableRecordSql"; cfg.option = "httpEnableRecordSql";
cfg.ptr = &tsHttpEnableRecordSql; cfg.ptr = &tsHttpEnableRecordSql;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -69,6 +69,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); ...@@ -69,6 +69,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
if (tsEnableStream == 0) {
return NULL;
}
SCqContext *pContext = calloc(sizeof(SCqContext), 1); SCqContext *pContext = calloc(sizeof(SCqContext), 1);
if (pContext == NULL) { if (pContext == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -99,6 +102,9 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -99,6 +102,9 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
} }
void cqClose(void *handle) { void cqClose(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqContext *pContext = handle; SCqContext *pContext = handle;
if (handle == NULL) return; if (handle == NULL) return;
...@@ -129,6 +135,9 @@ void cqClose(void *handle) { ...@@ -129,6 +135,9 @@ void cqClose(void *handle) {
} }
void cqStart(void *handle) { void cqStart(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqContext *pContext = handle; SCqContext *pContext = handle;
if (pContext->dbConn || pContext->master) return; if (pContext->dbConn || pContext->master) return;
...@@ -147,6 +156,9 @@ void cqStart(void *handle) { ...@@ -147,6 +156,9 @@ void cqStart(void *handle) {
} }
void cqStop(void *handle) { void cqStop(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqContext *pContext = handle; SCqContext *pContext = handle;
cInfo("vgId:%d, stop all CQs", pContext->vgId); cInfo("vgId:%d, stop all CQs", pContext->vgId);
if (pContext->dbConn == NULL || pContext->master == 0) return; if (pContext->dbConn == NULL || pContext->master == 0) return;
...@@ -174,6 +186,9 @@ void cqStop(void *handle) { ...@@ -174,6 +186,9 @@ void cqStop(void *handle) {
} }
void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) { void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) {
if (tsEnableStream == 0) {
return NULL;
}
SCqContext *pContext = handle; SCqContext *pContext = handle;
SCqObj *pObj = calloc(sizeof(SCqObj), 1); SCqObj *pObj = calloc(sizeof(SCqObj), 1);
...@@ -203,6 +218,9 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema * ...@@ -203,6 +218,9 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *
} }
void cqDrop(void *handle) { void cqDrop(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle; SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext; SCqContext *pContext = pObj->pContext;
......
...@@ -267,16 +267,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -267,16 +267,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return terrno; return terrno;
} }
SCqCfg cqCfg = {0}; if (tsEnableStream) {
sprintf(cqCfg.user, "_root"); SCqCfg cqCfg = {0};
strcpy(cqCfg.pass, tsInternalPass); sprintf(cqCfg.user, "_root");
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.pass, tsInternalPass);
cqCfg.vgId = vnode; strcpy(cqCfg.db, pVnode->db);
cqCfg.cqWrite = vnodeWriteToCache; cqCfg.vgId = vnode;
pVnode->cq = cqOpen(pVnode, &cqCfg); cqCfg.cqWrite = vnodeWriteToCache;
if (pVnode->cq == NULL) { pVnode->cq = cqOpen(pVnode, &cqCfg);
vnodeCleanUp(pVnode); if (pVnode->cq == NULL) {
return terrno; vnodeCleanUp(pVnode);
return terrno;
}
} }
STsdbAppH appH = {0}; STsdbAppH appH = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册