提交 66dbdcdd 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

...@@ -54,7 +54,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -54,7 +54,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
return; return;
} }
strtolower(pSql->sqlstr, sqlstr); strntolower(pSql->sqlstr, sqlstr, sqlLen);
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
pSql->cmd.curSql = pSql->sqlstr; pSql->cmd.curSql = pSql->sqlstr;
...@@ -551,8 +551,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -551,8 +551,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) { if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
sem_post(&pSql->rspSem);
} }
(*pSql->fp)(pSql->param, pSql, code);
return; return;
} }
......
...@@ -4651,15 +4651,18 @@ int32_t validateEp(char* ep) { ...@@ -4651,15 +4651,18 @@ int32_t validateEp(char* ep) {
char buf[TSDB_EP_LEN + 1] = {0}; char buf[TSDB_EP_LEN + 1] = {0};
tstrncpy(buf, ep, TSDB_EP_LEN); tstrncpy(buf, ep, TSDB_EP_LEN);
char *pos = strchr(buf, ':'); char* pos = strchr(buf, ':');
if (NULL == pos) { if (NULL == pos) {
int32_t val = strtol(ep, NULL, 10);
if (val <= 0 || val > 65536) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
} else {
uint16_t port = atoi(pos+1); uint16_t port = atoi(pos + 1);
if (0 == port) { if (0 == port) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4669,13 +4672,13 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { ...@@ -4669,13 +4672,13 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
const int DNODE_DYNAMIC_CFG_OPTIONS_SIZE = 17; const int DNODE_DYNAMIC_CFG_OPTIONS_SIZE = 19;
const SDNodeDynConfOption DNODE_DYNAMIC_CFG_OPTIONS[] = { const SDNodeDynConfOption DNODE_DYNAMIC_CFG_OPTIONS[] = {
{"resetLog", 8}, {"resetQueryCache", 15}, {"debugFlag", 9}, {"mDebugFlag", 10}, {"resetLog", 8}, {"resetQueryCache", 15}, {"debugFlag", 9}, {"mDebugFlag", 10},
{"dDebugFlag", 10}, {"sdbDebugFlag", 12}, {"vDebugFlag", 10}, {"cDebugFlag", 10}, {"dDebugFlag", 10}, {"sdbDebugFlag", 12}, {"vDebugFlag", 10}, {"cDebugFlag", 10},
{"httpDebugFlag", 13}, {"monitorDebugFlag", 16}, {"rpcDebugFlag", 12}, {"uDebugFlag", 10}, {"httpDebugFlag", 13}, {"monitorDebugFlag", 16}, {"rpcDebugFlag", 12}, {"uDebugFlag", 10},
{"tmrDebugFlag", 12}, {"qDebugflag", 10}, {"sDebugflag", 10}, {"tsdbDebugFlag", 13}, {"tmrDebugFlag", 12}, {"qDebugflag", 10}, {"sDebugflag", 10}, {"tsdbDebugFlag", 13},
{"monitor", 7}}; {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"monitor", 7}};
SSQLToken* pOptionToken = &pOptions->a[1]; SSQLToken* pOptionToken = &pOptions->a[1];
...@@ -4699,7 +4702,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { ...@@ -4699,7 +4702,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
SSQLToken* pValToken = &pOptions->a[2]; SSQLToken* pValToken = &pOptions->a[2];
int32_t val = strtol(pValToken->z, NULL, 10); int32_t val = strtol(pValToken->z, NULL, 10);
if (val < 131 || val > 199) { if (val < 0 || val > 256) {
/* options value is out of valid range */ /* options value is out of valid range */
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
......
...@@ -263,12 +263,29 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { ...@@ -263,12 +263,29 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
return pSql; return pSql;
} }
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
char* buf = malloc(sqlLen + 1); STscObj *pObj = (STscObj *)taos;
buf[sqlLen] = 0; if (pObj == NULL || pObj->signature != pObj) {
strncpy(buf, sqlstr, sqlLen); terrno = TSDB_CODE_TSC_DISCONNECTED;
TAOS_RES *res = taos_query(taos, buf); return NULL;
free(buf); }
return res;
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_INVALID_SQL;
return NULL;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
tsem_wait(&pSql->rspSem);
return pSql;
} }
int taos_result_precision(TAOS_RES *res) { int taos_result_precision(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
......
...@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { ...@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlObj * pSql = pStream->pSql; SSqlObj * pSql = pStream->pSql;
pSql->fp = tscProcessStreamQueryCallback; pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
pSql->param = pStream; pSql->param = pStream;
pSql->res.completed = false; pSql->res.completed = false;
...@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { ...@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
} }
} }
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
SSqlCmd* pCmd = &pSql->cmd;
if (code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code);
pStream->fp(pStream->param, NULL, NULL);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
pStream->isProject = isProjectStream(pQueryInfo);
pStream->precision = tinfo.precision;
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, pSql->sqlstr);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
...@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
} }
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
} }
pStream->stime = stime;
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pSql->pStream = pStream; pSql->pStream = pStream;
pSql->param = pStream;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
...@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_SUCCESS) {
sem_wait(&pSql->rspSem); tscCreateStream(pStream, pSql, code);
} } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code));
if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
free(pStream);
return NULL; return NULL;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
pStream->isProject = isProjectStream(pQueryInfo);
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pStream->precision = tinfo.precision;
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr);
return pStream; return pStream;
} }
......
...@@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { ...@@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->maxPoints = maxRows; pCols->maxPoints = maxRows;
pCols->bufSize = maxRowSize * maxRows; pCols->bufSize = maxRowSize * maxRows;
pCols->buf = malloc(pCols->bufSize); pCols->buf = calloc(1, pCols->bufSize);
if (pCols->buf == NULL) { if (pCols->buf == NULL) {
free(pCols); free(pCols);
return NULL; return NULL;
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "ttimer.h"
#include "tcq.h" #include "tcq.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -45,10 +46,12 @@ typedef struct { ...@@ -45,10 +46,12 @@ typedef struct {
struct SCqObj *pHead; struct SCqObj *pHead;
void *dbConn; void *dbConn;
int master; int master;
void *tmrCtrl;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SCqContext; } SCqContext;
typedef struct SCqObj { typedef struct SCqObj {
tmr_h tmrId;
uint64_t uid; uint64_t uid;
int32_t tid; // table ID int32_t tid; // table ID
int rowSize; // bytes of a row int rowSize; // bytes of a row
...@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); ...@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
SCqContext *pContext = calloc(sizeof(SCqContext), 1); SCqContext *pContext = calloc(sizeof(SCqContext), 1);
if (pContext == NULL) { if (pContext == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass)); tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
const char* db = pCfg->db; const char* db = pCfg->db;
...@@ -99,6 +103,9 @@ void cqClose(void *handle) { ...@@ -99,6 +103,9 @@ void cqClose(void *handle) {
SCqContext *pContext = handle; SCqContext *pContext = handle;
if (handle == NULL) return; if (handle == NULL) return;
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
// stop all CQs // stop all CQs
cqStop(pContext); cqStop(pContext);
...@@ -154,8 +161,10 @@ void cqStop(void *handle) { ...@@ -154,8 +161,10 @@ void cqStop(void *handle) {
taos_close_stream(pObj->pStream); taos_close_stream(pObj->pStream);
pObj->pStream = NULL; pObj->pStream = NULL;
cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
} }
pObj = pObj->next; pObj = pObj->next;
} }
...@@ -211,8 +220,13 @@ void cqDrop(void *handle) { ...@@ -211,8 +220,13 @@ void cqDrop(void *handle) {
} }
// free the resources associated // free the resources associated
if (pObj->pStream) taos_close_stream(pObj->pStream); if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL; pObj->pStream = NULL;
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema); tdFreeSchema(pObj->pSchema);
...@@ -222,18 +236,30 @@ void cqDrop(void *handle) { ...@@ -222,18 +236,30 @@ void cqDrop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
return;
} }
} }
int64_t lastKey = 0; cqCreateStream(pContext, pObj);
}
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext; pObj->pContext = pContext;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pContext->dbConn == NULL) {
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl);
return;
}
pObj->tmrId = 0;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) { if (pObj->pStream) {
pContext->num++; pContext->num++;
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
......
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeStartModules(); void dnodeStartModules();
void dnodeStartStream();
void dnodeCleanupModules(); void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus); void dnodeProcessModuleStatus(uint32_t moduleStatus);
......
...@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() { ...@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() {
dnodeStartModules(); dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dnodeStartStream();
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
......
...@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() { ...@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeStartStream(vnodeList[i]);
}
dInfo("streams started");
}
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES]= {0}; int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
...@@ -416,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -416,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -430,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -430,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter); int32_t code = vnodeAlter(pVnode, pAlter);
...@@ -723,6 +706,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -723,6 +706,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
// fill cluster cfg parameters // fill cluster cfg parameters
pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes); pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes);
pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum); pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold); pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold);
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
......
...@@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
void *pVnode;
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeAcquireVnode(pHead->vgId); taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
if (pVnode == NULL) { if (queue == NULL) {
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
continue; continue;
} }
// put message into queue // put message into queue
taos_queue queue = vnodeGetRqueue(pVnode);
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
...@@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) { ...@@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
assert(pVnode != NULL);
taos_queue queue = vnodeAcquireRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
......
...@@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { ...@@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeGetWqueue(pHead->vgId); taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) { if (queue) {
// put message into queue // put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
......
...@@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); ...@@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue); void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode); void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue); void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
......
...@@ -54,12 +54,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" ) ...@@ -54,12 +54,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
...@@ -564,15 +563,16 @@ typedef struct { ...@@ -564,15 +563,16 @@ typedef struct {
typedef struct { typedef struct {
int32_t numOfMnodes; // tsNumOfMnodes int32_t numOfMnodes; // tsNumOfMnodes
int32_t enableBalance; // tsEnableBalance
int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum
int32_t offlineThreshold; // tsOfflineThreshold int32_t offlineThreshold; // tsOfflineThreshold
int32_t statusInterval; // tsStatusInterval int32_t statusInterval; // tsStatusInterval
int32_t maxtablesPerVnode;
int32_t maxVgroupsPerDb;
char arbitrator[TSDB_EP_LEN]; // tsArbitrator char arbitrator[TSDB_EP_LEN]; // tsArbitrator
char timezone[64]; // tsTimezone char timezone[64]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset char charset[TSDB_LOCALE_LEN]; // tsCharset
int32_t maxtablesPerVnode;
int32_t maxVgroupsPerDb;
} SClusterCfg; } SClusterCfg;
typedef struct { typedef struct {
......
...@@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); ...@@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
void tsdbStartStream(TSDB_REPO_T *repo);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
......
...@@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); ...@@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
typedef void (*FNotifyRole)(void *ahandle, int8_t role); typedef void (*FNotifyRole)(void *ahandle, int8_t role);
// when data file is synced successfully, notity app // when data file is synced successfully, notity app
typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef struct { typedef struct {
int32_t vgId; // vgroup ID int32_t vgId; // vgroup ID
......
...@@ -22,10 +22,10 @@ extern "C" { ...@@ -22,10 +22,10 @@ extern "C" {
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_READY, TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING, TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_DELETING, TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_RESET,
} EVnStatus; } EVnStatus;
typedef struct { typedef struct {
...@@ -44,17 +44,13 @@ typedef struct { ...@@ -44,17 +44,13 @@ typedef struct {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeStartStream(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireVnode(int32_t vgId); // add refcount void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeAcquireRqueue(void *);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
......
...@@ -269,18 +269,37 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) { ...@@ -269,18 +269,37 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) {
} }
static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
if (strcmp(pMsg->pUser->user, TSDB_DEFAULT_USER) != 0) {
mError("failed to cfg dnode, no rights");
return TSDB_CODE_MND_NO_RIGHTS;
}
SCMCfgDnodeMsg *pCmCfgDnode = pMsg->rpcMsg.pCont; SCMCfgDnodeMsg *pCmCfgDnode = pMsg->rpcMsg.pCont;
if (pCmCfgDnode->ep[0] == 0) { if (pCmCfgDnode->ep[0] == 0) {
strcpy(pCmCfgDnode->ep, tsLocalEp); tstrncpy(pCmCfgDnode->ep, tsLocalEp, TSDB_EP_LEN);
} else {
// TODO temporary disabled for compiling: strcpy(pCmCfgDnode->ep, pCmCfgDnode->ep);
} }
if (strcmp(pMsg->pUser->user, TSDB_DEFAULT_USER) != 0) { int32_t dnodeId = 0;
return TSDB_CODE_MND_NO_RIGHTS; char* pos = strchr(pCmCfgDnode->ep, ':');
if (NULL == pos) {
dnodeId = strtol(pCmCfgDnode->ep, NULL, 10);
if (dnodeId <= 0 || dnodeId > 65536) {
mError("failed to cfg dnode, invalid dnodeId:%s", pCmCfgDnode->ep);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
} }
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep); SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep);
if (dnodeId != 0) {
SDnodeObj *pDnode = mnodeGetDnode(dnodeId);
if (pDnode == NULL) {
mError("failed to cfg dnode, invalid dnodeId:%d", dnodeId);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp);
mnodeDecDnodeRef(pDnode);
}
SMDCfgDnodeMsg *pMdCfgDnode = rpcMallocCont(sizeof(SMDCfgDnodeMsg)); SMDCfgDnodeMsg *pMdCfgDnode = rpcMallocCont(sizeof(SMDCfgDnodeMsg));
strcpy(pMdCfgDnode->ep, pCmCfgDnode->ep); strcpy(pMdCfgDnode->ep, pCmCfgDnode->ep);
strcpy(pMdCfgDnode->config, pCmCfgDnode->config); strcpy(pMdCfgDnode->config, pCmCfgDnode->config);
...@@ -292,9 +311,9 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { ...@@ -292,9 +311,9 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
.pCont = pMdCfgDnode, .pCont = pMdCfgDnode,
.contLen = sizeof(SMDCfgDnodeMsg) .contLen = sizeof(SMDCfgDnodeMsg)
}; };
dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user); mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user);
dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -305,6 +324,7 @@ static void mnodeProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { ...@@ -305,6 +324,7 @@ static void mnodeProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
static bool mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { static bool mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
if (clusterCfg->numOfMnodes != htonl(tsNumOfMnodes)) return false; if (clusterCfg->numOfMnodes != htonl(tsNumOfMnodes)) return false;
if (clusterCfg->enableBalance != htonl(tsEnableBalance)) return false;
if (clusterCfg->mnodeEqualVnodeNum != htonl(tsMnodeEqualVnodeNum)) return false; if (clusterCfg->mnodeEqualVnodeNum != htonl(tsMnodeEqualVnodeNum)) return false;
if (clusterCfg->offlineThreshold != htonl(tsOfflineThreshold)) return false; if (clusterCfg->offlineThreshold != htonl(tsOfflineThreshold)) return false;
if (clusterCfg->statusInterval != htonl(tsStatusInterval)) return false; if (clusterCfg->statusInterval != htonl(tsStatusInterval)) return false;
......
...@@ -593,7 +593,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -593,7 +593,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pShow->bytes[cols] = 4; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "maxTables"); strcpy(pSchema[cols].name, "onlineVnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -692,8 +692,15 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v ...@@ -692,8 +692,15 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
*(int32_t *)pWrite = taosIdPoolMaxSize(pVgroup->idPool); *(int32_t *)pWrite = taosIdPoolMaxSize(pVgroup->idPool);
cols++; cols++;
int32_t onlineVnodes = 0;
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
if (pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_SLAVE || pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_MASTER) {
onlineVnodes++;
}
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = tsMaxTablePerVnode; *(int32_t *)pWrite = onlineVnodes;
cols++; cols++;
for (int32_t i = 0; i < pShow->maxReplica; ++i) { for (int32_t i = 0; i < pShow->maxReplica; ++i) {
......
...@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); ...@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
...@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { ...@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
goto _err; goto _err;
} }
tsdbStartStream(pRepo);
// pRepo->state = TSDB_REPO_STATE_ACTIVE; // pRepo->state = TSDB_REPO_STATE_ACTIVE;
tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo)); tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo));
...@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
int vgId = REPO_ID(pRepo); int vgId = REPO_ID(pRepo);
tsdbStopStream(repo);
if (toCommit) { if (toCommit) {
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
...@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ ...@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
return magic; return magic;
} }
void tsdbStartStream(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
}
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config; return &((STsdbRepo *)repo)->config;
...@@ -1121,3 +1113,26 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { ...@@ -1121,3 +1113,26 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
} }
#endif #endif
static void tsdbStartStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
}
static void tsdbStopStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
}
}
}
...@@ -133,6 +133,8 @@ char **strsplit(char *src, const char *delim, int32_t *num); ...@@ -133,6 +133,8 @@ char **strsplit(char *src, const char *delim, int32_t *num);
char* strtolower(char *dst, const char *src); char* strtolower(char *dst, const char *src);
char* strntolower(char *dst, const char *src, int32_t n);
int64_t strnatoi(char *num, int32_t len); int64_t strnatoi(char *num, int32_t len);
//char* strreplace(const char* str, const char* pattern, const char* rep); //char* strreplace(const char* str, const char* pattern, const char* rep);
......
...@@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI ...@@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
return -1; return -1;
} }
/* set REUSEADDR option, so the portnumber can be re-used */
int reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
close(sockFd);
return -1;
};
if ( clientIp != 0) { if ( clientIp != 0) {
memset((char *)&clientAddr, 0, sizeof(clientAddr)); memset((char *)&clientAddr, 0, sizeof(clientAddr));
clientAddr.sin_family = AF_INET; clientAddr.sin_family = AF_INET;
......
...@@ -234,6 +234,32 @@ char* strtolower(char *dst, const char *src) { ...@@ -234,6 +234,32 @@ char* strtolower(char *dst, const char *src) {
*p = 0; *p = 0;
return dst; return dst;
} }
char* strntolower(char *dst, const char *src, int32_t n) {
int esc = 0;
char quote = 0, *p = dst, c;
assert(dst != NULL);
for (c = *src++; n-- > 0; c = *src++) {
if (esc) {
esc = 0;
} else if (quote) {
if (c == '\\') {
esc = 1;
} else if (c == quote) {
quote = 0;
}
} else if (c >= 'A' && c <= 'Z') {
c -= 'A' - 'a';
} else if (c == '\'' || c == '"') {
quote = c;
}
*p++ = c;
}
*p = 0;
return dst;
}
char *paGetToken(char *string, char **token, int32_t *tokenLen) { char *paGetToken(char *string, char **token, int32_t *tokenLen) {
char quote = 0; char quote = 0;
......
...@@ -37,7 +37,7 @@ extern int32_t vDebugFlag; ...@@ -37,7 +37,7 @@ extern int32_t vDebugFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int status; int8_t status;
int8_t role; int8_t role;
int8_t accessState; int8_t accessState;
int64_t version; // current version int64_t version; // current version
...@@ -55,6 +55,8 @@ typedef struct { ...@@ -55,6 +55,8 @@ typedef struct {
SWalCfg walCfg; SWalCfg walCfg;
void *qMgmt; void *qMgmt;
char *rootDir; char *rootDir;
tsem_t sem;
int8_t dropped;
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
} SVnodeObj; } SVnodeObj;
......
...@@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); ...@@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC #ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; } tsync_h syncStart(const SSyncInfo *info) { return NULL; }
...@@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
pVnode->status = TAOS_VN_STATUS_DELETING; pVnode->dropped = 1;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg // cfgVersion can be corrected by status msg
if (pVnode->status != TAOS_VN_STATUS_READY) { if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// the vnode may always fail to synchronize because of it in low cfgVersion
// so cannot use the following codes
// if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED)
// return TSDB_CODE_VND_NOT_SYNCED;
pVnode->status = TAOS_VN_STATUS_UPDATING;
int32_t code = vnodeSaveCfg(pVnodeCfg); int32_t code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
...@@ -194,11 +187,13 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -194,11 +187,13 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
return code; return code;
} }
if (pVnode->tsdb) {
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
return code; return code;
} }
}
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is altered", pVnode->vgId); vDebug("vgId:%d, vnode is altered", pVnode->vgId);
...@@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->tsdbCfg.tsdbId = pVnode->vgId;
pVnode->rootDir = strdup(rootDir); pVnode->rootDir = strdup(rootDir);
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0);
int32_t code = vnodeReadCfg(pVnode); int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -303,10 +299,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -303,10 +299,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
#endif #endif
// start continuous query
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
pVnode->events = NULL; pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
...@@ -317,22 +309,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -317,22 +309,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeStartStream(int32_t vnode) {
SVnodeObj* pVnode = vnodeAcquireVnode(vnode);
if (pVnode != NULL) {
tsdbStartStream(pVnode->tsdb);
vnodeRelease(pVnode);
}
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) return 0; if (ppVnode == NULL || *ppVnode == NULL) return 0;
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
vDebug("vgId:%d, vnode will be closed", pVnode->vgId); vDebug("vgId:%d, vnode will be closed", pVnode->vgId);
pVnode->status = TAOS_VN_STATUS_CLOSING;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return 0; return 0;
...@@ -347,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -347,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) {
if (refCount > 0) { if (refCount > 0) {
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
tsem_post(&pVnode->sem);
return; return;
} }
...@@ -357,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -357,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) {
tsdbCloseRepo(pVnode->tsdb, 1); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
// stop continuous query
if (pVnode->cq)
cqClose(pVnode->cq);
pVnode->cq = NULL;
if (pVnode->wal) if (pVnode->wal)
walClose(pVnode->wal); walClose(pVnode->wal);
pVnode->wal = NULL; pVnode->wal = NULL;
...@@ -376,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -376,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) {
tfree(pVnode->rootDir); tfree(pVnode->rootDir);
if (pVnode->status == TAOS_VN_STATUS_DELETING) { if (pVnode->dropped) {
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
taosMvDir(tsVnodeBakDir, rootDir); taosMvDir(tsVnodeBakDir, rootDir);
taosRemoveDir(rootDir); taosRemoveDir(rootDir);
} }
tsem_destroy(&pVnode->sem);
free(pVnode); free(pVnode);
int32_t count = taosHashGetSize(tsDnodeVnodesHash); int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
} }
void *vnodeGetVnode(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
...@@ -397,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) { ...@@ -397,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) {
return NULL; return NULL;
} }
return *ppVnode; SVnodeObj *pVnode = *ppVnode;
}
void *vnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
return pVnode; return pVnode;
} }
void *vnodeAcquireRqueue(void *param) { void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
atomic_add_fetch_32(&pVnode->refCount, 1); if (pVnode->status == TAOS_VN_STATUS_RESET) {
vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); terrno = TSDB_CODE_VND_INVALID_STATUS;
return ((SVnodeObj *)pVnode)->rqueue; vInfo("vgId:%d, status is in reset", vgId);
} vnodeRelease(pVnode);
return NULL;
}
void *vnodeGetRqueue(void *pVnode) { return pVnode->rqueue;
return ((SVnodeObj *)pVnode)->rqueue;
} }
void *vnodeGetWqueue(int32_t vgId) { void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireVnode(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue; return pVnode->wqueue;
} }
...@@ -497,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) { ...@@ -497,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) {
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId); pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId); SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState; pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
...@@ -511,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { ...@@ -511,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed // remove from hash, so new messages wont be consumed
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
int i = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) {
sched_yield();
}
}
}
// stop replication module // stop replication module
if (pVnode->sync) { if (pVnode->sync) {
syncStop(pVnode->sync); void *sync = pVnode->sync;
pVnode->sync = NULL; pVnode->sync = NULL;
syncStop(sync);
}
// stop continuous query
if (pVnode->cq) {
void *cq = pVnode->cq;
pVnode->cq = NULL;
cqClose(cq);
} }
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
...@@ -562,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -562,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { static int vnodeResetTsdb(SVnodeObj *pVnode)
SVnodeObj *pVnode = ahandle; {
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// clsoe tsdb, then open tsdb
tsdbCloseRepo(pVnode->tsdb, 0); if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
return -1;
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2)
tsem_wait(&pVnode->sem);
// close tsdb, then open tsdb
tsdbCloseRepo(tsdb, 0);
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus; appH.notifyStatus = vnodeProcessTsdbStatus;
...@@ -582,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { ...@@ -582,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH.cqDropFunc = cqDrop; appH.cqDropFunc = cqDrop;
appH.configFunc = dnodeSendCfgTableToRecv; appH.configFunc = dnodeSendCfgTableToRecv;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
pVnode->status = TAOS_VN_STATUS_READY;
vnodeRelease(pVnode);
return 0;
}
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle;
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
return vnodeResetTsdb(pVnode);
} }
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
...@@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return TSDB_CODE_VND_INVALID_STATUS; return TSDB_CODE_VND_INVALID_STATUS;
} }
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY;
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
...@@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void *pCont = pReadMsg->pCont; void *pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
...@@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
dnodePutItemIntoReadQueue(pVnode, *handle); vnodePutItemIntoReadQueue(pVnode, *handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
...@@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else { // if failed to dump result, free qhandle immediately } else { // if failed to dump result, free qhandle immediately
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
if (qHasMoreResultsToRetrieve(*handle)) { if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutItemIntoReadQueue(pVnode, *handle); vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
freeHandle = false; freeHandle = false;
} else { } else {
......
...@@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY;
if (pHead->version == 0) { // from client or CQ if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state
} }
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_RPC_NOT_READY;
} }
......
...@@ -143,6 +143,7 @@ python3 ./test.py -f query/filterOtherTypes.py ...@@ -143,6 +143,7 @@ python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py -f query/querySort.py python3 ./test.py -f query/querySort.py
python3 ./test.py -f query/queryJoin.py python3 ./test.py -f query/queryJoin.py
python3 ./test.py -f query/select_last_crash.py python3 ./test.py -f query/select_last_crash.py
python3 ./test.py -f query/queryNullValueTest.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numOfRecords = 10
self.ts = 1537146000000
def restartTaosd(self):
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.execute("use db")
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute(
"create table st (ts timestamp, speed int) tags(areaid int, loc nchar(20))")
tdSql.execute("create table t1 using st tags(1, 'beijing')")
tdSql.execute("insert into t1 values(now, 1)")
tdSql.query("select * from st")
tdSql.checkRows(1)
tdSql.execute("alter table st add column length int")
tdSql.execute("insert into t1 values(now, 1, 2)")
tdSql.query("select last(*) from st")
tdSql.checkData(0, 2, 2);
self.restartTaosd();
tdSql.query("select last(*) from st")
tdSql.checkData(0, 2, 2);
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numOfRecords = 10
self.ts = 1537146000000
def checkNullValue(self, result):
mx = np.array(result)
[rows, cols] = mx.shape
for i in range(rows):
for j in range(cols):
if j + 1 < cols and mx[i, j + 1] is not None:
print(mx[i, j + 1])
return False
return True
def restartTaosd(self):
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.execute("use db")
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute(
"create table meters (ts timestamp, col1 int) tags(tgcol1 int)")
tdSql.execute("create table t0 using meters tags(NULL)")
for i in range (self.numOfRecords):
tdSql.execute("insert into t0 values (%d, %d)" % (self.ts + i, i));
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col2 tinyint")
tdSql.execute("alter table meters drop column col1")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col2 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col1 int")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col1 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col3 smallint")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col3 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col4 bigint")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col4 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col5 float")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col5 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col6 double")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col6 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col7 bool")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col7 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col8 binary(20)")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col8 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add column col9 nchar(20)")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select col9 from meters")
tdSql.checkRows(10)
tdSql.execute("alter table meters add tag tgcol2 tinyint")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol2 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol3 smallint")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol3 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol4 bigint")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol4 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol5 float")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol5 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol6 double")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol6 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol7 bool")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol7 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol8 binary(20)")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol8 from meters")
tdSql.checkRows(1)
tdSql.execute("alter table meters add tag tgcol9 nchar(20)")
tdSql.query("select * from meters")
tdSql.checkRows(10)
tdSql.query("select tgcol9 from meters")
tdSql.checkRows(1)
self.restartTaosd()
tdSql.query("select * from meters")
tdSql.checkRows(10)
if self.checkNullValue(tdSql.queryResult) is False:
tdLog.exit("non None value is detected")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -140,6 +140,7 @@ python3 ./test.py -f query/queryJoin.py ...@@ -140,6 +140,7 @@ python3 ./test.py -f query/queryJoin.py
python3 ./test.py -f query/filterCombo.py python3 ./test.py -f query/filterCombo.py
python3 ./test.py -f query/queryNormal.py python3 ./test.py -f query/queryNormal.py
python3 ./test.py -f query/select_last_crash.py python3 ./test.py -f query/select_last_crash.py
python3 ./test.py -f query/queryNullValueTest.py
#stream #stream
python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream1.py
......
...@@ -136,6 +136,8 @@ echo "defaultPass taosdata" >> $TAOS_CFG ...@@ -136,6 +136,8 @@ echo "defaultPass taosdata" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG
echo "clog 2" >> $TAOS_CFG echo "clog 2" >> $TAOS_CFG
#echo "cache 1" >> $TAOS_CFG
#echo "block 2" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG
echo "numOfTotalVnodes 4" >> $TAOS_CFG echo "numOfTotalVnodes 4" >> $TAOS_CFG
echo "maxVgroupsPerDb 4" >> $TAOS_CFG echo "maxVgroupsPerDb 4" >> $TAOS_CFG
......
...@@ -88,7 +88,9 @@ if [ "$EXEC_OPTON" = "start" ]; then ...@@ -88,7 +88,9 @@ if [ "$EXEC_OPTON" = "start" ]; then
echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR
if [ "$SHELL_OPTION" = "true" ]; then if [ "$SHELL_OPTION" = "true" ]; then
nohup valgrind --log-file=${LOG_DIR}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & TT=`date +%s`
mkdir ${LOG_DIR}/${TT}
nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 &
else else
nohup $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & nohup $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 &
fi fi
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode2 -c wallevel -v 2
system sh/cfg.sh -n dnode3 -c wallevel -v 2
system sh/cfg.sh -n dnode4 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c debugFlag -v 131
system sh/cfg.sh -n dnode2 -c debugFlag -v 131
system sh/cfg.sh -n dnode3 -c debugFlag -v 131
system sh/cfg.sh -n dnode4 -c debugFlag -v 131
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start -t
system sh/exec.sh -n dnode3 -s start -t
sleep 3000
print ========= step1
sql create database db replica 2
#sql create table db.tb1 (ts timestamp, i int)
#sql create table db.tb2 (ts timestamp, i int)
#sql create table db.tb3 (ts timestamp, i int)
#sql create table db.tb4 (ts timestamp, i int)
#sql insert into db.tb1 values(now, 1)
#sql select count(*) from db.tb1
sql create database db replica 2
sql create table db.tb (ts timestamp, i int)
sql insert into db.tb values(now, 1)
sql select count(*) from db.tb
$lastRows = $rows
print ======== step2
#run_back unique/vnode/back_insert_many.sim
run_back unique/vnode/back_insert.sim
sleep 3000
print ======== step3
$x = 0
loop:
print ======== step4
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 10000
system sh/exec.sh -n dnode2 -s start -t
sleep 10000
print ======== step5
system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 10000
system sh/exec.sh -n dnode3 -s start -t
sleep 10000
print ======== step6
#sql select count(*) from db.tb1
#print select count(*) from db.tb1 ==> $data00 $lastRows
sql select count(*) from db.tb
print select count(*) from db.tb ==> $data00 $lastRows
if $data00 <= $lastRows then
return -1
endi
print ======== step7
$lastRows = $data00
print ======== loop Times $x
if $x < 10 then
$x = $x + 1
goto loop
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册