diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 25df797c04b0b85d9f5297b45c392e2898ebd70a..419390379505a966096b733d8b75772b97c9ebed 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -106,7 +106,7 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString); static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql); -static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate); +static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate); static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); @@ -4689,7 +4689,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* return TSDB_CODE_SUCCESS; } -static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { +static int32_t setKeepOption(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { const char* msg = "invalid number of options"; pMsg->daysToKeep = htonl(-1); @@ -4720,7 +4720,7 @@ static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* p return TSDB_CODE_SUCCESS; } -static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDbInfo) { +static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDbInfo) { const char* msg = "invalid time precision"; pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default @@ -4744,7 +4744,7 @@ static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreate return TSDB_CODE_SUCCESS; } -static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { +static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { pMsg->blocksPerTable = htons(pCreateDb->numOfBlocksPerTable); pMsg->compression = pCreateDb->compressionLevel; @@ -4759,7 +4759,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { } int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { - SCreateDbMsg* pMsg = (SCreateDbMsg*)(pCmd->payload); + SCMCreateDbMsg* pMsg = (SCMCreateDbMsg*)(pCmd->payload); setCreateDBOption(pMsg, pCreateDbSql); if (setKeepOption(pCmd, pMsg, pCreateDbSql) != TSDB_CODE_SUCCESS) { @@ -5251,7 +5251,7 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { } // can only perform the parameters based on the macro definitation -int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) { +int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { char msg[512] = {0}; if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e86a2afbcde2a550a2412ef2397e837f15301d3e..83cf638133a4d673118594483a09a365abd06e22 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -116,7 +116,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SSqlRes *pRes = &pSql->res; if (code == 0) { - SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; + SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SRpcIpSet * pIpList = &pRsp->ipList; tscSetMgmtIpList(pIpList); @@ -942,7 +942,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); - SVPeerDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; + SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; if (numOfRows > 0) { assert(pRes->numOfRows == numOfRows); @@ -1141,7 +1141,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { int32_t idx = pMeterMetaInfo->vnodeIndex; SVnodeSidList *vnodeInfo = NULL; - SVPeerDesc * pSvd = NULL; + SVnodeDesc * pSvd = NULL; if (pMeterMetaInfo->pMetricMeta != NULL) { vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; @@ -1684,7 +1684,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SCreateDbMsg); + pCmd->payloadLen = sizeof(SCMCreateDbMsg); pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -1692,7 +1692,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg*)pCmd->payload; + SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload; assert(pCmd->numOfClause == 1); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); @@ -1703,13 +1703,13 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SCreateDnodeMsg); + pCmd->payloadLen = sizeof(SCMCreateDnodeMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload; + SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload; strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n); pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE; @@ -1718,13 +1718,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SCreateAcctMsg); + pCmd->payloadLen = sizeof(SCMCreateAcctMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload; + SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload; SSQLToken *pName = &pInfo->pDCLInfo->user.user; SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd; @@ -1763,14 +1763,14 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SCreateUserMsg); + pCmd->payloadLen = sizeof(SCMCreateUserMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SCreateUserMsg *pAlterMsg = (SCreateUserMsg*)pCmd->payload; + SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload; SUserInfo *pUser = &pInfo->pDCLInfo->user; strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n); @@ -1808,14 +1808,14 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SDropDbMsg); + pCmd->payloadLen = sizeof(SCMDropDbMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload; + SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db)); @@ -1827,14 +1827,14 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SDropTableMsg); + pCmd->payloadLen = sizeof(SCMDropTableMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SDropTableMsg *pDropTableMsg = (SDropTableMsg*)pCmd->payload; + SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name); pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; @@ -1845,13 +1845,13 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SDropDnodeMsg); + pCmd->payloadLen = sizeof(SCMDropDnodeMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCmd->payload; + SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDrop->ip, pMeterMetaInfo->name); pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE; @@ -1861,7 +1861,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SDropUserMsg); + pCmd->payloadLen = sizeof(SCMDropUserMsg); pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -1869,7 +1869,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SDropUserMsg *pDropMsg = (SDropUserMsg*)pCmd->payload; + SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDropMsg->user, pMeterMetaInfo->name); @@ -1878,14 +1878,14 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SUseDbMsg); + pCmd->payloadLen = sizeof(SCMUseDbMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SUseDbMsg *pUseDbMsg = (SUseDbMsg*)pCmd->payload; + SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pUseDbMsg->db, pMeterMetaInfo->name); pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB; @@ -1897,14 +1897,14 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STscObj *pObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW; - pCmd->payloadLen = sizeof(SShowMsg) + 100; + pCmd->payloadLen = sizeof(SCMShowMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SShowMsg *pShowMsg = (SShowMsg*)pCmd->payload; + SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); size_t nameLen = strlen(pMeterMetaInfo->name); @@ -1931,20 +1931,20 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShowMsg->payloadLen = htons(pIpAddr->n); } - pCmd->payloadLen = sizeof(SShowMsg) + pShowMsg->payloadLen; + pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen; return TSDB_CODE_SUCCESS; } int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SKillQueryMsg); + pCmd->payloadLen = sizeof(SCMKillQueryMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SKillQueryMsg *pKill = (SKillQueryMsg*)pCmd->payload; + SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload; strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n); switch (pCmd->command) { case TSDB_SQL_KILL_QUERY: @@ -1963,7 +1963,7 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &(pSql->cmd); - int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg); + int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg); SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo; if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) { @@ -1996,7 +1996,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } - SCreateTableMsg *pCreateTableMsg = (SCreateTableMsg *)pCmd->payload; + SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload; strcpy(pCreateTableMsg->tableId, pMeterMetaInfo->name); // use dbinfo from table id without modifying current db info @@ -2051,12 +2051,12 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + + return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE; } int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SAlterTableMsg *pAlterTableMsg; + SCMAlterTableMsg *pAlterTableMsg; char * pMsg; int msgLen = 0; int size = 0; @@ -2072,7 +2072,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return -1; } - pAlterTableMsg = (SAlterTableMsg *)pCmd->payload; + pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload; tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pAlterTableMsg->db); @@ -2107,7 +2107,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SAlterDbMsg); + pCmd->payloadLen = sizeof(SCMAlterDbMsg); pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -2115,7 +2115,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg*)pCmd->payload; + SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pAlterDbMsg->db, pMeterMetaInfo->name); @@ -2244,14 +2244,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STscObj *pObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT; - pCmd->payloadLen = sizeof(SConnectMsg); + pCmd->payloadLen = sizeof(SCMConnectMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload; + SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; char *db; // ugly code to move the space db = strstr(pObj->db, TS_PATH_DELIMITER); @@ -2264,7 +2264,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - STableInfoMsg *pInfoMsg; + SCMTableInfoMsg *pInfoMsg; char * pMsg; int msgLen = 0; @@ -2284,11 +2284,11 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - pInfoMsg = (STableInfoMsg *)pCmd->payload; + pInfoMsg = (SCMTableInfoMsg *)pCmd->payload; strcpy(pInfoMsg->tableId, pMeterMetaInfo->name); pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0); - pMsg = (char*)pInfoMsg + sizeof(STableInfoMsg); + pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg); if (pSql->cmd.createOnDemand) { memcpy(pInfoMsg->tags, tmpData, sizeof(STagData)); @@ -2307,7 +2307,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { /** * multi meter meta req pkg format: - * | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ...... + * | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ...... * no used 4B **/ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { @@ -2325,7 +2325,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize); memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db - SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); + SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); pInfoMsg->numOfTables = htonl((int32_t)pCmd->count); if (pCmd->payloadLen > 0) { @@ -2334,7 +2334,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tfree(tmpData); - pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg); + pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg); pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META; assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); @@ -2651,7 +2651,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { /** * multi meter meta rsp pkg format: - * | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 + * | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 * |...... 1B 1B 4B **/ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { @@ -2672,9 +2672,9 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { rsp++; - SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp; + SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp; totalNum = htonl(pInfo->numOfTables); - rsp += sizeof(SMultiTableInfoMsg); + rsp += sizeof(SCMMultiTableInfoMsg); for (i = 0; i < totalNum; i++) { SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp; @@ -2887,7 +2887,7 @@ _error_clean: */ int tscProcessShowRsp(SSqlObj *pSql) { STableMeta * pMeta; - SShowRsp *pShow; + SCMShowRsp *pShow; SSchema * pSchema; char key[20]; @@ -2898,7 +2898,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - pShow = (SShowRsp *)pRes->pRsp; + pShow = (SCMShowRsp *)pRes->pRsp; pShow->qhandle = htobe64(pShow->qhandle); pRes->qhandle = pShow->qhandle; @@ -2946,7 +2946,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; - SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp; + SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp; strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); @@ -2954,7 +2954,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { strncpy(pObj->db, temp, tListLen(pObj->db)); // SIpList * pIpList; -// char *rsp = pRes->pRsp + sizeof(SConnectRsp); +// char *rsp = pRes->pRsp + sizeof(SCMConnectRsp); // pIpList = (SIpList *)rsp; // tscSetMgmtIpList(pIpList); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 397ef2f0d03ed9fe4cc3c67b85dfdefcbdb0439a..3386b73acd104c702b8f528d9a808476abf9e863 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -769,8 +769,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi void tscCloseTscObj(STscObj* pObj) { pObj->signature = NULL; SSqlObj* pSql = pObj->pSql; - globalCode = pSql->res.code; - + if (pSql) { + globalCode = pSql->res.code; + } + taosTmrStopA(&(pObj->pTimer)); tscFreeSqlObj(pSql); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 3a653797f0f6f894bb361b1ef9fceeb12ac8dd66..7131440ec12ed16c8100fe5085acf3f8cb675645 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -325,7 +325,7 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { } static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { -// SDAlterStreamMsg *pStream = pCont; +// SMDAlterStreamMsg *pStream = pCont; // pStream->uid = htobe64(pStream->uid); // pStream->stime = htobe64(pStream->stime); // pStream->vnode = htonl(pStream->vnode); @@ -350,8 +350,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { return; } -// int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); -// SStatusMsg *pStatus = rpcMallocCont(contLen); +// int32_t contLen = sizeof(SDMStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); +// SDMStatusMsg *pStatus = rpcMallocCont(contLen); // if (pStatus == NULL) { // dError("Failed to malloc status message"); // return; diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 15f1e653c51e2d119e576ac0505d18ad7d98290e..d206577cdc63f4d2f1d9c4a124c09b02b7ed123a 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -25,8 +25,6 @@ extern "C" { #include "taosdef.h" #include "taosmsg.h" #include "taoserror.h" - - #include "sdb.h" #include "tglobalcfg.h" #include "thash.h" @@ -44,7 +42,6 @@ extern "C" { // internal globals extern char version[]; extern void *tsMgmtTmr; -extern void *tsMgmtTranQhandle; extern char tsMgmtDirectory[]; typedef struct { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d97a253d3668af79d289e6f10a373a6522698a48..b83b81d358a8fbcba28bb9bad44e5765431a1ef6 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -240,7 +240,7 @@ typedef struct SSchema { typedef struct { int32_t vnode; //the index of vnode uint32_t ip; -} SVPeerDesc; +} SVnodeDesc; typedef struct { int8_t tableType; @@ -255,7 +255,7 @@ typedef struct { uint64_t uid; uint64_t superTableUid; uint64_t createdTime; - SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; char tableId[TSDB_TABLE_ID_LEN + 1]; char superTableId[TSDB_TABLE_ID_LEN + 1]; char data[]; @@ -270,12 +270,12 @@ typedef struct { int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string int16_t reserved[16]; char schema[]; -} SCreateTableMsg; +} SCMCreateTableMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t igNotExists; -} SDropTableMsg; +} SCMDropTableMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; @@ -284,13 +284,13 @@ typedef struct { char tagVal[TSDB_MAX_BYTES_PER_ROW]; int8_t numOfCols; /* number of schema */ SSchema schema[]; -} SAlterTableMsg; +} SCMAlterTableMsg; typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; char db[TSDB_TABLE_ID_LEN + 1]; -} SConnectMsg; +} SCMConnectMsg; typedef struct { char acctId[TSDB_ACCT_LEN + 1]; @@ -298,7 +298,7 @@ typedef struct { int8_t writeAuth; int8_t superAuth; SRpcIpSet ipList; -} SConnectRsp; +} SCMConnectRsp; typedef struct { int32_t maxUsers; @@ -318,18 +318,18 @@ typedef struct { char user[TSDB_USER_LEN + 1]; char pass[TSDB_KEY_LEN + 1]; SAcctCfg cfg; -} SCreateAcctMsg, SAlterAcctMsg; +} SCMCreateAcctMsg, SCMAlterAcctMsg; typedef struct { char user[TSDB_USER_LEN + 1]; -} SDropUserMsg, SDropAcctMsg; +} SCMDropUserMsg, SCMDropAcctMsg; typedef struct { char user[TSDB_USER_LEN + 1]; char pass[TSDB_KEY_LEN + 1]; int8_t privilege; int8_t flag; -} SCreateUserMsg, SAlterUserMsg; +} SCMCreateUserMsg, SCMAlterUserMsg; typedef struct { char db[TSDB_TABLE_ID_LEN + 1]; @@ -339,7 +339,7 @@ typedef struct { int32_t sid; int32_t numOfVPeers; uint64_t uid; - SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropTableMsg; @@ -562,12 +562,12 @@ typedef struct { int8_t loadLatest; // load into mem or not uint8_t precision; // time resolution int8_t reserved[16]; -} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; +} SVnodeCfg, SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg; typedef struct { char db[TSDB_TABLE_ID_LEN + 1]; uint8_t ignoreNotExists; -} SDropDbMsg, SUseDbMsg; +} SCMDropDbMsg, SCMUseDbMsg; // IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed // TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE @@ -598,52 +598,40 @@ typedef struct { uint8_t alternativeRole; uint8_t reserve[15]; SVnodeLoad load[]; -} SStatusMsg; +} SDMStatusMsg; typedef struct { int32_t code; SDnodeState dnodeState; SRpcIpSet ipList; SVnodeAccess vnodeAccess[]; -} SStatusRsp; - -typedef struct { - char spi; - char encrypt; - char secret[TSDB_KEY_LEN]; // key is changed if updated - char cipheringKey[TSDB_KEY_LEN]; -} SSecIe; - -typedef struct { - int32_t numOfVPeers; - SVPeerDesc vpeerDesc[]; -} SVpeerDescArray; +} SDMStatusRsp; typedef struct { int32_t vnode; SVnodeCfg cfg; - SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; } SMDCreateVnodeMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int16_t createFlag; char tags[]; -} STableInfoMsg; +} SCMTableInfoMsg; typedef struct { int32_t numOfTables; char tableIds[]; -} SMultiTableInfoMsg; +} SCMMultiTableInfoMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; -} SSuperTableInfoMsg; +} SCMSuperTableInfoMsg; typedef struct { int32_t numOfDnodes; uint32_t dnodeIps[]; -} SSuperTableInfoRsp; +} SCMSuperTableInfoRsp; typedef struct { int16_t elemLen; @@ -675,7 +663,7 @@ typedef struct { } SSuperTableMetaMsg; typedef struct { - SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; + SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; int16_t index; // used locally int32_t numOfSids; int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo @@ -699,7 +687,7 @@ typedef struct STableMeta { int16_t rowSize; // used locally, calculated in client int16_t sversion; int8_t numOfVpeers; - SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; + SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; int32_t sid; int32_t vgid; uint64_t uid; @@ -727,27 +715,27 @@ typedef struct { char db[TSDB_DB_NAME_LEN + 1]; uint16_t payloadLen; char payload[]; -} SShowMsg; +} SCMShowMsg; typedef struct { uint64_t qhandle; STableMeta tableMeta; -} SShowRsp; +} SCMShowRsp; typedef struct { char ip[32]; -} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg; +} SCMCreateDnodeMsg, SCMDropDnodeMsg; typedef struct { uint32_t dnode; int32_t vnode; int32_t sid; -} STableCfgMsg; +} SDMConfigTableMsg; typedef struct { uint32_t dnode; int32_t vnode; -} SVpeerCfgMsg; +} SDMConfigVnodeMsg; typedef struct { char ip[32]; @@ -785,18 +773,18 @@ typedef struct { typedef struct { SQqueryList qlist; SStreamList slist; -} SHeartBeatMsg; +} SCMHeartBeatMsg; typedef struct { uint32_t queryId; uint32_t streamId; int8_t killConnection; SRpcIpSet ipList; -} SHeartBeatRsp; +} SCMHeartBeatRsp; typedef struct { char queryId[TSDB_KILL_MSG_LEN + 1]; -} SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg; +} SCMKillQueryMsg, SCMKillStreamMsg, SCMKillConnMsg; typedef struct { int32_t vnode; @@ -805,7 +793,7 @@ typedef struct { uint64_t stime; // stream starting time int32_t status; char tableId[TSDB_TABLE_ID_LEN + 1]; -} SDAlterStreamMsg; +} SMDAlterStreamMsg; #pragma pack(pop) diff --git a/src/inc/trpc.h b/src/inc/trpc.h index c4374a5c98cde4c29e6115264710fee58a9ea8d3..e545abfed378f661c6ab58278590784c985b2672 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -84,7 +84,7 @@ void *rpcReallocCont(void *ptr, int contLen); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendResponse(SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); -void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); #ifdef __cplusplus } diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 6b184b53b62b11e13dff737f50d2d2bd12aabce9..a7b7e8383bafab2f76682488d131c0d2bfbe65d3 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -81,17 +81,6 @@ struct arguments args = { */ int main(int argc, char* argv[]) { /*setlocale(LC_ALL, "en_US.UTF-8"); */ - // - if (argc == 1) - { - printf("=== this a test for debug usage\n"); - void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0); - taos_query(taos, "select * from d1.t6"); - while (1) { - sleep(1000); - } - } - // if (!checkVersion()) { exit(EXIT_FAILURE); diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index e84add1c48c033e7514f77ed39ee989128ad4558..5012ae8f17d21bd6f2817e6ed7dc6b0bd0184d30 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -30,7 +30,7 @@ int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); void * mgmtGetChildTable(char *tableId); -int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, +int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index d46adaf3cb5ccb303596f7d8b9ae98814cbe2be0..f740765ed1475e951a0f5e5cc08e9667dbc11d15 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -28,7 +28,7 @@ int32_t mgmtInitNormalTables(); void mgmtCleanUpNormalTables(); void * mgmtGetNormalTable(char *tableId); -int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, +int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 609e8d079f90e82be6aa6b6faf1bf7f685d69eae..e9da9e546df7e2215f340dca6850a1d28e214e50 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables(); void * mgmtGetSuperTable(char *tableId); -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate); +int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate); int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); diff --git a/src/mnode/inc/mgmtSystem.h b/src/mnode/inc/mgmtSystem.h deleted file mode 100644 index 5d71809f36de9ab44797c907c89fad44de0d47f6..0000000000000000000000000000000000000000 --- a/src/mnode/inc/mgmtSystem.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MGMT_SYSTEM_H -#define TDENGINE_MGMT_SYSTEM_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include - -int32_t mgmtInitSystem(); -int32_t mgmtStartSystem(); -void mgmtCleanUpSystem(); -void mgmtStopSystem(); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 2871a5ae6c7be51f77923f1e7d396d0f1caa64c3..2d94d48882ab5b337041f06af11787bec7b747e8 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -33,9 +33,9 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); +int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); -int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); +int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); @@ -45,8 +45,8 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); -void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); -void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); +void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); +void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index e19c804e2113632beaf5633b4df16d537fa9fbd1..0c7518eef73d32844aecd7b10846465c9edabe41 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -314,7 +314,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou return pCreateTable; } -int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, +int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= tsMaxTables) { @@ -350,7 +350,7 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj * } pTagData += (TSDB_TABLE_ID_LEN + 1); - int32_t tagDataLen = contLen - sizeof(SCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; *pDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen); if (*pDCreateOut == NULL) { mError("table:%s, failed to build create table message", pCreate->tableId); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 3c5d365f72e8ac4ff7c6d572ae76d47bb47838a7..469c903d833a265bd4307dceae420f89b9859aab 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -89,7 +89,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // // //static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { -// STableCfgMsg *pCfg = (STableCfgMsg *) pCont; +// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont; // pCfg->dnode = htonl(pCfg->dnode); // pCfg->vnode = htonl(pCfg->vnode); // pCfg->sid = htonl(pCfg->sid); @@ -121,7 +121,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // return; // } // -// SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *) pCont; +// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont; // pCfg->dnode = htonl(pCfg->dnode); // pCfg->vnode = htonl(pCfg->vnode); // @@ -317,7 +317,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s //} // //void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { -// SStatusMsg *pStatus = (SStatusMsg *)pCont; +// SDMStatusMsg *pStatus = (SDMStatusMsg *)pCont; // // SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp)); // if (pObj == NULL) { diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 2cbbfedbb079efefc54295f33fb6b2c6ae2271d9..5670d2a384a8e2376eeb824c5851dba96593ed77 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -35,7 +35,7 @@ static void *tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtUpdateDb(SDbObj *pDb); -static int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate); +static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); static int32_t mgmtDropDb(SDbObj *pDb); @@ -81,7 +81,7 @@ int32_t mgmtInitDbs() { SDbObj tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; - tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "db", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction); + tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction); if (tsDbSdb == NULL) { mError("failed to init db data"); return -1; @@ -133,7 +133,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { return (SDbObj *)sdbGetRow(tsDbSdb, db); } -static int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) { +static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); return TSDB_CODE_INVALID_OPTION; @@ -206,7 +206,7 @@ static int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) { +static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) { // assign default parameters if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; // if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; // @@ -251,7 +251,7 @@ static int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) { +static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); if (numOfDbs >= tsMaxDbs) { mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); @@ -442,7 +442,7 @@ static void mgmtMonitorDbDrop(void *unused, void *unusedt) { } } -static int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { +static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) { return 0; // int32_t code = TSDB_CODE_SUCCESS; // @@ -915,7 +915,7 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { return; } - SCreateDbMsg *pCreate = (SCreateDbMsg *) rpcMsg->pCont; + SCMCreateDbMsg *pCreate = (SCMCreateDbMsg *) rpcMsg->pCont; pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); @@ -953,7 +953,7 @@ static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) { return; } - SAlterDbMsg *pAlter = (SAlterDbMsg *) rpcMsg->pCont; + SCMAlterDbMsg *pAlter = (SCMAlterDbMsg *) rpcMsg->pCont; pAlter->daysPerFile = htonl(pAlter->daysPerFile); pAlter->daysToKeep = htonl(pAlter->daysToKeep); pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; @@ -983,7 +983,7 @@ static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) { } if (pUser->superAuth) { - SDropDbMsg *pDrop = rpcMsg->pCont; + SCMDropDbMsg *pDrop = rpcMsg->pCont; rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); if (rpcRsp.code == TSDB_CODE_SUCCESS) { mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtMain.c similarity index 83% rename from src/mnode/src/mgmtSystem.c rename to src/mnode/src/mgmtMain.c index 96e0abcb700b9d1cb529ee91093b7fb68b9d5aee..3e142e595d15e93a6e7c42f8a7572794de9b1b10 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtMain.c @@ -27,41 +27,28 @@ #include "mgmtDServer.h" #include "mgmtVgroup.h" #include "mgmtUser.h" -#include "mgmtSystem.h" #include "mgmtTable.h" #include "mgmtShell.h" +static int32_t mgmtCheckMgmtRunning(); char tsMgmtDirectory[128] = {0}; -void *tsMgmtTmr = NULL; -void *tsMgmtTranQhandle = NULL; +void *tsMgmtTmr = NULL; - -void mgmtCleanUpSystem() { - mPrint("starting to clean up mgmt"); - - sdbCleanUpPeers(); - mgmtCleanupBalance(); - mgmtCleanupDClient(); - mgmtCleanupDServer(); - mgmtCleanUpShell(); - mgmtCleanUpTables(); - mgmtCleanUpVgroups(); - mgmtCleanUpDbs(); - mgmtCleanUpDnodes(); - mgmtCleanUpUsers(); - mgmtCleanUpAccts(); - taosTmrCleanUp(tsMgmtTmr); - taosCleanUpScheduler(tsMgmtTranQhandle); - - mPrint("mgmt is cleaned up"); -} - -int32_t mgmtCheckMgmtRunning() { - if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) { +int32_t mgmtInitSystem() { + if (mgmtInitShell() != 0) { + mError("failed to init shell"); return -1; } - tsetModuleStatus(TSDB_MOD_MGMT); + struct stat dirstat; + bool fileExist = (stat(tsMgmtDirectory, &dirstat) == 0); + bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); + + if (asMaster || fileExist) { + if (mgmtStartSystem() != 0) { + return -1; + } + } return 0; } @@ -79,11 +66,9 @@ int32_t mgmtStartSystem() { return 0; } - tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT"); - tsMgmtTmr = taosTmrInit((tsMaxDnodes + tsMaxShellConns) * 3, 200, 3600000, "MND"); if (tsMgmtTmr == NULL) { - mError("failed to init timer, exit"); + mError("failed to init timer"); return -1; } @@ -125,11 +110,6 @@ int32_t mgmtStartSystem() { return -1; } - if (mgmtInitShell() < 0) { - mError("failed to init shell"); - return -1; - } - if (sdbInitPeers(tsMgmtDirectory) < 0) { mError("failed to init peers"); return -1; @@ -139,30 +119,11 @@ int32_t mgmtStartSystem() { mError("failed to init dnode balance") } - mPrint("TDengine mgmt is initialized successfully"); return 0; } -int32_t mgmtInitSystem() { - struct stat dirstat; - bool directoryExist = (stat(tsMgmtDirectory, &dirstat) == 0); - bool equalWithMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); - - if (equalWithMaster || directoryExist) { - if (mgmtStartSystem() != 0) { - return -1; - } - } - - if (mgmtInitShell() < 0) { - mError("failed to init shell"); - return -1; - } - - return 0; -} void mgmtStopSystem() { if (sdbMaster) { @@ -172,5 +133,30 @@ void mgmtStopSystem() { mgmtCleanUpSystem(); remove(tsMgmtDirectory); -// mgmtInitRedirect(); } + +void mgmtCleanUpSystem() { + mPrint("starting to clean up mgmt"); + sdbCleanUpPeers(); + mgmtCleanupBalance(); + mgmtCleanUpShell(); + mgmtCleanupDClient(); + mgmtCleanupDServer(); + mgmtCleanUpTables(); + mgmtCleanUpVgroups(); + mgmtCleanUpDbs(); + mgmtCleanUpDnodes(); + mgmtCleanUpUsers(); + mgmtCleanUpAccts(); + taosTmrCleanUp(tsMgmtTmr); + mPrint("mgmt is cleaned up"); +} + +static int32_t mgmtCheckMgmtRunning() { + if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) { + return -1; + } + + tsetModuleStatus(TSDB_MOD_MGMT); + return 0; +} \ No newline at end of file diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 9e1b3f8ceb70a8c37e571dd3aeb54dc83ffe8c2a..637d0a1107fa60d564d01f68d13323940360cf38 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -18,5 +18,5 @@ #include "mgmtMnode.h" bool mgmtCheckRedirect(void *handle) { - return true; + return false; } \ No newline at end of file diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index ff2ed52ac339a2bf1b0a7fa403cd04b6861bd725..d6623ee7009a8be93d0af69642fa6d04c74758cf 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -328,7 +328,7 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr return pCreateTable; } -int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, +int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 2061fc3409c64ae2c9c881951c471797ce2be9c2..e16a7f166c7b1c25c34c4b662caedaf57acc7e0e 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -22,7 +22,7 @@ #include "mgmtShell.h" #include "mgmtUser.h" -int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); +int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); int32_t mgmtKillQuery(char *qidstr, void *pConn); int32_t mgmtKillStream(char *qidstr, void *pConn); @@ -63,7 +63,7 @@ typedef struct { SStreamDesc sdesc[]; } SStreamShow; -int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg) { +int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) { // SAcctObj *pAcct = pConn->pAcct; // // if (contLen <= 0 || pAcct == NULL) { @@ -684,7 +684,7 @@ void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) { return; } - SKillQueryMsg *pKill = (SKillQueryMsg *) rpcMsg->pCont; + SCMKillQueryMsg *pKill = (SCMKillQueryMsg *) rpcMsg->pCont; int32_t code; if (!pUser->writeAuth) { @@ -708,7 +708,7 @@ void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) { return; } - SKillStreamMsg *pKill = (SKillStreamMsg *) rpcMsg->pCont; + SCMKillStreamMsg *pKill = (SCMKillStreamMsg *) rpcMsg->pCont; int32_t code; if (!pUser->writeAuth) { @@ -732,7 +732,7 @@ void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) { return; } - SKillConnectionMsg *pKill = (SKillConnectionMsg *) rpcMsg->pCont; + SCMKillConnMsg *pKill = (SCMKillConnMsg *) rpcMsg->pCont; int32_t code; if (!pUser->writeAuth) { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 380508874692b408c5d3c91655dff6201691d166..094848c8c081ba60495d28493ca6b05139d99a38 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -51,6 +51,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg); static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg); static void *tsMgmtShellRpc = NULL; +static void *tsMgmtTranQhandle = NULL; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *) = {0}; static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; @@ -59,6 +60,8 @@ int32_t mgmtInitShell() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); + tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT"); + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; if (numOfThreads < 1) { numOfThreads = 1; @@ -89,6 +92,11 @@ int32_t mgmtInitShell() { } void mgmtCleanUpShell() { + if (tsMgmtTranQhandle) { + taosCleanUpScheduler(tsMgmtTranQhandle); + tsMgmtTranQhandle = NULL; + } + if (tsMgmtShellRpc) { rpcClose(tsMgmtShellRpc); tsMgmtShellRpc = NULL; @@ -112,12 +120,16 @@ void mgmtProcessTranRequest(SSchedMsg *sched) { SRpcMsg *rpcMsg = sched->msg; (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); } void mgmtAddToTranRequest(SRpcMsg *rpcMsg) { + SRpcMsg *queuedRpcMsg = malloc(sizeof(SRpcMsg)); + memcpy(queuedRpcMsg, rpcMsg, sizeof(SRpcMsg)); + SSchedMsg schedMsg; - schedMsg.msg = rpcMsg; - schedMsg.fp = mgmtProcessTranRequest; + schedMsg.msg = queuedRpcMsg; + schedMsg.fp = mgmtProcessTranRequest; taosScheduleTask(tsMgmtTranQhandle, &schedMsg); } @@ -130,6 +142,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } + mTrace("%s is received", taosMsg[rpcMsg->msgType]); if (tsMgmtProcessShellMsgFp[rpcMsg->msgType]) { if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); @@ -147,15 +160,15 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SShowMsg *pShowMsg = rpcMsg->pCont; + SCMShowMsg *pShowMsg = rpcMsg->pCont; if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } } - int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; - SShowRsp *pShowRsp = rpcMallocCont(size); + int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + SCMShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; rpcSendResponse(&rpcRsp); @@ -179,9 +192,9 @@ static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { mgmtSaveQhandle(pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow); if (tsMgmtShowMetaFp[pShowMsg->type]) { - code = (*tsMgmtShowMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle); + code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle); if (code == 0) { - size = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; + size = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; } else { mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[(uint8_t) pShowMsg->type], code); @@ -245,7 +258,7 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) - rowsRead = (*tsMgmtShowRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); + rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); if (rowsRead < 0) { rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; @@ -267,10 +280,10 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - //SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) rpcMsg->pCont; + //SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont; //mgmtSaveQueryStreamList(pHBMsg); - SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(sizeof(SHeartBeatRsp)); + SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); if (pHBRsp == NULL) { rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; rpcSendResponse(&rpcRsp); @@ -278,7 +291,10 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { } SRpcConnInfo connInfo; - rpcGetConnInfo(rpcMsg->handle, &connInfo); + if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { + mError("conn:%p is already released while process heart beat msg", rpcMsg->handle); + return; + } pHBRsp->ipList.inUse = 0; pHBRsp->ipList.port = htons(tsMnodeShellPort); @@ -305,7 +321,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { pHBRsp->killConnection = 0; rpcRsp.pCont = pHBRsp; - rpcRsp.contLen = sizeof(SHeartBeatRsp); + rpcRsp.contLen = sizeof(SCMHeartBeatRsp); rpcSendResponse(&rpcRsp); } @@ -326,12 +342,15 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SConnectMsg *pConnectMsg = (SConnectMsg *) rpcMsg->pCont; + SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) rpcMsg->pCont; SRpcConnInfo connInfo; - rpcGetConnInfo(rpcMsg->handle, &connInfo); - int32_t code; + if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { + mError("conn:%p is already released while process connect msg", rpcMsg->handle); + return; + } + int32_t code; SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { code = TSDB_CODE_INVALID_USER; @@ -364,7 +383,7 @@ static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { } } - SConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SConnectRsp)); + SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); if (pConnectRsp == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto connect_over; @@ -397,7 +416,7 @@ connect_over: } else { mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); rpcRsp.pCont = pConnectRsp; - rpcRsp.contLen = sizeof(SConnectRsp); + rpcRsp.contLen = sizeof(SCMConnectRsp); } rpcSendResponse(&rpcRsp); } @@ -406,7 +425,7 @@ connect_over: * check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one. */ static bool mgmtCheckMeterMetaMsgType(void *pMsg) { - STableInfoMsg *pInfo = (STableInfoMsg *) pMsg; + SCMTableInfoMsg *pInfo = (SCMTableInfoMsg *) pMsg; int16_t autoCreate = htons(pInfo->createFlag); STableInfo *pTable = mgmtGetTable(pInfo->tableId); diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 1d7db53a5e71191068f77437f4c4039b034f6618..ed4fa3b6a6a706b43a68084daae341a6b1d50d91 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -200,7 +200,7 @@ void mgmtCleanUpSuperTables() { sdbCloseTable(tsSuperTableSdb); } -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { +int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) { int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb); if (numOfTables >= TSDB_MAX_SUPER_TABLES) { mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES); @@ -259,7 +259,7 @@ void* mgmtGetSuperTable(char *tableId) { void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { //TODO get vgroup of dnodes - SSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); + SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); rsp->numOfDnodes = 1; rsp->dnodeIps[0] = 0; return rsp; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 6e7c617558ffe19f92cea6df17874f452c08c0ed..188083d516904165fde44c6a8a600f90200ca8bd 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -133,7 +133,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo -void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { +void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb == NULL) { @@ -189,7 +189,7 @@ void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *th // -void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { +void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { assert(pVgroup != NULL); SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; @@ -239,7 +239,7 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c mgmtSendMsgToDnode(&ipSet, &rpcMsg); } -int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { +int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb == NULL) { mError("table:%s, failed to create table, db not selected", pCreate->tableId); @@ -331,7 +331,7 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { } } -int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { +int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { STableInfo *pTable = mgmtGetTable(pAlter->tableId); if (pTable == NULL) { return TSDB_CODE_INVALID_TABLE; @@ -553,7 +553,7 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCreateTableMsg *pCreate = (SCreateTableMsg *) rpcMsg->pCont; + SCMCreateTableMsg *pCreate = (SCMCreateTableMsg *) rpcMsg->pCont; pCreate->numOfColumns = htons(pCreate->numOfColumns); pCreate->numOfTags = htons(pCreate->numOfTags); pCreate->sqlLen = htons(pCreate->sqlLen); @@ -594,7 +594,7 @@ void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SDropTableMsg *pDrop = (SDropTableMsg *) rpcMsg->pCont; + SCMDropTableMsg *pDrop = (SCMDropTableMsg *) rpcMsg->pCont; if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("table:%s, failed to drop table, need redirect message", pDrop->tableId); @@ -644,7 +644,7 @@ void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { return; } - SAlterTableMsg *pAlter = (SAlterTableMsg *) rpcMsg->pCont; + SCMAlterTableMsg *pAlter = (SCMAlterTableMsg *) rpcMsg->pCont; if (!pUser->writeAuth) { rpcRsp.code = TSDB_CODE_NO_RIGHTS; @@ -686,7 +686,11 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { } SRpcConnInfo connInfo; - rpcGetConnInfo(thandle, &connInfo); + if (rpcGetConnInfo(thandle, &connInfo) != 0) { + mError("conn:%p is already released while get table meta", thandle); + return; + } + bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); @@ -710,7 +714,7 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { rpcRsp.pCont = NULL; rpcRsp.contLen = 0; - STableInfoMsg *pInfo = rpcMsg->pCont; + SCMTableInfoMsg *pInfo = rpcMsg->pCont; pInfo->createFlag = htons(pInfo->createFlag); SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); @@ -735,8 +739,8 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { return; } - int32_t contLen = sizeof(SCreateTableMsg) + sizeof(STagData); - SCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); + int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData); + SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); if (pCreateMsg == NULL) { mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -762,7 +766,10 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { rpcRsp.contLen = 0; SRpcConnInfo connInfo; - rpcGetConnInfo(rpcMsg->handle, &connInfo); + if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { + mError("conn:%p is already released while get mulit table meta", rpcMsg->handle); + return; + } bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); SUserObj *pUser = mgmtGetUser(connInfo.user); @@ -772,7 +779,7 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { return; } - SMultiTableInfoMsg *pInfo = rpcMsg->pCont; + SCMMultiTableInfoMsg *pInfo = rpcMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice @@ -823,7 +830,7 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SSuperTableInfoMsg *pInfo = rpcMsg->pCont; + SCMSuperTableInfoMsg *pInfo = rpcMsg->pCont; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); if (pTable == NULL) { rpcRsp.code = TSDB_CODE_INVALID_TABLE; @@ -831,7 +838,7 @@ void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { return; } - SSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); + SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); if (pRsp != NULL) { int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); rpcRsp.pCont = pRsp; diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 2d898fa34c6ee9dff443e5e23cc1ba41de0c0672..821e553810d7f430dbe2eb8594eb31bf6e3ec34d 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -59,7 +59,7 @@ int32_t mgmtInitUsers() { SUserObj tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "user", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction); + tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction); if (tsUserSdb == NULL) { mError("failed to init user data"); return -1; @@ -123,7 +123,7 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { SUserObj *pUser = (SUserObj *)sdbGetRow(tsUserSdb, name); if (pUser != NULL) { - mWarn("user:%s is already there", name); + mTrace("user:%s is already there", name); return TSDB_CODE_USER_ALREADY_EXIST; } @@ -330,9 +330,11 @@ static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t * SUserObj *mgmtGetUserFromConn(void *pConn) { SRpcConnInfo connInfo; - rpcGetConnInfo(pConn, &connInfo); + if (rpcGetConnInfo(pConn, &connInfo) == 0) { + return mgmtGetUser(connInfo.user); + } - return mgmtGetUser(connInfo.user); + return NULL; } static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { @@ -347,7 +349,7 @@ static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { } if (pUser->superAuth) { - SCreateUserMsg *pCreate = rpcMsg->pCont; + SCMCreateUserMsg *pCreate = rpcMsg->pCont; rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); if (rpcRsp.code == TSDB_CODE_SUCCESS) { mLPrint("user:%s is created by %s", pCreate->user, pUser->user); @@ -370,7 +372,7 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { return; } - SAlterUserMsg *pAlter = rpcMsg->pCont; + SCMAlterUserMsg *pAlter = rpcMsg->pCont; SUserObj *pUser = mgmtGetUser(pAlter->user); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; @@ -477,7 +479,7 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { return ; } - SDropUserMsg *pDrop = rpcMsg->pCont; + SCMDropUserMsg *pDrop = rpcMsg->pCont; SUserObj *pUser = mgmtGetUser(pDrop->user); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 2b158f3f35a1967fb4a233457a21c9e96b10b5c6..c4a11bd11ea269ba5a1cdc2dacc99348c8297de3 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -534,7 +534,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - SVPeerDesc *vpeerDesc = pVPeers->vpeerDesc; + SVnodeDesc *vpeerDesc = pVPeers->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 291cb5effcdd2978c1d7c0f331d2363d72dceae9..d62d0651fc0f04682bb7e18e7bd130512d57f74c 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -441,15 +441,16 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { return; } -void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { SRpcConn *pConn = (SRpcConn *)thandle; + if (pConn->user[0] == 0) return -1; pInfo->clientIp = pConn->peerIp; pInfo->clientPort = pConn->peerPort; pInfo->serverIp = pConn->destIp; - assert(pConn->user[0]); strcpy(pInfo->user, pConn->user); + return 0; } static void rpcFreeMsg(void *msg) {