提交 b87f9ae0 编写于 作者: B Bomin Zhang

defer cq creation

上级 cd12407c
...@@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) { if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
sem_post(&pSql->rspSem);
} }
(*pSql->fp)(pSql->param, pSql, code);
return; return;
} }
......
...@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { ...@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlObj * pSql = pStream->pSql; SSqlObj * pSql = pStream->pSql;
pSql->fp = tscProcessStreamQueryCallback; pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
pSql->param = pStream; pSql->param = pStream;
pSql->res.completed = false; pSql->res.completed = false;
...@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { ...@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
} }
} }
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
SSqlCmd* pCmd = &pSql->cmd;
if (code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code);
pStream->fp(pStream->param, NULL, NULL);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
pStream->isProject = isProjectStream(pQueryInfo);
pStream->precision = tinfo.precision;
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, pSql->sqlstr);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
...@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
} }
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
} }
pStream->stime = stime;
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pSql->pStream = pStream; pSql->pStream = pStream;
pSql->param = pStream;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
...@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_SUCCESS) {
sem_wait(&pSql->rspSem); tscCreateStream(pStream, pSql, code);
} } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code));
if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
free(pStream);
return NULL; return NULL;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
pStream->isProject = isProjectStream(pQueryInfo);
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pStream->precision = tinfo.precision;
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr);
return pStream; return pStream;
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "ttimer.h"
#include "tcq.h" #include "tcq.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -45,10 +46,12 @@ typedef struct { ...@@ -45,10 +46,12 @@ typedef struct {
struct SCqObj *pHead; struct SCqObj *pHead;
void *dbConn; void *dbConn;
int master; int master;
void *tmrCtrl;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SCqContext; } SCqContext;
typedef struct SCqObj { typedef struct SCqObj {
tmr_h tmrId;
uint64_t uid; uint64_t uid;
int32_t tid; // table ID int32_t tid; // table ID
int rowSize; // bytes of a row int rowSize; // bytes of a row
...@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); ...@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
SCqContext *pContext = calloc(sizeof(SCqContext), 1); SCqContext *pContext = calloc(sizeof(SCqContext), 1);
if (pContext == NULL) { if (pContext == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass)); tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
const char* db = pCfg->db; const char* db = pCfg->db;
...@@ -99,6 +103,9 @@ void cqClose(void *handle) { ...@@ -99,6 +103,9 @@ void cqClose(void *handle) {
SCqContext *pContext = handle; SCqContext *pContext = handle;
if (handle == NULL) return; if (handle == NULL) return;
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
// stop all CQs // stop all CQs
cqStop(pContext); cqStop(pContext);
...@@ -154,8 +161,10 @@ void cqStop(void *handle) { ...@@ -154,8 +161,10 @@ void cqStop(void *handle) {
taos_close_stream(pObj->pStream); taos_close_stream(pObj->pStream);
pObj->pStream = NULL; pObj->pStream = NULL;
cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
} }
pObj = pObj->next; pObj = pObj->next;
} }
...@@ -211,8 +220,13 @@ void cqDrop(void *handle) { ...@@ -211,8 +220,13 @@ void cqDrop(void *handle) {
} }
// free the resources associated // free the resources associated
if (pObj->pStream) taos_close_stream(pObj->pStream); if (pObj->pStream) {
pObj->pStream = NULL; taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema); tdFreeSchema(pObj->pSchema);
...@@ -222,18 +236,30 @@ void cqDrop(void *handle) { ...@@ -222,18 +236,30 @@ void cqDrop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
return;
} }
} }
cqCreateStream(pContext, pObj);
}
int64_t lastKey = 0; static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext; pObj->pContext = pContext;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pContext->dbConn == NULL) {
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl);
return;
}
pObj->tmrId = 0;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) { if (pObj->pStream) {
pContext->num++; pContext->num++;
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
......
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeStartModules(); void dnodeStartModules();
void dnodeStartStream();
void dnodeCleanupModules(); void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus); void dnodeProcessModuleStatus(uint32_t moduleStatus);
......
...@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() { ...@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() {
dnodeStartModules(); dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dnodeStartStream();
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
......
...@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() { ...@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeStartStream(vnodeList[i]);
}
dInfo("streams started");
}
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES]= {0}; int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
......
...@@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); ...@@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
void tsdbStartStream(TSDB_REPO_T *repo);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
......
...@@ -44,7 +44,6 @@ typedef struct { ...@@ -44,7 +44,6 @@ typedef struct {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeStartStream(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
......
...@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); ...@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
...@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { ...@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
goto _err; goto _err;
} }
tsdbStartStream(pRepo);
// pRepo->state = TSDB_REPO_STATE_ACTIVE; // pRepo->state = TSDB_REPO_STATE_ACTIVE;
tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo)); tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo));
...@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
int vgId = REPO_ID(pRepo); int vgId = REPO_ID(pRepo);
tsdbStopStream(repo);
if (toCommit) { if (toCommit) {
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
...@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ ...@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
return magic; return magic;
} }
void tsdbStartStream(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
}
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config; return &((STsdbRepo *)repo)->config;
...@@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { ...@@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable); return TSDB_GET_TABLE_LAST_KEY(pTable);
} }
#endif #endif
\ No newline at end of file
static void tsdbStartStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
}
static void tsdbStopStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
}
}
}
...@@ -303,10 +303,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -303,10 +303,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
#endif #endif
// start continuous query
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
pVnode->events = NULL; pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
...@@ -317,15 +313,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -317,15 +313,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeStartStream(int32_t vnode) {
SVnodeObj* pVnode = vnodeAcquireVnode(vnode);
if (pVnode != NULL) {
tsdbStartStream(pVnode->tsdb);
vnodeRelease(pVnode);
}
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) return 0; if (ppVnode == NULL || *ppVnode == NULL) return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册