diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f7562a4b9b30aa3314fe19c789da15b4681322b8..38609c23483ece8ce97e4065d42952d6f58d0ab8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -301,6 +301,8 @@ typedef struct SSchema { typedef struct { int32_t nCols; int32_t sver; + int32_t tagVer; + int32_t colVer; SSchema* pSchema; } SSchemaWrapper; @@ -309,6 +311,8 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p if (pSW == NULL) return pSW; pSW->nCols = pSchemaWrapper->nCols; pSW->sver = pSchemaWrapper->sver; + pSW->tagVer = pSchemaWrapper->tagVer; + pSW->colVer = pSchemaWrapper->colVer; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { taosMemoryFree(pSW); @@ -364,6 +368,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr int32_t tlen = 0; tlen += taosEncodeVariantI32(buf, pSW->nCols); tlen += taosEncodeVariantI32(buf, pSW->sver); + tlen += taosEncodeVariantI32(buf, pSW->tagVer); + tlen += taosEncodeVariantI32(buf, pSW->colVer); for (int32_t i = 0; i < pSW->nCols; i++) { tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); } @@ -373,6 +379,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->sver); + buf = taosDecodeVariantI32(buf, &pSW->tagVer); + buf = taosDecodeVariantI32(buf, &pSW->colVer); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; @@ -387,6 +395,8 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; } @@ -397,6 +407,8 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; @@ -410,6 +422,8 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1; pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; @@ -455,6 +469,7 @@ int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; + int32_t verInBlock; int32_t numOfFields; SArray* pFields; int32_t ttl; diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index aec14766637a6f74a1a723f7cffeacbf6eb9c6f8..89fbc92992bf783b1d8896fbf636f2468b6fa4c6 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -142,6 +142,8 @@ void fmFuncMgtDestroy(); int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc); +bool fmIsBuiltinFunc(const char* pFunc); + bool fmIsAggFunc(int32_t funcId); bool fmIsScalarFunc(int32_t funcId); bool fmIsNonstandardSQLFunc(int32_t funcId); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 5e294ae45564daa5007edc8e9362406601ffaa77..82bf4e1f45a0cab5c7f1b61d04e08d137148e44d 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -78,7 +78,7 @@ typedef struct SAlterDatabaseStmt { typedef struct STableOptions { ENodeType type; - char comment[TSDB_STB_COMMENT_LEN]; + char comment[TSDB_TB_COMMENT_LEN]; int32_t delay; float filesFactor; SNodeList* pRollupFuncs; @@ -90,7 +90,7 @@ typedef struct SColumnDefNode { ENodeType type; char colName[TSDB_COL_NAME_LEN]; SDataType dataType; - char comments[TSDB_STB_COMMENT_LEN]; + char comments[TSDB_TB_COMMENT_LEN]; bool sma; } SColumnDefNode; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index be6005d46b6d5f701d81ac7c9de5d7d1e92d79ff..fd3e008e676dfc640c31d8999b2b57e275738d12 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -647,6 +647,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x264A) #define TSDB_CODE_PAR_INVALID_MODIFY_COL TAOS_DEF_ERROR_CODE(0, 0x264B) #define TSDB_CODE_PAR_INVALID_TBNAME TAOS_DEF_ERROR_CODE(0, 0x264C) +#define TSDB_CODE_PAR_INVALID_FUNCTION_NAME TAOS_DEF_ERROR_CODE(0, 0x264D) +#define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/include/util/tdef.h b/include/util/tdef.h index f95d96be56d40a24cb227820058807eac9e7f051..5cc687d7ab141c0eedabfcf6331f56af1a6175e5 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -218,8 +218,8 @@ typedef enum ELogicConditionType { #define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb -#define TSDB_APP_NAME_LEN TSDB_UNI_LEN -#define TSDB_STB_COMMENT_LEN 1024 +#define TSDB_APP_NAME_LEN TSDB_UNI_LEN +#define TSDB_TB_COMMENT_LEN 1025 /** * In some scenarios uint16_t (0~65535) is used to store the row len. diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 278fc2c193830dfa199f07c4598c80ce8576b4ec..677fbb43b4d88f195460678cf5b3a99f7358ea5d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -600,6 +600,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1; + if (tEncodeI32(&encoder, pReq->verInBlock) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1; for (int32_t i = 0; i < pReq->numOfFields; ++i) { SField *pField = taosArrayGet(pReq->pFields, i); @@ -626,6 +627,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->verInBlock) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1; pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField)); if (pReq->pFields == NULL) { diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index 2637e0af04401a3e11d13a30cd93fd1e62428746..d2e547078e925401ed3154133c0f96f2e48eec50 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "bmInt.h" -static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} +void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonBmInfo bmInfo = {0}; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index f90fd72c6b5a836f9bb38b185cea058eb71ff3bb..ae8879326d6da92b6bd5ab3ea89584b347817fd4 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -32,7 +32,9 @@ typedef struct SDnodeMgmt { SSingleWorker mgmtWorker; ProcessCreateNodeFp processCreateNodeFp; ProcessDropNodeFp processDropNodeFp; - IsNodeRequiredFp isNodeRequiredFp; + SendMonitorReportFp sendMonitorReportFp; + GetVnodeLoadsFp getVnodeLoadsFp; + GetMnodeLoadsFp getMnodeLoadsFp; } SDnodeMgmt; // dmHandle.c @@ -43,11 +45,6 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); -// dmMonitor.c -void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo); -void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo); -void dmSendMonitorReport(SDnodeMgmt *pMgmt); - // dmWorker.c int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmStartStatusThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 38e71754b60bd25baa7008b238bea20f151a28b3..7c1162ec10797b5e58fbbe56552bb8ba3503e05d 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -72,11 +72,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { taosRUnLockLatch(&pMgmt->pData->latch); SMonVloadInfo vinfo = {0}; - dmGetVnodeLoads(pMgmt, &vinfo); + (*pMgmt->getVnodeLoadsFp)(&vinfo); req.pVloads = vinfo.pVloads; SMonMloadInfo minfo = {0}; - dmGetMnodeLoads(pMgmt, &minfo); + (*pMgmt->getMnodeLoadsFp)(&minfo); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); @@ -115,7 +115,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { SServerStatusRsp statusRsp = {0}; SMonMloadInfo minfo = {0}; - dmGetMnodeLoads(pMgmt, &minfo); + (*pMgmt->getMnodeLoadsFp)(&minfo); if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) { pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState)); @@ -123,7 +123,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { } SMonVloadInfo vinfo = {0}; - dmGetVnodeLoads(pMgmt, &vinfo); + (*pMgmt->getVnodeLoadsFp)(&vinfo); for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) { SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i); if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 3b343d491642229da7b0e6db63301476853856e6..59c926545e6f565a124a4846532e4f74efeecd5e 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -45,7 +45,9 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->name = pInput->name; pMgmt->processCreateNodeFp = pInput->processCreateNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; - pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp; + pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; + pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; + pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; if (dmStartWorker(pMgmt) != 0) { return -1; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c b/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c deleted file mode 100644 index 3547c769377733ac9b60a5e3859f395e469505f0..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dmInt.h" - -#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \ - if (!tsMultiProcess) { \ - SRpcMsg rsp = {0}; \ - SRpcMsg req = {.msgType = mtype}; \ - SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ - tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \ - epset.eps[0].port = tsServerPort; \ - rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ - if (rsp.code == 0 && rsp.contLen > 0) { \ - func(rsp.pCont, rsp.contLen, pInfo); \ - } \ - rpcFreeCont(rsp.pCont); \ - } - -static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) { - pInfo->protocol = 1; - pInfo->dnode_id = pMgmt->pData->dnodeId; - pInfo->cluster_id = pMgmt->pData->clusterId; - tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); -} - -static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) { - pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f); - pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE); - pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE); - pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE); - pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE); - tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); - pInfo->logdir.size = tsLogSpace.size; - tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); - pInfo->tempdir.size = tsTempSpace.size; -} - -static void dmGetMonitorInfo(SDnodeMgmt *pMgmt, SMonDmInfo *pInfo) { - dmGetMonitorBasicInfo(pMgmt, &pInfo->basic); - dmGetMonitorDnodeInfo(pMgmt, &pInfo->dnode); - dmGetMonitorSystemInfo(&pInfo->sys); -} - -void dmSendMonitorReport(SDnodeMgmt *pMgmt) { - if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; - dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); - - SMonDmInfo dmInfo = {0}; - SMonMmInfo mmInfo = {0}; - SMonVmInfo vmInfo = {0}; - SMonQmInfo qmInfo = {0}; - SMonSmInfo smInfo = {0}; - SMonBmInfo bmInfo = {0}; - - dmGetMonitorInfo(pMgmt, &dmInfo); - dmSendLocalRecv(pMgmt, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo); - if (dmInfo.dnode.has_mnode) { - dmSendLocalRecv(pMgmt, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo); - } - if (dmInfo.dnode.has_qnode) { - dmSendLocalRecv(pMgmt, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo); - } - if (dmInfo.dnode.has_snode) { - dmSendLocalRecv(pMgmt, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo); - } - if (dmInfo.dnode.has_bnode) { - dmSendLocalRecv(pMgmt, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo); - } - - monSetDmInfo(&dmInfo); - monSetMmInfo(&mmInfo); - monSetVmInfo(&vmInfo); - monSetQmInfo(&qmInfo); - monSetSmInfo(&smInfo); - monSetBmInfo(&bmInfo); - tFreeSMonMmInfo(&mmInfo); - tFreeSMonVmInfo(&vmInfo); - tFreeSMonQmInfo(&qmInfo); - tFreeSMonSmInfo(&smInfo); - tFreeSMonBmInfo(&bmInfo); - monSendReport(); -} - -void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { - dmSendLocalRecv(pMgmt, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo); -} - -void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { - dmSendLocalRecv(pMgmt, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo); -} diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 599fa07e1ca9501502669d439b0543c46e31f7bd..6a7e0ad322efca98991c362d90d602c3ca692c26 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -50,7 +50,7 @@ static void *dmMonitorThreadFp(void *param) { int64_t curTime = taosGetTimestampMs(); float interval = (curTime - lastTime) / 1000.0f; if (interval >= tsMonitorInterval) { - dmSendMonitorReport(pMgmt); + (*pMgmt->sendMonitorReportFp)(); lastTime = curTime; } } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index df377fefe796960c6d793c7b3550ccec8c4e50c3..0bab05f9737059b5fd3a79ca1e5288814b65a76e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -154,6 +154,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { return -1; } - dInfo("successed to write %s, deployed:%d", realfile, deployed); + dDebug("successed to write %s, deployed:%d", realfile, deployed); return 0; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index a09fd2627e8b795859fb135aa90b200ae7146258..8b2d8f8badf9e83a210c59298b233efd3b499842 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -16,8 +16,13 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) { - mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant); +void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) { + mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant); +} + +void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { + pInfo->isMnode = 1; + mndGetLoad(pMgmt->pMnode, &pInfo->load); } int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { @@ -45,11 +50,6 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { return 0; } -static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { - pInfo->isMnode = 1; - mndGetLoad(pMgmt->pMnode, &pInfo->load); -} - int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonMloadInfo mloads = {0}; mmGetMnodeLoads(pMgmt, &mloads); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 4518a558bba131a2ad93a66d3f4c70ae2e657b7f..ac0868d8ef4ed6e6ef9aab1d5e445807b209bd5d 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} +void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonQmInfo qmInfo = {0}; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index ad83639a8d956d5251fc3a988152c82e0925ae21..d6ff73b132fea374e89b4b0d348081abefebb875 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "smInt.h" -static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} +void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonSmInfo smInfo = {0}; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index f6c7bb33e61595f886552baae062401c53aacf77..7a6c5f982ed13317a4d047f95b8893f9798d4a09 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -128,7 +128,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes = vnodesNum; code = 0; - dInfo("succcessed to read file %s", file); + dDebug("succcessed to read file %s", file); _OVER: if (content != NULL) taosMemoryFree(content); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 902998f28e5739d82a38bdca1e5bff1c94fc22be..a4da6d089c6b2d21db01207747d1b291ffbbd5fd 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { +void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; @@ -37,7 +37,7 @@ static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { taosRUnLockLatch(&pMgmt->latch); } -static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { +void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { SMonVloadInfo vloads = {0}; vmGetVnodeLoads(pMgmt, &vloads); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 7484c1e18f0e9e555e6f1c51e7d2d3bdad36ad7e..9d092a93bc1fcd8a482a6771c2613515f38e578b 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_IMP_H_ -#define _TD_DND_IMP_H_ +#ifndef _TD_DND_MGMT_H_ +#define _TD_DND_MGMT_H_ // tobe deleted #include "uv.h" @@ -165,16 +165,13 @@ SMsgCb dmGetMsgcb(SDnode *pDnode); int32_t dmInitMsgHandle(SDnode *pDnode); int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -// mgmt nodes -SMgmtFunc dmGetMgmtFunc(); -SMgmtFunc bmGetMgmtFunc(); -SMgmtFunc qmGetMgmtFunc(); -SMgmtFunc smGetMgmtFunc(); -SMgmtFunc vmGetMgmtFunc(); -SMgmtFunc mmGetMgmtFunc(); +// dmMonitor.c +void dmSendMonitorReport(); +void dmGetVnodeLoads(SMonVloadInfo *pInfo); +void dmGetMnodeLoads(SMonMloadInfo *pInfo); #ifdef __cplusplus } #endif -#endif /*_TD_DND_IMP_H_*/ \ No newline at end of file +#endif /*_TD_DND_MGMT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h new file mode 100644 index 0000000000000000000000000000000000000000..3ac71de530d4dd9dad6ccd6b29b7789f56a85b1e --- /dev/null +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_NODES_H_ +#define _TD_DND_NODES_H_ + +#include "dmInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +SMgmtFunc dmGetMgmtFunc(); +SMgmtFunc bmGetMgmtFunc(); +SMgmtFunc qmGetMgmtFunc(); +SMgmtFunc smGetMgmtFunc(); +SMgmtFunc vmGetMgmtFunc(); +SMgmtFunc mmGetMgmtFunc(); + +void mmGetMonitorInfo(void *pMgmt, SMonMmInfo *pInfo); +void vmGetMonitorInfo(void *pMgmt, SMonVmInfo *pInfo); +void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo); +void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo); +void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo); + +void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo); +void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_NODES_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index af5c0f00db64e16e3b659e9c124fd3f367caa421..07d0c43360a5de639f5af2b64208d13c79192687 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -168,11 +168,6 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { return code; } -static bool dmIsNodeRequired(EDndNodeType ntype) { - SDnode *pDnode = dmInstance(); - return pDnode->wrappers[ntype].required; -} - SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { SMgmtInputOpt opt = { .path = pWrapper->path, @@ -180,7 +175,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .pData = &pWrapper->pDnode->data, .processCreateNodeFp = dmProcessCreateNodeReq, .processDropNodeFp = dmProcessDropNodeReq, - .isNodeRequiredFp = dmIsNodeRequired, + .sendMonitorReportFp = dmSendMonitorReport, + .getVnodeLoadsFp = dmGetVnodeLoads, + .getMnodeLoadsFp = dmGetMnodeLoads, }; opt.msgCb = dmGetMsgcb(pWrapper->pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 30d7750f79c2aa31cac179f4228d27147e282180..3cbb9ff04601a36326eb80a352b9abebd85e39c6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" +#include "dmNodes.h" static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -189,7 +190,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { } dmReportStartup("dnode-transport", "initialized"); - dInfo("dnode is created, ptr:%p", pDnode); + dDebug("dnode is created, ptr:%p", pDnode); code = 0; _OVER: @@ -208,7 +209,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupClient(pDnode); dmCleanupServer(pDnode); dmClearVars(pDnode); - dInfo("dnode is closed, ptr:%p", pDnode); + dDebug("dnode is closed, ptr:%p", pDnode); } void dmSetStatus(SDnode *pDnode, EDndRunStatus status) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c new file mode 100644 index 0000000000000000000000000000000000000000..2497da13ec3313429a6f418ab503c8d5cb06f9b9 --- /dev/null +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dmMgmt.h" +#include "dmNodes.h" + +#define dmSendLocalRecv(pDnode, mtype, func, pInfo) \ + SRpcMsg rsp = {0}; \ + SRpcMsg req = {.msgType = mtype}; \ + SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ + tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \ + epset.eps[0].port = tsServerPort; \ + rpcSendRecv(pDnode->trans.clientRpc, &epset, &req, &rsp); \ + if (rsp.code == 0 && rsp.contLen > 0) { \ + func(rsp.pCont, rsp.contLen, pInfo); \ + } \ + rpcFreeCont(rsp.pCont); + +static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { + pInfo->protocol = 1; + pInfo->dnode_id = pDnode->data.dnodeId; + pInfo->cluster_id = pDnode->data.clusterId; + tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); +} + +static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { + pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); + pInfo->has_mnode = pDnode->wrappers[MNODE].required; + pInfo->has_qnode = pDnode->wrappers[QNODE].required; + pInfo->has_snode = pDnode->wrappers[SNODE].required; + pInfo->has_bnode = pDnode->wrappers[BNODE].required; + tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); + pInfo->logdir.size = tsLogSpace.size; + tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); + pInfo->tempdir.size = tsTempSpace.size; +} + +static void dmGetDmMonitorInfo(SDnode *pDnode) { + SMonDmInfo dmInfo = {0}; + dmGetMonitorBasicInfo(pDnode, &dmInfo.basic); + dmGetMonitorDnodeInfo(pDnode, &dmInfo.dnode); + dmGetMonitorSystemInfo(&dmInfo.sys); + monSetDmInfo(&dmInfo); +} + +static void dmGetMmMonitorInfo(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + SMonMmInfo mmInfo = {0}; + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo); + } else if (pWrapper->pMgmt != NULL) { + mmGetMonitorInfo(pWrapper->pMgmt, &mmInfo); + } + dmReleaseWrapper(pWrapper); + monSetMmInfo(&mmInfo); + tFreeSMonMmInfo(&mmInfo); + } +} + +static void dmGetVmMonitorInfo(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + SMonVmInfo vmInfo = {0}; + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo); + } else if (pWrapper->pMgmt != NULL) { + vmGetMonitorInfo(pWrapper->pMgmt, &vmInfo); + } + dmReleaseWrapper(pWrapper); + monSetVmInfo(&vmInfo); + tFreeSMonVmInfo(&vmInfo); + } +} + +static void dmGetQmMonitorInfo(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + SMonQmInfo qmInfo = {0}; + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo); + } else if (pWrapper->pMgmt != NULL) { + qmGetMonitorInfo(pWrapper->pMgmt, &qmInfo); + } + dmReleaseWrapper(pWrapper); + monSetQmInfo(&qmInfo); + tFreeSMonQmInfo(&qmInfo); + } +} + +static void dmGetSmMonitorInfo(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[SNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + SMonSmInfo smInfo = {0}; + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo); + } else if (pWrapper->pMgmt != NULL) { + smGetMonitorInfo(pWrapper->pMgmt, &smInfo); + } + dmReleaseWrapper(pWrapper); + monSetSmInfo(&smInfo); + tFreeSMonSmInfo(&smInfo); + } +} + +static void dmGetBmMonitorInfo(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[BNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + SMonBmInfo bmInfo = {0}; + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo); + } else if (pWrapper->pMgmt != NULL) { + bmGetMonitorInfo(pWrapper->pMgmt, &bmInfo); + } + dmReleaseWrapper(pWrapper); + monSetBmInfo(&bmInfo); + tFreeSMonBmInfo(&bmInfo); + } +} + +void dmSendMonitorReport() { + if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; + dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); + + SDnode *pDnode = dmInstance(); + dmGetDmMonitorInfo(pDnode); + dmGetMmMonitorInfo(pDnode); + dmGetVmMonitorInfo(pDnode); + dmGetQmMonitorInfo(pDnode); + dmGetSmMonitorInfo(pDnode); + dmGetBmMonitorInfo(pDnode); + monSendReport(); +} + +void dmGetVnodeLoads(SMonVloadInfo *pInfo) { + SDnode *pDnode = dmInstance(); + SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo); + } else if (pWrapper->pMgmt != NULL) { + vmGetVnodeLoads(pWrapper->pMgmt, pInfo); + } + dmReleaseWrapper(pWrapper); + } +} + +void dmGetMnodeLoads(SMonMloadInfo *pInfo) { + SDnode *pDnode = dmInstance(); + SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; + if (tsMultiProcess) { + dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo); + } else if (pWrapper->pMgmt != NULL) { + mmGetMnodeLoads(pWrapper->pMgmt, pInfo); + } + dmReleaseWrapper(pWrapper); +} diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 45bd3e4f64d99dd30e3e4138fcfec10bb47216cd..b0e764bf8e5ee1d35c3323cbeff70ada218b53ec 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -89,21 +89,23 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); -typedef bool (*IsNodeRequiredFp)(EDndNodeType ntype); +typedef void (*SendMonitorReportFp)(); +typedef void (*GetVnodeLoadsFp)(); +typedef void (*GetMnodeLoadsFp)(); typedef struct { - int32_t dnodeId; - int64_t clusterId; - int64_t dnodeVer; - int64_t updateTime; - int64_t rebootTime; - bool dropped; - bool stopped; - SEpSet mnodeEps; - SArray *dnodeEps; - SHashObj *dnodeHash; - SRWLatch latch; - SMsgCb msgCb; + int32_t dnodeId; + int64_t clusterId; + int64_t dnodeVer; + int64_t updateTime; + int64_t rebootTime; + bool dropped; + bool stopped; + SEpSet mnodeEps; + SArray *dnodeEps; + SHashObj *dnodeHash; + SRWLatch latch; + SMsgCb msgCb; } SDnodeData; typedef struct { @@ -113,7 +115,9 @@ typedef struct { SMsgCb msgCb; ProcessCreateNodeFp processCreateNodeFp; ProcessDropNodeFp processDropNodeFp; - IsNodeRequiredFp isNodeRequiredFp; + SendMonitorReportFp sendMonitorReportFp; + GetVnodeLoadsFp getVnodeLoadsFp; + GetMnodeLoadsFp getMnodeLoadsFp; } SMgmtInputOpt; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a04417736a32d01619a011b17eab23291460ab82..81f4c5ed1ef87431b639d256acde0faa596692fe 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -365,6 +365,8 @@ typedef struct { int64_t uid; int64_t dbUid; int32_t version; + int32_t tagVer; + int32_t colVer; int32_t nextColId; float xFilesFactor; int32_t delay; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e8a0b31e2f902f4ba878cf3384b5ab47d9801b33..7485510bc6f5db1873dced18e911adfe493a5d24 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -88,6 +88,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER) SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->version, _OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER) SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER) @@ -166,6 +168,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->version, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER) int32_t xFilesFactor = 0; SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER) @@ -317,6 +321,8 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->version = pNew->version; + pOld->tagVer = pNew->tagVer; + pOld->colVer = pNew->colVer; pOld->nextColId = pNew->nextColId; pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; @@ -384,6 +390,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.schema.nCols = pStb->numOfColumns; req.schema.sver = pStb->version; + req.schema.tagVer = pStb->tagVer; + req.schema.colVer = pStb->colVer; req.schema.pSchema = pStb->pColumns; req.schemaTag.nCols = pStb->numOfTags; req.schemaTag.sver = 1; @@ -657,6 +665,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->version = 1; + pDst->tagVer = 1; + pDst->colVer = 1; pDst->nextColId = 1; pDst->xFilesFactor = pCreate->xFilesFactor; pDst->delay = pCreate->delay; @@ -949,6 +959,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p } pNew->version++; + pNew->tagVer++; return 0; } @@ -967,6 +978,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch pNew->numOfTags--; pNew->version++; + pNew->tagVer++; mDebug("stb:%s, start to drop tag %s", pNew->name, tagName); return 0; } @@ -1007,6 +1019,7 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN); pNew->version++; + pNew->tagVer++; mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName); return 0; } @@ -1036,6 +1049,7 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SFi pTag->bytes = pField->bytes; pNew->version++; + pNew->tagVer++; mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes); return 0; @@ -1075,6 +1089,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray } pNew->version++; + pNew->colVer++; return 0; } @@ -1103,6 +1118,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const pNew->numOfColumns--; pNew->version++; + pNew->colVer++; mDebug("stb:%s, start to drop col %s", pNew->name, colName); return 0; } @@ -1141,6 +1157,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const pCol->bytes = pField->bytes; pNew->version++; + pNew->colVer++; mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes); return 0; @@ -1300,6 +1317,13 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) { goto _OVER; } + if (alterReq.verInBlock > 0 && alterReq.verInBlock <= pStb->version) { + mDebug("stb:%s, already exist, verInBlock:%d smaller than verInStb:%d, alter success", alterReq.name, + alterReq.verInBlock, pStb->version); + code = 0; + goto _OVER; + } + pUser = mndAcquireUser(pMnode, pReq->conn.user); if (pUser == NULL) { goto _OVER; diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index 16974ad54158e29464d29af9f3deede38ca3b1a8..b8873210ab995bde06f6c2baf17d14975bc591e1 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -32,7 +32,7 @@ class MndTestStb : public ::testing::Test { void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen); void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen); void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen); - void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen); + void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen, int32_t verInBlock); }; Testbase MndTestStb::test; @@ -271,12 +271,13 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co } void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, - int32_t* pContLen) { + int32_t* pContLen, int32_t verInBlock) { SMAlterStbReq req = {0}; strcpy(req.name, stbname); req.numOfFields = 1; req.pFields = taosArrayInit(1, sizeof(SField)); req.alterType = TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES; + req.verInBlock = verInBlock; SField field = {0}; field.bytes = bytes; @@ -781,31 +782,40 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) { } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_COLUMN_NOT_EXIST); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_STB_OPTION); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen, 0); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + + test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables", dbname); + EXPECT_EQ(test.GetShowRows(), 1); + } + + { + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col_not_exist", 20, &contLen, 1); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 73ec7f510b83650d58ac9ed569261b9b569bd390..e46cb815617baa7adbf4b59232b3698feb1a6558 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -100,6 +100,10 @@ int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) { return getUdfInfo(pParam, pFunc); } +bool fmIsBuiltinFunc(const char* pFunc) { + return NULL != taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc)); +} + EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) { return FUNC_DATA_REQUIRED_DATA_LOAD; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 763ccbf7a02a9fd3649c3e0466304aa4ccc46db0..bc49f36afe121db0c676662c972f088a660f897c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -946,6 +946,23 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { return code; } +static const char* jkLogicPlanSubplans = "Subplans"; + +static int32_t logicPlanToJson(const void* pObj, SJson* pJson) { + const SQueryLogicPlan* pNode = (const SQueryLogicPlan*)pObj; + return tjsonAddObject(pJson, jkLogicPlanSubplans, nodeToJson, nodesListGetNode(pNode->pTopSubplans, 0)); +} + +static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { + SQueryLogicPlan* pNode = (SQueryLogicPlan*)pObj; + SNode* pChild = NULL; + int32_t code = jsonToNodeObject(pJson, jkLogicPlanSubplans, &pChild); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pNode->pTopSubplans, pChild); + } + return code; +} + static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; @@ -3029,7 +3046,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN: - break; + return logicPlanToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: return physiTagScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: @@ -3126,6 +3143,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicPartitionNode(pJson, pObj); case QUERY_NODE_LOGIC_SUBPLAN: return jsonToLogicSubplan(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN: + return jsonToLogicPlan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 894659ddd15f52144727117aa076fede9087eadc..80c4593d9bb683788953d0410f654b813d0c126a 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -196,6 +196,15 @@ static bool checkIndexName(SAstCreateContext* pCxt, SToken* pIndexName) { return true; } +static bool checkComment(SAstCreateContext* pCxt, const SToken* pCommentToken, bool demand) { + if (NULL == pCommentToken) { + pCxt->errCode = demand ? TSDB_CODE_PAR_SYNTAX_ERROR : TSDB_CODE_SUCCESS; + } else if (pCommentToken->n >= (TSDB_TB_COMMENT_LEN + 2)) { + pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_COMMENT_TOO_LONG); + } + return TSDB_CODE_SUCCESS == pCxt->errCode; +} + SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode) { SRawExprNode* target = (SRawExprNode*)nodesMakeNode(QUERY_NODE_RAW_EXPR); CHECK_OUT_OF_MEM(target); @@ -823,8 +832,10 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal) { switch (type) { case TABLE_OPTION_COMMENT: - copyStringFormStringToken((SToken*)pVal, ((STableOptions*)pOptions)->comment, - sizeof(((STableOptions*)pOptions)->comment)); + if (checkComment(pCxt, (SToken*)pVal, true)) { + copyStringFormStringToken((SToken*)pVal, ((STableOptions*)pOptions)->comment, + sizeof(((STableOptions*)pOptions)->comment)); + } break; case TABLE_OPTION_DELAY: ((STableOptions*)pOptions)->delay = taosStr2Int32(((SToken*)pVal)->z, NULL, 10); @@ -848,7 +859,7 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType } SNode* createColumnDefNode(SAstCreateContext* pCxt, SToken* pColName, SDataType dataType, const SToken* pComment) { - if (!checkColumnName(pCxt, pColName)) { + if (!checkColumnName(pCxt, pColName) || !checkComment(pCxt, pComment, false)) { return NULL; } SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0d214fd85e38cfe5500696115e9400f0298bde98..074a5bc30cd298deb40433eda34868225106472e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -272,6 +272,10 @@ static bool isTimelineFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId)); } +static bool isScanPseudoColumnFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); +} + static bool isDistinctOrderBy(STranslateContext* pCxt) { return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrStmt->isDistinct); } @@ -892,7 +896,7 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { return DEAL_RES_IGNORE_CHILD; } } - if (QUERY_NODE_COLUMN == nodeType(*pNode)) { + if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { if (pCxt->selectFuncNum > 1) { return generateDealNodeErrMsg(pCxt->pTranslateCxt, getGroupByErrorCode(pCxt->pTranslateCxt)); } else { @@ -930,7 +934,7 @@ static EDealRes rewriteColsToSelectValFuncImpl(SNode** pNode, void* pContext) { if (isAggFunc(*pNode)) { return DEAL_RES_IGNORE_CHILD; } - if (QUERY_NODE_COLUMN == nodeType(*pNode)) { + if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { return rewriteColToSelectValFunc((STranslateContext*)pContext, NULL, pNode); } return DEAL_RES_CONTINUE; @@ -958,7 +962,7 @@ static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) { pCxt->existAggFunc = true; return DEAL_RES_IGNORE_CHILD; } - if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (isScanPseudoColumnFunc(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) { pCxt->existCol = true; } return DEAL_RES_CONTINUE; @@ -3261,6 +3265,9 @@ static int32_t readFromFile(char* pName, int32_t* len, char** buf) { } static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionStmt* pStmt) { + if (fmIsBuiltinFunc(pStmt->funcName)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNCTION_NAME); + } SCreateFuncReq req = {0}; strcpy(req.name, pStmt->funcName); req.igExists = pStmt->ignoreExists; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 1ae066a8c251601a2ba2f94719b2df18a4b69f69..7b9147beab4ebd83430fc569ea78d0effd575631 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -160,6 +160,10 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Only binary/nchar column length could be modified"; case TSDB_CODE_PAR_INVALID_TBNAME: return "Invalid tbname pseudo column"; + case TSDB_CODE_PAR_INVALID_FUNCTION_NAME: + return "Invalid function name"; + case TSDB_CODE_PAR_COMMENT_TOO_LONG: + return "Comment too long"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d0ac239bcb9b0a82cda60a5e9b63452b61ca344a..fb716af3e5563a6adf9f6c239c9c946c9b5d9be9 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -24,6 +24,7 @@ #define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) typedef struct SSplitContext { + int32_t queryId; int32_t groupId; bool split; } SSplitContext; @@ -62,6 +63,7 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* if (NULL == pSubplan) { return NULL; } + pSubplan->id.queryId = pCxt->queryId; pSubplan->id.groupId = pCxt->groupId; pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan); @@ -242,6 +244,7 @@ static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) if (NULL == pSubplan) { return NULL; } + pSubplan->id.queryId = pCxt->queryId; pSubplan->id.groupId = pCxt->groupId; pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->pNode = pNode; @@ -406,7 +409,7 @@ static const SSplitRule splitRuleSet[] = {{.pName = "SuperTableScan", .splitFunc static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); static int32_t applySplitRule(SLogicSubplan* pSubplan) { - SSplitContext cxt = {.groupId = pSubplan->id.groupId + 1, .split = false}; + SSplitContext cxt = {.queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false}; do { cxt.split = false; for (int32_t i = 0; i < splitRuleNum; ++i) { diff --git a/source/libs/planner/test/planSetOpTest.cpp b/source/libs/planner/test/planSetOpTest.cpp index 3c7bf9e43a34f88166a0b7716a38db03d56f224b..717384aae69fe26973216c996aef199954225e23 100644 --- a/source/libs/planner/test/planSetOpTest.cpp +++ b/source/libs/planner/test/planSetOpTest.cpp @@ -32,6 +32,15 @@ TEST_F(PlanSetOpTest, unionAllSubquery) { run("SELECT * FROM (SELECT c1, c2 FROM t1 UNION ALL SELECT c1, c2 FROM t1)"); } +TEST_F(PlanSetOpTest, unionAllWithSubquery) { + useDb("root", "test"); + + // child table + run("SELECT ts FROM (SELECT ts FROM st1s1) UNION ALL SELECT ts FROM (SELECT ts FROM st1s2)"); + // super table + run("SELECT ts FROM (SELECT ts FROM st1) UNION ALL SELECT ts FROM (SELECT ts FROM st1)"); +} + TEST_F(PlanSetOpTest, union) { useDb("root", "test"); diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index af8ec8715876624d79ccb4f9d09eb1a40b8ad4d2..94a28f46a82de78fd038f05682f0521796b716fe 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -322,6 +322,7 @@ class PlannerTestBaseImpl { } void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { + pCxt->queryId = 1; if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) { pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery; pCxt->topicQuery = true;