diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 8132eab539426d06e36fa9f8407b7060caaf4aa1..fd0ada95d9043d143fe6eca3d295a18cc3b19849 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -18,7 +18,8 @@ "ms-vscode.cpptools", "ms-vscode.cmake-tools", "austin.code-gnu-global", - "visualstudioexptteam.vscodeintel" + "visualstudioexptteam.vscodeintel", + "eamodio.gitlens" ], // Use 'forwardPorts' to make a list of ports inside the container available locally. diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 42956b6bdde2b27f3921817b90a1cc871ca0a8dd..e8a56b77c95dacc3c457f84e1b5c2784e8c1043e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -30,11 +30,12 @@ extern char tsLocalEp[]; extern uint16_t tsServerPort; extern int32_t tsStatusInterval; extern int8_t tsEnableTelemetryReporting; +extern int32_t tsNumOfSupportVnodes; // common extern int tsRpcTimer; extern int tsRpcMaxTime; -extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled +extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled extern int32_t tsMaxConnections; extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; @@ -48,14 +49,18 @@ extern int32_t tsCompressMsgSize; extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; -extern int64_t tsMaxVnodeQueuedBytes; -extern int tsCompatibleModel; // 2.0 compatible model - -//query buffer management -extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing -extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node during query processing -extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked -extern int8_t tsKeepOriginalColumnName; +extern int tsCompatibleModel; // 2.0 compatible model +extern int8_t tsEnableSlaveQuery; +extern int8_t tsEnableAdjustMaster; +extern int8_t tsPrintAuth; +extern int64_t tsTickPerDay[3]; + +// query buffer management +extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing +extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node +extern int32_t tsRetrieveBlockingModel; // retrieve threads will be blocked +extern int8_t tsKeepOriginalColumnName; +extern int8_t tsDeadLockKillQuery; // client extern int32_t tsMaxSQLStringLen; @@ -72,27 +77,17 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the extern int32_t tsProjectExecInterval; extern int64_t tsMaxRetentWindow; -// balance -extern int8_t tsEnableSlaveQuery; - - -// interna -extern int8_t tsPrintAuth; -extern char tsVnodeDir[]; -extern char tsMnodeDir[]; -extern int64_t tsTickPerDay[3]; - // system info -extern float tsTotalLogDirGB; -extern float tsTotalTmpDirGB; -extern float tsTotalDataDirGB; -extern float tsAvailLogDirGB; -extern float tsAvailTmpDirectorySpace; -extern float tsAvailDataDirGB; -extern float tsUsedDataDirGB; -extern float tsMinimalLogDirGB; -extern float tsReservedTmpDirectorySpace; -extern float tsMinimalDataDirGB; +extern float tsTotalLogDirGB; +extern float tsTotalTmpDirGB; +extern float tsTotalDataDirGB; +extern float tsAvailLogDirGB; +extern float tsAvailTmpDirectorySpace; +extern float tsAvailDataDirGB; +extern float tsUsedDataDirGB; +extern float tsMinimalLogDirGB; +extern float tsReservedTmpDirectorySpace; +extern float tsMinimalDataDirGB; extern uint32_t tsVersion; // build info @@ -102,17 +97,13 @@ extern char gitinfo[]; extern char gitinfoOfInternal[]; extern char buildinfo[]; -#ifdef TD_TSZ -// lossy -extern char lossyColumns[]; -extern double fPrecision; -extern double dPrecision; -extern uint32_t maxRange; -extern uint32_t curRange; -extern char Compressor[]; -#endif -// long query -extern int8_t tsDeadLockKillQuery; +// lossy +extern char tsLossyColumns[]; +extern double tsFPrecision; +extern double tsDPrecision; +extern uint32_t tsMaxRange; +extern uint32_t tsCurRange; +extern char tsCompressor[]; typedef struct { char dir[TSDB_FILENAME_LEN]; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0df910b3997702a1551cfd458dcf26dae751c67b..17a4d4ad2cbcc446ebe860b8cb0d747189485053 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -25,7 +25,9 @@ extern "C" { #include "taoserror.h" #include "tcoding.h" #include "tdataformat.h" +#include "tlist.h" +/* ------------------------ MESSAGE DEFINITIONS ------------------------ */ #define TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ @@ -54,6 +56,47 @@ extern int tMsgDict[]; typedef uint16_t tmsg_t; +/* ------------------------ ENCODE/DECODE FUNCTIONS AND MACROS ------------------------ */ +struct SMEListNode { + TD_SLIST_NODE(SMEListNode); + SEncoder coder; +}; + +typedef struct SMsgEncoder { + SEncoder coder; + TD_SLIST(SMEListNode) eStack; // encode stack +} SMsgEncoder; + +struct SMDFreeListNode { + TD_SLIST_NODE(SMDFreeListNode); + char payload[]; +}; + +struct SMDListNode { + TD_SLIST_NODE(SMDListNode); + SDecoder coder; +}; + +typedef struct SMsgDecoder { + SDecoder coder; + TD_SLIST(SMDListNode) dStack; + TD_SLIST(SMDFreeListNode) freeList; +} SMsgDecoder; + +#define TMSG_MALLOC(SIZE, DECODER) \ + ({ \ + void* ptr = malloc((SIZE) + sizeof(struct SMDFreeListNode)); \ + if (ptr) { \ + TD_SLIST_PUSH(&((DECODER)->freeList), (struct SMDFreeListNode*)ptr); \ + ptr = POINTER_SHIFT(ptr, sizeof(struct SMDFreeListNode*)); \ + } \ + ptr; \ + }) + +void tmsgInitMsgDecoder(SMsgDecoder* pMD, td_endian_t endian, uint8_t* data, int64_t size); +void tmsgClearMsgDecoder(SMsgDecoder* pMD); + +/* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type #define TSDB_IE_TYPE_SEC 1 #define TSDB_IE_TYPE_META 2 @@ -651,8 +694,8 @@ typedef struct { int64_t clusterId; int64_t rebootTime; int64_t updateTime; - int16_t numOfCores; - int16_t numOfSupportVnodes; + int32_t numOfCores; + int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SVnodeLoads vnodeLoads; @@ -1228,100 +1271,11 @@ typedef struct SVCreateTbReq { }; } SVCreateTbReq; -static FORCE_INLINE int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq) { - int tlen = 0; - - tlen += taosEncodeFixedU64(buf, pReq->ver); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeFixedU32(buf, pReq->ttl); - tlen += taosEncodeFixedU32(buf, pReq->keep); - tlen += taosEncodeFixedU8(buf, pReq->type); - - switch (pReq->type) { - case TD_SUPER_TABLE: - tlen += taosEncodeFixedU64(buf, pReq->stbCfg.suid); - tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols); - for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { - tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); - tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].colId); - tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); - tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); - } - tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols); - for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { - tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); - tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].colId); - tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); - tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); - } - break; - case TD_CHILD_TABLE: - tlen += taosEncodeFixedU64(buf, pReq->ctbCfg.suid); - tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); - break; - case TD_NORMAL_TABLE: - tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols); - for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { - tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type); - tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].colId); - tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); - tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); - } - break; - default: - ASSERT(0); - } - - return tlen; -} - -static FORCE_INLINE void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq) { - buf = taosDecodeFixedU64(buf, &(pReq->ver)); - buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeFixedU32(buf, &(pReq->ttl)); - buf = taosDecodeFixedU32(buf, &(pReq->keep)); - buf = taosDecodeFixedU8(buf, &(pReq->type)); - - switch (pReq->type) { - case TD_SUPER_TABLE: - buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid)); - buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols)); - pReq->stbCfg.pSchema = (SSchema*)malloc(pReq->stbCfg.nCols * sizeof(SSchema)); - for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { - buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); - buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].colId)); - buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); - buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); - } - buf = taosDecodeFixedU32(buf, &pReq->stbCfg.nTagCols); - pReq->stbCfg.pTagSchema = (SSchema*)malloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); - for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { - buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); - buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].colId); - buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); - buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); - } - break; - case TD_CHILD_TABLE: - buf = taosDecodeFixedU64(buf, &pReq->ctbCfg.suid); - buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag); - break; - case TD_NORMAL_TABLE: - buf = taosDecodeFixedU32(buf, &pReq->ntbCfg.nCols); - pReq->ntbCfg.pSchema = (SSchema*)malloc(pReq->ntbCfg.nCols * sizeof(SSchema)); - for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { - buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type); - buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].colId); - buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); - buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); - } - break; - default: - ASSERT(0); - } +int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq); +int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq); +int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq); +void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); - return buf; -} typedef struct SVCreateTbRsp { } SVCreateTbRsp; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index e12ce0942281f4c55a76a2ce3aa007b5fd888b1d..2e3863c3a190db25c5461a0a2afcdb13dac24028 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -27,8 +27,8 @@ typedef struct SDnode SDnode; typedef struct { int32_t sver; - int16_t numOfCores; - int16_t numOfSupportVnodes; + int32_t numOfCores; + int32_t numOfSupportVnodes; int16_t numOfCommitThreads; int8_t enableTelem; int32_t statusInterval; diff --git a/include/os/osEnv.h b/include/os/osEnv.h index b857670214699c52975fff1020e548f6b40242a6..a7fd86776c06ea6f399f6094ac52a78c0345a7e5 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -21,7 +21,6 @@ extern "C" { #endif extern char tsOsName[]; -extern char tsDnodeDir[]; extern char tsDataDir[]; extern char tsLogDir[]; extern char tsScriptDir[]; diff --git a/include/util/encode.h b/include/util/encode.h index 85be2b76c64aa16c3edb8fe37405b7c59bec11d3..ee35791012b61b5df89a44fa8bc9693381ce3df7 100644 --- a/include/util/encode.h +++ b/include/util/encode.h @@ -26,8 +26,8 @@ extern "C" { typedef struct { td_endian_t endian; uint8_t* data; - int64_t size; - int64_t pos; + int32_t size; + int32_t pos; } SEncoder, SDecoder; #define tPut(TYPE, BUF, VAL) ((TYPE*)(BUF))[0] = (VAL) @@ -62,7 +62,7 @@ typedef struct { #define TD_CHECK_CODER_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) /* ------------------------ FOR ENCODER ------------------------ */ -static FORCE_INLINE void tInitEncoder(SEncoder* pEncoder, td_endian_t endian, uint8_t* data, int64_t size) { +static FORCE_INLINE void tInitEncoder(SEncoder* pEncoder, td_endian_t endian, uint8_t* data, int32_t size) { pEncoder->endian = endian; pEncoder->data = data; pEncoder->size = (data) ? size : 0; @@ -266,7 +266,7 @@ static FORCE_INLINE int tEncodeCStr(SEncoder* pEncoder, const char* val) { } /* ------------------------ FOR DECODER ------------------------ */ -static FORCE_INLINE void tInitDecoder(SDecoder* pDecoder, td_endian_t endian, uint8_t* data, int64_t size) { +static FORCE_INLINE void tInitDecoder(SDecoder* pDecoder, td_endian_t endian, uint8_t* data, int32_t size) { ASSERT(!TD_IS_NULL(data)); pDecoder->endian = endian; pDecoder->data = data; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 165ea9b62fb57947dac5b70181f9f5b91b47e933..7aa4bc9ee968b2d7f643215964560fa709cffb8b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -128,6 +128,7 @@ typedef struct SRequestObj { char *msgBuf; void *pInfo; // sql parse info, generated by parser module int32_t code; + uint64_t affectedRows; SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index db1ea435f1f0dec72fecb024a2649cb7f1ae420f..960ba95324ca90d96fc9af6f1106acdc2c37737d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -18,6 +18,7 @@ #include "clientInt.h" #include "clientLog.h" #include "query.h" +#include "scheduler.h" #include "tmsg.h" #include "tcache.h" #include "tconfig.h" @@ -230,6 +231,8 @@ void taos_init_imp(void) { SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; catalogInit(&cfg); + SSchedulerCfg scfg = {.maxJobNum = 100}; + schedulerInit(&scfg); tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp); taosSetCoreDump(true); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 62ef63c858277ec535593ae7bb30322cd423158f..af288456731f003f9090f289241a4a556bc882a9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -196,7 +196,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { return TSDB_CODE_SUCCESS; } +int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { + pRequest->type = pQuery->type; + return qCreateQueryDag(pQuery, pDag); +} + int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { + if (TSDB_SQL_INSERT == pRequest->type) { + return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); + } return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } @@ -283,7 +291,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { if (qIsDdlQuery(pQuery)) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); } else { - CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 910d05e9a3eec9518f76891240cd8f5af22e4c6d..9ddadc9ba60c3cc7f9284ab68d6b71fb12f3824a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -15,17 +15,18 @@ #define _DEFAULT_SOURCE #include "os.h" + #include "taosdef.h" #include "taoserror.h" -#include "ulog.h" -#include "tlog.h" +#include "tcompare.h" #include "tconfig.h" +#include "tep.h" #include "tglobal.h" -#include "tcompare.h" -#include "tutil.h" -#include "ttimezone.h" #include "tlocale.h" -#include "tep.h" +#include "tlog.h" +#include "ttimezone.h" +#include "tutil.h" +#include "ulog.h" // cluster char tsFirst[TSDB_EP_LEN] = {0}; @@ -36,22 +37,24 @@ uint16_t tsServerPort = 6030; int32_t tsStatusInterval = 1; // second int8_t tsEnableTelemetryReporting = 0; char tsEmail[TSDB_FQDN_LEN] = {0}; +int32_t tsNumOfSupportVnodes = 16; // common -int32_t tsRpcTimer = 300; -int32_t tsRpcMaxTime = 600; // seconds; -int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default -int32_t tsMaxShellConns = 50000; +int32_t tsRpcTimer = 300; +int32_t tsRpcMaxTime = 600; // seconds; +int32_t tsRpcForceTcp = 1; // disable this, means query, show command use udp protocol as default +int32_t tsMaxShellConns = 50000; int32_t tsMaxConnections = 5000; -int32_t tsShellActivityTimer = 3; // second +int32_t tsShellActivityTimer = 3; // second float tsNumOfThreadsPerCore = 1.0f; int32_t tsNumOfCommitThreads = 4; float tsRatioOfQueryCores = 1.0f; -int8_t tsDaylight = 0; +int8_t tsDaylight = 0; int8_t tsEnableCoreFile = 0; int32_t tsMaxBinaryDisplayWidth = 30; -int64_t tsMaxVnodeQueuedBytes = 1024*1024*1024; //1GB - +int8_t tsEnableSlaveQuery = 1; +int8_t tsEnableAdjustMaster = 1; +int8_t tsPrintAuth = 0; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, * metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server. @@ -79,7 +82,7 @@ int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN; int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN; int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN; -int8_t tsTscEnableRecordSql = 0; +int8_t tsTscEnableRecordSql = 0; // the maximum number of results for projection query on super table that are returned from // one virtual node, to order according to timestamp @@ -89,7 +92,7 @@ int32_t tsMaxNumOfOrderedResults = 100000; int32_t tsMinSlidingTime = 10; // the maxinum number of distict query result -int32_t tsMaxNumOfDistinctResults = 1000 * 10000; +int32_t tsMaxNumOfDistinctResults = 1000 * 10000; // 1 us for interval time range, changed accordingly int32_t tsMinIntervalTime = 1; @@ -101,7 +104,7 @@ int32_t tsMaxStreamComputDelay = 20000; int32_t tsStreamCompStartDelay = 10000; // the stream computing delay time after executing failed, change accordingly -int32_t tsRetryStreamCompDelay = 10*1000; +int32_t tsRetryStreamCompDelay = 10 * 1000; // The delayed computing ration. 10% of the whole computing time window by default. float tsStreamComputDelayRatio = 0.1f; @@ -120,30 +123,16 @@ int64_t tsQueryBufferSizeBytes = -1; int32_t tsRetrieveBlockingModel = 0; // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name -int8_t tsKeepOriginalColumnName = 0; +int8_t tsKeepOriginalColumnName = 0; + +// long query death-lock +int8_t tsDeadLockKillQuery = 0; -// tsdb config +// tsdb config // For backward compatibility bool tsdbForceKeepFile = false; -// balance -int8_t tsEnableFlowCtrl = 1; -int8_t tsEnableSlaveQuery = 1; -int8_t tsEnableAdjustMaster = 1; - - -// monitor -char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; -char tsInternalPass[] = "secretkey"; - -// internal -int8_t tsCompactMnodeWal = 0; -int8_t tsPrintAuth = 0; -char tsVnodeDir[PATH_MAX] = {0}; -char tsDnodeDir[PATH_MAX] = {0}; -char tsMnodeDir[PATH_MAX] = {0}; - -int32_t tsDiskCfgNum = 0; +int32_t tsDiskCfgNum = 0; #ifndef _STORAGE SDiskCfg tsDiskCfg[1]; @@ -160,31 +149,28 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS]; int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L}; // system info -float tsTotalTmpDirGB = 0; -float tsTotalDataDirGB = 0; -float tsAvailTmpDirectorySpace = 0; -float tsAvailDataDirGB = 0; -float tsUsedDataDirGB = 0; -float tsReservedTmpDirectorySpace = 1.0f; -float tsMinimalDataDirGB = 2.0f; -int32_t tsTotalMemoryMB = 0; +float tsTotalTmpDirGB = 0; +float tsTotalDataDirGB = 0; +float tsAvailTmpDirectorySpace = 0; +float tsAvailDataDirGB = 0; +float tsUsedDataDirGB = 0; +float tsReservedTmpDirectorySpace = 1.0f; +float tsMinimalDataDirGB = 2.0f; +int32_t tsTotalMemoryMB = 0; uint32_t tsVersion = 0; -#ifdef TD_TSZ // // lossy compress 6 // -char lossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty can close lossy compress. -// below option can take effect when tsLossyColumns not empty -double fPrecision = 1E-8; // float column precision -double dPrecision = 1E-16; // double column precision -uint32_t maxRange = 500; // max range -uint32_t curRange = 100; // range -char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR -#endif +char tsLossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty + // can close lossy compress. +// below option can take effect when tsLossyColumns not empty +double tsFPrecision = 1E-8; // float column precision +double tsDPrecision = 1E-16; // double column precision +uint32_t tsMaxRange = 500; // max range +uint32_t tsCurRange = 100; // range +char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR -// long query death-lock -int8_t tsDeadLockKillQuery = 0; int32_t (*monStartSystemFp)() = NULL; void (*monStopSystemFp)() = NULL; @@ -195,13 +181,12 @@ char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"}; static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT; void taosSetAllDebugFlag() { - if (debugFlag != 0) { + if (debugFlag != 0) { mDebugFlag = debugFlag; dDebugFlag = debugFlag; vDebugFlag = debugFlag; jniDebugFlag = debugFlag; - odbcDebugFlag = debugFlag; - qDebugFlag = debugFlag; + qDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; sDebugFlag = debugFlag; @@ -213,12 +198,12 @@ void taosSetAllDebugFlag() { } int32_t taosCfgDynamicOptions(char *msg) { - char *option, *value; - int32_t olen, vlen; - int32_t vint = 0; + char *option, *value; + int32_t olen, vlen; + int32_t vint = 0; paGetToken(msg, &option, &olen); - if (olen == 0) return -1;; + if (olen == 0) return -1; paGetToken(option + olen + 1, &value, &vlen); if (vlen == 0) @@ -231,9 +216,9 @@ int32_t taosCfgDynamicOptions(char *msg) { for (int32_t i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - //if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; + // if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; if (cfg->valType != TAOS_CFG_VTYPE_INT32 && cfg->valType != TAOS_CFG_VTYPE_INT8) continue; - + int32_t cfgLen = (int32_t)strlen(cfg->option); if (cfgLen != olen) continue; if (strncasecmp(option, cfg->option, olen) != 0) continue; @@ -262,7 +247,7 @@ int32_t taosCfgDynamicOptions(char *msg) { return 0; } if (strncasecmp(cfg->option, "debugFlag", olen) == 0) { - taosSetAllDebugFlag(); + taosSetAllDebugFlag(); } return 0; } @@ -323,7 +308,7 @@ static void doInitGlobalConfig(void) { srand(taosSafeRand()); SGlobalCfg cfg = {0}; - + // ip address cfg.option = "firstEp"; cfg.ptr = tsFirst; @@ -366,6 +351,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); + cfg.option = "supportVnodes"; + cfg.ptr = &tsNumOfSupportVnodes; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 65536; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosAddConfigOption(cfg); + // directory cfg.option = "configDir"; cfg.ptr = configDir; @@ -442,8 +437,8 @@ static void doInitGlobalConfig(void) { cfg.ptr = &tsMaxNumOfDistinctResults; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; - cfg.minValue = 10*10000; - cfg.maxValue = 10000*10000; + cfg.minValue = 10 * 10000; + cfg.maxValue = 10000 * 10000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); @@ -749,17 +744,6 @@ static void doInitGlobalConfig(void) { cfg.maxValue = 10000000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_GB; - taosAddConfigOption(cfg); - - // module configs - cfg.option = "flowctrl"; - cfg.ptr = &tsEnableFlowCtrl; - cfg.valType = TAOS_CFG_VTYPE_INT8; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; - cfg.minValue = 0; - cfg.maxValue = 1; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); cfg.option = "slaveQuery"; @@ -893,16 +877,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); - cfg.option = "odbcDebugFlag"; - cfg.ptr = &odbcDebugFlag; - cfg.valType = TAOS_CFG_VTYPE_INT32; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; - cfg.minValue = 0; - cfg.maxValue = 255; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosAddConfigOption(cfg); - cfg.option = "uDebugFlag"; cfg.ptr = &uDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; @@ -1034,7 +1008,7 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); - // enable kill long query + // enable kill long query cfg.option = "deadLockKillQuery"; cfg.ptr = &tsDeadLockKillQuery; cfg.valType = TAOS_CFG_VTYPE_INT8; @@ -1066,7 +1040,6 @@ static void doInitGlobalConfig(void) { cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosAddConfigOption(cfg); cfg.option = "dPrecision"; @@ -1100,23 +1073,20 @@ static void doInitGlobalConfig(void) { taosAddConfigOption(cfg); assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); #else - //assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); + // assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); #endif - } -void taosInitGlobalCfg() { - pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); -} +void taosInitGlobalCfg() { pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); } int32_t taosCheckAndPrintCfg() { - char fqdn[TSDB_FQDN_LEN]; + char fqdn[TSDB_FQDN_LEN]; uint16_t port; if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) { taosSetAllDebugFlag(); } - + if (tsLocalFqdn[0] == 0) { taosGetFqdn(tsLocalFqdn); } @@ -1143,7 +1113,7 @@ int32_t taosCheckAndPrintCfg() { if (taosDirExist(tsTempDir) != 0) { return -1; } - + taosGetSystemInfo(); tsSetLocale(); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a7f285046c8074e1e4f476d7e5a1a272439735fe..b81143ee62b6a355b6ddf4a9a59b63733c62ac32 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -25,4 +25,230 @@ #undef TD_MSG_INFO_ #define TD_MSG_DICT_ #undef TD_MSG_SEG_CODE_ -#include "tmsgdef.h" \ No newline at end of file +#include "tmsgdef.h" + +static int tmsgStartEncode(SMsgEncoder *pME); +static void tmsgEndEncode(SMsgEncoder *pME); +static int tmsgStartDecode(SMsgDecoder *pMD); +static void tmsgEndDecode(SMsgDecoder *pMD); + +/* ------------------------ ENCODE/DECODE FUNCTIONS ------------------------ */ +void tmsgInitMsgEncoder(SMsgEncoder *pME, td_endian_t endian, uint8_t *data, int64_t size) { + tInitEncoder(&(pME->coder), endian, data, size); + TD_SLIST_INIT(&(pME->eStack)); +} + +void tmsgClearMsgEncoder(SMsgEncoder *pME) { + struct SMEListNode *pNode; + for (;;) { + pNode = TD_SLIST_HEAD(&(pME->eStack)); + if (TD_IS_NULL(pNode)) break; + TD_SLIST_POP(&(pME->eStack)); + free(pNode); + } +} + +void tmsgInitMsgDecoder(SMsgDecoder *pMD, td_endian_t endian, uint8_t *data, int64_t size) { + tInitDecoder(&pMD->coder, endian, data, size); + TD_SLIST_INIT(&(pMD->dStack)); + TD_SLIST_INIT(&(pMD->freeList)); +} + +void tmsgClearMsgDecoder(SMsgDecoder *pMD) { + { + struct SMDFreeListNode *pNode; + for (;;) { + pNode = TD_SLIST_HEAD(&(pMD->freeList)); + if (TD_IS_NULL(pNode)) break; + TD_SLIST_POP(&(pMD->freeList)); + free(pNode); + } + } + { + struct SMDListNode *pNode; + for (;;) { + pNode = TD_SLIST_HEAD(&(pMD->dStack)); + if (TD_IS_NULL(pNode)) break; + TD_SLIST_POP(&(pMD->dStack)); + free(pNode); + } + } +} + +/* ------------------------ MESSAGE ENCODE/DECODE ------------------------ */ +int tmsgSVCreateTbReqEncode(SMsgEncoder *pCoder, SVCreateTbReq *pReq) { + tmsgStartEncode(pCoder); + // TODO + + tmsgEndEncode(pCoder); + return 0; +} + +int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) { + tmsgStartDecode(pCoder); + + // TODO: decode + + // Decode is not end + if (pCoder->coder.pos != pCoder->coder.size) { + // Continue decode + } + + tmsgEndDecode(pCoder); + return 0; +} + +int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) { + int tlen = 0; + + tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedU32(buf, pReq->ttl); + tlen += taosEncodeFixedU32(buf, pReq->keep); + tlen += taosEncodeFixedU8(buf, pReq->type); + + switch (pReq->type) { + case TD_SUPER_TABLE: + tlen += taosEncodeFixedU64(buf, pReq->stbCfg.suid); + tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols); + for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { + tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].colId); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); + tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); + } + tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols); + for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { + tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].colId); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); + tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); + } + break; + case TD_CHILD_TABLE: + tlen += taosEncodeFixedU64(buf, pReq->ctbCfg.suid); + tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); + break; + case TD_NORMAL_TABLE: + tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols); + for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { + tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type); + tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].colId); + tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); + tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); + } + break; + default: + ASSERT(0); + } + + return tlen; +} + +void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { + buf = taosDecodeFixedU64(buf, &(pReq->ver)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeFixedU32(buf, &(pReq->ttl)); + buf = taosDecodeFixedU32(buf, &(pReq->keep)); + buf = taosDecodeFixedU8(buf, &(pReq->type)); + + switch (pReq->type) { + case TD_SUPER_TABLE: + buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid)); + buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols)); + pReq->stbCfg.pSchema = (SSchema *)malloc(pReq->stbCfg.nCols * sizeof(SSchema)); + for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { + buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); + buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].colId)); + buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); + buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); + } + buf = taosDecodeFixedU32(buf, &pReq->stbCfg.nTagCols); + pReq->stbCfg.pTagSchema = (SSchema *)malloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); + for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { + buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); + buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].colId); + buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); + buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); + } + break; + case TD_CHILD_TABLE: + buf = taosDecodeFixedU64(buf, &pReq->ctbCfg.suid); + buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag); + break; + case TD_NORMAL_TABLE: + buf = taosDecodeFixedU32(buf, &pReq->ntbCfg.nCols); + pReq->ntbCfg.pSchema = (SSchema *)malloc(pReq->ntbCfg.nCols * sizeof(SSchema)); + for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { + buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type); + buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].colId); + buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); + buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); + } + break; + default: + ASSERT(0); + } + + return buf; +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static int tmsgStartEncode(SMsgEncoder *pME) { + struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode)); + if (TD_IS_NULL(pNode)) return -1; + + pNode->coder = pME->coder; + TD_SLIST_PUSH(&(pME->eStack), pNode); + TD_CODER_MOVE_POS(&(pME->coder), sizeof(int32_t)); + + return 0; +} + +static void tmsgEndEncode(SMsgEncoder *pME) { + int32_t size; + struct SMEListNode *pNode; + + pNode = TD_SLIST_HEAD(&(pME->eStack)); + ASSERT(pNode); + TD_SLIST_POP(&(pME->eStack)); + + size = pME->coder.pos - pNode->coder.pos; + tEncodeI32(&(pNode->coder), size); + + free(pNode); +} + +static int tmsgStartDecode(SMsgDecoder *pMD) { + struct SMDListNode *pNode; + int32_t size; + + pNode = (struct SMDListNode *)malloc(sizeof(*pNode)); + if (pNode == NULL) return -1; + + tDecodeI32(&(pMD->coder), &size); + + pNode->coder = pMD->coder; + TD_SLIST_PUSH(&(pMD->dStack), pNode); + + pMD->coder.pos = 0; + pMD->coder.size = size - sizeof(int32_t); + pMD->coder.data = TD_CODER_CURRENT(&(pNode->coder)); + + return 0; +} + +static void tmsgEndDecode(SMsgDecoder *pMD) { + ASSERT(pMD->coder.pos == pMD->coder.size); + struct SMDListNode *pNode; + + pNode = TD_SLIST_HEAD(&(pMD->dStack)); + ASSERT(pNode); + TD_SLIST_POP(&(pMD->dStack)); + + pNode->coder.pos += pMD->coder.size; + + pMD->coder = pNode->coder; + + free(pNode); +} \ No newline at end of file diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 03ca95ff649723a3cae5ba40d29278dd8d0cd858..58dde913f0814750c08cccadaaeead2ed0c0d207 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -18,11 +18,11 @@ TARGET_INCLUDE_DIRECTORIES( ) # tmsg test -add_executable(tmsgTest "") -target_sources(tmsgTest - PRIVATE - "tmsgTest.cpp" - "../src/tmsg.c" -) -target_include_directories(tmsgTest PUBLIC "${CMAKE_SOURCE_DIR}/include/common/") -target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) \ No newline at end of file +# add_executable(tmsgTest "") +# target_sources(tmsgTest +# PRIVATE +# "tmsgTest.cpp" +# "../src/tmsg.c" +# ) +# target_include_directories(tmsgTest PUBLIC "${CMAKE_SOURCE_DIR}/include/common/") +# target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main) \ No newline at end of file diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 70dca0e4dfebbf5dab16e4ac6d00a9239724c51b..8161b8d125ebb9f9fec4762f0327e2adc3a5449f 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -139,7 +139,7 @@ void dmnWaitSignal() { void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = 30000000; //3.0.0.0 pOption->numOfCores = tsNumOfCores; - pOption->numOfSupportVnodes = 16; + pOption->numOfSupportVnodes = tsNumOfSupportVnodes; pOption->numOfCommitThreads = 1; pOption->statusInterval = tsStatusInterval; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 8807579564746a5ad08ebf046be105396e1174af..30b069f349c4e3cb3f33e31bd0d93df42df3fce2 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -370,8 +370,8 @@ void dndSendStatusMsg(SDnode *pDnode) { pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->rebootTime = htobe64(pMgmt->rebootTime); pStatus->updateTime = htobe64(pMgmt->updateTime); - pStatus->numOfCores = htons(pDnode->opt.numOfCores); - pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfSupportVnodes); + pStatus->numOfCores = htonl(pDnode->opt.numOfCores); + pStatus->numOfSupportVnodes = htonl(pDnode->opt.numOfSupportVnodes); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); @@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0) { - dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId); + dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; @@ -440,19 +440,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { } SStatusRsp *pRsp = pMsg->pCont; - SDnodeCfg *pCfg = &pRsp->dnodeCfg; - pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htobe64(pCfg->clusterId); - dndUpdateDnodeCfg(pDnode, pCfg); + if (pMsg->pCont != NULL && pMsg->contLen != 0) { + SDnodeCfg *pCfg = &pRsp->dnodeCfg; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->clusterId = htobe64(pCfg->clusterId); + dndUpdateDnodeCfg(pDnode, pCfg); + + SDnodeEps *pDnodeEps = &pRsp->dnodeEps; + pDnodeEps->num = htonl(pDnodeEps->num); + for (int32_t i = 0; i < pDnodeEps->num; ++i) { + pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); + pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + } - SDnodeEps *pDnodeEps = &pRsp->dnodeEps; - pDnodeEps->num = htonl(pDnodeEps->num); - for (int32_t i = 0; i < pDnodeEps->num; ++i) { - pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); - pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + dndUpdateDnodeEps(pDnode, pDnodeEps); } - - dndUpdateDnodeEps(pDnode, pDnodeEps); pMgmt->statusSent = 0; } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 2c3be4a700a47354a1fd379c0a33547314b19c06..e5be16937bb4bc5eb909eba8cccf4f94ac2578c6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index 96bb6e53909b7f6fc2b802e70f0e0655e59f3268..54d7e73be6542d8391dfdfddaf5f3f7857a7db1d 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -57,7 +57,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) { CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id"); CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint"); CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes"); - CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "max_vnodes"); + CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "support_vnodes"); CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status"); CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason"); diff --git a/source/dnode/mgmt/impl/test/sut/src/base.cpp b/source/dnode/mgmt/impl/test/sut/src/base.cpp index 98371e989397c56e0fea907f5d7752ab4703e096..e14dc94d31bb126a286780c79d19cfb42c37e114 100644 --- a/source/dnode/mgmt/impl/test/sut/src/base.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/base.cpp @@ -24,7 +24,6 @@ void Testbase::InitLog(const char* path) { tmrDebugFlag = 0; uDebugFlag = 143; rpcDebugFlag = 0; - odbcDebugFlag = 0; qDebugFlag = 0; wDebugFlag = 0; sDebugFlag = 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 419af6df883fd4c31dec0a78dac0495517b00040..e66ffd4426c9bf2efa97ecb0addf2a241a691341 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -74,13 +74,6 @@ typedef enum { typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; -typedef enum { - DND_STATUS_OFFLINE = 0, - DND_STATUS_READY = 1, - DND_STATUS_CREATING = 2, - DND_STATUS_DROPPING = 3 -} EDndStatus; - typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, @@ -125,9 +118,8 @@ typedef struct { int64_t lastAccessTime; int32_t accessTimes; int16_t numOfVnodes; - int16_t numOfSupportVnodes; - int16_t numOfCores; - EDndStatus status; + int32_t numOfSupportVnodes; + int32_t numOfCores; EDndReason offlineReason; uint16_t port; char fqdn[TSDB_FQDN_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index 764dfbffc150a6d09cb9162264a079a823792368..c76186c0a29bc6a0c78458afbf6781a77cd81cb3 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); int32_t mndGetDnodeSize(SMnode *pMnode); -bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode); +bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 43b458a52ad3965b3557adcbdd067346f796ae83..ca20ba61a10e2b499aab65e3a25829a2f7419e22 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -40,8 +40,6 @@ static const char *offlineReason[] = { "unknown", }; -static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"}; - static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); @@ -208,10 +206,12 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { return sdbGetSize(pSdb, SDB_DNODE); } -bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) { - int64_t ms = taosGetTimestampMs(); - int64_t interval = ABS(pDnode->lastAccessTime - ms); +bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { + int64_t interval = ABS(pDnode->lastAccessTime - curMs); if (interval > 3500 * pMnode->cfg.statusInterval) { + if (pDnode->rebootTime > 0) { + pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; + } return false; } return true; @@ -278,8 +278,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->clusterId = htobe64(pStatus->clusterId); pStatus->rebootTime = htobe64(pStatus->rebootTime); pStatus->updateTime = htobe64(pStatus->updateTime); - pStatus->numOfCores = htons(pStatus->numOfCores); - pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes); + pStatus->numOfCores = htonl(pStatus->numOfCores); + pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes); pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); } @@ -287,96 +287,99 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + SDnodeObj *pDnode = NULL; + int32_t code = -1; + mndParseStatusMsg(pStatus); - SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); if (pDnode == NULL) { mDebug("dnode:%s, not created yet", pStatus->dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; + goto PROCESS_STATUS_MSG_OVER; } } else { pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); if (pDnode == NULL) { pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; } mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); - mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; + goto PROCESS_STATUS_MSG_OVER; } } - if (pStatus->sver != pMnode->cfg.sver) { - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { - pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; - } - mndReleaseDnode(pMnode, pDnode); - mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); - terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; - return -1; - } + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime); - if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); - } else { - if (pStatus->clusterId != pMnode->clusterId) { - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { - pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + if (needCheckCfg) { + if (pStatus->sver != pMnode->cfg.sver) { + if (pDnode != NULL) { + pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, - pMnode->clusterId); - mndReleaseDnode(pMnode, pDnode); - terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; - return -1; + mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); + terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; + goto PROCESS_STATUS_MSG_OVER; + } + + if (pStatus->dnodeId == 0) { + mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); } else { - pDnode->accessTimes++; - mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + if (pStatus->clusterId != pMnode->clusterId) { + if (pDnode != NULL) { + pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + } + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, + pMnode->clusterId); + terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; + goto PROCESS_STATUS_MSG_OVER; + } else { + pDnode->accessTimes++; + mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + } } - } - if (pDnode->status == DND_STATUS_OFFLINE) { // Verify whether the cluster parameters are consistent when status change from offline to ready int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); if (0 != ret) { pDnode->offlineReason = ret; mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); - mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG; - return -1; + goto PROCESS_STATUS_MSG_OVER; } mInfo("dnode:%d, from offline to online", pDnode->id); - } - pDnode->rebootTime = pStatus->rebootTime; - pDnode->numOfCores = pStatus->numOfCores; - pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; - pDnode->lastAccessTime = taosGetTimestampMs(); - pDnode->status = DND_STATUS_READY; + pDnode->rebootTime = pStatus->rebootTime; + pDnode->numOfCores = pStatus->numOfCores; + pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; - int32_t numOfEps = mndGetDnodeSize(pMnode); - int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); - SStatusRsp *pRsp = rpcMallocCont(contLen); - if (pRsp == NULL) { - mndReleaseDnode(pMnode, pDnode); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + int32_t numOfEps = mndGetDnodeSize(pMnode); + int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); + SStatusRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto PROCESS_STATUS_MSG_OVER; + } + + pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); + pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); + mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + + pMsg->contLen = contLen; + pMsg->pCont = pRsp; } - pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); - pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); - mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + pDnode->lastAccessTime = curMs; + code = 0; - pMsg->contLen = contLen; - pMsg->pCont = pRsp; +PROCESS_STATUS_MSG_OVER: mndReleaseDnode(pMnode, pDnode); - - return 0; + return code; } static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) { @@ -638,7 +641,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "max_vnodes"); + strcpy(pSchema[cols].name, "support_vnodes"); pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; @@ -682,10 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i int32_t cols = 0; SDnodeObj *pDnode = NULL; char *pWrite; + int64_t curMs = taosGetTimestampMs(); while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode); if (pShow->pIter == NULL) break; + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); cols = 0; @@ -706,8 +711,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - const char *status = dnodeStatus[pDnode->status]; - STR_TO_VARSTR(pWrite, status); + STR_TO_VARSTR(pWrite, online ? "ready" : "offline"); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -715,11 +719,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pDnode->status == DND_STATUS_READY) { - STR_TO_VARSTR(pWrite, ""); - } else { - STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]); - } + STR_TO_VARSTR(pWrite, online ? "" : offlineReason[pDnode->offlineReason]); cols++; numOfRows++; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 23b62d2dfb07f90563ee95c77d9bbefdb51b11ca..1b0026cd139ea298d12ec3fa143bf2be3e1b3f8e 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -260,13 +260,14 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) { pDnode->numOfVnodes++; } - bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode); - if (isReady) { + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + if (online) { taosArrayPush(pArray, pDnode); } - mDebug("dnode:%d, numOfVnodes:%d numOfSupportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes, - pDnode->numOfSupportVnodes, isMnode, isReady); + mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes, + pDnode->numOfSupportVnodes, isMnode, online); sdbRelease(pSdb, pDnode); } @@ -333,6 +334,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { uint32_t hashMax = UINT32_MAX; uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups; + if (maxVgId < 2) maxVgId = 2; + for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) { SVgObj *pVgroup = &pVgroups[v]; pVgroup->vgId = maxVgId++; diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 51e3330ebf74843ed46dd3c652b35a17d6109405..4ef7dc01a46eb937f402fc8656c9cf8366427995 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -444,6 +444,7 @@ int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) { void * pBuf; int tlen; STableMetaMsg *pMsg; + SSchema * pSchema; key.data = tbname; key.size = strlen(tbname) + 1; @@ -487,10 +488,9 @@ int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) { pMsg->tversion = 0; pMsg->suid = tbCfg.stbCfg.suid; pMsg->tuid = tbCfg.stbCfg.suid; - for (size_t i = 0; i < tbCfg.stbCfg.nTagCols; i++) { - - } - + memcpy(pMsg->pSchema, tbCfg.stbCfg.pSchema, sizeof(SSchema) * tbCfg.stbCfg.nCols); + memcpy(POINTER_SHIFT(pMsg->pSchema, sizeof(SSchema) * tbCfg.stbCfg.nCols), tbCfg.stbCfg.pTagSchema, + sizeof(SSchema) * tbCfg.stbCfg.nTagCols); break; case META_CHILD_TABLE: ASSERT(0); diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 991bde5ed251079c1095a3ee813520f5dbc5d7fc..66966f75db59f9e01a710b7429312bfc5d911e8a 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -64,64 +64,6 @@ typedef struct SInsertParseContext { SInsertStmtInfo* pOutput; } SInsertParseContext; -static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) { - errno = 0; - *value = strtold(pToken->z, endPtr); - - // not a valid integer number, return error - if ((*endPtr - pToken->z) != pToken->n) { - return TK_ILLEGAL; - } - - return pToken->type; -} - -static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, bool issigned) { - errno = 0; - int32_t ret = 0; - - char* endPtr = NULL; - if (type == TK_FLOAT) { - double v = strtod(z, &endPtr); - if ((errno == ERANGE && v == HUGE_VALF) || isinf(v) || isnan(v)) { - ret = -1; - } else if ((issigned && (v < INT64_MIN || v > INT64_MAX)) || ((!issigned) && (v < 0 || v > UINT64_MAX))) { - ret = -1; - } else { - *value = (int64_t) round(v); - } - - errno = 0; - return ret; - } - - int32_t radix = 10; - if (type == TK_HEX) { - radix = 16; - } else if (type == TK_BIN) { - radix = 2; - } - - // the string may be overflow according to errno - if (!issigned) { - const char *p = z; - while(*p != 0 && *p == ' ') p++; - if (*p != 0 && *p == '-') { return -1;} - - *value = strtoull(z, &endPtr, radix); - } else { - *value = strtoll(z, &endPtr, radix); - } - - // not a valid integer number, return error - if (endPtr - z != n || errno == ERANGE) { - ret = -1; - } - - errno = 0; - return ret; -} - static int32_t skipInsertInto(SInsertParseContext* pCxt) { SToken sToken; NEXT_TOKEN(pCxt->pSql, sToken); @@ -159,10 +101,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { char tableName[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&name, tableName); - SParseBasicCtx* pBasicCtx = &pCxt->pComCxt->ctx; CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); - SVgroupInfo vg; CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); @@ -349,207 +289,6 @@ static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *p return TSDB_CODE_SUCCESS; } -//static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { -// if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && -// type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || -// (pToken->n == 0) || (type == TK_RP)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); -// } -// -// if (IS_NUMERIC_TYPE(type) && pToken->n == 0) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z); -// } -// -// // Remove quotation marks -// if (TK_STRING == type) { -// if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { -// return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z); -// } -// -// // delete escape character: \\, \', \" -// char delim = pToken->z[0]; -// int32_t cnt = 0; -// int32_t j = 0; -// for (uint32_t k = 1; k < pToken->n - 1; ++k) { -// if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) { -// tmpTokenBuf[j] = pToken->z[k + 1]; -// cnt++; -// j++; -// k++; -// continue; -// } -// tmpTokenBuf[j] = pToken->z[k]; -// j++; -// } -// -// tmpTokenBuf[j] = 0; -// pToken->z = tmpTokenBuf; -// pToken->n -= 2 + cnt; -// } -// -// return TSDB_CODE_SUCCESS; -//} - -//static FORCE_INLINE int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) { -// int64_t iv; -// char *endptr = NULL; -// bool isSigned = false; -// -// CHECK_CODE(checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf)); -// -// if (isNullStr(pToken)) { -// if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { -// int64_t tmpVal = 0; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// return func(getNullValue(pSchema->type), 0, param); -// } -// -// switch (pSchema->type) { -// case TSDB_DATA_TYPE_BOOL: { -// if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { -// if (strncmp(pToken->z, "true", pToken->n) == 0) { -// return func(&TRUE_VALUE, pSchema->bytes, param); -// } else if (strncmp(pToken->z, "false", pToken->n) == 0) { -// return func(&FALSE_VALUE, pSchema->bytes, param); -// } else { -// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); -// } -// } else if (pToken->type == TK_INTEGER) { -// return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); -// } else if (pToken->type == TK_FLOAT) { -// return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); -// } else { -// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); -// } -// } -// -// case TSDB_DATA_TYPE_TINYINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z); -// } else if (!IS_VALID_TINYINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z); -// } -// -// uint8_t tmpVal = (uint8_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_UTINYINT:{ -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z); -// } else if (!IS_VALID_UTINYINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z); -// } -// uint8_t tmpVal = (uint8_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_SMALLINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z); -// } else if (!IS_VALID_SMALLINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z); -// } -// int16_t tmpVal = (int16_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_USMALLINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z); -// } else if (!IS_VALID_USMALLINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z); -// } -// uint16_t tmpVal = (uint16_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_INT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z); -// } else if (!IS_VALID_INT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z); -// } -// int32_t tmpVal = (int32_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_UINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z); -// } else if (!IS_VALID_UINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z); -// } -// uint32_t tmpVal = (uint32_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_BIGINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z); -// } else if (!IS_VALID_BIGINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z); -// } -// return func(&iv, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_UBIGINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z); -// } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z); -// } -// uint64_t tmpVal = (uint64_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_FLOAT: { -// double dv; -// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z); -// } -// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z); -// } -// float tmpVal = (float)dv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_DOUBLE: { -// double dv; -// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z); -// } -// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z); -// } -// return func(&dv, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_BINARY: { -// // too long values will return invalid sql, not be truncated automatically -// if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { -// return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z); -// } -// return func(pToken->z, pToken->n, param); -// } -// case TSDB_DATA_TYPE_NCHAR: { -// return func(pToken->z, pToken->n, param); -// } -// case TSDB_DATA_TYPE_TIMESTAMP: { -// int64_t tmpVal; -// if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z); -// } -// return func(&tmpVal, pSchema->bytes, param); -// } -// } -// -// return TSDB_CODE_FAILED; -//} - // pSql -> tag1_name, ...) static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) { int32_t nCols = pColList->numOfCols; diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index a454061fd7dd28c3fccd4f3e489b58a4091cfd9b..6c7ecbe0ed8094f89a2fdf0451d758f5203a8b7b 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1639,9 +1639,9 @@ static bool isNullStr(SToken *pToken) { } static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { - if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && - type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || - (pToken->n == 0) || (type == TK_RP)) { + if ((pToken->type != TK_NOW && pToken->type != TK_INTEGER && pToken->type != TK_STRING && pToken->type != TK_FLOAT && pToken->type != TK_BOOL && + pToken->type != TK_NULL && pToken->type != TK_HEX && pToken->type != TK_OCT && pToken->type != TK_BIN) || + (pToken->n == 0) || (pToken->type == TK_RP)) { return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); } @@ -1785,7 +1785,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_TINYINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z); } else if (!IS_VALID_TINYINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z); @@ -1796,7 +1796,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_UTINYINT:{ - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z); } else if (!IS_VALID_UTINYINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z); @@ -1806,7 +1806,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_SMALLINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z); } else if (!IS_VALID_SMALLINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z); @@ -1816,7 +1816,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_USMALLINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z); } else if (!IS_VALID_USMALLINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z); @@ -1826,7 +1826,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_INT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z); } else if (!IS_VALID_INT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z); @@ -1836,7 +1836,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_UINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z); } else if (!IS_VALID_UINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z); @@ -1846,7 +1846,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_BIGINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z); } else if (!IS_VALID_BIGINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z); @@ -1855,7 +1855,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_UBIGINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z); } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z); diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index a651e6c1df8b24abf3b044a901e9d659df0e56b1..3be358fec8478ffd482da55bfabe64f917c62a3c 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -97,8 +97,8 @@ public: int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { std::unique_ptr table; - char db[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pTableName, db); + char db[TSDB_DB_NAME_LEN] = {0}; + tNameGetDbName(pTableName, db); const char* tname = tNameGetTableName(pTableName); int32_t code = copyTableSchemaMeta(db, tname, &table); @@ -111,6 +111,7 @@ public: int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { // todo + vgInfo->vgId = 1; return 0; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 8388458b4c0ea0d04d505aac5f765d890a485d88..97c9cec7c7b31345c2065daf79f6a4f389a3e10b 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -207,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { } taosArrayPush(currentLevel, &subplan); pCxt->pCurrentSubplan = subplan; + ++(pCxt->pDag->numOfSubplans); return subplan; } @@ -293,11 +294,14 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SArray* vgs = (SArray*)pPlanNode->pExtInfo; size_t numOfVg = taosArrayGetSize(vgs); for (int32_t i = 0; i < numOfVg; ++i) { + STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i); vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet); subplan->pNode = NULL; subplan->pDataSink = createDataInserter(pCxt, blocks); + subplan->type = QUERY_TYPE_MODIFY; + RECOVERY_CURRENT_SUBPLAN(pCxt); } } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7bd2205e436c580633159a39c3ddcdc3be54fe9e..12c07d69eeafb017297b83f423b753fde527cb55 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -166,7 +166,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { } for (int32_t n = 0; n < levelPlanNum; ++n) { - SSubplan *plan = taosArrayGet(levelPlans, n); + SSubplan *plan = taosArrayGetP(levelPlans, n); SSchTask task = {0}; if (plan->type == QUERY_TYPE_MODIFY) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 7e7f673943c7e7b92b7895c61c3dcf311afd967b..95f2fe76e6b0a70dba8fa575147f62952391109a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -85,7 +85,6 @@ int32_t dDebugFlag = 135; int32_t vDebugFlag = 135; int32_t cDebugFlag = 131; int32_t jniDebugFlag = 131; -int32_t odbcDebugFlag = 131; int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 131; int32_t uDebugFlag = 131; diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 0ccc85c3473ed95d8329901b016efded8f0173e0..fcc11ca2136df3b57aa64247a5d0452afa87e0c0 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -120,6 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG echo "fqdn ${HOSTNAME}" >> $TAOS_CFG echo "serverPort ${NODE}" >> $TAOS_CFG +echo "supportVnodes 16" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG diff --git a/tests/script/tmp/dnodes.sim b/tests/script/tmp/dnodes.sim index f13f6026f9ad886ba7dcd31e4de31ab92ef2936d..a3f9a0c17385773698e2d8ab1cadbf56c5cf343d 100644 --- a/tests/script/tmp/dnodes.sim +++ b/tests/script/tmp/dnodes.sim @@ -1,8 +1,5 @@ -system sh/stop_dnodes.sh - - ############## config parameter ##################### -$node1 = 192.168.0.201 +$node1 = 192.168.0.201 $node2 = 192.168.0.202 $node3 = 192.168.0.203 $node4 = 192.168.0.204 @@ -10,54 +7,75 @@ $node4 = 192.168.0.204 $self = $node1 $num = 25 -############### deploy firstEp ##################### +#deploy = 0, start = 1, stop = 2 +$option = 0 +print =============== option:$option + + +############### stop dnodes ##################### +if $option == 0 then + system sh/stop_dnodes.sh +endi + +############### process firstEp ##################### $firstEp = $node1 . :7100 $firstPort = 7100 if $self == $node1 then - system sh/deploy.sh -n dnode1 -i 1 - system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp - system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp - system sh/cfg.sh -n dnode1 -c fqdn -v $node1 - system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort - - system sh/exec.sh -n dnode1 -s start - sql connect - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node1 port $port - endw - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node2 port $port - endw - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node3 port $port - endw - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node4 port $port - endw + if $option == 1 then + system sh/exec.sh -n dnode1 -s start + endi + + if $option == 2 then + system sh/exec.sh -n dnode1 -s stop -x SIGINT + endi + + if $option == 0 then + system sh/deploy.sh -n dnode1 -i 1 + system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp + system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp + system sh/cfg.sh -n dnode1 -c fqdn -v $node1 + system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort + system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + + system sh/exec.sh -n dnode1 -s start + sql connect + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node1 port $port + endw + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node2 port $port + endw + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node3 port $port + endw + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node4 port $port + endw + endi endi -############### deploy nodes ##################### +############### process nodes ##################### $i = 0 while $i < $num @@ -67,11 +85,21 @@ while $i < $num $dnodename = dnode . $index $i = $i + 1 - system sh/deploy.sh -n $dnodename -i 1 - system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp - system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp - system sh/cfg.sh -n $dnodename -c fqdn -v $self - system sh/cfg.sh -n $dnodename -c serverPort -v $port + if $option == 1 then + system sh/exec.sh -n $dnodename -s start + endi + + if $option == 2 then + system sh/exec.sh -n $dnodename -s stop -x SIGINT + endi + + if $option == 0 then + system sh/deploy.sh -n $dnodename -i 1 + system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp + system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp + system sh/cfg.sh -n $dnodename -c fqdn -v $self + system sh/cfg.sh -n $dnodename -c serverPort -v $port - system sh/exec.sh -n $dnodename -s start + system sh/exec.sh -n $dnodename -s start + endi endw