diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 42956b6bdde2b27f3921817b90a1cc871ca0a8dd..e8a56b77c95dacc3c457f84e1b5c2784e8c1043e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -30,11 +30,12 @@ extern char tsLocalEp[]; extern uint16_t tsServerPort; extern int32_t tsStatusInterval; extern int8_t tsEnableTelemetryReporting; +extern int32_t tsNumOfSupportVnodes; // common extern int tsRpcTimer; extern int tsRpcMaxTime; -extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled +extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled extern int32_t tsMaxConnections; extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; @@ -48,14 +49,18 @@ extern int32_t tsCompressMsgSize; extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; -extern int64_t tsMaxVnodeQueuedBytes; -extern int tsCompatibleModel; // 2.0 compatible model - -//query buffer management -extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing -extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node during query processing -extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked -extern int8_t tsKeepOriginalColumnName; +extern int tsCompatibleModel; // 2.0 compatible model +extern int8_t tsEnableSlaveQuery; +extern int8_t tsEnableAdjustMaster; +extern int8_t tsPrintAuth; +extern int64_t tsTickPerDay[3]; + +// query buffer management +extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing +extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node +extern int32_t tsRetrieveBlockingModel; // retrieve threads will be blocked +extern int8_t tsKeepOriginalColumnName; +extern int8_t tsDeadLockKillQuery; // client extern int32_t tsMaxSQLStringLen; @@ -72,27 +77,17 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the extern int32_t tsProjectExecInterval; extern int64_t tsMaxRetentWindow; -// balance -extern int8_t tsEnableSlaveQuery; - - -// interna -extern int8_t tsPrintAuth; -extern char tsVnodeDir[]; -extern char tsMnodeDir[]; -extern int64_t tsTickPerDay[3]; - // system info -extern float tsTotalLogDirGB; -extern float tsTotalTmpDirGB; -extern float tsTotalDataDirGB; -extern float tsAvailLogDirGB; -extern float tsAvailTmpDirectorySpace; -extern float tsAvailDataDirGB; -extern float tsUsedDataDirGB; -extern float tsMinimalLogDirGB; -extern float tsReservedTmpDirectorySpace; -extern float tsMinimalDataDirGB; +extern float tsTotalLogDirGB; +extern float tsTotalTmpDirGB; +extern float tsTotalDataDirGB; +extern float tsAvailLogDirGB; +extern float tsAvailTmpDirectorySpace; +extern float tsAvailDataDirGB; +extern float tsUsedDataDirGB; +extern float tsMinimalLogDirGB; +extern float tsReservedTmpDirectorySpace; +extern float tsMinimalDataDirGB; extern uint32_t tsVersion; // build info @@ -102,17 +97,13 @@ extern char gitinfo[]; extern char gitinfoOfInternal[]; extern char buildinfo[]; -#ifdef TD_TSZ -// lossy -extern char lossyColumns[]; -extern double fPrecision; -extern double dPrecision; -extern uint32_t maxRange; -extern uint32_t curRange; -extern char Compressor[]; -#endif -// long query -extern int8_t tsDeadLockKillQuery; +// lossy +extern char tsLossyColumns[]; +extern double tsFPrecision; +extern double tsDPrecision; +extern uint32_t tsMaxRange; +extern uint32_t tsCurRange; +extern char tsCompressor[]; typedef struct { char dir[TSDB_FILENAME_LEN]; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ab18554b2794ce5e5db89d0ec9da8e5cd9062a75..884379eabdfdea5eb507c212417823a4c3e1cd05 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -308,19 +308,7 @@ typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t suid; - int32_t sverson; - uint32_t ttl; - uint32_t keep; - int32_t numOfTags; - int32_t numOfColumns; - SSchema pSchema[]; -} SCreateStbInternalMsg; - -typedef struct { - SMsgHead head; - char name[TSDB_TABLE_FNAME_LEN]; - uint64_t suid; -} SDropStbInternalMsg; +} SVDropStbReq; typedef struct { SMsgHead head; @@ -706,8 +694,8 @@ typedef struct { int64_t clusterId; int64_t rebootTime; int64_t updateTime; - int16_t numOfCores; - int16_t numOfSupportVnodes; + int32_t numOfCores; + int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SVnodeLoads vnodeLoads; @@ -898,17 +886,13 @@ typedef struct { typedef struct { int32_t dnodeId; -} SCreateMnodeMsg, SDropMnodeMsg; +} SMCreateMnodeMsg, SMDropMnodeMsg, SDDropMnodeMsg; typedef struct { int32_t dnodeId; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; -} SCreateMnodeInMsg, SAlterMnodeInMsg; - -typedef struct { - int32_t dnodeId; -} SDropMnodeInMsg; +} SDCreateMnodeMsg, SDAlterMnodeMsg; typedef struct { int32_t dnodeId; @@ -1251,13 +1235,13 @@ typedef struct { char* executor; int32_t sqlLen; char* sql; -} SCreateTopicInternalMsg; +} SDCreateTopicMsg; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t tuid; -} SDropTopicInternalMsg; +} SDDropTopicMsg; typedef struct SVCreateTbReq { uint64_t ver; // use a general definition @@ -1300,13 +1284,13 @@ typedef struct SVShowTablesReq { } SVShowTablesReq; typedef struct SVShowTablesRsp { - int64_t id; + int32_t id; STableMetaMsg metaInfo; } SVShowTablesRsp; typedef struct SVShowTablesFetchReq { SMsgHead head; - int64_t id; + int32_t id; } SVShowTablesFetchReq; typedef struct SVShowTablesFetchRsp { diff --git a/include/common/tname.h b/include/common/tname.h index 3ac7f8b27b40178e73f8f9726ed45a8d27d9f952..11d97dac06d7240664f48626cd9d58ad6c555afa 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -17,6 +17,7 @@ #define TDENGINE_TNAME_H #include "tdef.h" +#include "tmsg.h" #define TSDB_DB_NAME_T 1 #define TSDB_TABLE_NAME_T 2 @@ -58,4 +59,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type); int32_t tNameSetAcctId(SName* dst, int32_t acctId); +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); + #endif // TDENGINE_TNAME_H diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index e12ce0942281f4c55a76a2ce3aa007b5fd888b1d..2e3863c3a190db25c5461a0a2afcdb13dac24028 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -27,8 +27,8 @@ typedef struct SDnode SDnode; typedef struct { int32_t sver; - int16_t numOfCores; - int16_t numOfSupportVnodes; + int32_t numOfCores; + int32_t numOfSupportVnodes; int16_t numOfCommitThreads; int8_t enableTelem; int32_t statusInterval; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index dd3d92866fbf1a1864d37db32a00576e39693b0f..a9e1f26d2014225a5a8481b497906f1d1f34aab4 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -74,7 +74,6 @@ int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo); STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex); SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex); -SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name); int32_t getNewResColId(); void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn); diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 83047a44de3e210f611ff8d83426758a79384f9d..9897467230675dae1b4b3e7909acd748504dcec7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -54,8 +54,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -void qWorkerDestroy(void **qWorkerMgmt); +int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); + +int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +void qWorkerDestroy(void **qWorkerMgmt); #ifdef __cplusplus } diff --git a/include/os/osEnv.h b/include/os/osEnv.h index b857670214699c52975fff1020e548f6b40242a6..a7fd86776c06ea6f399f6094ac52a78c0345a7e5 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -21,7 +21,6 @@ extern "C" { #endif extern char tsOsName[]; -extern char tsDnodeDir[]; extern char tsDataDir[]; extern char tsLogDir[]; extern char tsScriptDir[]; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 021f6502544aeb3c09e284b8b5c9205a92be8dab..706e838b948b0b92d8efce73a7809c66c0c181c3 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -121,6 +121,7 @@ typedef struct SRequestObj { char *msgBuf; void *pInfo; // sql parse info, generated by parser module int32_t code; + uint64_t affectedRows; SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index db1ea435f1f0dec72fecb024a2649cb7f1ae420f..960ba95324ca90d96fc9af6f1106acdc2c37737d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -18,6 +18,7 @@ #include "clientInt.h" #include "clientLog.h" #include "query.h" +#include "scheduler.h" #include "tmsg.h" #include "tcache.h" #include "tconfig.h" @@ -230,6 +231,8 @@ void taos_init_imp(void) { SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; catalogInit(&cfg); + SSchedulerCfg scfg = {.maxJobNum = 100}; + schedulerInit(&scfg); tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp); taosSetCoreDump(true); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3d295830c75f17adb86bf5d64093be77be5a6b4a..89c31092b43c877a2972f746c45b8fb11c2d825d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -196,7 +196,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { return TSDB_CODE_SUCCESS; } +int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { + pRequest->type = pQuery->type; + return qCreateQueryDag(pQuery, pDag); +} + int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { + if (TSDB_SQL_INSERT == pRequest->type) { + return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); + } return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } @@ -283,7 +291,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { if (qIsDdlQuery(pQuery)) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); } else { - CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); } @@ -490,7 +498,13 @@ void* doFetchRow(SRequestObj* pRequest) { SReqResultInfo* pResultInfo = &pRequest->body.resInfo; if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { - pRequest->type = TDMT_MND_SHOW_RETRIEVE; + if (pRequest->type == TDMT_MND_SHOW) { + pRequest->type = TDMT_MND_SHOW_RETRIEVE; + } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { + pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; + } else { + // do nothing + } SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f7cf6610197f60e476c11ea7cbd87b7f602e1e02..6575102f81bde4e8bbed632513cbabfaa2a61734 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -18,23 +18,26 @@ #include "tname.h" #include "clientInt.h" #include "clientLog.h" -#include "trpc.h" int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); +static void setErrno(SRequestObj* pRequest, int32_t code) { + pRequest->code = code; + terrno = code; +} + int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; - pRequest->code = code; + setErrno(pRequest, code); + sem_post(&pRequest->body.rspSem); - return 0; + return code; } int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; - terrno = code; - + setErrno(pRequest, code); sem_post(&pRequest->body.rspSem); return code; } @@ -74,46 +77,48 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) { - pMsgSendInfo->msgType = TDMT_MND_SHOW_RETRIEVE; - pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg); - pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->param = pRequest; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; - - SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); - if (pRetrieveMsg == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - pRetrieveMsg->showId = htonl(pRequest->body.execId); - pMsgSendInfo->msgInfo.pData = pRetrieveMsg; - return TSDB_CODE_SUCCESS; -} - SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); - if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { - buildRetrieveMnodeMsg(pRequest, pMsgSendInfo); + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->param = pRequest; + pMsgSendInfo->msgType = pRequest->type; + + if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { + if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { + SRetrieveTableMsg* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); + if (pRetrieveMsg == NULL) { + return NULL; + } + + pRetrieveMsg->showId = htonl(pRequest->body.execId); + pMsgSendInfo->msgInfo.pData = pRetrieveMsg; + pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg); + } else { + SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); + if (pFetchMsg == NULL) { + return NULL; + } + + pFetchMsg->id = htonl(pRequest->body.execId); + pFetchMsg->head.vgId = htonl(13); + pMsgSendInfo->msgInfo.pData = pFetchMsg; + pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq); + } } else { assert(pRequest != NULL); - pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->msgInfo = pRequest->body.requestMsg; - pMsgSendInfo->msgType = pRequest->type; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->param = pRequest; - - pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; } + pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; return pMsgSendInfo; } int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; + setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); return code; } @@ -128,6 +133,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { pMetaMsg->tuid = htobe64(pMetaMsg->tuid); for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { pSchema->bytes = htonl(pSchema->bytes); + pSchema->colId = htonl(pSchema->colId); pSchema++; } @@ -154,22 +160,25 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) { - assert(pMsg->len >= sizeof(SRetrieveTableRsp)); + SRequestObj *pRequest = param; + SReqResultInfo *pResInfo = &pRequest->body.resInfo; + tfree(pResInfo->pRspMsg); - SRequestObj* pRequest = param; -// tfree(pRequest->body.resInfo.pRspMsg); -// pRequest->body.resInfo.pRspMsg = pMsg->pData; + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + tsem_post(&pRequest->body.rspSem); + return code; + } + + assert(pMsg->len >= sizeof(SRetrieveTableRsp)); SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData; pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->precision = htons(pRetrieve->precision); - SReqResultInfo* pResInfo = &pRequest->body.resInfo; - - tfree(pResInfo->pRspMsg); pResInfo->pRspMsg = pMsg->pData; pResInfo->numOfRows = pRetrieve->numOfRows; - pResInfo->pData = pRetrieve->data; // todo fix this in async model + pResInfo->pData = pRetrieve->data; pResInfo->current = 0; setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); @@ -181,6 +190,40 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) return 0; } +int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) { + assert(pMsg->len >= sizeof(SRetrieveTableRsp)); + + SRequestObj* pRequest = param; + tfree(pRequest->body.resInfo.pRspMsg); + + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + tsem_post(&pRequest->body.rspSem); + return code; + } + + pRequest->body.resInfo.pRspMsg = pMsg->pData; + + SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData; + pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows); + pFetchRsp->precision = htons(pFetchRsp->precision); + + SReqResultInfo* pResInfo = &pRequest->body.resInfo; + + pResInfo->pRspMsg = pMsg->pData; + pResInfo->numOfRows = pFetchRsp->numOfRows; + pResInfo->pData = pFetchRsp->data; + + pResInfo->current = 0; + setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); + + tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows, + pFetchRsp->completed, pRequest->body.execId); + + tsem_post(&pRequest->body.rspSem); + return 0; +} + int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo rsp with the vnode id list SRequestObj* pRequest = param; @@ -191,34 +234,48 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; + setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); return code; } - SUseDbRsp* pUseDbRsp = (SUseDbRsp*)pMsg->pData; - SName name = {0}; - tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT | T_NAME_DB); + SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData; + SName name = {0}; + tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB); char db[TSDB_DB_NAME_LEN] = {0}; tNameGetDbName(&name, db); setConnectionDB(pRequest->pTscObj, db); - tsem_post(&pRequest->body.rspSem); return 0; } int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { - assert(pMsg != NULL); + assert(pMsg != NULL && param != NULL); SRequestObj* pRequest = param; + + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + tsem_post(&pRequest->body.rspSem); + return code; + } + tsem_post(&pRequest->body.rspSem); + return code; } int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo: Remove cache in catalog cache. SRequestObj* pRequest = param; + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + tsem_post(&pRequest->body.rspSem); + return code; + } + tsem_post(&pRequest->body.rspSem); + return code; } void initMsgHandleFp() { @@ -304,4 +361,7 @@ void initMsgHandleFp() { handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; + + handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp; } \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e88e7411bfa3562b6ad182d8ba6bed21fc24ebae..be6e048378572342f010b0b806771cd34197c978 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -227,25 +227,25 @@ TEST(testCase, use_db_test) { } TEST(testCase, drop_db_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - showDB(pConn); - - TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - showDB(pConn); - - pRes = taos_query(pConn, "create database abc1"); - if (taos_errno(pRes) != 0) { - printf("create to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - taos_close(pConn); +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// showDB(pConn); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// showDB(pConn); +// +// pRes = taos_query(pConn, "create database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// taos_close(pConn); } TEST(testCase, create_stable_Test) { @@ -302,12 +302,12 @@ TEST(testCase, create_ctable_Test) { } taos_free_result(pRes); - pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); +// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); taos_close(pConn); } @@ -443,7 +443,23 @@ TEST(testCase, show_table_Test) { taos_free_result(pRes); pRes = taos_query(pConn, "show tables"); - taos_free_result(pRes); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); taos_close(pConn); } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 910d05e9a3eec9518f76891240cd8f5af22e4c6d..9ddadc9ba60c3cc7f9284ab68d6b71fb12f3824a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -15,17 +15,18 @@ #define _DEFAULT_SOURCE #include "os.h" + #include "taosdef.h" #include "taoserror.h" -#include "ulog.h" -#include "tlog.h" +#include "tcompare.h" #include "tconfig.h" +#include "tep.h" #include "tglobal.h" -#include "tcompare.h" -#include "tutil.h" -#include "ttimezone.h" #include "tlocale.h" -#include "tep.h" +#include "tlog.h" +#include "ttimezone.h" +#include "tutil.h" +#include "ulog.h" // cluster char tsFirst[TSDB_EP_LEN] = {0}; @@ -36,22 +37,24 @@ uint16_t tsServerPort = 6030; int32_t tsStatusInterval = 1; // second int8_t tsEnableTelemetryReporting = 0; char tsEmail[TSDB_FQDN_LEN] = {0}; +int32_t tsNumOfSupportVnodes = 16; // common -int32_t tsRpcTimer = 300; -int32_t tsRpcMaxTime = 600; // seconds; -int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default -int32_t tsMaxShellConns = 50000; +int32_t tsRpcTimer = 300; +int32_t tsRpcMaxTime = 600; // seconds; +int32_t tsRpcForceTcp = 1; // disable this, means query, show command use udp protocol as default +int32_t tsMaxShellConns = 50000; int32_t tsMaxConnections = 5000; -int32_t tsShellActivityTimer = 3; // second +int32_t tsShellActivityTimer = 3; // second float tsNumOfThreadsPerCore = 1.0f; int32_t tsNumOfCommitThreads = 4; float tsRatioOfQueryCores = 1.0f; -int8_t tsDaylight = 0; +int8_t tsDaylight = 0; int8_t tsEnableCoreFile = 0; int32_t tsMaxBinaryDisplayWidth = 30; -int64_t tsMaxVnodeQueuedBytes = 1024*1024*1024; //1GB - +int8_t tsEnableSlaveQuery = 1; +int8_t tsEnableAdjustMaster = 1; +int8_t tsPrintAuth = 0; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, * metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server. @@ -79,7 +82,7 @@ int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN; int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN; int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN; -int8_t tsTscEnableRecordSql = 0; +int8_t tsTscEnableRecordSql = 0; // the maximum number of results for projection query on super table that are returned from // one virtual node, to order according to timestamp @@ -89,7 +92,7 @@ int32_t tsMaxNumOfOrderedResults = 100000; int32_t tsMinSlidingTime = 10; // the maxinum number of distict query result -int32_t tsMaxNumOfDistinctResults = 1000 * 10000; +int32_t tsMaxNumOfDistinctResults = 1000 * 10000; // 1 us for interval time range, changed accordingly int32_t tsMinIntervalTime = 1; @@ -101,7 +104,7 @@ int32_t tsMaxStreamComputDelay = 20000; int32_t tsStreamCompStartDelay = 10000; // the stream computing delay time after executing failed, change accordingly -int32_t tsRetryStreamCompDelay = 10*1000; +int32_t tsRetryStreamCompDelay = 10 * 1000; // The delayed computing ration. 10% of the whole computing time window by default. float tsStreamComputDelayRatio = 0.1f; @@ -120,30 +123,16 @@ int64_t tsQueryBufferSizeBytes = -1; int32_t tsRetrieveBlockingModel = 0; // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name -int8_t tsKeepOriginalColumnName = 0; +int8_t tsKeepOriginalColumnName = 0; + +// long query death-lock +int8_t tsDeadLockKillQuery = 0; -// tsdb config +// tsdb config // For backward compatibility bool tsdbForceKeepFile = false; -// balance -int8_t tsEnableFlowCtrl = 1; -int8_t tsEnableSlaveQuery = 1; -int8_t tsEnableAdjustMaster = 1; - - -// monitor -char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; -char tsInternalPass[] = "secretkey"; - -// internal -int8_t tsCompactMnodeWal = 0; -int8_t tsPrintAuth = 0; -char tsVnodeDir[PATH_MAX] = {0}; -char tsDnodeDir[PATH_MAX] = {0}; -char tsMnodeDir[PATH_MAX] = {0}; - -int32_t tsDiskCfgNum = 0; +int32_t tsDiskCfgNum = 0; #ifndef _STORAGE SDiskCfg tsDiskCfg[1]; @@ -160,31 +149,28 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS]; int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L}; // system info -float tsTotalTmpDirGB = 0; -float tsTotalDataDirGB = 0; -float tsAvailTmpDirectorySpace = 0; -float tsAvailDataDirGB = 0; -float tsUsedDataDirGB = 0; -float tsReservedTmpDirectorySpace = 1.0f; -float tsMinimalDataDirGB = 2.0f; -int32_t tsTotalMemoryMB = 0; +float tsTotalTmpDirGB = 0; +float tsTotalDataDirGB = 0; +float tsAvailTmpDirectorySpace = 0; +float tsAvailDataDirGB = 0; +float tsUsedDataDirGB = 0; +float tsReservedTmpDirectorySpace = 1.0f; +float tsMinimalDataDirGB = 2.0f; +int32_t tsTotalMemoryMB = 0; uint32_t tsVersion = 0; -#ifdef TD_TSZ // // lossy compress 6 // -char lossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty can close lossy compress. -// below option can take effect when tsLossyColumns not empty -double fPrecision = 1E-8; // float column precision -double dPrecision = 1E-16; // double column precision -uint32_t maxRange = 500; // max range -uint32_t curRange = 100; // range -char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR -#endif +char tsLossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty + // can close lossy compress. +// below option can take effect when tsLossyColumns not empty +double tsFPrecision = 1E-8; // float column precision +double tsDPrecision = 1E-16; // double column precision +uint32_t tsMaxRange = 500; // max range +uint32_t tsCurRange = 100; // range +char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR -// long query death-lock -int8_t tsDeadLockKillQuery = 0; int32_t (*monStartSystemFp)() = NULL; void (*monStopSystemFp)() = NULL; @@ -195,13 +181,12 @@ char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"}; static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT; void taosSetAllDebugFlag() { - if (debugFlag != 0) { + if (debugFlag != 0) { mDebugFlag = debugFlag; dDebugFlag = debugFlag; vDebugFlag = debugFlag; jniDebugFlag = debugFlag; - odbcDebugFlag = debugFlag; - qDebugFlag = debugFlag; + qDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; sDebugFlag = debugFlag; @@ -213,12 +198,12 @@ void taosSetAllDebugFlag() { } int32_t taosCfgDynamicOptions(char *msg) { - char *option, *value; - int32_t olen, vlen; - int32_t vint = 0; + char *option, *value; + int32_t olen, vlen; + int32_t vint = 0; paGetToken(msg, &option, &olen); - if (olen == 0) return -1;; + if (olen == 0) return -1; paGetToken(option + olen + 1, &value, &vlen); if (vlen == 0) @@ -231,9 +216,9 @@ int32_t taosCfgDynamicOptions(char *msg) { for (int32_t i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - //if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; + // if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; if (cfg->valType != TAOS_CFG_VTYPE_INT32 && cfg->valType != TAOS_CFG_VTYPE_INT8) continue; - + int32_t cfgLen = (int32_t)strlen(cfg->option); if (cfgLen != olen) continue; if (strncasecmp(option, cfg->option, olen) != 0) continue; @@ -262,7 +247,7 @@ int32_t taosCfgDynamicOptions(char *msg) { return 0; } if (strncasecmp(cfg->option, "debugFlag", olen) == 0) { - taosSetAllDebugFlag(); + taosSetAllDebugFlag(); } return 0; } @@ -323,7 +308,7 @@ static void doInitGlobalConfig(void) { srand(taosSafeRand()); SGlobalCfg cfg = {0}; - + // ip address cfg.option = "firstEp"; cfg.ptr = tsFirst; @@ -366,6 +351,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); + cfg.option = "supportVnodes"; + cfg.ptr = &tsNumOfSupportVnodes; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 65536; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosAddConfigOption(cfg); + // directory cfg.option = "configDir"; cfg.ptr = configDir; @@ -442,8 +437,8 @@ static void doInitGlobalConfig(void) { cfg.ptr = &tsMaxNumOfDistinctResults; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; - cfg.minValue = 10*10000; - cfg.maxValue = 10000*10000; + cfg.minValue = 10 * 10000; + cfg.maxValue = 10000 * 10000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); @@ -749,17 +744,6 @@ static void doInitGlobalConfig(void) { cfg.maxValue = 10000000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_GB; - taosAddConfigOption(cfg); - - // module configs - cfg.option = "flowctrl"; - cfg.ptr = &tsEnableFlowCtrl; - cfg.valType = TAOS_CFG_VTYPE_INT8; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; - cfg.minValue = 0; - cfg.maxValue = 1; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); cfg.option = "slaveQuery"; @@ -893,16 +877,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); - cfg.option = "odbcDebugFlag"; - cfg.ptr = &odbcDebugFlag; - cfg.valType = TAOS_CFG_VTYPE_INT32; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; - cfg.minValue = 0; - cfg.maxValue = 255; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosAddConfigOption(cfg); - cfg.option = "uDebugFlag"; cfg.ptr = &uDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; @@ -1034,7 +1008,7 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); - // enable kill long query + // enable kill long query cfg.option = "deadLockKillQuery"; cfg.ptr = &tsDeadLockKillQuery; cfg.valType = TAOS_CFG_VTYPE_INT8; @@ -1066,7 +1040,6 @@ static void doInitGlobalConfig(void) { cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosAddConfigOption(cfg); cfg.option = "dPrecision"; @@ -1100,23 +1073,20 @@ static void doInitGlobalConfig(void) { taosAddConfigOption(cfg); assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); #else - //assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); + // assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); #endif - } -void taosInitGlobalCfg() { - pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); -} +void taosInitGlobalCfg() { pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); } int32_t taosCheckAndPrintCfg() { - char fqdn[TSDB_FQDN_LEN]; + char fqdn[TSDB_FQDN_LEN]; uint16_t port; if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) { taosSetAllDebugFlag(); } - + if (tsLocalFqdn[0] == 0) { taosGetFqdn(tsLocalFqdn); } @@ -1143,7 +1113,7 @@ int32_t taosCheckAndPrintCfg() { if (taosDirExist(tsTempDir) != 0) { return -1; } - + taosGetSystemInfo(); tsSetLocale(); diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 88a5ebb7f6eda28ddfb0b65ae7e5dc7030fada9f..f8ef9f097951c346fd2147c19b70fb20dc3ab920 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -259,3 +259,13 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return 0; } + +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { + SSchema s = {0}; + s.type = type; + s.bytes = bytes; + s.colId = colId; + + tstrncpy(s.name, name, tListLen(s.name)); + return s; +} \ No newline at end of file diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 70dca0e4dfebbf5dab16e4ac6d00a9239724c51b..8161b8d125ebb9f9fec4762f0327e2adc3a5449f 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -139,7 +139,7 @@ void dmnWaitSignal() { void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = 30000000; //3.0.0.0 pOption->numOfCores = tsNumOfCores; - pOption->numOfSupportVnodes = 16; + pOption->numOfSupportVnodes = tsNumOfSupportVnodes; pOption->numOfCommitThreads = 1; pOption->statusInterval = tsStatusInterval; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 8807579564746a5ad08ebf046be105396e1174af..30b069f349c4e3cb3f33e31bd0d93df42df3fce2 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -370,8 +370,8 @@ void dndSendStatusMsg(SDnode *pDnode) { pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->rebootTime = htobe64(pMgmt->rebootTime); pStatus->updateTime = htobe64(pMgmt->updateTime); - pStatus->numOfCores = htons(pDnode->opt.numOfCores); - pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfSupportVnodes); + pStatus->numOfCores = htonl(pDnode->opt.numOfCores); + pStatus->numOfSupportVnodes = htonl(pDnode->opt.numOfSupportVnodes); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); @@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0) { - dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId); + dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; @@ -440,19 +440,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { } SStatusRsp *pRsp = pMsg->pCont; - SDnodeCfg *pCfg = &pRsp->dnodeCfg; - pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htobe64(pCfg->clusterId); - dndUpdateDnodeCfg(pDnode, pCfg); + if (pMsg->pCont != NULL && pMsg->contLen != 0) { + SDnodeCfg *pCfg = &pRsp->dnodeCfg; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->clusterId = htobe64(pCfg->clusterId); + dndUpdateDnodeCfg(pDnode, pCfg); + + SDnodeEps *pDnodeEps = &pRsp->dnodeEps; + pDnodeEps->num = htonl(pDnodeEps->num); + for (int32_t i = 0; i < pDnodeEps->num; ++i) { + pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); + pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + } - SDnodeEps *pDnodeEps = &pRsp->dnodeEps; - pDnodeEps->num = htonl(pDnodeEps->num); - for (int32_t i = 0; i < pDnodeEps->num; ++i) { - pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); - pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + dndUpdateDnodeEps(pDnode, pDnodeEps); } - - dndUpdateDnodeEps(pDnode, pDnodeEps); pMgmt->statusSent = 0; } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 8fb95c0b754b46d55afd5e3dffafbf63623ce3ea..577cb0c3b07115a1d4f3eb0932430a9671ebaa0d 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -299,7 +299,7 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) { +static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) { dndInitMnodeOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); @@ -417,8 +417,8 @@ static int32_t dndDropMnode(SDnode *pDnode) { return 0; } -static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { - SCreateMnodeInMsg *pMsg = pRpcMsg->pCont; +static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { + SDCreateMnodeMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); for (int32_t i = 0; i < pMsg->replica; ++i) { pMsg->replicas[i].id = htonl(pMsg->replicas[i].id); @@ -429,7 +429,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { } int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); + SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -445,7 +445,7 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); + SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -465,7 +465,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDropMnodeInMsg *pMsg = pRpcMsg->pCont; + SDDropMnodeMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index ba93d1913de75638a438697d635cad78da362b63..e5be16937bb4bc5eb909eba8cccf4f94ac2578c6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; @@ -141,6 +141,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -383,6 +385,7 @@ void dndCleanupTrans(SDnode *pDnode) { void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { STransMgmt *pMgmt = &pDnode->tmgmt; + if (pMgmt->clientRpc == NULL) return; rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL); } diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index ec2c2d9a4453396950fa94635c4d78e6f693882e..54d7e73be6542d8391dfdfddaf5f3f7857a7db1d 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -57,7 +57,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) { CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id"); CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint"); CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes"); - CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "max_vnodes"); + CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "support_vnodes"); CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status"); CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason"); @@ -68,7 +68,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) { CheckInt16(1); CheckBinary("localhost:9041", TSDB_EP_LEN); CheckInt16(0); - CheckInt16(1); + CheckInt16(16); CheckBinary("ready", 10); CheckTimestamp(); CheckBinary("", 24); @@ -112,8 +112,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { CheckBinary("localhost:9042", TSDB_EP_LEN); CheckInt16(0); CheckInt16(0); - CheckInt16(1); - CheckInt16(1); + CheckInt16(16); + CheckInt16(16); CheckBinary("ready", 10); CheckBinary("ready", 10); CheckTimestamp(); @@ -140,7 +140,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { CheckInt16(1); CheckBinary("localhost:9041", TSDB_EP_LEN); CheckInt16(0); - CheckInt16(1); + CheckInt16(16); CheckBinary("ready", 10); CheckTimestamp(); CheckBinary("", 24); @@ -199,10 +199,10 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { CheckInt16(0); CheckInt16(0); CheckInt16(0); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); + CheckInt16(16); + CheckInt16(16); + CheckInt16(16); + CheckInt16(16); CheckBinary("ready", 10); CheckBinary("ready", 10); CheckBinary("ready", 10); @@ -242,10 +242,10 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { CheckInt16(0); CheckInt16(0); CheckInt16(0); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); + CheckInt16(16); + CheckInt16(16); + CheckInt16(16); + CheckInt16(16); CheckBinary("ready", 10); CheckBinary("ready", 10); CheckBinary("ready", 10); diff --git a/source/dnode/mgmt/impl/test/mnode/mnode.cpp b/source/dnode/mgmt/impl/test/mnode/mnode.cpp index a6cec935395f7461556d06a2ab8919800e3b44ed..e9b1ef45bdd419f17aa97b0f60cb43d9837efebe 100644 --- a/source/dnode/mgmt/impl/test/mnode/mnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/mnode.cpp @@ -72,9 +72,9 @@ TEST_F(DndTestMnode, 01_ShowDnode) { TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeMsg); - SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -85,9 +85,9 @@ TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeMsg); - SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -117,9 +117,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { { // create mnode - int32_t contLen = sizeof(SCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeMsg); - SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -144,9 +144,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { { // drop mnode - int32_t contLen = sizeof(SDropMnodeMsg); + int32_t contLen = sizeof(SMDropMnodeMsg); - SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen); + SMDropMnodeMsg* pReq = (SMDropMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_MNODE, pReq, contLen); diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index ca168fb6d735e364bac63d457634f0670904422c..dca0f4851658caa57ec264a9ce9c896e5e44fc45 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -146,8 +146,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { pSchema->bytes = htonl(pSchema->bytes); } - EXPECT_STREQ(pRsp->tbFname, ""); - EXPECT_STREQ(pRsp->stbFname, "1.d1.stb"); + EXPECT_STREQ(pRsp->tbFname, "1.d1.stb"); + EXPECT_STREQ(pRsp->stbFname, ""); EXPECT_EQ(pRsp->numOfColumns, 2); EXPECT_EQ(pRsp->numOfTags, 3); EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); diff --git a/source/dnode/mgmt/impl/test/sut/src/base.cpp b/source/dnode/mgmt/impl/test/sut/src/base.cpp index 98371e989397c56e0fea907f5d7752ab4703e096..e14dc94d31bb126a286780c79d19cfb42c37e114 100644 --- a/source/dnode/mgmt/impl/test/sut/src/base.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/base.cpp @@ -24,7 +24,6 @@ void Testbase::InitLog(const char* path) { tmrDebugFlag = 0; uDebugFlag = 143; rpcDebugFlag = 0; - odbcDebugFlag = 0; qDebugFlag = 0; wDebugFlag = 0; sDebugFlag = 0; diff --git a/source/dnode/mgmt/impl/test/sut/src/server.cpp b/source/dnode/mgmt/impl/test/sut/src/server.cpp index f29b2fad1d538a5ffe18782618e1d1581f712e4d..8ac5f6214430a1e46f757f755e97b2fca28c4119 100644 --- a/source/dnode/mgmt/impl/test/sut/src/server.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/server.cpp @@ -26,7 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p SDnodeOpt option = {0}; option.sver = 1; option.numOfCores = 1; - option.numOfSupportVnodes = 1; + option.numOfSupportVnodes = 16; option.numOfCommitThreads = 1; option.statusInterval = 1; option.numOfThreadsPerCore = 1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index b93fc0951c6fc9391592d9921b6461b1bbb7d49c..338024c20abef97d785bd6faaa5956e0d46cc1d5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -74,13 +74,6 @@ typedef enum { typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; -typedef enum { - DND_STATUS_OFFLINE = 0, - DND_STATUS_READY = 1, - DND_STATUS_CREATING = 2, - DND_STATUS_DROPPING = 3 -} EDndStatus; - typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, @@ -125,9 +118,8 @@ typedef struct { int64_t lastAccessTime; int32_t accessTimes; int16_t numOfVnodes; - int16_t numOfSupportVnodes; - int16_t numOfCores; - EDndStatus status; + int32_t numOfSupportVnodes; + int32_t numOfCores; EDndReason offlineReason; uint16_t port; char fqdn[TSDB_FQDN_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index 764dfbffc150a6d09cb9162264a079a823792368..c76186c0a29bc6a0c78458afbf6781a77cd81cb3 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); int32_t mndGetDnodeSize(SMnode *pMnode); -bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode); +bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 43b458a52ad3965b3557adcbdd067346f796ae83..ca20ba61a10e2b499aab65e3a25829a2f7419e22 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -40,8 +40,6 @@ static const char *offlineReason[] = { "unknown", }; -static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"}; - static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); @@ -208,10 +206,12 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { return sdbGetSize(pSdb, SDB_DNODE); } -bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) { - int64_t ms = taosGetTimestampMs(); - int64_t interval = ABS(pDnode->lastAccessTime - ms); +bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { + int64_t interval = ABS(pDnode->lastAccessTime - curMs); if (interval > 3500 * pMnode->cfg.statusInterval) { + if (pDnode->rebootTime > 0) { + pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; + } return false; } return true; @@ -278,8 +278,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->clusterId = htobe64(pStatus->clusterId); pStatus->rebootTime = htobe64(pStatus->rebootTime); pStatus->updateTime = htobe64(pStatus->updateTime); - pStatus->numOfCores = htons(pStatus->numOfCores); - pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes); + pStatus->numOfCores = htonl(pStatus->numOfCores); + pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes); pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); } @@ -287,96 +287,99 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + SDnodeObj *pDnode = NULL; + int32_t code = -1; + mndParseStatusMsg(pStatus); - SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); if (pDnode == NULL) { mDebug("dnode:%s, not created yet", pStatus->dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; + goto PROCESS_STATUS_MSG_OVER; } } else { pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); if (pDnode == NULL) { pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; } mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); - mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; + goto PROCESS_STATUS_MSG_OVER; } } - if (pStatus->sver != pMnode->cfg.sver) { - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { - pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; - } - mndReleaseDnode(pMnode, pDnode); - mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); - terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; - return -1; - } + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime); - if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); - } else { - if (pStatus->clusterId != pMnode->clusterId) { - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { - pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + if (needCheckCfg) { + if (pStatus->sver != pMnode->cfg.sver) { + if (pDnode != NULL) { + pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, - pMnode->clusterId); - mndReleaseDnode(pMnode, pDnode); - terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; - return -1; + mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); + terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; + goto PROCESS_STATUS_MSG_OVER; + } + + if (pStatus->dnodeId == 0) { + mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); } else { - pDnode->accessTimes++; - mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + if (pStatus->clusterId != pMnode->clusterId) { + if (pDnode != NULL) { + pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + } + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, + pMnode->clusterId); + terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; + goto PROCESS_STATUS_MSG_OVER; + } else { + pDnode->accessTimes++; + mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + } } - } - if (pDnode->status == DND_STATUS_OFFLINE) { // Verify whether the cluster parameters are consistent when status change from offline to ready int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); if (0 != ret) { pDnode->offlineReason = ret; mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); - mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG; - return -1; + goto PROCESS_STATUS_MSG_OVER; } mInfo("dnode:%d, from offline to online", pDnode->id); - } - pDnode->rebootTime = pStatus->rebootTime; - pDnode->numOfCores = pStatus->numOfCores; - pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; - pDnode->lastAccessTime = taosGetTimestampMs(); - pDnode->status = DND_STATUS_READY; + pDnode->rebootTime = pStatus->rebootTime; + pDnode->numOfCores = pStatus->numOfCores; + pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; - int32_t numOfEps = mndGetDnodeSize(pMnode); - int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); - SStatusRsp *pRsp = rpcMallocCont(contLen); - if (pRsp == NULL) { - mndReleaseDnode(pMnode, pDnode); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + int32_t numOfEps = mndGetDnodeSize(pMnode); + int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); + SStatusRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto PROCESS_STATUS_MSG_OVER; + } + + pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); + pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); + mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + + pMsg->contLen = contLen; + pMsg->pCont = pRsp; } - pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); - pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); - mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + pDnode->lastAccessTime = curMs; + code = 0; - pMsg->contLen = contLen; - pMsg->pCont = pRsp; +PROCESS_STATUS_MSG_OVER: mndReleaseDnode(pMnode, pDnode); - - return 0; + return code; } static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) { @@ -638,7 +641,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "max_vnodes"); + strcpy(pSchema[cols].name, "support_vnodes"); pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; @@ -682,10 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i int32_t cols = 0; SDnodeObj *pDnode = NULL; char *pWrite; + int64_t curMs = taosGetTimestampMs(); while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode); if (pShow->pIter == NULL) break; + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); cols = 0; @@ -706,8 +711,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - const char *status = dnodeStatus[pDnode->status]; - STR_TO_VARSTR(pWrite, status); + STR_TO_VARSTR(pWrite, online ? "ready" : "offline"); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -715,11 +719,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pDnode->status == DND_STATUS_READY) { - STR_TO_VARSTR(pWrite, ""); - } else { - STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]); - } + STR_TO_VARSTR(pWrite, online ? "" : offlineReason[pDnode->offlineReason]); cols++; numOfRows++; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 04e77c013667b9632056f32d9a312d6289f21145..b76d72d79c1cba7443e6405717295d38c9671fac 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -251,7 +251,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno void *pIter = NULL; int32_t numOfReplicas = 0; - SCreateMnodeInMsg createMsg = {0}; + SDCreateMnodeMsg createMsg = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); @@ -281,18 +281,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; - SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg)); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg)); pMsg->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.pCont = pMsg; - action.contLen = sizeof(SAlterMnodeInMsg); + action.contLen = sizeof(SDAlterMnodeMsg); action.msgType = TDMT_DND_ALTER_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -309,14 +309,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg)); + SDCreateMnodeMsg *pMsg = malloc(sizeof(SDCreateMnodeMsg)); if (pMsg == NULL) return -1; - memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg)); pMsg->dnodeId = htonl(pObj->id); action.epSet = mndGetDnodeEpset(pDnode); action.pCont = pMsg; - action.contLen = sizeof(SCreateMnodeInMsg); + action.contLen = sizeof(SDCreateMnodeMsg); action.msgType = TDMT_DND_CREATE_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -327,7 +327,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeMsg *pCreate) { SMnodeObj mnodeObj = {0}; mnodeObj.id = pDnode->id; mnodeObj.createdTime = taosGetTimestampMs(); @@ -370,7 +370,7 @@ CREATE_MNODE_OVER: static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; + SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; pCreate->dnodeId = htonl(pCreate->dnodeId); @@ -423,7 +423,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode void *pIter = NULL; int32_t numOfReplicas = 0; - SAlterMnodeInMsg alterMsg = {0}; + SDAlterMnodeMsg alterMsg = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); @@ -449,18 +449,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode if (pMObj->id != pObj->id) { STransAction action = {0}; - SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg)); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg)); + memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeMsg)); pMsg->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.pCont = pMsg; - action.contLen = sizeof(SAlterMnodeInMsg); + action.contLen = sizeof(SDAlterMnodeMsg); action.msgType = TDMT_DND_ALTER_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -478,7 +478,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg)); + SDDropMnodeMsg *pMsg = malloc(sizeof(SDDropMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -487,7 +487,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode action.epSet = mndGetDnodeEpset(pDnode); action.pCont = pMsg; - action.contLen = sizeof(SDropMnodeInMsg); + action.contLen = sizeof(SDDropMnodeMsg); action.msgType = TDMT_DND_DROP_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -537,7 +537,7 @@ DROP_MNODE_OVER: static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; + SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); mDebug("mnode:%d, start to drop", pDrop->dnodeId); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index af18c814caad97fbfb836f174e85551e35c179f1..918f43f2bdbfb695bc5fec741171dd0e882c8336 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -201,7 +201,6 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { } static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) { -#if 1 SVCreateTbReq req; void * buf; int bsize; @@ -235,43 +234,12 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb *pContLen = sizeof(SMsgHead) + bsize; return buf; - -#else - int32_t totalCols = pStb->numOfTags + pStb->numOfColumns; - int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg); - - SCreateStbInternalMsg *pCreate = calloc(1, contLen); - if (pCreate == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pCreate->head.contLen = htonl(contLen); - pCreate->head.vgId = htonl(pVgroup->vgId); - memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN); - pCreate->suid = htobe64(pStb->uid); - pCreate->sverson = htonl(pStb->version); - pCreate->ttl = 0; - pCreate->keep = 0; - pCreate->numOfTags = htonl(pStb->numOfTags); - pCreate->numOfColumns = htonl(pStb->numOfColumns); - - memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema)); - for (int32_t t = 0; t < totalCols; ++t) { - SSchema *pSchema = &pCreate->pSchema[t]; - pSchema->bytes = htonl(pSchema->bytes); - pSchema->colId = htonl(pSchema->colId); - } - - *pContLen = contLen; - return pCreate; -#endif } -static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { - int32_t contLen = sizeof(SDropStbInternalMsg); +static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { + int32_t contLen = sizeof(SVDropStbReq); - SDropStbInternalMsg *pDrop = calloc(1, contLen); + SVDropStbReq *pDrop = calloc(1, contLen); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -402,7 +370,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); + SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -413,7 +381,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pMsg; - action.contLen = sizeof(SDropStbInternalMsg); + action.contLen = sizeof(SVDropStbReq); action.msgType = TDMT_VND_DROP_STB; if (mndTransAppendUndoAction(pTrans, &action) != 0) { free(pMsg); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 49c96967e6de28eade6740d542db1de8f4825f49..24e32e07b47c7eb5e4f0f500a7a0b76ce3c0d578 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -162,10 +162,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } -static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { - int32_t contLen = sizeof(SDropTopicInternalMsg); +static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { + int32_t contLen = sizeof(SDDropTopicMsg); - SDropTopicInternalMsg *pDrop = calloc(1, contLen); + SDDropTopicMsg *pDrop = calloc(1, contLen); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 78205d5f6a4e2c701f59a404e320a3095a2db5f8..1b0026cd139ea298d12ec3fa143bf2be3e1b3f8e 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -120,6 +120,9 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId) + if (pVgroup->replica == 1) { + pVgid->role = TAOS_SYNC_STATE_LEADER; + } } SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE) @@ -257,13 +260,14 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) { pDnode->numOfVnodes++; } - bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode); - if (isReady) { + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + if (online) { taosArrayPush(pArray, pDnode); } - mDebug("dnode:%d, numOfVnodes:%d numOfSupportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes, - pDnode->numOfSupportVnodes, isMnode, isReady); + mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes, + pDnode->numOfSupportVnodes, isMnode, online); sdbRelease(pSdb, pDnode); } @@ -330,6 +334,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { uint32_t hashMax = UINT32_MAX; uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups; + if (maxVgId < 2) maxVgId = 2; + for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) { SVgObj *pVgroup = &pVgroups[v]; pVgroup->vgId = maxVgId++; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 170319681a1a285eea21e5abc8edbddfc4defa35..2dde9e03e8346f377eced92876cb60fa32e5cf9b 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -36,12 +36,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); case TDMT_VND_DROP_TASK: return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_SHOW_TABLES: + return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_SHOW_TABLES_FETCH: + return qWorkerProcessShowFetchMsg(pVnode, pVnode->pQuery, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; - break; } - return 0; } static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index bdbc095861b1199d95eea8952d92ba596369691a..6b0e0ed828ff70739a09659b1cf2bba8b6c6e3cb 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -90,7 +90,6 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); pShowMsg->type = pShowInfo->showType; - if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) { SToken* pPattern = &pShowInfo->pattern; if (pPattern->type > 0) { // only show tables support wildcard query diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 7160b13eba2a94a8629f1f1176452a2d0c1fa29a..8a09ea9ed8a22aa2a22778562c846618d7db1868 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -18,7 +18,7 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) { } static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen, - SMsgBuf* pMsgBuf) { + SEpSet* pEpSet, SMsgBuf* pMsgBuf) { const char* msg1 = "invalid name"; const char* msg2 = "wildcard string should be less than %d characters"; const char* msg3 = "database name too long"; @@ -31,57 +31,69 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou * wildcard in like clause in pInfo->pMiscInfo->a[1] */ int16_t showType = pShowInfo->showType; - if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { - SToken* pDbPrefixToken = &pShowInfo->prefix; - if (pDbPrefixToken->type != 0) { - if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long - return buildInvalidOperationMsg(pMsgBuf, msg3); - } + if (showType == TSDB_MGMT_TABLE_TABLE) { + SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); + *pEpSet = pCtx->mgmtEpSet; + + // catalogGetDBVgroupVersion() + pShowReq->head.vgId = htonl(13); + *outputLen = sizeof(SVShowTablesReq); + *output = pShowReq; + } else { + if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { + SToken* pDbPrefixToken = &pShowInfo->prefix; + if (pDbPrefixToken->type != 0) { + if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long + return buildInvalidOperationMsg(pMsgBuf, msg3); + } - if (pDbPrefixToken->n <= 0) { - return buildInvalidOperationMsg(pMsgBuf, msg5); - } + if (pDbPrefixToken->n <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg5); + } - if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + + // int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken); + // if (ret != TSDB_CODE_SUCCESS) { + // return buildInvalidOperationMsg(pMsgBuf, msg1); + // } } - // int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken); - // if (ret != TSDB_CODE_SUCCESS) { - // return buildInvalidOperationMsg(pMsgBuf, msg1); - // } - } + // show table/stable like 'xxxx', set the like pattern for show tables + SToken* pPattern = &pShowInfo->pattern; + if (pPattern->type != 0) { + if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) { + return buildInvalidOperationMsg(pMsgBuf, msg4); + } - // show table/stable like 'xxxx', set the like pattern for show tables - SToken* pPattern = &pShowInfo->pattern; - if (pPattern->type != 0) { - if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) { - return buildInvalidOperationMsg(pMsgBuf, msg4); - } + pPattern->n = strdequote(pPattern->z); + if (pPattern->n <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg6); + } - pPattern->n = strdequote(pPattern->z); - if (pPattern->n <= 0) { - return buildInvalidOperationMsg(pMsgBuf, msg6); + if (pPattern->n > tsMaxWildCardsLen) { + char tmp[64] = {0}; + sprintf(tmp, msg2, tsMaxWildCardsLen); + return buildInvalidOperationMsg(pMsgBuf, tmp); + } + } + } else if (showType == TSDB_MGMT_TABLE_VNODES) { + if (pShowInfo->prefix.type == 0) { + return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep"); } - if (pPattern->n > tsMaxWildCardsLen) { - char tmp[64] = {0}; - sprintf(tmp, msg2, tsMaxWildCardsLen); - return buildInvalidOperationMsg(pMsgBuf, tmp); + if (pShowInfo->prefix.type == TK_STRING) { + pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z); } } - } else if (showType == TSDB_MGMT_TABLE_VNODES) { - if (pShowInfo->prefix.type == 0) { - return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep"); - } - if (pShowInfo->prefix.type == TK_STRING) { - pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z); - } + *pEpSet = pCtx->mgmtEpSet; + *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); + *outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/; } - *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); - *outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/; return TSDB_CODE_SUCCESS; } @@ -608,8 +620,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } case TSDB_SQL_SHOW: { - code = setShowInfo(&pInfo->pMiscInfo->showOpt, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, pMsgBuf); - pDcl->msgType = TDMT_MND_SHOW; + SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; + code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, pMsgBuf); + pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; break; } diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 991bde5ed251079c1095a3ee813520f5dbc5d7fc..66966f75db59f9e01a710b7429312bfc5d911e8a 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -64,64 +64,6 @@ typedef struct SInsertParseContext { SInsertStmtInfo* pOutput; } SInsertParseContext; -static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) { - errno = 0; - *value = strtold(pToken->z, endPtr); - - // not a valid integer number, return error - if ((*endPtr - pToken->z) != pToken->n) { - return TK_ILLEGAL; - } - - return pToken->type; -} - -static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, bool issigned) { - errno = 0; - int32_t ret = 0; - - char* endPtr = NULL; - if (type == TK_FLOAT) { - double v = strtod(z, &endPtr); - if ((errno == ERANGE && v == HUGE_VALF) || isinf(v) || isnan(v)) { - ret = -1; - } else if ((issigned && (v < INT64_MIN || v > INT64_MAX)) || ((!issigned) && (v < 0 || v > UINT64_MAX))) { - ret = -1; - } else { - *value = (int64_t) round(v); - } - - errno = 0; - return ret; - } - - int32_t radix = 10; - if (type == TK_HEX) { - radix = 16; - } else if (type == TK_BIN) { - radix = 2; - } - - // the string may be overflow according to errno - if (!issigned) { - const char *p = z; - while(*p != 0 && *p == ' ') p++; - if (*p != 0 && *p == '-') { return -1;} - - *value = strtoull(z, &endPtr, radix); - } else { - *value = strtoll(z, &endPtr, radix); - } - - // not a valid integer number, return error - if (endPtr - z != n || errno == ERANGE) { - ret = -1; - } - - errno = 0; - return ret; -} - static int32_t skipInsertInto(SInsertParseContext* pCxt) { SToken sToken; NEXT_TOKEN(pCxt->pSql, sToken); @@ -159,10 +101,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { char tableName[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&name, tableName); - SParseBasicCtx* pBasicCtx = &pCxt->pComCxt->ctx; CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); - SVgroupInfo vg; CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); @@ -349,207 +289,6 @@ static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *p return TSDB_CODE_SUCCESS; } -//static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { -// if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && -// type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || -// (pToken->n == 0) || (type == TK_RP)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); -// } -// -// if (IS_NUMERIC_TYPE(type) && pToken->n == 0) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z); -// } -// -// // Remove quotation marks -// if (TK_STRING == type) { -// if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { -// return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z); -// } -// -// // delete escape character: \\, \', \" -// char delim = pToken->z[0]; -// int32_t cnt = 0; -// int32_t j = 0; -// for (uint32_t k = 1; k < pToken->n - 1; ++k) { -// if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) { -// tmpTokenBuf[j] = pToken->z[k + 1]; -// cnt++; -// j++; -// k++; -// continue; -// } -// tmpTokenBuf[j] = pToken->z[k]; -// j++; -// } -// -// tmpTokenBuf[j] = 0; -// pToken->z = tmpTokenBuf; -// pToken->n -= 2 + cnt; -// } -// -// return TSDB_CODE_SUCCESS; -//} - -//static FORCE_INLINE int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) { -// int64_t iv; -// char *endptr = NULL; -// bool isSigned = false; -// -// CHECK_CODE(checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf)); -// -// if (isNullStr(pToken)) { -// if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { -// int64_t tmpVal = 0; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// return func(getNullValue(pSchema->type), 0, param); -// } -// -// switch (pSchema->type) { -// case TSDB_DATA_TYPE_BOOL: { -// if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { -// if (strncmp(pToken->z, "true", pToken->n) == 0) { -// return func(&TRUE_VALUE, pSchema->bytes, param); -// } else if (strncmp(pToken->z, "false", pToken->n) == 0) { -// return func(&FALSE_VALUE, pSchema->bytes, param); -// } else { -// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); -// } -// } else if (pToken->type == TK_INTEGER) { -// return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); -// } else if (pToken->type == TK_FLOAT) { -// return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); -// } else { -// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); -// } -// } -// -// case TSDB_DATA_TYPE_TINYINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z); -// } else if (!IS_VALID_TINYINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z); -// } -// -// uint8_t tmpVal = (uint8_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_UTINYINT:{ -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z); -// } else if (!IS_VALID_UTINYINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z); -// } -// uint8_t tmpVal = (uint8_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_SMALLINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z); -// } else if (!IS_VALID_SMALLINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z); -// } -// int16_t tmpVal = (int16_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_USMALLINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z); -// } else if (!IS_VALID_USMALLINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z); -// } -// uint16_t tmpVal = (uint16_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_INT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z); -// } else if (!IS_VALID_INT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z); -// } -// int32_t tmpVal = (int32_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_UINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z); -// } else if (!IS_VALID_UINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z); -// } -// uint32_t tmpVal = (uint32_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_BIGINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z); -// } else if (!IS_VALID_BIGINT(iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z); -// } -// return func(&iv, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_UBIGINT: { -// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z); -// } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { -// return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z); -// } -// uint64_t tmpVal = (uint64_t)iv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_FLOAT: { -// double dv; -// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z); -// } -// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z); -// } -// float tmpVal = (float)dv; -// return func(&tmpVal, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_DOUBLE: { -// double dv; -// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z); -// } -// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { -// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z); -// } -// return func(&dv, pSchema->bytes, param); -// } -// -// case TSDB_DATA_TYPE_BINARY: { -// // too long values will return invalid sql, not be truncated automatically -// if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { -// return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z); -// } -// return func(pToken->z, pToken->n, param); -// } -// case TSDB_DATA_TYPE_NCHAR: { -// return func(pToken->z, pToken->n, param); -// } -// case TSDB_DATA_TYPE_TIMESTAMP: { -// int64_t tmpVal; -// if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) { -// return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z); -// } -// return func(&tmpVal, pSchema->bytes, param); -// } -// } -// -// return TSDB_CODE_FAILED; -//} - // pSql -> tag1_name, ...) static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) { int32_t nCols = pColList->numOfCols; diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 20f330247e57c756874cab299b41a45095cfb25a..6c7ecbe0ed8094f89a2fdf0451d758f5203a8b7b 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -563,16 +563,6 @@ TAOS_FIELD createField(const SSchema* pSchema) { return f; } -SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name) { - SSchema s = {0}; - s.type = type; - s.bytes = bytes; - s.colId = colId; - - tstrncpy(s.name, name, tListLen(s.name)); - return s; -} - void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) { pColumn->uid = uid; pColumn->flag = flag; @@ -1649,9 +1639,9 @@ static bool isNullStr(SToken *pToken) { } static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { - if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && - type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || - (pToken->n == 0) || (type == TK_RP)) { + if ((pToken->type != TK_NOW && pToken->type != TK_INTEGER && pToken->type != TK_STRING && pToken->type != TK_FLOAT && pToken->type != TK_BOOL && + pToken->type != TK_NULL && pToken->type != TK_HEX && pToken->type != TK_OCT && pToken->type != TK_BIN) || + (pToken->n == 0) || (pToken->type == TK_RP)) { return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); } @@ -1795,7 +1785,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_TINYINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z); } else if (!IS_VALID_TINYINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z); @@ -1806,7 +1796,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_UTINYINT:{ - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z); } else if (!IS_VALID_UTINYINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z); @@ -1816,7 +1806,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_SMALLINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z); } else if (!IS_VALID_SMALLINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z); @@ -1826,7 +1816,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_USMALLINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z); } else if (!IS_VALID_USMALLINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z); @@ -1836,7 +1826,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_INT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z); } else if (!IS_VALID_INT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z); @@ -1846,7 +1836,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_UINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z); } else if (!IS_VALID_UINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z); @@ -1856,7 +1846,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_BIGINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z); } else if (!IS_VALID_BIGINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z); @@ -1865,7 +1855,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti } case TSDB_DATA_TYPE_UBIGINT: { - if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) { + if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z); } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z); diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index a651e6c1df8b24abf3b044a901e9d659df0e56b1..3be358fec8478ffd482da55bfabe64f917c62a3c 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -97,8 +97,8 @@ public: int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { std::unique_ptr table; - char db[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pTableName, db); + char db[TSDB_DB_NAME_LEN] = {0}; + tNameGetDbName(pTableName, db); const char* tname = tNameGetTableName(pTableName); int32_t code = copyTableSchemaMeta(db, tname, &table); @@ -111,6 +111,7 @@ public: int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { // todo + vgInfo->vgId = 1; return 0; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 8388458b4c0ea0d04d505aac5f765d890a485d88..97c9cec7c7b31345c2065daf79f6a4f389a3e10b 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -207,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { } taosArrayPush(currentLevel, &subplan); pCxt->pCurrentSubplan = subplan; + ++(pCxt->pDag->numOfSubplans); return subplan; } @@ -293,11 +294,14 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SArray* vgs = (SArray*)pPlanNode->pExtInfo; size_t numOfVg = taosArrayGetSize(vgs); for (int32_t i = 0; i < numOfVg; ++i) { + STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i); vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet); subplan->pNode = NULL; subplan->pDataSink = createDataInserter(pCxt, blocks); + subplan->type = QUERY_TYPE_MODIFY; + RECOVERY_CURRENT_SUBPLAN(pCxt); } } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4296e82a5617b2847c02011fb76b5f808602cff2..d5ecd40ccd140a5c140604bfd6ba69cf17e21342 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1,8 +1,9 @@ -#include "tmsg.h" -#include "query.h" #include "qworker.h" -#include "qworkerInt.h" +#include "tname.h" #include "planner.h" +#include "query.h" +#include "qworkerInt.h" +#include "tmsg.h" int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t code = 0; @@ -634,7 +635,6 @@ int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { return TSDB_CODE_SUCCESS; } - int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; @@ -665,11 +665,68 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { }; rpcSendResponse(&rpcRsp); + return TSDB_CODE_SUCCESS; +} + +int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { + int32_t numOfCols = 6; + SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols); + + int32_t cols = 0; + SSchema *pSchema = pRsp->metaInfo.pSchema; + + const SSchema *s = tGetTbnameColumnSchema(); + *pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "name"); + pSchema++; + + int32_t type = TSDB_DATA_TYPE_TIMESTAMP; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "created"); + pSchema++; + + type = TSDB_DATA_TYPE_SMALLINT; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "columns"); + pSchema++; + + *pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "stable"); + pSchema++; + + type = TSDB_DATA_TYPE_BIGINT; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "uid"); + pSchema++; + + type = TSDB_DATA_TYPE_INT; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "vgId"); + + pRsp->metaInfo.numOfColumns = htonl(cols); + + SRpcMsg rpcMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + rpcSendResponse(&rpcMsg); return TSDB_CODE_SUCCESS; } +int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { + SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); + int32_t handle = htonl(pFetchReq->id); + + pRsp->numOfRows = 0; + SRpcMsg rpcMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = 0, + }; + rpcSendResponse(&rpcMsg); + return TSDB_CODE_SUCCESS; +} int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { SQWSchStatus *sch = NULL; @@ -801,7 +858,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI return TSDB_CODE_SUCCESS; } - int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; @@ -911,7 +967,6 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 return TSDB_CODE_SUCCESS; } - int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); if (NULL == mgmt) { @@ -1157,6 +1212,25 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + int32_t code = 0; + SVShowTablesReq *pReq = pMsg->pCont; + QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code)); +} + +int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SVShowTablesFetchReq *pFetchReq = pMsg->pCont; + QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); +} + int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; int8_t status = 0; @@ -1182,7 +1256,6 @@ int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_RET(code); } - void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { return; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7bd2205e436c580633159a39c3ddcdc3be54fe9e..12c07d69eeafb017297b83f423b753fde527cb55 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -166,7 +166,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { } for (int32_t n = 0; n < levelPlanNum; ++n) { - SSubplan *plan = taosArrayGet(levelPlans, n); + SSubplan *plan = taosArrayGetP(levelPlans, n); SSchTask task = {0}; if (plan->type == QUERY_TYPE_MODIFY) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 7e7f673943c7e7b92b7895c61c3dcf311afd967b..95f2fe76e6b0a70dba8fa575147f62952391109a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -85,7 +85,6 @@ int32_t dDebugFlag = 135; int32_t vDebugFlag = 135; int32_t cDebugFlag = 131; int32_t jniDebugFlag = 131; -int32_t odbcDebugFlag = 131; int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 131; int32_t uDebugFlag = 131; diff --git a/source/util/test/hashTest.cpp b/source/util/test/hashTest.cpp index d31fcfb7efaafb0c05ba2921bf6e4af1847b7cfd..ac1bae243468a13e97b219a25cab479ab1d28ac2 100644 --- a/source/util/test/hashTest.cpp +++ b/source/util/test/hashTest.cpp @@ -154,9 +154,9 @@ void acquireRleaseTest() { int32_t code = 0; int32_t num = 0; TESTSTRUCT data = {0}; - char *str1 = "abcdefg"; - char *str2 = "aaaaaaa"; - char *str3 = "123456789"; + const char *str1 = "abcdefg"; + const char *str2 = "aaaaaaa"; + const char *str3 = "123456789"; data.p = (char *)malloc(10); strcpy(data.p, str1); diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 0ccc85c3473ed95d8329901b016efded8f0173e0..fcc11ca2136df3b57aa64247a5d0452afa87e0c0 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -120,6 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG echo "fqdn ${HOSTNAME}" >> $TAOS_CFG echo "serverPort ${NODE}" >> $TAOS_CFG +echo "supportVnodes 16" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG diff --git a/tests/script/tmp/dnodes.sim b/tests/script/tmp/dnodes.sim index f13f6026f9ad886ba7dcd31e4de31ab92ef2936d..a3f9a0c17385773698e2d8ab1cadbf56c5cf343d 100644 --- a/tests/script/tmp/dnodes.sim +++ b/tests/script/tmp/dnodes.sim @@ -1,8 +1,5 @@ -system sh/stop_dnodes.sh - - ############## config parameter ##################### -$node1 = 192.168.0.201 +$node1 = 192.168.0.201 $node2 = 192.168.0.202 $node3 = 192.168.0.203 $node4 = 192.168.0.204 @@ -10,54 +7,75 @@ $node4 = 192.168.0.204 $self = $node1 $num = 25 -############### deploy firstEp ##################### +#deploy = 0, start = 1, stop = 2 +$option = 0 +print =============== option:$option + + +############### stop dnodes ##################### +if $option == 0 then + system sh/stop_dnodes.sh +endi + +############### process firstEp ##################### $firstEp = $node1 . :7100 $firstPort = 7100 if $self == $node1 then - system sh/deploy.sh -n dnode1 -i 1 - system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp - system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp - system sh/cfg.sh -n dnode1 -c fqdn -v $node1 - system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort - - system sh/exec.sh -n dnode1 -s start - sql connect - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node1 port $port - endw - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node2 port $port - endw - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node3 port $port - endw - - $i = 0 - while $i < $num - $port = $i * 100 - $port = $port + 8100 - $i = $i + 1 - sql create dnode $node4 port $port - endw + if $option == 1 then + system sh/exec.sh -n dnode1 -s start + endi + + if $option == 2 then + system sh/exec.sh -n dnode1 -s stop -x SIGINT + endi + + if $option == 0 then + system sh/deploy.sh -n dnode1 -i 1 + system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp + system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp + system sh/cfg.sh -n dnode1 -c fqdn -v $node1 + system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort + system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + + system sh/exec.sh -n dnode1 -s start + sql connect + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node1 port $port + endw + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node2 port $port + endw + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node3 port $port + endw + + $i = 0 + while $i < $num + $port = $i * 100 + $port = $port + 8100 + $i = $i + 1 + sql create dnode $node4 port $port + endw + endi endi -############### deploy nodes ##################### +############### process nodes ##################### $i = 0 while $i < $num @@ -67,11 +85,21 @@ while $i < $num $dnodename = dnode . $index $i = $i + 1 - system sh/deploy.sh -n $dnodename -i 1 - system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp - system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp - system sh/cfg.sh -n $dnodename -c fqdn -v $self - system sh/cfg.sh -n $dnodename -c serverPort -v $port + if $option == 1 then + system sh/exec.sh -n $dnodename -s start + endi + + if $option == 2 then + system sh/exec.sh -n $dnodename -s stop -x SIGINT + endi + + if $option == 0 then + system sh/deploy.sh -n $dnodename -i 1 + system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp + system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp + system sh/cfg.sh -n $dnodename -c fqdn -v $self + system sh/cfg.sh -n $dnodename -c serverPort -v $port - system sh/exec.sh -n $dnodename -s start + system sh/exec.sh -n $dnodename -s start + endi endw