提交 64d2a3e3 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/stream

......@@ -128,7 +128,6 @@ typedef struct setConfRet {
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
......
......@@ -151,6 +151,7 @@ void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
struct SConfig *taosGetCfg();
int32_t taosSetCfg(SConfig *pCfg, char* name);
#ifdef __cplusplus
}
......
......@@ -510,7 +510,8 @@ typedef struct {
int8_t superUser;
int8_t connType;
SEpSet epSet;
char sVersion[128];
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
} SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
......@@ -836,6 +837,20 @@ typedef struct {
int32_t tSerializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
int32_t tDeserializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
typedef struct {
int32_t useless; // useless
} SServerVerReq;
int32_t tSerializeSServerVerReq(void* buf, int32_t bufLen, SServerVerReq* pReq);
int32_t tDeserializeSServerVerReq(void* buf, int32_t bufLen, SServerVerReq* pReq);
typedef struct {
char ver[TSDB_VERSION_LEN];
} SServerVerRsp;
int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
......@@ -1229,6 +1244,21 @@ typedef struct {
int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
typedef struct {
char name[TSDB_CONFIG_OPTION_LEN + 1];
char value[TSDB_CONFIG_VALUE_LEN + 1];
} SVariablesInfo;
typedef struct {
SArray *variables; //SArray<SVariablesInfo>
} SShowVariablesRsp;
int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq);
int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq);
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp);
/*
* sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%'
......
......@@ -163,6 +163,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......
......@@ -57,6 +57,8 @@ void tNameAssign(SName* dst, const SName* src);
int32_t tNameSetDbName(SName* dst, int32_t acctId, const char* dbName, size_t nameLen);
int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen);
int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
int32_t tNameSetAcctId(SName* dst, int32_t acctId);
......
......@@ -71,6 +71,7 @@ typedef struct SCatalogReq {
SArray* pTableCfg; // element is SNAME
bool qNodeRequired; // valid qnode
bool dNodeRequired; // valid dnode
bool svrVerRequired;
bool forceUpdate;
} SCatalogReq;
......@@ -80,18 +81,19 @@ typedef struct SMetaRes {
} SMetaRes;
typedef struct SMetaData {
SArray* pDbVgroup; // pRes = SArray<SVgroupInfo>*
SArray* pDbCfg; // pRes = SDbCfgInfo*
SArray* pDbInfo; // pRes = SDbInfo*
SArray* pTableMeta; // pRes = STableMeta*
SArray* pTableHash; // pRes = SVgroupInfo*
SArray* pTableIndex; // pRes = SArray<STableIndexInfo>*
SArray* pUdfList; // pRes = SFuncInfo*
SArray* pIndex; // pRes = SIndexInfo*
SArray* pUser; // pRes = bool*
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
SArray* pTableCfg; // pRes = STableCfg*
SArray* pDnodeList; // pRes = SArray<SEpSet>*
SArray* pDbVgroup; // pRes = SArray<SVgroupInfo>*
SArray* pDbCfg; // pRes = SDbCfgInfo*
SArray* pDbInfo; // pRes = SDbInfo*
SArray* pTableMeta; // pRes = STableMeta*
SArray* pTableHash; // pRes = SVgroupInfo*
SArray* pTableIndex; // pRes = SArray<STableIndexInfo>*
SArray* pUdfList; // pRes = SFuncInfo*
SArray* pIndex; // pRes = SIndexInfo*
SArray* pUser; // pRes = bool*
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
SArray* pTableCfg; // pRes = STableCfg*
SArray* pDnodeList; // pRes = SArray<SEpSet>*
SMetaRes* pSvrVer; // pRes = char*
} SMetaData;
typedef struct SCatalogCfg {
......@@ -268,7 +270,7 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, c
*/
int32_t catalogGetAllMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
......@@ -298,6 +300,8 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo *pConn, char** pVersion);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate);
int32_t catalogClearCache(void);
......
......@@ -51,6 +51,8 @@ typedef struct SParseContext {
bool isSuperUser;
bool async;
int8_t schemalessType;
const char* svrVer;
bool nodeOffline;
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -266,19 +266,19 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
......
......@@ -33,7 +33,8 @@ typedef enum {
CFG_STYPE_ENV_CMD,
CFG_STYPE_APOLLO_URL,
CFG_STYPE_ARG_LIST,
CFG_STYPE_TAOS_OPTIONS
CFG_STYPE_TAOS_OPTIONS,
CFG_STYPE_ALTER_CMD,
} ECfgSrcType;
typedef enum {
......
......@@ -54,6 +54,11 @@ enum {
RES_TYPE__TMQ_META,
};
#define SHOW_VARIABLES_RESULT_COLS 2
#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
......@@ -104,6 +109,8 @@ typedef struct SHeartBeatInfo {
struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
int32_t totalDnodes;
int32_t onlineDnodes;
TdThreadMutex qnodeMutex;
SArray* pQnodeList;
SAppClusterSummary summary;
......@@ -127,7 +134,8 @@ typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int8_t connType;
int32_t acctId;
uint32_t connId;
......@@ -215,6 +223,7 @@ typedef struct SRequestObj {
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool stableQuery;
bool validateOnly;
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
......@@ -235,7 +244,12 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveT
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly);
void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, bool validateOnly);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
......@@ -301,7 +315,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
......
......@@ -161,6 +161,9 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
}
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId;
if (pRsp->query->killRid) {
......
......@@ -178,7 +178,9 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.pStmtCb = pStmtCb,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
......@@ -237,6 +239,10 @@ static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscOb
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
if (pRequest->validateOnly) {
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
return;
}
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
......@@ -261,6 +267,11 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
}
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (pRequest->validateOnly) {
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
return TSDB_CODE_SUCCESS;
}
// drop table if exists not_exists_table
if (NULL == pQuery->pCmdMsg) {
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
......@@ -276,8 +287,11 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
return TSDB_CODE_SUCCESS;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
if (code) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
return code;
}
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
......@@ -851,15 +865,19 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
code = execLocalCmd(pRequest, pQuery);
if (!pRequest->validateOnly) {
code = execLocalCmd(pRequest, pQuery);
}
break;
case QUERY_EXEC_MODE_RPC:
code = execDdlQuery(pRequest, pQuery);
if (!pRequest->validateOnly) {
code = execDdlQuery(pRequest, pQuery);
}
break;
case QUERY_EXEC_MODE_SCHEDULE: {
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList);
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
......@@ -894,7 +912,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
return pRequest;
}
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
......@@ -904,6 +922,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
return NULL;
}
pRequest->validateOnly = validateOnly;
code = parseSql(pRequest, false, &pQuery, NULL);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
......@@ -945,7 +965,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
......@@ -962,7 +982,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
taosArrayDestroy(pNodeList);
} else {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......@@ -1045,14 +1065,14 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
return TSDB_CODE_SUCCESS;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
do {
destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen);
pRequest = launchQuery(pTscObj, sql, sqlLen, validateOnly);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
......@@ -1819,3 +1839,251 @@ _OVER:
}
return code;
}
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t acctId, char* db) {
SName name;
if (len1 <= 0) {
return -1;
}
const char *dbName = db;
const char *tbName = NULL;
int32_t dbLen = 0;
int32_t tbLen = 0;
if (len2 > 0) {
dbName = str + pos1;
dbLen = len1;
tbName = str + pos2;
tbLen = len2;
} else {
dbLen = strlen(db);
tbName = str + pos1;
tbLen = len1;
}
if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
return -1;
}
if (tNameAddTbName(&name, tbName, tbLen)) {
return -1;
}
taosArrayPush(pList, &name);
return TSDB_CODE_SUCCESS;
}
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
*pReq = taosArrayInit(10, sizeof(SName));
if (NULL == *pReq) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
bool inEscape = false;
int32_t code = 0;
int32_t vIdx = 0;
int32_t vPos[2];
int32_t vLen[2];
memset(vPos, -1, sizeof(vPos));
memset(vLen, 0, sizeof(vLen));
for (int32_t i = 0; ; ++i) {
if (0 == *(tbList + i)) {
if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
vLen[vIdx] = i - vPos[vIdx];
}
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) {
goto _return;
}
break;
}
if ('`' == *(tbList + i)) {
inEscape = !inEscape;
if (!inEscape) {
if (vPos[vIdx] >= 0) {
vLen[vIdx] = i - vPos[vIdx];
} else {
goto _return;
}
}
continue;
}
if (inEscape) {
if (vPos[vIdx] < 0) {
vPos[vIdx] = i;
}
continue;
}
if ('.' == *(tbList + i)) {
if (vPos[vIdx] < 0) {
goto _return;
}
if (vLen[vIdx] <= 0) {
vLen[vIdx] = i - vPos[vIdx];
}
vIdx++;
if (vIdx >= 2) {
goto _return;
}
continue;
}
if (',' == *(tbList + i)) {
if (vPos[vIdx] < 0) {
goto _return;
}
if (vLen[vIdx] <= 0) {
vLen[vIdx] = i - vPos[vIdx];
}
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
if (code) {
goto _return;
}
memset(vPos, -1, sizeof(vPos));
memset(vLen, 0, sizeof(vLen));
vIdx = 0;
continue;
}
if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
vLen[vIdx] = i - vPos[vIdx];
}
continue;
}
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) ||
('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
('0' <= *(tbList + i) && '9' >= *(tbList + i))) {
if (vLen[vIdx] > 0) {
goto _return;
}
if (vPos[vIdx] < 0) {
vPos[vIdx] = i;
}
continue;
}
goto _return;
}
return TSDB_CODE_SUCCESS;
_return:
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
taosArrayDestroy(*pReq);
*pReq = NULL;
return terrno;
}
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
SSyncQueryParam *pParam = param;
pParam->pRequest->code = code;
tsem_post(&pParam->sem);
}
void syncQueryFn(void *param, void *res, int32_t code) {
SSyncQueryParam *pParam = param;
pParam->pRequest = res;
pParam->pRequest->code = code;
tsem_post(&pParam->sem);
}
void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, bool validateOnly) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(*(int64_t *)taos);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
fp(param, NULL, terrno);
return;
}
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
fp(param, NULL, terrno);
return;
}
SRequestObj *pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
return;
}
pRequest->validateOnly = validateOnly;
pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
}
TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem);
releaseTscObj(*(int64_t *)taos);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(*(int64_t *)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen, validateOnly);
releaseTscObj(*(int64_t *)taos);
return pRes;
#endif
}
......@@ -81,12 +81,6 @@ void taos_cleanup(void) {
taosCloseLog();
}
setConfRet taos_set_config(const char *config) {
// TODO
setConfRet ret = {SET_CONF_RET_SUCC, {0}};
return ret;
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, port, user, db);
if (user == NULL) {
......@@ -205,51 +199,9 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pResInfo->userFields;
}
static void syncQueryFn(void *param, void *res, int32_t code) {
SSyncQueryParam *pParam = param;
pParam->pRequest = res;
pParam->pRequest->code = code;
tsem_post(&pParam->sem);
}
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taos_query_a(taos, sql, syncQueryFn, param);
tsem_wait(&param->sem);
releaseTscObj(*(int64_t *)taos);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(*(int64_t *)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen);
releaseTscObj(*(int64_t *)taos);
return pRes;
#endif
return taosQueryImpl(taos, sql, false);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
......@@ -641,7 +593,14 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
return pResInfo->pCol[columnIndex].offset;
}
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
int taos_validate_sql(TAOS *taos, const char *sql) {
TAOS_RES* pObj = taosQueryImpl(taos, sql, true);
int code = taos_errno(pObj);
taos_free_result(pObj);
return code;
}
void taos_reset_current_db(TAOS *taos) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
......@@ -664,7 +623,7 @@ const char *taos_get_server_info(TAOS *taos) {
releaseTscObj(*(int64_t *)taos);
return pTscObj->ver;
return pTscObj->sDetailVer;
}
typedef struct SqlParseWrapper {
......@@ -731,38 +690,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(*(int64_t *)taos);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
fp(param, NULL, terrno);
return;
}
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
fp(param, NULL, terrno);
return;
}
SRequestObj *pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
return;
}
pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
taosAsyncQueryImpl(taos, sql, fp, param, false);
}
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
......@@ -838,7 +766,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
.requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet};
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, &catalogReq, retrieveMetaCallback, pWrapper,
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob);
if (code == TSDB_CODE_SUCCESS) {
return;
......@@ -955,10 +883,75 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
// TODO
return -1;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj *pRequest = NULL;
SCatalogReq catalogReq = {0};
if (NULL == tableNameList) {
return TSDB_CODE_SUCCESS;
}
int32_t length = (int32_t)strlen(tableNameList);
if (0 == length) {
return TSDB_CODE_SUCCESS;
} else if (length > MAX_TABLE_NAME_LENGTH) {
tscError("tableNameList too long, length:%d, maximum allowed:%d", length, MAX_TABLE_NAME_LENGTH);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) {
goto _return;
}
SCatalog* pCtg = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
if (code != TSDB_CODE_SUCCESS) {
goto _return;
}
char* sql = "taos_load_table_info";
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _return;
}
SSyncQueryParam param = {0};
tsem_init(&param.sem, 0, 0);
param.pRequest = pRequest;
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, &param, NULL);
if (code) {
goto _return;
}
tsem_wait(&param.sem);
_return:
taosArrayDestroy(catalogReq.pTableMeta);
destroyRequest(pRequest);
releaseTscObj(*(int64_t *)taos);
return code;
}
TAOS_STMT *taos_stmt_init(TAOS *taos) {
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
......
......@@ -82,7 +82,8 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connId = connectRsp.connId;
pTscObj->acctId = connectRsp.acctId;
tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
// update the appInstInfo
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
......@@ -287,6 +288,103 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
int32_t numOfCfg = taosArrayGetSize(pVars);
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SVariablesInfo *pInfo = taosArrayGet(pVars, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, value, false);
}
pBlock->info.rows = numOfCfg;
*block = pBlock;
return TSDB_CODE_SUCCESS;
}
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = NULL;
int32_t code = buildShowVariablesBlock(pVars, &pBlock);
if (code) {
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pRsp)->useconds = 0;
(*pRsp)->completed = 1;
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS;
}
int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
} else {
SShowVariablesRsp rsp = {0};
SRetrieveTableRsp* pRes = NULL;
code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
if (TSDB_CODE_SUCCESS == code) {
code = buildShowVariablesRsp(rsp.variables, &pRes);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false);
}
tFreeSShowVariablesRsp(&rsp);
}
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);
}
return code;
}
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
switch (msgType) {
case TDMT_MND_CONNECT:
......@@ -301,6 +399,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
return processDropDbRsp;
case TDMT_MND_ALTER_STB:
return processAlterStbRsp;
case TDMT_MND_SHOW_VARIABLES:
return processShowVariablesRsp;
default:
return genericRspCallback;
}
......
......@@ -636,6 +636,373 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return 0;
}
int32_t taosSetCfg(SConfig *pCfg, char* name) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
switch (lowcaseName[0]) {
case 'a': {
if (strcasecmp("asyncLog", name) == 0) {
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
}
break;
}
case 'b': {
if (strcasecmp("bnodeShmSize", name) == 0) {
tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
}
break;
}
case 'c': {
if (strcasecmp("charset", name) == 0) {
const char *locale = cfgGetItem(pCfg, "locale")->str;
const char *charset = cfgGetItem(pCfg, "charset")->str;
taosSetSystemLocale(locale, charset);
osSetSystemLocale(locale, charset);
} else if (strcasecmp("compressMsgSize", name) == 0) {
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
} else if (strcasecmp("compressColData", name) == 0) {
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
} else if (strcasecmp("countAlwaysReturnValue", name) == 0) {
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
} else if (strcasecmp("cDebugFlag", name) == 0) {
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
}
break;
}
case 'd': {
if (strcasecmp("deadLockKillQuery", name) == 0) {
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
} else if (strcasecmp("dDebugFlag", name) == 0) {
dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32;
}
break;
}
case 'e': {
if (strcasecmp("enableCoreFile", name) == 0) {
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
taosSetConsoleEcho(enableCore);
}
break;
}
case 'f': {
if (strcasecmp("fqdn", name) == 0) {
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
char defaultFirstEp[TSDB_EP_LEN] = {0};
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
SEp firstEp = {0};
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
} else if (strcasecmp("firstEp", name) == 0) {
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
char defaultFirstEp[TSDB_EP_LEN] = {0};
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
SEp firstEp = {0};
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
} else if (strcasecmp("fsDebugFlag", name) == 0) {
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
} else if (strcasecmp("fnDebugFlag", name) == 0) {
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
}
break;
}
case 'i': {
if (strcasecmp("idxDebugFlag", name) == 0) {
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
}
break;
}
case 'j': {
if (strcasecmp("jniDebugFlag", name) == 0) {
jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32;
}
break;
}
case 'k': {
if (strcasecmp("keepColumnName", name) == 0) {
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
}
break;
}
case 'l': {
if (strcasecmp("locale", name) == 0) {
const char *locale = cfgGetItem(pCfg, "locale")->str;
const char *charset = cfgGetItem(pCfg, "charset")->str;
taosSetSystemLocale(locale, charset);
osSetSystemLocale(locale, charset);
} else if (strcasecmp("logDir", name) == 0) {
tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX);
taosExpandDir(tsLogDir, tsLogDir, PATH_MAX);
} else if (strcasecmp("logKeepDays", name) == 0) {
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
}
break;
}
case 'm': {
switch (lowcaseName[1]) {
case 'a': {
if (strcasecmp("maxShellConns", name) == 0) {
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
}
break;
}
case 'd': {
if (strcasecmp("mDebugFlag", name) == 0) {
mDebugFlag = cfgGetItem(pCfg, "mDebugFlag")->i32;
}
break;
}
case 'i': {
if (strcasecmp("minimalTempDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
} else if (strcasecmp("minSlidingTime", name) == 0) {
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
} else if (strcasecmp("minIntervalTime", name) == 0) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
}
break;
}
case 'n': {
if (strcasecmp("mnodeShmSize", name) == 0) {
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
}
break;
}
case 'o': {
if (strcasecmp("monitor", name) == 0) {
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
} else if (strcasecmp("monitorInterval", name) == 0) {
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
} else if (strcasecmp("monitorFqdn", name) == 0) {
tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN);
} else if (strcasecmp("monitorPort", name) == 0) {
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
} else if (strcasecmp("monitorMaxLogs", name) == 0) {
tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32;
} else if (strcasecmp("monitorComp", name) == 0) {
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
}
break;
}
case 'q': {
if (strcasecmp("mqRebalanceInterval", name) == 0) {
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
}
break;
}
case 'u': {
if (strcasecmp("multiProcess", name) == 0) {
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
}
break;
}
default:
terrno = TSDB_CODE_CFG_NOT_FOUND;
return -1;
}
break;
}
case 'n': {
if (strcasecmp("numOfTaskQueueThreads", name) == 0) {
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
} else if (strcasecmp("numOfRpcThreads", name) == 0) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
} else if (strcasecmp("numOfCommitThreads", name) == 0) {
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
} else if (strcasecmp("numOfMnodeQueryThreads", name) == 0) {
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
} else if (strcasecmp("numOfMnodeReadThreads", name) == 0) {
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
} else if (strcasecmp("numOfVnodeQueryThreads", name) == 0) {
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
} else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
} else if (strcasecmp("numOfVnodeWriteThreads", name) == 0) {
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
} else if (strcasecmp("numOfVnodeMergeThreads", name) == 0) {
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
} else if (strcasecmp("numOfLogLines", name) == 0) {
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
}
break;
}
case 'p': {
if (strcasecmp("printAuth", name) == 0) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
}
break;
}
case 'q': {
if (strcasecmp("queryPolicy", name) == 0) {
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
} else if (strcasecmp("querySmaOptimize", name) == 0) {
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
} else if (strcasecmp("queryBufferSize", name) == 0) {
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}
} else if (strcasecmp("qnodeShmSize", name) == 0) {
tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32;
} else if (strcasecmp("qDebugFlag", name) == 0) {
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
}
break;
}
case 'r': {
if (strcasecmp("retryStreamCompDelay", name) == 0) {
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
} else if (strcasecmp("rpcDebugFlag", name) == 0) {
rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32;
}
break;
}
case 's': {
if (strcasecmp("secondEp", name) == 0) {
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
SEp secondEp = {0};
taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? tsFirst : pSecondpItem->str, &secondEp);
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
} else if (strcasecmp("smlChildTableName", name) == 0) {
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
} else if (strcasecmp("smlTagName", name) == 0) {
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
} else if (strcasecmp("smlDataFormat", name) == 0) {
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
} else if (strcasecmp("shellActivityTimer", name) == 0) {
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
} else if (strcasecmp("supportVnodes", name) == 0) {
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
} else if (strcasecmp("statusInterval", name) == 0) {
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
} else if (strcasecmp("slaveQuery", name) == 0) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
} else if (strcasecmp("snodeShmSize", name) == 0) {
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
} else if (strcasecmp("serverPort", name) == 0) {
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
char defaultFirstEp[TSDB_EP_LEN] = {0};
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
SEp firstEp = {0};
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
} else if (strcasecmp("sDebugFlag", name) == 0) {
sDebugFlag = cfgGetItem(pCfg, "sDebugFlag")->i32;
} else if (strcasecmp("smaDebugFlag", name) == 0) {
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
}
break;
}
case 't': {
if (strcasecmp("timezone", name) == 0) {
SConfigItem *pItem = cfgGetItem(pCfg, "timezone");
osSetTimezone(pItem->str);
uDebug("timezone format changed from %s to %s", pItem->str, tsTimezoneStr);
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype);
} else if (strcasecmp("tempDir", name) == 0) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
}
} else if (strcasecmp("telemetryReporting", name) == 0) {
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
} else if (strcasecmp("telemetryInterval", name) == 0) {
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
} else if (strcasecmp("telemetryServer", name) == 0) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
} else if (strcasecmp("telemetryPort", name) == 0) {
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
} else if (strcasecmp("transPullupInterval", name) == 0) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
} else if (strcasecmp("tmrDebugFlag", name) == 0) {
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
} else if (strcasecmp("tsdbDebugFlag", name) == 0) {
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
} else if (strcasecmp("tqDebugFlag", name) == 0) {
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
}
break;
}
case 'u': {
if (strcasecmp("udf", name) == 0) {
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
} else if (strcasecmp("uDebugFlag", name) == 0) {
uDebugFlag = cfgGetItem(pCfg, "uDebugFlag")->i32;
}
break;
}
case 'v': {
if (strcasecmp("vnodeShmSize", name) == 0) {
tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32;
} else if (strcasecmp("vDebugFlag", name) == 0) {
vDebugFlag = cfgGetItem(pCfg, "vDebugFlag")->i32;
}
break;
}
case 'w': {
if (strcasecmp("wDebugFlag", name) == 0) {
wDebugFlag = cfgGetItem(pCfg, "wDebugFlag")->i32;
}
break;
}
default:
terrno = TSDB_CODE_CFG_NOT_FOUND;
return -1;
}
return 0;
}
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
if (tsCfg == NULL) osDefaultInit();
......
......@@ -2250,6 +2250,56 @@ int32_t tDeserializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq
return 0;
}
int32_t tSerializeSServerVerReq(void *buf, int32_t bufLen, SServerVerReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSServerVerReq(void *buf, int32_t bufLen, SServerVerReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->ver) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->ver) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
SEncoder encoder = {0};
......@@ -2859,6 +2909,67 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR
return 0;
}
int32_t tEncodeSVariablesInfo(SEncoder* pEncoder, SVariablesInfo* pInfo) {
if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1;
return 0;
}
int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1;
return 0;
}
int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t varNum = taosArrayGetSize(pRsp->variables);
if (tEncodeI32(&encoder, varNum) < 0) return -1;
for (int32_t i = 0; i < varNum; ++i) {
SVariablesInfo* pInfo = taosArrayGet(pRsp->variables, i);
if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t varNum = 0;
if (tDecodeI32(&decoder, &varNum) < 0) return -1;
if (varNum > 0) {
pRsp->variables = taosArrayInit(varNum, sizeof(SVariablesInfo));
if (NULL == pRsp->variables) return -1;
for (int32_t i = 0; i < varNum; ++i) {
SVariablesInfo info = {0};
if (tDecodeSVariablesInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(pRsp->variables, &info)) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp) {
if (NULL == pRsp) {
return;
}
taosArrayDestroy(pRsp->variables);
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -3400,7 +3511,8 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -3420,7 +3532,8 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -214,6 +214,20 @@ int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t name
return 0;
}
int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen) {
assert(dst != NULL && tbName != NULL && nameLen > 0);
// too long account id or too long db name
if (nameLen >= tListLen(dst->tname) || nameLen <= 0) {
return -1;
}
dst->type = TSDB_TABLE_NAME_T;
tstrncpy(dst->tname, tbName, nameLen + 1);
return 0;
}
int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
assert(dst != NULL);
dst->acctId = acctId;
......
......@@ -206,6 +206,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
......
......@@ -48,6 +48,7 @@ static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq);
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq);
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
......@@ -78,6 +79,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
......@@ -554,6 +556,60 @@ _OVER:
return code;
}
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
SShowVariablesRsp rsp = {0};
int32_t code = -1;
rsp.variables = taosArrayInit(4, sizeof(SVariablesInfo));
if (NULL == rsp.variables) {
mError("failed to alloc SVariablesInfo array while process show variables req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
SVariablesInfo info = {0};
strcpy(info.name, "statusInterval");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "timezone");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "locale");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "charset");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
taosArrayPush(rsp.variables, &info);
int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp);
pReq->info.rspLen = rspLen;
pReq->info.rsp = pRsp;
code = 0;
_OVER:
if (code != 0) {
mError("failed to get show variables info since %s", terrstr());
}
tFreeSShowVariablesRsp(&rsp);
return code;
}
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
......
......@@ -70,6 +70,7 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
static void mndFreeApp(SAppObj *pApp);
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter);
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq);
int32_t mndInitProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -94,6 +95,7 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
......@@ -262,8 +264,9 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
connectRsp.connId = pConn->id;
connectRsp.connType = connReq.connType;
connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
gitinfo);
mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
......@@ -460,6 +463,27 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn
return TSDB_CODE_SUCCESS;
}
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
SSdb *pSdb = pMnode->pSdb;
SDnodeObj *pDnode = NULL;
int64_t curMs = taosGetTimestampMs();
void *pIter = NULL;
while (true) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
bool online = mndIsDnodeOnline(pDnode, curMs);
if (online) {
(*num)++;
}
sdbRelease(pSdb, pDnode);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -503,7 +527,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
rspBasic->connId = pConn->id;
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
rspBasic->onlineDnodes = 1; // TODO
mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
......@@ -694,6 +718,28 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
}
}
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
int32_t code = -1;
SServerVerRsp rsp = {0};
strcpy(rsp.ver, version);
int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
if (contLen < 0) goto _over;
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) goto _over;
tSerializeSServerVerRsp(pRsp, contLen, &rsp);
pReq->info.rspLen = contLen;
pReq->info.rsp = pRsp;
code = 0;
_over:
return code;
}
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
if(NOT TD_WINDOWS)
add_test(
NAME dbTest
COMMAND dbTest
)
endif(NOT TD_WINDOWS)
add_test(
NAME dbTest
COMMAND dbTest
)
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
# if(NOT TD_WINDOWS)
add_test(
NAME funcTest
COMMAND funcTest
)
# endif(NOT TD_WINDOWS)
add_test(
NAME funcTest
COMMAND funcTest
)
\ No newline at end of file
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
# if(NOT TD_WINDOWS)
add_test(
NAME profileTest
COMMAND profileTest
)
# endif(NOT TD_WINDOWS)
add_test(
NAME profileTest
COMMAND profileTest
)
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
# if(NOT TD_WINDOWS)
add_test(
NAME showTest
COMMAND showTest
)
# endif(NOT TD_WINDOWS)
add_test(
NAME showTest
COMMAND showTest
)
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
if(NOT TD_WINDOWS)
add_test(
NAME smaTest
COMMAND smaTest
)
endif(NOT TD_WINDOWS)
add_test(
NAME smaTest
COMMAND smaTest
)
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
if(NOT TD_WINDOWS)
add_test(
NAME stbTest
COMMAND stbTest
)
endif(NOT TD_WINDOWS)
\ No newline at end of file
add_test(
NAME stbTest
COMMAND stbTest
)
\ No newline at end of file
......@@ -5,9 +5,7 @@ target_link_libraries(
PUBLIC sut
)
if(NOT TD_WINDOWS)
add_test(
NAME userTest
COMMAND userTest
)
endif(NOT TD_WINDOWS)
add_test(
NAME userTest
COMMAND userTest
)
......@@ -76,6 +76,7 @@ typedef enum {
CTG_TASK_GET_INDEX,
CTG_TASK_GET_UDF,
CTG_TASK_GET_USER,
CTG_TASK_GET_SVR_VER,
} CTG_TASK_TYPE;
typedef enum {
......@@ -224,6 +225,7 @@ typedef struct SCtgJob {
int32_t dbInfoNum;
int32_t tbIndexNum;
int32_t tbCfgNum;
int32_t svrVerNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
......@@ -578,8 +580,9 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
......
......@@ -1051,7 +1051,7 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pConn || NULL == pReq || NULL == fp || NULL == param) {
......@@ -1060,7 +1060,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, uint64_t
int32_t code = 0, taskNum = 0;
SCtgJob *pJob = NULL;
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, reqId, pReq, fp, param, &taskNum));
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param, &taskNum));
if (taskNum <= 0) {
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
fp(pMetaData, param, TSDB_CODE_SUCCESS);
......@@ -1247,6 +1247,22 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo *pConn, char** pVersion) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pConn || NULL == pVersion) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgGetSvrVerFromMnode(pCtg, pConn, pVersion, NULL));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
CTG_API_ENTER();
......
......@@ -255,6 +255,20 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_SVR_VER;
task.taskId = taskIdx;
task.pJob = pJob;
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " [%dth] task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param;
SCtgTask task = {0};
......@@ -413,7 +427,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t code = 0;
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
......@@ -421,6 +435,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
int32_t svrVerNum = pReq->svrVerRequired ? 1 : 0;
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
......@@ -428,21 +443,21 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, pConn->requestId);
return TSDB_CODE_SUCCESS;
}
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) {
ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), reqId);
ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgJob *pJob = *job;
pJob->queryId = reqId;
pJob->queryId = pConn->requestId;
pJob->userFp = fp;
pJob->pCtg = pCtg;
pJob->conn = *pConn;
......@@ -460,6 +475,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
pJob->dbInfoNum = dbInfoNum;
pJob->tbIndexNum = tbIndexNum;
pJob->tbCfgNum = tbCfgNum;
pJob->svrVerNum = svrVerNum;
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
......@@ -530,6 +546,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL));
}
if (svrVerNum) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_SVR_VER, NULL, NULL));
}
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
......@@ -728,6 +748,21 @@ int32_t ctgDumpUserRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpSvrVer(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pSvrVer) {
pJob->jobRes.pSvrVer = taosMemoryCalloc(1, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pSvrVer) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
pJob->jobRes.pSvrVer->code = pTask->code;
pJob->jobRes.pSvrVer->pRes = pTask->res;
return TSDB_CODE_SUCCESS;
}
int32_t ctgInvokeSubCb(SCtgTask *pTask) {
int32_t code = 0;
......@@ -1156,6 +1191,20 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetSvrVerRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
_return:
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);
}
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
......@@ -1459,6 +1508,15 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
ctgResetTbMetaTask(pTask);
......@@ -1532,6 +1590,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL},
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
};
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
......@@ -1633,7 +1692,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
qDebug("QID:0x%" PRIx64 " ctg start to launch task %d", pJob->queryId, pTask->taskId);
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
pTask->status = CTG_TASK_LAUNCHED;
}
......
......@@ -210,7 +210,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
int64_t jobId = 0;
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, reqId, &req, ctgdUserCallback, param, &jobId));
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, &req, ctgdUserCallback, param, &jobId));
_return:
......
......@@ -217,6 +217,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got stb cfg from mnode, tbFName:%s", target);
break;
}
case TDMT_MND_SERVER_VERSION: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process svr ver rsp failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
}
qDebug("Got svr ver from mnode");
break;
}
default:
qError("invalid req type %s", TMSG_INFO(reqType));
return TSDB_CODE_APP_ERROR;
......@@ -811,4 +826,38 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
qDebug("try to get svr ver from mnode");
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get svr ver msg failed, code:%s", tstrerror(code));
CTG_ERR_RET(code);
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
return TSDB_CODE_SUCCESS;
}
......@@ -45,6 +45,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
return "[get udf]";
case CTG_TASK_GET_USER:
return "[get user]";
case CTG_TASK_GET_SVR_VER:
return "[get svr ver]";
default:
return "unknown";
}
......@@ -103,8 +105,13 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pQnodeList);
pData->pQnodeList = NULL;
taosArrayDestroy(pData->pDnodeList);
pData->pDnodeList = NULL;
taosArrayDestroy(pData->pTableCfg);
pData->pTableCfg = NULL;
taosMemoryFreeClear(pData->pSvrVer);
}
void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) {
......@@ -346,20 +353,8 @@ void ctgResetTbMetaTask(SCtgTask* pTask) {
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
switch (type) {
case CTG_TASK_GET_QNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_DNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_QNODE:
case CTG_TASK_GET_DNODE:
case CTG_TASK_GET_DB_VGROUP: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
......@@ -373,14 +368,6 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
}
break;
}
case CTG_TASK_GET_DB_INFO: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_HASH: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_INDEX: {
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
*pRes = NULL;
......@@ -394,15 +381,13 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
}
break;
}
case CTG_TASK_GET_INDEX: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_UDF: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_USER: {
case CTG_TASK_GET_TB_HASH:
case CTG_TASK_GET_DB_INFO:
case CTG_TASK_GET_INDEX:
case CTG_TASK_GET_UDF:
case CTG_TASK_GET_USER:
case CTG_TASK_GET_SVR_VER:
case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes);
break;
}
......@@ -415,20 +400,12 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
switch (type) {
case CTG_TASK_GET_QNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_QNODE:
case CTG_TASK_GET_DNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_DB_VGROUP: {
if (*pRes) {
SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
......@@ -445,14 +422,6 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
}
break;
}
case CTG_TASK_GET_DB_INFO: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_HASH: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_INDEX: {
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
*pRes = NULL;
......@@ -466,14 +435,12 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
}
break;
}
case CTG_TASK_GET_INDEX: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_UDF: {
taosMemoryFreeClear(*pRes);
break;
}
case CTG_TASK_GET_TB_META:
case CTG_TASK_GET_DB_INFO:
case CTG_TASK_GET_TB_HASH:
case CTG_TASK_GET_INDEX:
case CTG_TASK_GET_UDF:
case CTG_TASK_GET_SVR_VER:
case CTG_TASK_GET_USER: {
taosMemoryFreeClear(*pRes);
break;
......@@ -497,10 +464,6 @@ void ctgClearSubTaskRes(SCtgSubRes *pRes) {
void ctgFreeTaskCtx(SCtgTask* pTask) {
switch (pTask->type) {
case CTG_TASK_GET_QNODE: {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_META: {
SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
......@@ -511,18 +474,6 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_DB_VGROUP: {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_DB_CFG: {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_DB_INFO: {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TB_HASH: {
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
......@@ -542,14 +493,12 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_INDEX: {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_UDF: {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_DB_VGROUP:
case CTG_TASK_GET_DB_CFG:
case CTG_TASK_GET_DB_INFO:
case CTG_TASK_GET_INDEX:
case CTG_TASK_GET_UDF:
case CTG_TASK_GET_QNODE:
case CTG_TASK_GET_USER: {
taosMemoryFreeClear(pTask->taskCtx);
break;
......
......@@ -520,7 +520,17 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
return execShowCreateTable(pStmt, pRsp);
}
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return TSDB_CODE_FAILED; }
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
return terrno;
}
if (taosSetCfg(tsCfg, pStmt->config)) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* buildLocalVariablesResultDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
......
......@@ -26,6 +26,7 @@
typedef struct SBlockKeyTuple {
TSKEY skey;
void* payloadAddr;
int16_t index;
} SBlockKeyTuple;
typedef struct SBlockKeyInfo {
......@@ -36,7 +37,6 @@ typedef struct SBlockKeyInfo {
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
TSKEY left = *(TSKEY*)lhs;
TSKEY right = *(TSKEY*)rhs;
if (left == right) {
return 0;
} else {
......@@ -44,6 +44,16 @@ static int32_t rowDataCompar(const void* lhs, const void* rhs) {
}
}
static int32_t rowDataComparStable(const void* lhs, const void* rhs) {
TSKEY left = *(TSKEY*)lhs;
TSKEY right = *(TSKEY*)rhs;
if (left == right) {
return ((SBlockKeyTuple*)lhs)->index - ((SBlockKeyTuple*)rhs)->index;
} else {
return left > right ? 1 : -1;
}
}
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
pColList->numOfCols = numOfCols;
pColList->numOfBound = numOfCols;
......@@ -343,6 +353,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey
while (n < nRows) {
pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
pBlkKeyTuple->payloadAddr = pBlockData;
pBlkKeyTuple->index = n;
// next loop
pBlockData += extendedRowSize;
......@@ -354,7 +365,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
// todo. qsort is unstable, if timestamp is same, should get the last one
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0;
......
......@@ -4641,7 +4641,7 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema**
return TSDB_CODE_SUCCESS;
}
static int32_t extractShowLocalVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = 2;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
......@@ -4678,7 +4678,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
return extractShowCreateTableResultSchema(numOfCols, pSchema);
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
return extractShowLocalVariablesResultSchema(numOfCols, pSchema);
case QUERY_NODE_SHOW_VARIABLES_STMT:
return extractShowVariablesResultSchema(numOfCols, pSchema);
default:
break;
}
......@@ -5909,7 +5910,6 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
......@@ -6011,6 +6011,14 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_ALTER_LOCAL_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
break;
case QUERY_NODE_SHOW_VARIABLES_STMT:
pQuery->haveResultSet = true;
pQuery->execMode = QUERY_EXEC_MODE_RPC;
if (NULL != pCxt->pCmdMsg) {
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg);
pQuery->msgType = pQuery->pCmdMsg->msgType;
}
break;
default:
pQuery->execMode = QUERY_EXEC_MODE_RPC;
if (NULL != pCxt->pCmdMsg) {
......
......@@ -54,12 +54,6 @@ class ParserDdlTest : public ParserTestBase {
virtual void checkDdl(const SQuery* pQuery, ParserStage stage) {
ASSERT_NE(pQuery, nullptr);
ASSERT_NE(pQuery->pRoot, nullptr);
if (QUERY_EXEC_MODE_RPC == pQuery->execMode) {
ASSERT_EQ(pQuery->haveResultSet, false);
ASSERT_EQ(pQuery->numOfResCols, 0);
ASSERT_EQ(pQuery->pResSchema, nullptr);
ASSERT_EQ(pQuery->precision, 0);
}
if (nullptr != checkDdl_) {
checkDdl_(pQuery, stage);
}
......
......@@ -32,9 +32,7 @@ if(${BUILD_WINGETOPT})
target_link_libraries(plannerTest PUBLIC wingetopt)
endif()
# if(NOT TD_WINDOWS)
add_test(
NAME plannerTest
COMMAND plannerTest
)
# endif(NOT TD_WINDOWS)
add_test(
NAME plannerTest
COMMAND plannerTest
)
......@@ -144,6 +144,23 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SServerVerReq req = {0};
int32_t bufLen = tSerializeSServerVerReq(NULL, 0, &req);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSServerVerReq(pBuf, bufLen, &req);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
......@@ -467,6 +484,26 @@ int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
return code;
}
int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) {
SServerVerRsp out = {0};
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
if (tDeserializeSServerVerRsp(msg, msgSize, &out) != 0) {
qError("invalid svr ver rsp msg, msgSize:%d", msgSize);
code = TSDB_CODE_INVALID_MSG;
return code;
}
*(char**)output = strdup(out.ver);
return code;
}
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
SDbCfgRsp out = {0};
......@@ -583,6 +620,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
......@@ -596,6 +634,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp;
}
#pragma GCC diagnostic pop
......@@ -125,6 +125,7 @@ typedef struct {
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
SCvtAddr cvtAddr;
bool setMaxRetry;
int hThrdIdx;
} STransConnCtx;
......
......@@ -980,7 +980,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tTrace("try to send req to next node");
pMsg->st = taosGetTimestampUs();
pCtx->retryCount += 1;
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL && pCtx->setMaxRetry == false) {
if (pCtx->retryCount < pEpSet->numOfEps * 3) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
if (pThrd->quit == false) {
......@@ -997,6 +997,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
}
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
pCtx->setMaxRetry = true;
if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
transPrintEpSet(&pCtx->epSet);
......@@ -1012,8 +1013,10 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
}
if (pThrd->quit == false) {
if (pConn->status != ConnInPool) {
addConnToPool(pThrd->pool, pConn);
if (pResp->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pConn->status != ConnInPool) addConnToPool(pThrd->pool, pConn);
} else {
transUnrefCliHandle(pConn);
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
......
......@@ -16,12 +16,12 @@ rem echo NODE: %NODE%
set SCRIPT_DIR=%~dp0..\
rem echo SCRIPT_DIR: %SCRIPT_DIR%
set BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\
echo %cd% | grep community > nul && set "BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\" || set "BUILD_DIR=%SCRIPT_DIR%..\..\debug\build\bin\"
set TSIM=%BUILD_DIR%tsim
rem echo BUILD_DIR: %BUILD_DIR%
rem echo TSIM: %TSIM%
set SIM_DIR=%SCRIPT_DIR%..\..\..\sim\
echo %cd% | grep community > nul && set "SIM_DIR=%SCRIPT_DIR%..\..\..\sim\" || set "SIM_DIR=%SCRIPT_DIR%..\..\sim\"
rem echo SIM_DIR: %SIM_DIR%
set NODE_DIR=%SIM_DIR%%NODE_NAME%\
......
......@@ -4,7 +4,7 @@ echo Executing copy_udf.bat
set SCRIPT_DIR=%cd%
echo SCRIPT_DIR: %SCRIPT_DIR%
cd ..\..\..
echo %cd% | grep community > nul && cd ..\..\.. || cd ..\..
set TAOS_DIR=%cd%
echo find udf library in %TAOS_DIR%
set UDF1_DIR=%TAOS_DIR%\debug\build\lib\udf1.dll
......
......@@ -13,12 +13,12 @@ rem echo NODE: %NODE%
set SCRIPT_DIR=%~dp0..\
rem echo SCRIPT_DIR: %SCRIPT_DIR%
set BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\
echo %cd% | grep community > nul && set "BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\" || set "BUILD_DIR=%SCRIPT_DIR%..\..\debug\build\bin\"
set TSIM=%BUILD_DIR%tsim
rem echo BUILD_DIR: %BUILD_DIR%
rem echo TSIM: %TSIM%
set SIM_DIR=%SCRIPT_DIR%..\..\..\sim\
echo %cd% | grep community > nul && set "SIM_DIR=%SCRIPT_DIR%..\..\..\sim\" || set "SIM_DIR=%SCRIPT_DIR%..\..\sim\"
rem echo SIM_DIR: %SIM_DIR%
set NODE_DIR=%SIM_DIR%%NODE_NAME%\
......
......@@ -13,12 +13,12 @@ rem echo NODE: %EXEC_OPTON%
set SCRIPT_DIR=%~dp0..\
rem echo SCRIPT_DIR: %SCRIPT_DIR%
set BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\
echo %cd% | grep community > nul && set "BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\" || set "BUILD_DIR=%SCRIPT_DIR%..\..\debug\build\bin\"
set TAOSD=%BUILD_DIR%taosd
rem echo BUILD_DIR: %BUILD_DIR%
rem echo TAOSD: %TAOSD%
set SIM_DIR=%SCRIPT_DIR%..\..\..\sim\
echo %cd% | grep community > nul && set "SIM_DIR=%SCRIPT_DIR%..\..\..\sim\" || set "SIM_DIR=%SCRIPT_DIR%..\..\sim\"
rem echo SIM_DIR: %SIM_DIR%
set NODE_DIR=%SIM_DIR%%NODE_NAME%\
......
......@@ -112,7 +112,7 @@ if $data(5)[4] != ready then
goto step2
endi
print =============== step31: move follower
print =============== step3: create database
$leaderExist = 0
$leaderVnode = 0
$follower1 = 0
......@@ -165,7 +165,7 @@ if $rows != 1 then
return -1
endi
print =============== step32: move follower2
print =============== step31:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql show d1.vgroups
......@@ -178,7 +178,7 @@ endi
return
print =============== step33: move follower1
print =============== step32:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql show d1.vgroups
......@@ -189,7 +189,7 @@ if $rows != 1 then
return -1
endi
print =============== step34: move follower2
print =============== step33:
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql show d1.vgroups
......@@ -200,7 +200,7 @@ if $rows != 1 then
return -1
endi
print =============== step35: move follower1
print =============== step34:
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql show d1.vgroups
......@@ -211,9 +211,9 @@ if $rows != 1 then
return -1
endi
print =============== step36: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
print =============== step34:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
......@@ -222,7 +222,7 @@ if $rows != 1 then
return -1
endi
print =============== step37: move follower1
print =============== step36:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql show d1.vgroups
......@@ -233,7 +233,7 @@ if $rows != 1 then
return -1
endi
print =============== step38: move follower2
print =============== step37:
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql show d1.vgroups
......@@ -244,7 +244,7 @@ if $rows != 1 then
return -1
endi
print =============== step39: move follower1
print =============== step38:
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql show d1.vgroups
......@@ -255,6 +255,17 @@ if $rows != 1 then
return -1
endi
print =============== step39:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
sql show d1.tables
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
......
......@@ -91,7 +91,7 @@ if $data(5)[4] != ready then
goto step2
endi
print =============== step31: move follower
print =============== step3: create database
$leaderExist = 0
$leaderVnode = 0
$follower1 = 0
......@@ -144,7 +144,7 @@ if $rows != 1 then
return -1
endi
print =============== step32: move leader
print =============== step40: move leader
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
sql show d1.vgroups
......@@ -155,9 +155,7 @@ if $rows != 1 then
return -1
endi
return
print =============== step33: move follower1
print =============== step41:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql show d1.vgroups
......@@ -168,7 +166,7 @@ if $rows != 1 then
return -1
endi
print =============== step34: move follower2
print =============== step42:
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
sql show d1.vgroups
......@@ -179,7 +177,18 @@ if $rows != 1 then
return -1
endi
print =============== step35: move leader
print =============== step43:
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode $leaderVnode
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode $leaderVnode
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
sql show d1.tables
if $rows != 1 then
return -1
endi
print =============== step44:
print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
sql show d1.vgroups
......@@ -190,7 +199,7 @@ if $rows != 1 then
return -1
endi
print =============== step36: move follower1
print =============== step45:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql show d1.vgroups
......@@ -201,7 +210,7 @@ if $rows != 1 then
return -1
endi
print =============== step37: move follower2
print =============== step46:
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql show d1.vgroups
......@@ -212,7 +221,7 @@ if $rows != 1 then
return -1
endi
print =============== step38: move leader
print =============== step47:
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
sql show d1.vgroups
......@@ -223,7 +232,18 @@ if $rows != 1 then
return -1
endi
print =============== step39: move follower1
print =============== step48:
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
sql show d1.tables
if $rows != 1 then
return -1
endi
print =============== step49:
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql show d1.vgroups
......
......@@ -74,7 +74,7 @@ sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,t
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by tbname interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
......
......@@ -197,22 +197,22 @@ print $switch_loop_cnt
if $switch_loop_cnt == 1 then
sql show vgroups
$dnodeId = $data[1][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 2 then
sql show vgroups
$dnodeId = $data[2][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 3 then
sql show vgroups
$dnodeId = $data[3][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 4 then
sql show vgroups
$dnodeId = $data[4][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
else
......
......@@ -290,22 +290,22 @@ print $switch_loop_cnt
if $switch_loop_cnt == 1 then
sql show vgroups
$dnodeId = $data[1][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 2 then
sql show vgroups
$dnodeId = $data[2][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 3 then
sql show vgroups
$dnodeId = $data[3][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 4 then
sql show vgroups
$dnodeId = $data[4][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
else
......
......@@ -227,22 +227,22 @@ print $switch_loop_cnt
if $switch_loop_cnt == 1 then
sql show vgroups
$dnodeId = $data[1][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 2 then
sql show vgroups
$dnodeId = $data[2][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 3 then
sql show vgroups
$dnodeId = $data[3][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 4 then
sql show vgroups
$dnodeId = $data[4][3]
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
else
......
@echo off
set EXEC_OPTON=start
set DB_NAME=db
set CDB_NAME=db
set /a POLL_DELAY=5
set /a VALGRIND=0
set SIGNAL=SIGINT
set /a SHOW_MSG=0
set /a SHOW_ROW=0
set /a EXP_USE_SNAPSHOT=0
:param
if "%1"=="" (
goto :end
)
if %1 == -d ( set "DB_NAME=%2" && shift && shift && goto :param )
if %1 == -g ( set "SHOW_MSG=%2" && shift && shift && goto :param )
if %1 == -r ( set "SHOW_ROW=%2" && shift && shift && goto :param )
if %1 == -s ( set "EXEC_OPTON=%2" && shift && shift && goto :param )
if %1 == -v ( set "VALGRIND=1" && shift && goto :param )
if %1 == -y ( set "POLL_DELAY=%2" && shift && shift && goto :param )
if %1 == -x ( set "SIGNAL=%2" && shift && shift && goto :param )
if %1 == -w ( set "CDB_NAME=%2" && shift && shift && goto :param )
if %1 == -e ( set "EXP_USE_SNAPSHOT=%2" && shift && shift && goto :param )
echo unkown argument %1
goto :eof
:end
echo EXEC_OPTON %EXEC_OPTON%
echo DB_NAME %DB_NAME%
echo CDB_NAME %CDB_NAME%
echo POLL_DELAY %POLL_DELAY%
echo VALGRIND %VALGRIND%
echo SIGNAL %SIGNAL%
echo SHOW_MSG %SHOW_MSG%
echo SHOW_ROW %SHOW_ROW%
echo EXP_USE_SNAPSHOT %EXP_USE_SNAPSHOT%
echo %cd% | grep community > nul && cd ..\..\.. || cd ..\..
set BUILD_DIR=%cd%\debug\build\bin
set SIM_DIR=%cd%\sim
set PRG_DIR=%SIM_DIR%\tsim
set CFG_DIR=%PRG_DIR%\cfg
set LOG_DIR=%PRG_DIR%\log
set PROGRAM=%BUILD_DIR%\tmq_sim.exe
echo ------------------------------------------------------------------------
echo BUILD_DIR : %BUILD_DIR%
echo SIM_DIR : %SIM_DIR%
echo CFG_DIR : %CFG_DIR%
echo PRG_DIR : %PRG_DIR%
echo CFG_DIR : %CFG_DIR%
echo LOG_DIR : %LOG_DIR%
echo PROGRAM : %PROGRAM%
echo POLL_DELAY : %POLL_DELAY%
echo DB_NAME : %DB_NAME%
echo ------------------------------------------------------------------------
if "%EXEC_OPTON%" == "start" (
echo mintty -h never %PROGRAM% -c %CFG_DIR% -y %POLL_DELAY% -d %DB_NAME% -g %SHOW_MSG% -r %SHOW_ROW% -w %CDB_NAME% -e %EXP_USE_SNAPSHOT%
mintty -h never %PROGRAM% -c %CFG_DIR% -y %POLL_DELAY% -d %DB_NAME% -g %SHOW_MSG% -r %SHOW_ROW% -w %CDB_NAME% -e %EXP_USE_SNAPSHOT%
) else (
if "%SIGNAL%" == "SIGKILL" ( ps | grep tmq_sim | awk '{print $2}' | xargs kill -9 ) else ( ps | grep tmq_sim | awk '{print $2}' | xargs kill -SIGINT )
)
goto :eof
......@@ -6,12 +6,12 @@ rem echo Start TDengine Testing Case ...
set "SCRIPT_DIR=%~dp0"
rem echo SCRIPT_DIR: %SCRIPT_DIR%
set "BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\"
echo %cd% | grep community > nul && set "BUILD_DIR=%SCRIPT_DIR%..\..\..\debug\build\bin\" || set "BUILD_DIR=%SCRIPT_DIR%..\..\debug\build\bin\"
set "TSIM=%BUILD_DIR%tsim"
rem echo BUILD_DIR: %BUILD_DIR%
rem echo TSIM: %TSIM%
set "SIM_DIR=%SCRIPT_DIR%..\..\..\sim\"
echo %cd% | grep community > nul && set "SIM_DIR=%SCRIPT_DIR%..\..\..\sim\" || set "SIM_DIR=%SCRIPT_DIR%..\..\sim\"
rem echo SIM_DIR: %SIM_DIR%
set "TSIM_DIR=%SIM_DIR%tsim\"
......
......@@ -44,13 +44,13 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key
tdLog.info ("taos cmd: %s" % taosCmd)
child = taosExpect.spawn(taosCmd, timeout=3)
child = taosExpect.spawn(taosCmd, timeout=10)
#output = child.readline()
#print (output.decode())
if len(expectString) != 0:
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=6)
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=10)
else:
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=6)
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=10)
if platform.system().lower() == 'windows':
retResult = child.before
......
@REM python3 .\test.py -f 0-others\taosShell.py
@REM python3 .\test.py -f 0-others\taosShellError.py
python3 .\test.py -f 0-others\taosShell.py
python3 .\test.py -f 0-others\taosShellError.py
python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py
@REM python3 .\test.py -f 0-others\udf_create.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py
python3 .\test.py -f 0-others\udf_create.py
python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py
@REM python3 .\test.py -f 0-others\user_control.py
......
......@@ -38,7 +38,9 @@ def checkRunTimeError():
while 1:
time.sleep(1)
timeCount = timeCount + 1
print("checkRunTimeError",timeCount)
if (timeCount>900):
print("stop the test.")
os.system("TASKKILL /F /IM taosd.exe")
os.system("TASKKILL /F /IM taos.exe")
os.system("TASKKILL /F /IM tmq_sim.exe")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册