提交 aaf2bb46 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -30,11 +30,12 @@ extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern int32_t tsStatusInterval;
extern int8_t tsEnableTelemetryReporting;
extern int32_t tsNumOfSupportVnodes;
// common
extern int tsRpcTimer;
extern int tsRpcMaxTime;
extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled
extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled
extern int32_t tsMaxConnections;
extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer;
......@@ -48,14 +49,18 @@ extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[];
extern int64_t tsMaxVnodeQueuedBytes;
extern int tsCompatibleModel; // 2.0 compatible model
//query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node during query processing
extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked
extern int8_t tsKeepOriginalColumnName;
extern int tsCompatibleModel; // 2.0 compatible model
extern int8_t tsEnableSlaveQuery;
extern int8_t tsEnableAdjustMaster;
extern int8_t tsPrintAuth;
extern int64_t tsTickPerDay[3];
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
extern int32_t tsRetrieveBlockingModel; // retrieve threads will be blocked
extern int8_t tsKeepOriginalColumnName;
extern int8_t tsDeadLockKillQuery;
// client
extern int32_t tsMaxSQLStringLen;
......@@ -72,27 +77,17 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the
extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
// balance
extern int8_t tsEnableSlaveQuery;
// interna
extern int8_t tsPrintAuth;
extern char tsVnodeDir[];
extern char tsMnodeDir[];
extern int64_t tsTickPerDay[3];
// system info
extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB;
extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB;
extern float tsUsedDataDirGB;
extern float tsMinimalLogDirGB;
extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB;
extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB;
extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB;
extern float tsUsedDataDirGB;
extern float tsMinimalLogDirGB;
extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB;
extern uint32_t tsVersion;
// build info
......@@ -102,17 +97,13 @@ extern char gitinfo[];
extern char gitinfoOfInternal[];
extern char buildinfo[];
#ifdef TD_TSZ
// lossy
extern char lossyColumns[];
extern double fPrecision;
extern double dPrecision;
extern uint32_t maxRange;
extern uint32_t curRange;
extern char Compressor[];
#endif
// long query
extern int8_t tsDeadLockKillQuery;
// lossy
extern char tsLossyColumns[];
extern double tsFPrecision;
extern double tsDPrecision;
extern uint32_t tsMaxRange;
extern uint32_t tsCurRange;
extern char tsCompressor[];
typedef struct {
char dir[TSDB_FILENAME_LEN];
......
......@@ -308,19 +308,7 @@ typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
int32_t sverson;
uint32_t ttl;
uint32_t keep;
int32_t numOfTags;
int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbInternalMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
} SDropStbInternalMsg;
} SVDropStbReq;
typedef struct {
SMsgHead head;
......@@ -706,8 +694,8 @@ typedef struct {
int64_t clusterId;
int64_t rebootTime;
int64_t updateTime;
int16_t numOfCores;
int16_t numOfSupportVnodes;
int32_t numOfCores;
int32_t numOfSupportVnodes;
char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads;
......@@ -898,17 +886,13 @@ typedef struct {
typedef struct {
int32_t dnodeId;
} SCreateMnodeMsg, SDropMnodeMsg;
} SMCreateMnodeMsg, SMDropMnodeMsg, SDDropMnodeMsg;
typedef struct {
int32_t dnodeId;
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
} SCreateMnodeInMsg, SAlterMnodeInMsg;
typedef struct {
int32_t dnodeId;
} SDropMnodeInMsg;
} SDCreateMnodeMsg, SDAlterMnodeMsg;
typedef struct {
int32_t dnodeId;
......@@ -1251,13 +1235,13 @@ typedef struct {
char* executor;
int32_t sqlLen;
char* sql;
} SCreateTopicInternalMsg;
} SDCreateTopicMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid;
} SDropTopicInternalMsg;
} SDDropTopicMsg;
typedef struct SVCreateTbReq {
uint64_t ver; // use a general definition
......@@ -1300,13 +1284,13 @@ typedef struct SVShowTablesReq {
} SVShowTablesReq;
typedef struct SVShowTablesRsp {
int64_t id;
int32_t id;
STableMetaMsg metaInfo;
} SVShowTablesRsp;
typedef struct SVShowTablesFetchReq {
SMsgHead head;
int64_t id;
int32_t id;
} SVShowTablesFetchReq;
typedef struct SVShowTablesFetchRsp {
......
......@@ -17,6 +17,7 @@
#define TDENGINE_TNAME_H
#include "tdef.h"
#include "tmsg.h"
#define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2
......@@ -58,4 +59,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
int32_t tNameSetAcctId(SName* dst, int32_t acctId);
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name);
#endif // TDENGINE_TNAME_H
......@@ -27,8 +27,8 @@ typedef struct SDnode SDnode;
typedef struct {
int32_t sver;
int16_t numOfCores;
int16_t numOfSupportVnodes;
int32_t numOfCores;
int32_t numOfSupportVnodes;
int16_t numOfCommitThreads;
int8_t enableTelem;
int32_t statusInterval;
......
......@@ -74,7 +74,6 @@ int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo);
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
int32_t getNewResColId();
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
......
......@@ -54,8 +54,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
void qWorkerDestroy(void **qWorkerMgmt);
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
void qWorkerDestroy(void **qWorkerMgmt);
#ifdef __cplusplus
}
......
......@@ -21,7 +21,6 @@ extern "C" {
#endif
extern char tsOsName[];
extern char tsDnodeDir[];
extern char tsDataDir[];
extern char tsLogDir[];
extern char tsScriptDir[];
......
......@@ -121,6 +121,7 @@ typedef struct SRequestObj {
char *msgBuf;
void *pInfo; // sql parse info, generated by parser module
int32_t code;
uint64_t affectedRows;
SQueryExecMetric metric;
SRequestSendRecvBody body;
} SRequestObj;
......
......@@ -18,6 +18,7 @@
#include "clientInt.h"
#include "clientLog.h"
#include "query.h"
#include "scheduler.h"
#include "tmsg.h"
#include "tcache.h"
#include "tconfig.h"
......@@ -230,6 +231,8 @@ void taos_init_imp(void) {
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
catalogInit(&cfg);
SSchedulerCfg scfg = {.maxJobNum = 100};
schedulerInit(&scfg);
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
taosSetCoreDump(true);
......
......@@ -196,7 +196,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
return TSDB_CODE_SUCCESS;
}
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
pRequest->type = pQuery->type;
return qCreateQueryDag(pQuery, pDag);
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
}
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
}
......@@ -283,7 +291,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if (qIsDdlQuery(pQuery)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
}
......@@ -490,7 +498,13 @@ void* doFetchRow(SRequestObj* pRequest) {
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} else {
// do nothing
}
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
......
......@@ -18,23 +18,26 @@
#include "tname.h"
#include "clientInt.h"
#include "clientLog.h"
#include "trpc.h"
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code;
terrno = code;
}
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
pRequest->code = code;
setErrno(pRequest, code);
sem_post(&pRequest->body.rspSem);
return 0;
return code;
}
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
terrno = code;
setErrno(pRequest, code);
sem_post(&pRequest->body.rspSem);
return code;
}
......@@ -74,46 +77,48 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) {
pMsgSendInfo->msgType = TDMT_MND_SHOW_RETRIEVE;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
return TSDB_CODE_SUCCESS;
}
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
buildRetrieveMnodeMsg(pRequest, pMsgSendInfo);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type;
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
SRetrieveTableMsg* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) {
return NULL;
}
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
} else {
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
if (pFetchMsg == NULL) {
return NULL;
}
pFetchMsg->id = htonl(pRequest->body.execId);
pFetchMsg->head.vgId = htonl(13);
pMsgSendInfo->msgInfo.pData = pFetchMsg;
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
}
} else {
assert(pRequest != NULL);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->msgType = pRequest->type;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
}
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo;
}
int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
......@@ -128,6 +133,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
pSchema++;
}
......@@ -154,22 +160,25 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
SRequestObj *pRequest = param;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
tfree(pResInfo->pRspMsg);
SRequestObj* pRequest = param;
// tfree(pRequest->body.resInfo.pRspMsg);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision);
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
tfree(pResInfo->pRspMsg);
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pRetrieve->numOfRows;
pResInfo->pData = pRetrieve->data; // todo fix this in async model
pResInfo->pData = pRetrieve->data;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
......@@ -181,6 +190,40 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
return 0;
}
int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
SRequestObj* pRequest = param;
tfree(pRequest->body.resInfo.pRspMsg);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
pRequest->body.resInfo.pRspMsg = pMsg->pData;
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData;
pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows);
pFetchRsp->precision = htons(pFetchRsp->precision);
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pFetchRsp->numOfRows;
pResInfo->pData = pFetchRsp->data;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
pFetchRsp->completed, pRequest->body.execId);
tsem_post(&pRequest->body.rspSem);
return 0;
}
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list
SRequestObj* pRequest = param;
......@@ -191,34 +234,48 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
SUseDbRsp* pUseDbRsp = (SUseDbRsp*)pMsg->pData;
SName name = {0};
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT | T_NAME_DB);
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData;
SName name = {0};
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);
char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db);
setConnectionDB(pRequest->pTscObj, db);
tsem_post(&pRequest->body.rspSem);
return 0;
}
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL);
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
tsem_post(&pRequest->body.rspSem);
return code;
}
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo: Remove cache in catalog cache.
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
tsem_post(&pRequest->body.rspSem);
return code;
}
void initMsgHandleFp() {
......@@ -304,4 +361,7 @@ void initMsgHandleFp() {
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
}
\ No newline at end of file
......@@ -227,25 +227,25 @@ TEST(testCase, use_db_test) {
}
TEST(testCase, drop_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
showDB(pConn);
TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
showDB(pConn);
pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("create to drop db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// showDB(pConn);
//
// TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// showDB(pConn);
//
// pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("create to drop db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
// taos_close(pConn);
}
TEST(testCase, create_stable_Test) {
......@@ -302,12 +302,12 @@ TEST(testCase, create_ctable_Test) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
taos_close(pConn);
}
......@@ -443,7 +443,23 @@ TEST(testCase, show_table_Test) {
taos_free_result(pRes);
pRes = taos_query(pConn, "show tables");
taos_free_result(pRes);
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
......@@ -15,17 +15,18 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "ulog.h"
#include "tlog.h"
#include "tcompare.h"
#include "tconfig.h"
#include "tep.h"
#include "tglobal.h"
#include "tcompare.h"
#include "tutil.h"
#include "ttimezone.h"
#include "tlocale.h"
#include "tep.h"
#include "tlog.h"
#include "ttimezone.h"
#include "tutil.h"
#include "ulog.h"
// cluster
char tsFirst[TSDB_EP_LEN] = {0};
......@@ -36,22 +37,24 @@ uint16_t tsServerPort = 6030;
int32_t tsStatusInterval = 1; // second
int8_t tsEnableTelemetryReporting = 0;
char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsNumOfSupportVnodes = 16;
// common
int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default
int32_t tsMaxShellConns = 50000;
int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsRpcForceTcp = 1; // disable this, means query, show command use udp protocol as default
int32_t tsMaxShellConns = 50000;
int32_t tsMaxConnections = 5000;
int32_t tsShellActivityTimer = 3; // second
int32_t tsShellActivityTimer = 3; // second
float tsNumOfThreadsPerCore = 1.0f;
int32_t tsNumOfCommitThreads = 4;
float tsRatioOfQueryCores = 1.0f;
int8_t tsDaylight = 0;
int8_t tsDaylight = 0;
int8_t tsEnableCoreFile = 0;
int32_t tsMaxBinaryDisplayWidth = 30;
int64_t tsMaxVnodeQueuedBytes = 1024*1024*1024; //1GB
int8_t tsEnableSlaveQuery = 1;
int8_t tsEnableAdjustMaster = 1;
int8_t tsPrintAuth = 0;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
......@@ -79,7 +82,7 @@ int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;
int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN;
int8_t tsTscEnableRecordSql = 0;
int8_t tsTscEnableRecordSql = 0;
// the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp
......@@ -89,7 +92,7 @@ int32_t tsMaxNumOfOrderedResults = 100000;
int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 us for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
......@@ -101,7 +104,7 @@ int32_t tsMaxStreamComputDelay = 20000;
int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10*1000;
int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
......@@ -120,30 +123,16 @@ int64_t tsQueryBufferSizeBytes = -1;
int32_t tsRetrieveBlockingModel = 0;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
int8_t tsKeepOriginalColumnName = 0;
int8_t tsKeepOriginalColumnName = 0;
// long query death-lock
int8_t tsDeadLockKillQuery = 0;
// tsdb config
// tsdb config
// For backward compatibility
bool tsdbForceKeepFile = false;
// balance
int8_t tsEnableFlowCtrl = 1;
int8_t tsEnableSlaveQuery = 1;
int8_t tsEnableAdjustMaster = 1;
// monitor
char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey";
// internal
int8_t tsCompactMnodeWal = 0;
int8_t tsPrintAuth = 0;
char tsVnodeDir[PATH_MAX] = {0};
char tsDnodeDir[PATH_MAX] = {0};
char tsMnodeDir[PATH_MAX] = {0};
int32_t tsDiskCfgNum = 0;
int32_t tsDiskCfgNum = 0;
#ifndef _STORAGE
SDiskCfg tsDiskCfg[1];
......@@ -160,31 +149,28 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS];
int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L};
// system info
float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0;
float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0;
float tsUsedDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 2.0f;
int32_t tsTotalMemoryMB = 0;
float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0;
float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0;
float tsUsedDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 2.0f;
int32_t tsTotalMemoryMB = 0;
uint32_t tsVersion = 0;
#ifdef TD_TSZ
//
// lossy compress 6
//
char lossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty can close lossy compress.
// below option can take effect when tsLossyColumns not empty
double fPrecision = 1E-8; // float column precision
double dPrecision = 1E-16; // double column precision
uint32_t maxRange = 500; // max range
uint32_t curRange = 100; // range
char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif
char tsLossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty
// can close lossy compress.
// below option can take effect when tsLossyColumns not empty
double tsFPrecision = 1E-8; // float column precision
double tsDPrecision = 1E-16; // double column precision
uint32_t tsMaxRange = 500; // max range
uint32_t tsCurRange = 100; // range
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// long query death-lock
int8_t tsDeadLockKillQuery = 0;
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
......@@ -195,13 +181,12 @@ char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"};
static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT;
void taosSetAllDebugFlag() {
if (debugFlag != 0) {
if (debugFlag != 0) {
mDebugFlag = debugFlag;
dDebugFlag = debugFlag;
vDebugFlag = debugFlag;
jniDebugFlag = debugFlag;
odbcDebugFlag = debugFlag;
qDebugFlag = debugFlag;
qDebugFlag = debugFlag;
rpcDebugFlag = debugFlag;
uDebugFlag = debugFlag;
sDebugFlag = debugFlag;
......@@ -213,12 +198,12 @@ void taosSetAllDebugFlag() {
}
int32_t taosCfgDynamicOptions(char *msg) {
char *option, *value;
int32_t olen, vlen;
int32_t vint = 0;
char *option, *value;
int32_t olen, vlen;
int32_t vint = 0;
paGetToken(msg, &option, &olen);
if (olen == 0) return -1;;
if (olen == 0) return -1;
paGetToken(option + olen + 1, &value, &vlen);
if (vlen == 0)
......@@ -231,9 +216,9 @@ int32_t taosCfgDynamicOptions(char *msg) {
for (int32_t i = 0; i < tsGlobalConfigNum; ++i) {
SGlobalCfg *cfg = tsGlobalConfig + i;
//if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
// if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
if (cfg->valType != TAOS_CFG_VTYPE_INT32 && cfg->valType != TAOS_CFG_VTYPE_INT8) continue;
int32_t cfgLen = (int32_t)strlen(cfg->option);
if (cfgLen != olen) continue;
if (strncasecmp(option, cfg->option, olen) != 0) continue;
......@@ -262,7 +247,7 @@ int32_t taosCfgDynamicOptions(char *msg) {
return 0;
}
if (strncasecmp(cfg->option, "debugFlag", olen) == 0) {
taosSetAllDebugFlag();
taosSetAllDebugFlag();
}
return 0;
}
......@@ -323,7 +308,7 @@ static void doInitGlobalConfig(void) {
srand(taosSafeRand());
SGlobalCfg cfg = {0};
// ip address
cfg.option = "firstEp";
cfg.ptr = tsFirst;
......@@ -366,6 +351,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "supportVnodes";
cfg.ptr = &tsNumOfSupportVnodes;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 65536;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
// directory
cfg.option = "configDir";
cfg.ptr = configDir;
......@@ -442,8 +437,8 @@ static void doInitGlobalConfig(void) {
cfg.ptr = &tsMaxNumOfDistinctResults;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 10*10000;
cfg.maxValue = 10000*10000;
cfg.minValue = 10 * 10000;
cfg.maxValue = 10000 * 10000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
......@@ -749,17 +744,6 @@ static void doInitGlobalConfig(void) {
cfg.maxValue = 10000000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_GB;
taosAddConfigOption(cfg);
// module configs
cfg.option = "flowctrl";
cfg.ptr = &tsEnableFlowCtrl;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "slaveQuery";
......@@ -893,16 +877,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "odbcDebugFlag";
cfg.ptr = &odbcDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "uDebugFlag";
cfg.ptr = &uDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......@@ -1034,7 +1008,7 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
// enable kill long query
// enable kill long query
cfg.option = "deadLockKillQuery";
cfg.ptr = &tsDeadLockKillQuery;
cfg.valType = TAOS_CFG_VTYPE_INT8;
......@@ -1066,7 +1040,6 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "dPrecision";
......@@ -1100,23 +1073,20 @@ static void doInitGlobalConfig(void) {
taosAddConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
#else
//assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
// assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
#endif
}
void taosInitGlobalCfg() {
pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig);
}
void taosInitGlobalCfg() { pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); }
int32_t taosCheckAndPrintCfg() {
char fqdn[TSDB_FQDN_LEN];
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) {
taosSetAllDebugFlag();
}
if (tsLocalFqdn[0] == 0) {
taosGetFqdn(tsLocalFqdn);
}
......@@ -1143,7 +1113,7 @@ int32_t taosCheckAndPrintCfg() {
if (taosDirExist(tsTempDir) != 0) {
return -1;
}
taosGetSystemInfo();
tsSetLocale();
......
......@@ -259,3 +259,13 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return 0;
}
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) {
SSchema s = {0};
s.type = type;
s.bytes = bytes;
s.colId = colId;
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
\ No newline at end of file
......@@ -139,7 +139,7 @@ void dmnWaitSignal() {
void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportVnodes = 16;
pOption->numOfSupportVnodes = tsNumOfSupportVnodes;
pOption->numOfCommitThreads = 1;
pOption->statusInterval = tsStatusInterval;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
......
......@@ -370,8 +370,8 @@ void dndSendStatusMsg(SDnode *pDnode) {
pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->rebootTime = htobe64(pMgmt->rebootTime);
pStatus->updateTime = htobe64(pMgmt->updateTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfSupportVnodes);
pStatus->numOfCores = htonl(pDnode->opt.numOfCores);
pStatus->numOfSupportVnodes = htonl(pDnode->opt.numOfSupportVnodes);
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
......@@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId);
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
......@@ -440,19 +440,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
}
SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg);
if (pMsg->pCont != NULL && pMsg->contLen != 0) {
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg);
SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
}
SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
dndUpdateDnodeEps(pDnode, pDnodeEps);
}
dndUpdateDnodeEps(pDnode, pDnodeEps);
pMgmt->statusSent = 0;
}
......
......@@ -299,7 +299,7 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) {
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) {
dndInitMnodeOption(pDnode, pOption);
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
......@@ -417,8 +417,8 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return 0;
}
static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
SCreateMnodeInMsg *pMsg = pRpcMsg->pCont;
static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
SDCreateMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
for (int32_t i = 0; i < pMsg->replica; ++i) {
pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
......@@ -429,7 +429,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
}
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
......@@ -445,7 +445,7 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
......@@ -465,7 +465,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
SDDropMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
......
......@@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
......@@ -141,6 +141,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
......@@ -383,6 +385,7 @@ void dndCleanupTrans(SDnode *pDnode) {
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc == NULL) return;
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
}
......
......@@ -57,7 +57,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) {
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "max_vnodes");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "support_vnodes");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason");
......@@ -68,7 +68,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) {
CheckInt16(1);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(1);
CheckInt16(16);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
......@@ -112,8 +112,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
CheckBinary("localhost:9042", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
......@@ -140,7 +140,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
CheckInt16(1);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(1);
CheckInt16(16);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
......@@ -199,10 +199,10 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
......@@ -242,10 +242,10 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
......
......@@ -72,9 +72,9 @@ TEST_F(DndTestMnode, 01_ShowDnode) {
TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SCreateMnodeMsg);
int32_t contLen = sizeof(SMCreateMnodeMsg);
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -85,9 +85,9 @@ TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SCreateMnodeMsg);
int32_t contLen = sizeof(SMCreateMnodeMsg);
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -117,9 +117,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
{
// create mnode
int32_t contLen = sizeof(SCreateMnodeMsg);
int32_t contLen = sizeof(SMCreateMnodeMsg);
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -144,9 +144,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
{
// drop mnode
int32_t contLen = sizeof(SDropMnodeMsg);
int32_t contLen = sizeof(SMDropMnodeMsg);
SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen);
SMDropMnodeMsg* pReq = (SMDropMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_MNODE, pReq, contLen);
......
......@@ -146,8 +146,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pSchema->bytes = htonl(pSchema->bytes);
}
EXPECT_STREQ(pRsp->tbFname, "");
EXPECT_STREQ(pRsp->stbFname, "1.d1.stb");
EXPECT_STREQ(pRsp->tbFname, "1.d1.stb");
EXPECT_STREQ(pRsp->stbFname, "");
EXPECT_EQ(pRsp->numOfColumns, 2);
EXPECT_EQ(pRsp->numOfTags, 3);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
......
......@@ -24,7 +24,6 @@ void Testbase::InitLog(const char* path) {
tmrDebugFlag = 0;
uDebugFlag = 143;
rpcDebugFlag = 0;
odbcDebugFlag = 0;
qDebugFlag = 0;
wDebugFlag = 0;
sDebugFlag = 0;
......
......@@ -26,7 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p
SDnodeOpt option = {0};
option.sver = 1;
option.numOfCores = 1;
option.numOfSupportVnodes = 1;
option.numOfSupportVnodes = 16;
option.numOfCommitThreads = 1;
option.statusInterval = 1;
option.numOfThreadsPerCore = 1;
......
......@@ -74,13 +74,6 @@ typedef enum {
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
typedef enum {
DND_STATUS_OFFLINE = 0,
DND_STATUS_READY = 1,
DND_STATUS_CREATING = 2,
DND_STATUS_DROPPING = 3
} EDndStatus;
typedef enum {
DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT,
......@@ -125,9 +118,8 @@ typedef struct {
int64_t lastAccessTime;
int32_t accessTimes;
int16_t numOfVnodes;
int16_t numOfSupportVnodes;
int16_t numOfCores;
EDndStatus status;
int32_t numOfSupportVnodes;
int32_t numOfCores;
EDndReason offlineReason;
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
......
......@@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode);
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs);
#ifdef __cplusplus
}
......
......@@ -40,8 +40,6 @@ static const char *offlineReason[] = {
"unknown",
};
static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"};
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
......@@ -208,10 +206,12 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
return sdbGetSize(pSdb, SDB_DNODE);
}
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) {
int64_t ms = taosGetTimestampMs();
int64_t interval = ABS(pDnode->lastAccessTime - ms);
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
int64_t interval = ABS(pDnode->lastAccessTime - curMs);
if (interval > 3500 * pMnode->cfg.statusInterval) {
if (pDnode->rebootTime > 0) {
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
}
return false;
}
return true;
......@@ -278,8 +278,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->updateTime = htobe64(pStatus->updateTime);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes);
pStatus->numOfCores = htonl(pStatus->numOfCores);
pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
}
......@@ -287,96 +287,99 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
SDnodeObj *pDnode = NULL;
int32_t code = -1;
mndParseStatusMsg(pStatus);
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", pStatus->dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
goto PROCESS_STATUS_MSG_OVER;
}
} else {
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId);
if (pDnode == NULL) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
}
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
goto PROCESS_STATUS_MSG_OVER;
}
}
if (pStatus->sver != pMnode->cfg.sver) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
}
mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
return -1;
}
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime);
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else {
if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
if (needCheckCfg) {
if (pStatus->sver != pMnode->cfg.sver) {
if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
}
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode);
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID;
return -1;
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
goto PROCESS_STATUS_MSG_OVER;
}
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else {
pDnode->accessTimes++;
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
}
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
pMnode->clusterId);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
goto PROCESS_STATUS_MSG_OVER;
} else {
pDnode->accessTimes++;
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
}
}
}
if (pDnode->status == DND_STATUS_OFFLINE) {
// Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg);
if (0 != ret) {
pDnode->offlineReason = ret;
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
return -1;
goto PROCESS_STATUS_MSG_OVER;
}
mInfo("dnode:%d, from offline to online", pDnode->id);
}
pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
pDnode->lastAccessTime = taosGetTimestampMs();
pDnode->status = DND_STATUS_READY;
pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
int32_t numOfEps = mndGetDnodeSize(pMnode);
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
SStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
int32_t numOfEps = mndGetDnodeSize(pMnode);
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
SStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto PROCESS_STATUS_MSG_OVER;
}
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->contLen = contLen;
pMsg->pCont = pRsp;
}
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pDnode->lastAccessTime = curMs;
code = 0;
pMsg->contLen = contLen;
pMsg->pCont = pRsp;
PROCESS_STATUS_MSG_OVER:
mndReleaseDnode(pMnode, pDnode);
return 0;
return code;
}
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
......@@ -638,7 +641,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "max_vnodes");
strcpy(pSchema[cols].name, "support_vnodes");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
......@@ -682,10 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
int32_t cols = 0;
SDnodeObj *pDnode = NULL;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode);
if (pShow->pIter == NULL) break;
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
cols = 0;
......@@ -706,8 +711,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
const char *status = dnodeStatus[pDnode->status];
STR_TO_VARSTR(pWrite, status);
STR_TO_VARSTR(pWrite, online ? "ready" : "offline");
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -715,11 +719,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pDnode->status == DND_STATUS_READY) {
STR_TO_VARSTR(pWrite, "");
} else {
STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]);
}
STR_TO_VARSTR(pWrite, online ? "" : offlineReason[pDnode->offlineReason]);
cols++;
numOfRows++;
......
......@@ -251,7 +251,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
void *pIter = NULL;
int32_t numOfReplicas = 0;
SCreateMnodeInMsg createMsg = {0};
SDCreateMnodeMsg createMsg = {0};
while (1) {
SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
......@@ -281,18 +281,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0};
SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg));
SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg));
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj);
return -1;
}
memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg));
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg));
pMsg->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SAlterMnodeInMsg);
action.contLen = sizeof(SDAlterMnodeMsg);
action.msgType = TDMT_DND_ALTER_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......@@ -309,14 +309,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg));
SDCreateMnodeMsg *pMsg = malloc(sizeof(SDCreateMnodeMsg));
if (pMsg == NULL) return -1;
memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg));
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg));
pMsg->dnodeId = htonl(pObj->id);
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SCreateMnodeInMsg);
action.contLen = sizeof(SDCreateMnodeMsg);
action.msgType = TDMT_DND_CREATE_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
......@@ -327,7 +327,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
return 0;
}
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) {
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeMsg *pCreate) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = pDnode->id;
mnodeObj.createdTime = taosGetTimestampMs();
......@@ -370,7 +370,7 @@ CREATE_MNODE_OVER:
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont;
SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
......@@ -423,7 +423,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
void *pIter = NULL;
int32_t numOfReplicas = 0;
SAlterMnodeInMsg alterMsg = {0};
SDAlterMnodeMsg alterMsg = {0};
while (1) {
SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
......@@ -449,18 +449,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
if (pMObj->id != pObj->id) {
STransAction action = {0};
SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg));
SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg));
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj);
return -1;
}
memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg));
memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeMsg));
pMsg->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SAlterMnodeInMsg);
action.contLen = sizeof(SDAlterMnodeMsg);
action.msgType = TDMT_DND_ALTER_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......@@ -478,7 +478,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg));
SDDropMnodeMsg *pMsg = malloc(sizeof(SDDropMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -487,7 +487,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDropMnodeInMsg);
action.contLen = sizeof(SDDropMnodeMsg);
action.msgType = TDMT_DND_DROP_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
......@@ -537,7 +537,7 @@ DROP_MNODE_OVER:
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont;
SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
mDebug("mnode:%d, start to drop", pDrop->dnodeId);
......
......@@ -201,7 +201,6 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
}
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
#if 1
SVCreateTbReq req;
void * buf;
int bsize;
......@@ -235,43 +234,12 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
*pContLen = sizeof(SMsgHead) + bsize;
return buf;
#else
int32_t totalCols = pStb->numOfTags + pStb->numOfColumns;
int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg);
SCreateStbInternalMsg *pCreate = calloc(1, contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pCreate->head.contLen = htonl(contLen);
pCreate->head.vgId = htonl(pVgroup->vgId);
memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pCreate->suid = htobe64(pStb->uid);
pCreate->sverson = htonl(pStb->version);
pCreate->ttl = 0;
pCreate->keep = 0;
pCreate->numOfTags = htonl(pStb->numOfTags);
pCreate->numOfColumns = htonl(pStb->numOfColumns);
memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema));
for (int32_t t = 0; t < totalCols; ++t) {
SSchema *pSchema = &pCreate->pSchema[t];
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
}
*pContLen = contLen;
return pCreate;
#endif
}
static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SDropStbInternalMsg);
static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropStbReq);
SDropStbInternalMsg *pDrop = calloc(1, contLen);
SVDropStbReq *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -402,7 +370,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
......@@ -413,7 +381,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg;
action.contLen = sizeof(SDropStbInternalMsg);
action.contLen = sizeof(SVDropStbReq);
action.msgType = TDMT_VND_DROP_STB;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
......
......@@ -162,10 +162,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db);
}
static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) {
int32_t contLen = sizeof(SDropTopicInternalMsg);
static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicMsg);
SDropTopicInternalMsg *pDrop = calloc(1, contLen);
SDDropTopicMsg *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......
......@@ -120,6 +120,9 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId)
if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER;
}
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)
......@@ -257,13 +260,14 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) {
pDnode->numOfVnodes++;
}
bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode);
if (isReady) {
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
if (online) {
taosArrayPush(pArray, pDnode);
}
mDebug("dnode:%d, numOfVnodes:%d numOfSupportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, isReady);
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, online);
sdbRelease(pSdb, pDnode);
}
......@@ -330,6 +334,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
uint32_t hashMax = UINT32_MAX;
uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
if (maxVgId < 2) maxVgId = 2;
for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
SVgObj *pVgroup = &pVgroups[v];
pVgroup->vgId = maxVgId++;
......
......@@ -36,12 +36,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_SHOW_TABLES:
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_SHOW_TABLES_FETCH:
return qWorkerProcessShowFetchMsg(pVnode, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
break;
}
return 0;
}
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
......@@ -90,7 +90,6 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf,
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
pShowMsg->type = pShowInfo->showType;
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type > 0) { // only show tables support wildcard query
......
......@@ -18,7 +18,7 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
}
static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen,
SMsgBuf* pMsgBuf) {
SEpSet* pEpSet, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid name";
const char* msg2 = "wildcard string should be less than %d characters";
const char* msg3 = "database name too long";
......@@ -31,57 +31,69 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
* wildcard in like clause in pInfo->pMiscInfo->a[1]
*/
int16_t showType = pShowInfo->showType;
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
SToken* pDbPrefixToken = &pShowInfo->prefix;
if (pDbPrefixToken->type != 0) {
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (showType == TSDB_MGMT_TABLE_TABLE) {
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
*pEpSet = pCtx->mgmtEpSet;
// catalogGetDBVgroupVersion()
pShowReq->head.vgId = htonl(13);
*outputLen = sizeof(SVShowTablesReq);
*output = pShowReq;
} else {
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
SToken* pDbPrefixToken = &pShowInfo->prefix;
if (pDbPrefixToken->type != 0) {
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (pDbPrefixToken->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg5);
}
if (pDbPrefixToken->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg5);
}
if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken);
// if (ret != TSDB_CODE_SUCCESS) {
// return buildInvalidOperationMsg(pMsgBuf, msg1);
// }
}
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken);
// if (ret != TSDB_CODE_SUCCESS) {
// return buildInvalidOperationMsg(pMsgBuf, msg1);
// }
}
// show table/stable like 'xxxx', set the like pattern for show tables
SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type != 0) {
if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
// show table/stable like 'xxxx', set the like pattern for show tables
SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type != 0) {
if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
pPattern->n = strdequote(pPattern->z);
if (pPattern->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg6);
}
pPattern->n = strdequote(pPattern->z);
if (pPattern->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg6);
if (pPattern->n > tsMaxWildCardsLen) {
char tmp[64] = {0};
sprintf(tmp, msg2, tsMaxWildCardsLen);
return buildInvalidOperationMsg(pMsgBuf, tmp);
}
}
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
if (pShowInfo->prefix.type == 0) {
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
}
if (pPattern->n > tsMaxWildCardsLen) {
char tmp[64] = {0};
sprintf(tmp, msg2, tsMaxWildCardsLen);
return buildInvalidOperationMsg(pMsgBuf, tmp);
if (pShowInfo->prefix.type == TK_STRING) {
pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z);
}
}
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
if (pShowInfo->prefix.type == 0) {
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
}
if (pShowInfo->prefix.type == TK_STRING) {
pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z);
}
*pEpSet = pCtx->mgmtEpSet;
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
*outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
}
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
*outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
return TSDB_CODE_SUCCESS;
}
......@@ -608,8 +620,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
}
case TSDB_SQL_SHOW: {
code = setShowInfo(&pInfo->pMiscInfo->showOpt, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, pMsgBuf);
pDcl->msgType = TDMT_MND_SHOW;
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, pMsgBuf);
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW;
break;
}
......
......@@ -64,64 +64,6 @@ typedef struct SInsertParseContext {
SInsertStmtInfo* pOutput;
} SInsertParseContext;
static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
// not a valid integer number, return error
if ((*endPtr - pToken->z) != pToken->n) {
return TK_ILLEGAL;
}
return pToken->type;
}
static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, bool issigned) {
errno = 0;
int32_t ret = 0;
char* endPtr = NULL;
if (type == TK_FLOAT) {
double v = strtod(z, &endPtr);
if ((errno == ERANGE && v == HUGE_VALF) || isinf(v) || isnan(v)) {
ret = -1;
} else if ((issigned && (v < INT64_MIN || v > INT64_MAX)) || ((!issigned) && (v < 0 || v > UINT64_MAX))) {
ret = -1;
} else {
*value = (int64_t) round(v);
}
errno = 0;
return ret;
}
int32_t radix = 10;
if (type == TK_HEX) {
radix = 16;
} else if (type == TK_BIN) {
radix = 2;
}
// the string may be overflow according to errno
if (!issigned) {
const char *p = z;
while(*p != 0 && *p == ' ') p++;
if (*p != 0 && *p == '-') { return -1;}
*value = strtoull(z, &endPtr, radix);
} else {
*value = strtoll(z, &endPtr, radix);
}
// not a valid integer number, return error
if (endPtr - z != n || errno == ERANGE) {
ret = -1;
}
errno = 0;
return ret;
}
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
......@@ -159,10 +101,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&name, tableName);
SParseBasicCtx* pBasicCtx = &pCxt->pComCxt->ctx;
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
SVgroupInfo vg;
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
......@@ -349,207 +289,6 @@ static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *p
return TSDB_CODE_SUCCESS;
}
//static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
// if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
// type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
// (pToken->n == 0) || (type == TK_RP)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
// }
//
// if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
// }
//
// // Remove quotation marks
// if (TK_STRING == type) {
// if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
// return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
// }
//
// // delete escape character: \\, \', \"
// char delim = pToken->z[0];
// int32_t cnt = 0;
// int32_t j = 0;
// for (uint32_t k = 1; k < pToken->n - 1; ++k) {
// if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
// tmpTokenBuf[j] = pToken->z[k + 1];
// cnt++;
// j++;
// k++;
// continue;
// }
// tmpTokenBuf[j] = pToken->z[k];
// j++;
// }
//
// tmpTokenBuf[j] = 0;
// pToken->z = tmpTokenBuf;
// pToken->n -= 2 + cnt;
// }
//
// return TSDB_CODE_SUCCESS;
//}
//static FORCE_INLINE int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
// int64_t iv;
// char *endptr = NULL;
// bool isSigned = false;
//
// CHECK_CODE(checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf));
//
// if (isNullStr(pToken)) {
// if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
// int64_t tmpVal = 0;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// return func(getNullValue(pSchema->type), 0, param);
// }
//
// switch (pSchema->type) {
// case TSDB_DATA_TYPE_BOOL: {
// if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
// if (strncmp(pToken->z, "true", pToken->n) == 0) {
// return func(&TRUE_VALUE, pSchema->bytes, param);
// } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
// return func(&FALSE_VALUE, pSchema->bytes, param);
// } else {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
// }
// } else if (pToken->type == TK_INTEGER) {
// return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
// } else if (pToken->type == TK_FLOAT) {
// return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
// } else {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
// }
// }
//
// case TSDB_DATA_TYPE_TINYINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
// } else if (!IS_VALID_TINYINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
// }
//
// uint8_t tmpVal = (uint8_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UTINYINT:{
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
// } else if (!IS_VALID_UTINYINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
// }
// uint8_t tmpVal = (uint8_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_SMALLINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
// } else if (!IS_VALID_SMALLINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
// }
// int16_t tmpVal = (int16_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_USMALLINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
// } else if (!IS_VALID_USMALLINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
// }
// uint16_t tmpVal = (uint16_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_INT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
// } else if (!IS_VALID_INT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
// }
// int32_t tmpVal = (int32_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
// } else if (!IS_VALID_UINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
// }
// uint32_t tmpVal = (uint32_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_BIGINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
// } else if (!IS_VALID_BIGINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
// }
// return func(&iv, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UBIGINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
// } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
// }
// uint64_t tmpVal = (uint64_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_FLOAT: {
// double dv;
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
// }
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
// }
// float tmpVal = (float)dv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_DOUBLE: {
// double dv;
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
// }
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
// }
// return func(&dv, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_BINARY: {
// // too long values will return invalid sql, not be truncated automatically
// if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
// return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
// }
// return func(pToken->z, pToken->n, param);
// }
// case TSDB_DATA_TYPE_NCHAR: {
// return func(pToken->z, pToken->n, param);
// }
// case TSDB_DATA_TYPE_TIMESTAMP: {
// int64_t tmpVal;
// if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
// }
// return func(&tmpVal, pSchema->bytes, param);
// }
// }
//
// return TSDB_CODE_FAILED;
//}
// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
int32_t nCols = pColList->numOfCols;
......
......@@ -563,16 +563,6 @@ TAOS_FIELD createField(const SSchema* pSchema) {
return f;
}
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name) {
SSchema s = {0};
s.type = type;
s.bytes = bytes;
s.colId = colId;
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) {
pColumn->uid = uid;
pColumn->flag = flag;
......@@ -1649,9 +1639,9 @@ static bool isNullStr(SToken *pToken) {
}
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(pToken->n == 0) || (type == TK_RP)) {
if ((pToken->type != TK_NOW && pToken->type != TK_INTEGER && pToken->type != TK_STRING && pToken->type != TK_FLOAT && pToken->type != TK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_HEX && pToken->type != TK_OCT && pToken->type != TK_BIN) ||
(pToken->n == 0) || (pToken->type == TK_RP)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
}
......@@ -1795,7 +1785,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
......@@ -1806,7 +1796,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_UTINYINT:{
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
......@@ -1816,7 +1806,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
......@@ -1826,7 +1816,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
......@@ -1836,7 +1826,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
......@@ -1846,7 +1836,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
......@@ -1856,7 +1846,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
......@@ -1865,7 +1855,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
......
......@@ -97,8 +97,8 @@ public:
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
std::unique_ptr<STableMeta> table;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(pTableName, db);
const char* tname = tNameGetTableName(pTableName);
int32_t code = copyTableSchemaMeta(db, tname, &table);
......@@ -111,6 +111,7 @@ public:
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
// todo
vgInfo->vgId = 1;
return 0;
}
......
......@@ -207,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
}
taosArrayPush(currentLevel, &subplan);
pCxt->pCurrentSubplan = subplan;
++(pCxt->pDag->numOfSubplans);
return subplan;
}
......@@ -293,11 +294,14 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
size_t numOfVg = taosArrayGetSize(vgs);
for (int32_t i = 0; i < numOfVg; ++i) {
STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
subplan->pNode = NULL;
subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->type = QUERY_TYPE_MODIFY;
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
}
......
#include "tmsg.h"
#include "query.h"
#include "qworker.h"
#include "qworkerInt.h"
#include "tname.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "tmsg.h"
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
......@@ -634,7 +635,6 @@ int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code;
......@@ -665,11 +665,68 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
int32_t numOfCols = 6;
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols);
int32_t cols = 0;
SSchema *pSchema = pRsp->metaInfo.pSchema;
const SSchema *s = tGetTbnameColumnSchema();
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "name");
pSchema++;
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "created");
pSchema++;
type = TSDB_DATA_TYPE_SMALLINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "columns");
pSchema++;
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "stable");
pSchema++;
type = TSDB_DATA_TYPE_BIGINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "uid");
pSchema++;
type = TSDB_DATA_TYPE_INT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "vgId");
pRsp->metaInfo.numOfColumns = htonl(cols);
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id);
pRsp->numOfRows = 0;
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
SQWSchStatus *sch = NULL;
......@@ -801,7 +858,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
......@@ -911,7 +967,6 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) {
......@@ -1157,6 +1212,25 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
}
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
}
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
......@@ -1182,7 +1256,6 @@ int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_RET(code);
}
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
return;
......
......@@ -166,7 +166,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
}
for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGet(levelPlans, n);
SSubplan *plan = taosArrayGetP(levelPlans, n);
SSchTask task = {0};
if (plan->type == QUERY_TYPE_MODIFY) {
......
......@@ -85,7 +85,6 @@ int32_t dDebugFlag = 135;
int32_t vDebugFlag = 135;
int32_t cDebugFlag = 131;
int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131;
int32_t qDebugFlag = 131;
int32_t rpcDebugFlag = 131;
int32_t uDebugFlag = 131;
......
......@@ -154,9 +154,9 @@ void acquireRleaseTest() {
int32_t code = 0;
int32_t num = 0;
TESTSTRUCT data = {0};
char *str1 = "abcdefg";
char *str2 = "aaaaaaa";
char *str3 = "123456789";
const char *str1 = "abcdefg";
const char *str2 = "aaaaaaa";
const char *str3 = "123456789";
data.p = (char *)malloc(10);
strcpy(data.p, str1);
......
......@@ -120,6 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG
echo "fqdn ${HOSTNAME}" >> $TAOS_CFG
echo "serverPort ${NODE}" >> $TAOS_CFG
echo "supportVnodes 16" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
......
system sh/stop_dnodes.sh
############## config parameter #####################
$node1 = 192.168.0.201
$node1 = 192.168.0.201
$node2 = 192.168.0.202
$node3 = 192.168.0.203
$node4 = 192.168.0.204
......@@ -10,54 +7,75 @@ $node4 = 192.168.0.204
$self = $node1
$num = 25
############### deploy firstEp #####################
#deploy = 0, start = 1, stop = 2
$option = 0
print =============== option:$option
############### stop dnodes #####################
if $option == 0 then
system sh/stop_dnodes.sh
endi
############### process firstEp #####################
$firstEp = $node1 . :7100
$firstPort = 7100
if $self == $node1 then
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp
system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp
system sh/cfg.sh -n dnode1 -c fqdn -v $node1
system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort
system sh/exec.sh -n dnode1 -s start
sql connect
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node1 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node2 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node3 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node4 port $port
endw
if $option == 1 then
system sh/exec.sh -n dnode1 -s start
endi
if $option == 2 then
system sh/exec.sh -n dnode1 -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp
system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp
system sh/cfg.sh -n dnode1 -c fqdn -v $node1
system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
sql connect
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node1 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node2 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node3 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node4 port $port
endw
endi
endi
############### deploy nodes #####################
############### process nodes #####################
$i = 0
while $i < $num
......@@ -67,11 +85,21 @@ while $i < $num
$dnodename = dnode . $index
$i = $i + 1
system sh/deploy.sh -n $dnodename -i 1
system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp
system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp
system sh/cfg.sh -n $dnodename -c fqdn -v $self
system sh/cfg.sh -n $dnodename -c serverPort -v $port
if $option == 1 then
system sh/exec.sh -n $dnodename -s start
endi
if $option == 2 then
system sh/exec.sh -n $dnodename -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n $dnodename -i 1
system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp
system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp
system sh/cfg.sh -n $dnodename -c fqdn -v $self
system sh/cfg.sh -n $dnodename -c serverPort -v $port
system sh/exec.sh -n $dnodename -s start
system sh/exec.sh -n $dnodename -s start
endi
endw
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册