提交 54405164 编写于 作者: S slguan

[TD-10] change message name, and the create user commnad can be executed

上级 7041b747
...@@ -106,7 +106,7 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString); ...@@ -106,7 +106,7 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql); static int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate); static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate);
static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex);
...@@ -4689,7 +4689,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* ...@@ -4689,7 +4689,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { static int32_t setKeepOption(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
const char* msg = "invalid number of options"; const char* msg = "invalid number of options";
pMsg->daysToKeep = htonl(-1); pMsg->daysToKeep = htonl(-1);
...@@ -4720,7 +4720,7 @@ static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* p ...@@ -4720,7 +4720,7 @@ static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDbInfo) { static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDbInfo) {
const char* msg = "invalid time precision"; const char* msg = "invalid time precision";
pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default
...@@ -4744,7 +4744,7 @@ static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreate ...@@ -4744,7 +4744,7 @@ static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreate
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->blocksPerTable = htons(pCreateDb->numOfBlocksPerTable); pMsg->blocksPerTable = htons(pCreateDb->numOfBlocksPerTable);
pMsg->compression = pCreateDb->compressionLevel; pMsg->compression = pCreateDb->compressionLevel;
...@@ -4759,7 +4759,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { ...@@ -4759,7 +4759,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
} }
int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
SCreateDbMsg* pMsg = (SCreateDbMsg*)(pCmd->payload); SCMCreateDbMsg* pMsg = (SCMCreateDbMsg*)(pCmd->payload);
setCreateDBOption(pMsg, pCreateDbSql); setCreateDBOption(pMsg, pCreateDbSql);
if (setKeepOption(pCmd, pMsg, pCreateDbSql) != TSDB_CODE_SUCCESS) { if (setKeepOption(pCmd, pMsg, pCreateDbSql) != TSDB_CODE_SUCCESS) {
...@@ -5251,7 +5251,7 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -5251,7 +5251,7 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
} }
// can only perform the parameters based on the macro definitation // can only perform the parameters based on the macro definitation
int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
char msg[512] = {0}; char msg[512] = {0};
if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) {
......
...@@ -116,7 +116,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -116,7 +116,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (code == 0) { if (code == 0) {
SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SRpcIpSet * pIpList = &pRsp->ipList; SRpcIpSet * pIpList = &pRsp->ipList;
tscSetMgmtIpList(pIpList); tscSetMgmtIpList(pIpList);
...@@ -942,7 +942,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -942,7 +942,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
SVPeerDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) { if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows); assert(pRes->numOfRows == numOfRows);
...@@ -1141,7 +1141,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1141,7 +1141,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
int32_t idx = pMeterMetaInfo->vnodeIndex; int32_t idx = pMeterMetaInfo->vnodeIndex;
SVnodeSidList *vnodeInfo = NULL; SVnodeSidList *vnodeInfo = NULL;
SVPeerDesc * pSvd = NULL; SVnodeDesc * pSvd = NULL;
if (pMeterMetaInfo->pMetricMeta != NULL) { if (pMeterMetaInfo->pMetricMeta != NULL) {
vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
...@@ -1684,7 +1684,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1684,7 +1684,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateDbMsg); pCmd->payloadLen = sizeof(SCMCreateDbMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB; pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
...@@ -1692,7 +1692,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1692,7 +1692,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg*)pCmd->payload; SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
...@@ -1703,13 +1703,13 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1703,13 +1703,13 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateDnodeMsg); pCmd->payloadLen = sizeof(SCMCreateDnodeMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload; SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n); strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE; pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
...@@ -1718,13 +1718,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1718,13 +1718,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateAcctMsg); pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload; SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
SSQLToken *pName = &pInfo->pDCLInfo->user.user; SSQLToken *pName = &pInfo->pDCLInfo->user.user;
SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd; SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
...@@ -1763,14 +1763,14 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1763,14 +1763,14 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateUserMsg); pCmd->payloadLen = sizeof(SCMCreateUserMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SCreateUserMsg *pAlterMsg = (SCreateUserMsg*)pCmd->payload; SCMCreateUserMsg *pAlterMsg = (SCMCreateUserMsg*)pCmd->payload;
SUserInfo *pUser = &pInfo->pDCLInfo->user; SUserInfo *pUser = &pInfo->pDCLInfo->user;
strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n); strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
...@@ -1808,14 +1808,14 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1808,14 +1808,14 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SDropDbMsg); pCmd->payloadLen = sizeof(SCMDropDbMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload; SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db)); strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db));
...@@ -1827,14 +1827,14 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1827,14 +1827,14 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SDropTableMsg); pCmd->payloadLen = sizeof(SCMDropTableMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SDropTableMsg *pDropTableMsg = (SDropTableMsg*)pCmd->payload; SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name); strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name);
pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
...@@ -1845,13 +1845,13 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1845,13 +1845,13 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SDropDnodeMsg); pCmd->payloadLen = sizeof(SCMDropDnodeMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCmd->payload; SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDrop->ip, pMeterMetaInfo->name); strcpy(pDrop->ip, pMeterMetaInfo->name);
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE; pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
...@@ -1861,7 +1861,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1861,7 +1861,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SDropUserMsg); pCmd->payloadLen = sizeof(SCMDropUserMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER; pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
...@@ -1869,7 +1869,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1869,7 +1869,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SDropUserMsg *pDropMsg = (SDropUserMsg*)pCmd->payload; SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pMeterMetaInfo->name); strcpy(pDropMsg->user, pMeterMetaInfo->name);
...@@ -1878,14 +1878,14 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1878,14 +1878,14 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SUseDbMsg); pCmd->payloadLen = sizeof(SCMUseDbMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SUseDbMsg *pUseDbMsg = (SUseDbMsg*)pCmd->payload; SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pUseDbMsg->db, pMeterMetaInfo->name); strcpy(pUseDbMsg->db, pMeterMetaInfo->name);
pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB; pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
...@@ -1897,14 +1897,14 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1897,14 +1897,14 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW; pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW;
pCmd->payloadLen = sizeof(SShowMsg) + 100; pCmd->payloadLen = sizeof(SCMShowMsg) + 100;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SShowMsg *pShowMsg = (SShowMsg*)pCmd->payload; SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
size_t nameLen = strlen(pMeterMetaInfo->name); size_t nameLen = strlen(pMeterMetaInfo->name);
...@@ -1931,20 +1931,20 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1931,20 +1931,20 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShowMsg->payloadLen = htons(pIpAddr->n); pShowMsg->payloadLen = htons(pIpAddr->n);
} }
pCmd->payloadLen = sizeof(SShowMsg) + pShowMsg->payloadLen; pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SKillQueryMsg); pCmd->payloadLen = sizeof(SCMKillQueryMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SKillQueryMsg *pKill = (SKillQueryMsg*)pCmd->payload; SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n); strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
switch (pCmd->command) { switch (pCmd->command) {
case TSDB_SQL_KILL_QUERY: case TSDB_SQL_KILL_QUERY:
...@@ -1963,7 +1963,7 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1963,7 +1963,7 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) { int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &(pSql->cmd); SSqlCmd *pCmd = &(pSql->cmd);
int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg); int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMCreateTableMsg);
SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo; SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) { if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
...@@ -1996,7 +1996,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1996,7 +1996,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
SCreateTableMsg *pCreateTableMsg = (SCreateTableMsg *)pCmd->payload; SCMCreateTableMsg *pCreateTableMsg = (SCMCreateTableMsg *)pCmd->payload;
strcpy(pCreateTableMsg->tableId, pMeterMetaInfo->name); strcpy(pCreateTableMsg->tableId, pMeterMetaInfo->name);
// use dbinfo from table id without modifying current db info // use dbinfo from table id without modifying current db info
...@@ -2051,12 +2051,12 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2051,12 +2051,12 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) { int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + return minMsgSize() + sizeof(SMgmtHead) + sizeof(SCMAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
TSDB_EXTRA_PAYLOAD_SIZE; TSDB_EXTRA_PAYLOAD_SIZE;
} }
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SAlterTableMsg *pAlterTableMsg; SCMAlterTableMsg *pAlterTableMsg;
char * pMsg; char * pMsg;
int msgLen = 0; int msgLen = 0;
int size = 0; int size = 0;
...@@ -2072,7 +2072,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2072,7 +2072,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1; return -1;
} }
pAlterTableMsg = (SAlterTableMsg *)pCmd->payload; pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pAlterTableMsg->db); tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pAlterTableMsg->db);
...@@ -2107,7 +2107,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2107,7 +2107,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SAlterDbMsg); pCmd->payloadLen = sizeof(SCMAlterDbMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
...@@ -2115,7 +2115,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2115,7 +2115,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg*)pCmd->payload; SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pAlterDbMsg->db, pMeterMetaInfo->name); strcpy(pAlterDbMsg->db, pMeterMetaInfo->name);
...@@ -2244,14 +2244,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2244,14 +2244,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT; pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
pCmd->payloadLen = sizeof(SConnectMsg); pCmd->payloadLen = sizeof(SCMConnectMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload; SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
char *db; // ugly code to move the space char *db; // ugly code to move the space
db = strstr(pObj->db, TS_PATH_DELIMITER); db = strstr(pObj->db, TS_PATH_DELIMITER);
...@@ -2264,7 +2264,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2264,7 +2264,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableInfoMsg *pInfoMsg; SCMTableInfoMsg *pInfoMsg;
char * pMsg; char * pMsg;
int msgLen = 0; int msgLen = 0;
...@@ -2284,11 +2284,11 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2284,11 +2284,11 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
pInfoMsg = (STableInfoMsg *)pCmd->payload; pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
strcpy(pInfoMsg->tableId, pMeterMetaInfo->name); strcpy(pInfoMsg->tableId, pMeterMetaInfo->name);
pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0); pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
pMsg = (char*)pInfoMsg + sizeof(STableInfoMsg); pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
if (pSql->cmd.createOnDemand) { if (pSql->cmd.createOnDemand) {
memcpy(pInfoMsg->tags, tmpData, sizeof(STagData)); memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
...@@ -2307,7 +2307,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2307,7 +2307,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
/** /**
* multi meter meta req pkg format: * multi meter meta req pkg format:
* | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ...... * | SMgmtHead | SCMMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
* no used 4B * no used 4B
**/ **/
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...@@ -2325,7 +2325,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2325,7 +2325,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize); SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db
SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
pInfoMsg->numOfTables = htonl((int32_t)pCmd->count); pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
if (pCmd->payloadLen > 0) { if (pCmd->payloadLen > 0) {
...@@ -2334,7 +2334,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2334,7 +2334,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tfree(tmpData); tfree(tmpData);
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg); pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SCMMultiTableInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META; pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
...@@ -2651,7 +2651,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { ...@@ -2651,7 +2651,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
/** /**
* multi meter meta rsp pkg format: * multi meter meta rsp pkg format:
* | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 * | STaosRsp | ieType | SCMMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
* |...... 1B 1B 4B * |...... 1B 1B 4B
**/ **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
...@@ -2672,9 +2672,9 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -2672,9 +2672,9 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
rsp++; rsp++;
SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp; SCMMultiTableInfoMsg *pInfo = (SCMMultiTableInfoMsg *)rsp;
totalNum = htonl(pInfo->numOfTables); totalNum = htonl(pInfo->numOfTables);
rsp += sizeof(SMultiTableInfoMsg); rsp += sizeof(SCMMultiTableInfoMsg);
for (i = 0; i < totalNum; i++) { for (i = 0; i < totalNum; i++) {
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp; SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
...@@ -2887,7 +2887,7 @@ _error_clean: ...@@ -2887,7 +2887,7 @@ _error_clean:
*/ */
int tscProcessShowRsp(SSqlObj *pSql) { int tscProcessShowRsp(SSqlObj *pSql) {
STableMeta * pMeta; STableMeta * pMeta;
SShowRsp *pShow; SCMShowRsp *pShow;
SSchema * pSchema; SSchema * pSchema;
char key[20]; char key[20];
...@@ -2898,7 +2898,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -2898,7 +2898,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
pShow = (SShowRsp *)pRes->pRsp; pShow = (SCMShowRsp *)pRes->pRsp;
pShow->qhandle = htobe64(pShow->qhandle); pShow->qhandle = htobe64(pShow->qhandle);
pRes->qhandle = pShow->qhandle; pRes->qhandle = pShow->qhandle;
...@@ -2946,7 +2946,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2946,7 +2946,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp; SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
...@@ -2954,7 +2954,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2954,7 +2954,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
strncpy(pObj->db, temp, tListLen(pObj->db)); strncpy(pObj->db, temp, tListLen(pObj->db));
// SIpList * pIpList; // SIpList * pIpList;
// char *rsp = pRes->pRsp + sizeof(SConnectRsp); // char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
// pIpList = (SIpList *)rsp; // pIpList = (SIpList *)rsp;
// tscSetMgmtIpList(pIpList); // tscSetMgmtIpList(pIpList);
......
...@@ -769,8 +769,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -769,8 +769,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
void tscCloseTscObj(STscObj* pObj) { void tscCloseTscObj(STscObj* pObj) {
pObj->signature = NULL; pObj->signature = NULL;
SSqlObj* pSql = pObj->pSql; SSqlObj* pSql = pObj->pSql;
globalCode = pSql->res.code; if (pSql) {
globalCode = pSql->res.code;
}
taosTmrStopA(&(pObj->pTimer)); taosTmrStopA(&(pObj->pTimer));
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
......
...@@ -325,7 +325,7 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -325,7 +325,7 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
} }
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SDAlterStreamMsg *pStream = pCont; // SMDAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid); // pStream->uid = htobe64(pStream->uid);
// pStream->stime = htobe64(pStream->stime); // pStream->stime = htobe64(pStream->stime);
// pStream->vnode = htonl(pStream->vnode); // pStream->vnode = htonl(pStream->vnode);
...@@ -350,8 +350,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -350,8 +350,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return; return;
} }
// int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); // int32_t contLen = sizeof(SDMStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad);
// SStatusMsg *pStatus = rpcMallocCont(contLen); // SDMStatusMsg *pStatus = rpcMallocCont(contLen);
// if (pStatus == NULL) { // if (pStatus == NULL) {
// dError("Failed to malloc status message"); // dError("Failed to malloc status message");
// return; // return;
......
...@@ -25,8 +25,6 @@ extern "C" { ...@@ -25,8 +25,6 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "sdb.h" #include "sdb.h"
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "thash.h" #include "thash.h"
...@@ -44,7 +42,6 @@ extern "C" { ...@@ -44,7 +42,6 @@ extern "C" {
// internal globals // internal globals
extern char version[]; extern char version[];
extern void *tsMgmtTmr; extern void *tsMgmtTmr;
extern void *tsMgmtTranQhandle;
extern char tsMgmtDirectory[]; extern char tsMgmtDirectory[];
typedef struct { typedef struct {
......
...@@ -240,7 +240,7 @@ typedef struct SSchema { ...@@ -240,7 +240,7 @@ typedef struct SSchema {
typedef struct { typedef struct {
int32_t vnode; //the index of vnode int32_t vnode; //the index of vnode
uint32_t ip; uint32_t ip;
} SVPeerDesc; } SVnodeDesc;
typedef struct { typedef struct {
int8_t tableType; int8_t tableType;
...@@ -255,7 +255,7 @@ typedef struct { ...@@ -255,7 +255,7 @@ typedef struct {
uint64_t uid; uint64_t uid;
uint64_t superTableUid; uint64_t superTableUid;
uint64_t createdTime; uint64_t createdTime;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1]; char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[]; char data[];
...@@ -270,12 +270,12 @@ typedef struct { ...@@ -270,12 +270,12 @@ typedef struct {
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int16_t reserved[16]; int16_t reserved[16];
char schema[]; char schema[];
} SCreateTableMsg; } SCMCreateTableMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t igNotExists; int8_t igNotExists;
} SDropTableMsg; } SCMDropTableMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
...@@ -284,13 +284,13 @@ typedef struct { ...@@ -284,13 +284,13 @@ typedef struct {
char tagVal[TSDB_MAX_BYTES_PER_ROW]; char tagVal[TSDB_MAX_BYTES_PER_ROW];
int8_t numOfCols; /* number of schema */ int8_t numOfCols; /* number of schema */
SSchema schema[]; SSchema schema[];
} SAlterTableMsg; } SCMAlterTableMsg;
typedef struct { typedef struct {
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN + 1]; char db[TSDB_TABLE_ID_LEN + 1];
} SConnectMsg; } SCMConnectMsg;
typedef struct { typedef struct {
char acctId[TSDB_ACCT_LEN + 1]; char acctId[TSDB_ACCT_LEN + 1];
...@@ -298,7 +298,7 @@ typedef struct { ...@@ -298,7 +298,7 @@ typedef struct {
int8_t writeAuth; int8_t writeAuth;
int8_t superAuth; int8_t superAuth;
SRpcIpSet ipList; SRpcIpSet ipList;
} SConnectRsp; } SCMConnectRsp;
typedef struct { typedef struct {
int32_t maxUsers; int32_t maxUsers;
...@@ -318,18 +318,18 @@ typedef struct { ...@@ -318,18 +318,18 @@ typedef struct {
char user[TSDB_USER_LEN + 1]; char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1]; char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg; SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg; } SCMCreateAcctMsg, SCMAlterAcctMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN + 1]; char user[TSDB_USER_LEN + 1];
} SDropUserMsg, SDropAcctMsg; } SCMDropUserMsg, SCMDropAcctMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN + 1]; char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1]; char pass[TSDB_KEY_LEN + 1];
int8_t privilege; int8_t privilege;
int8_t flag; int8_t flag;
} SCreateUserMsg, SAlterUserMsg; } SCMCreateUserMsg, SCMAlterUserMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_ID_LEN + 1]; char db[TSDB_TABLE_ID_LEN + 1];
...@@ -339,7 +339,7 @@ typedef struct { ...@@ -339,7 +339,7 @@ typedef struct {
int32_t sid; int32_t sid;
int32_t numOfVPeers; int32_t numOfVPeers;
uint64_t uid; uint64_t uid;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
} SMDDropTableMsg; } SMDDropTableMsg;
...@@ -562,12 +562,12 @@ typedef struct { ...@@ -562,12 +562,12 @@ typedef struct {
int8_t loadLatest; // load into mem or not int8_t loadLatest; // load into mem or not
uint8_t precision; // time resolution uint8_t precision; // time resolution
int8_t reserved[16]; int8_t reserved[16];
} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; } SVnodeCfg, SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_ID_LEN + 1]; char db[TSDB_TABLE_ID_LEN + 1];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg; } SCMDropDbMsg, SCMUseDbMsg;
// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed // IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed
// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE // TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE
...@@ -598,52 +598,40 @@ typedef struct { ...@@ -598,52 +598,40 @@ typedef struct {
uint8_t alternativeRole; uint8_t alternativeRole;
uint8_t reserve[15]; uint8_t reserve[15];
SVnodeLoad load[]; SVnodeLoad load[];
} SStatusMsg; } SDMStatusMsg;
typedef struct { typedef struct {
int32_t code; int32_t code;
SDnodeState dnodeState; SDnodeState dnodeState;
SRpcIpSet ipList; SRpcIpSet ipList;
SVnodeAccess vnodeAccess[]; SVnodeAccess vnodeAccess[];
} SStatusRsp; } SDMStatusRsp;
typedef struct {
char spi;
char encrypt;
char secret[TSDB_KEY_LEN]; // key is changed if updated
char cipheringKey[TSDB_KEY_LEN];
} SSecIe;
typedef struct {
int32_t numOfVPeers;
SVPeerDesc vpeerDesc[];
} SVpeerDescArray;
typedef struct { typedef struct {
int32_t vnode; int32_t vnode;
SVnodeCfg cfg; SVnodeCfg cfg;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
} SMDCreateVnodeMsg; } SMDCreateVnodeMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int16_t createFlag; int16_t createFlag;
char tags[]; char tags[];
} STableInfoMsg; } SCMTableInfoMsg;
typedef struct { typedef struct {
int32_t numOfTables; int32_t numOfTables;
char tableIds[]; char tableIds[];
} SMultiTableInfoMsg; } SCMMultiTableInfoMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
} SSuperTableInfoMsg; } SCMSuperTableInfoMsg;
typedef struct { typedef struct {
int32_t numOfDnodes; int32_t numOfDnodes;
uint32_t dnodeIps[]; uint32_t dnodeIps[];
} SSuperTableInfoRsp; } SCMSuperTableInfoRsp;
typedef struct { typedef struct {
int16_t elemLen; int16_t elemLen;
...@@ -675,7 +663,7 @@ typedef struct { ...@@ -675,7 +663,7 @@ typedef struct {
} SSuperTableMetaMsg; } SSuperTableMetaMsg;
typedef struct { typedef struct {
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int16_t index; // used locally int16_t index; // used locally
int32_t numOfSids; int32_t numOfSids;
int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo
...@@ -699,7 +687,7 @@ typedef struct STableMeta { ...@@ -699,7 +687,7 @@ typedef struct STableMeta {
int16_t rowSize; // used locally, calculated in client int16_t rowSize; // used locally, calculated in client
int16_t sversion; int16_t sversion;
int8_t numOfVpeers; int8_t numOfVpeers;
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid; int32_t sid;
int32_t vgid; int32_t vgid;
uint64_t uid; uint64_t uid;
...@@ -727,27 +715,27 @@ typedef struct { ...@@ -727,27 +715,27 @@ typedef struct {
char db[TSDB_DB_NAME_LEN + 1]; char db[TSDB_DB_NAME_LEN + 1];
uint16_t payloadLen; uint16_t payloadLen;
char payload[]; char payload[];
} SShowMsg; } SCMShowMsg;
typedef struct { typedef struct {
uint64_t qhandle; uint64_t qhandle;
STableMeta tableMeta; STableMeta tableMeta;
} SShowRsp; } SCMShowRsp;
typedef struct { typedef struct {
char ip[32]; char ip[32];
} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg; } SCMCreateDnodeMsg, SCMDropDnodeMsg;
typedef struct { typedef struct {
uint32_t dnode; uint32_t dnode;
int32_t vnode; int32_t vnode;
int32_t sid; int32_t sid;
} STableCfgMsg; } SDMConfigTableMsg;
typedef struct { typedef struct {
uint32_t dnode; uint32_t dnode;
int32_t vnode; int32_t vnode;
} SVpeerCfgMsg; } SDMConfigVnodeMsg;
typedef struct { typedef struct {
char ip[32]; char ip[32];
...@@ -785,18 +773,18 @@ typedef struct { ...@@ -785,18 +773,18 @@ typedef struct {
typedef struct { typedef struct {
SQqueryList qlist; SQqueryList qlist;
SStreamList slist; SStreamList slist;
} SHeartBeatMsg; } SCMHeartBeatMsg;
typedef struct { typedef struct {
uint32_t queryId; uint32_t queryId;
uint32_t streamId; uint32_t streamId;
int8_t killConnection; int8_t killConnection;
SRpcIpSet ipList; SRpcIpSet ipList;
} SHeartBeatRsp; } SCMHeartBeatRsp;
typedef struct { typedef struct {
char queryId[TSDB_KILL_MSG_LEN + 1]; char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg; } SCMKillQueryMsg, SCMKillStreamMsg, SCMKillConnMsg;
typedef struct { typedef struct {
int32_t vnode; int32_t vnode;
...@@ -805,7 +793,7 @@ typedef struct { ...@@ -805,7 +793,7 @@ typedef struct {
uint64_t stime; // stream starting time uint64_t stime; // stream starting time
int32_t status; int32_t status;
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
} SDAlterStreamMsg; } SMDAlterStreamMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -84,7 +84,7 @@ void *rpcReallocCont(void *ptr, int contLen); ...@@ -84,7 +84,7 @@ void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
void rpcSendResponse(SRpcMsg *pMsg); void rpcSendResponse(SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -81,17 +81,6 @@ struct arguments args = { ...@@ -81,17 +81,6 @@ struct arguments args = {
*/ */
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
/*setlocale(LC_ALL, "en_US.UTF-8"); */ /*setlocale(LC_ALL, "en_US.UTF-8"); */
//
if (argc == 1)
{
printf("=== this a test for debug usage\n");
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
taos_query(taos, "select * from d1.t6");
while (1) {
sleep(1000);
}
}
//
if (!checkVersion()) { if (!checkVersion()) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
...@@ -30,7 +30,7 @@ int32_t mgmtInitChildTables(); ...@@ -30,7 +30,7 @@ int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables(); void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId); void * mgmtGetChildTable(char *tableId);
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
......
...@@ -28,7 +28,7 @@ int32_t mgmtInitNormalTables(); ...@@ -28,7 +28,7 @@ int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables(); void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId); void * mgmtGetNormalTable(char *tableId);
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
......
...@@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables(); ...@@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId); void * mgmtGetSuperTable(char *tableId);
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate); int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MGMT_SYSTEM_H
#define TDENGINE_MGMT_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mgmtInitSystem();
int32_t mgmtStartSystem();
void mgmtCleanUpSystem();
void mgmtStopSystem();
#ifdef __cplusplus
}
#endif
#endif
...@@ -33,9 +33,9 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); ...@@ -33,9 +33,9 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
...@@ -45,8 +45,8 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); ...@@ -45,8 +45,8 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -314,7 +314,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou ...@@ -314,7 +314,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
return pCreateTable; return pCreateTable;
} }
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) { if (numOfTables >= tsMaxTables) {
...@@ -350,7 +350,7 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj * ...@@ -350,7 +350,7 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *
} }
pTagData += (TSDB_TABLE_ID_LEN + 1); pTagData += (TSDB_TABLE_ID_LEN + 1);
int32_t tagDataLen = contLen - sizeof(SCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
*pDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen); *pDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen);
if (*pDCreateOut == NULL) { if (*pDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pCreate->tableId); mError("table:%s, failed to build create table message", pCreate->tableId);
......
...@@ -89,7 +89,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s ...@@ -89,7 +89,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// //
// //
//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { //static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
// STableCfgMsg *pCfg = (STableCfgMsg *) pCont; // SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode); // pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode); // pCfg->vnode = htonl(pCfg->vnode);
// pCfg->sid = htonl(pCfg->sid); // pCfg->sid = htonl(pCfg->sid);
...@@ -121,7 +121,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s ...@@ -121,7 +121,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// return; // return;
// } // }
// //
// SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *) pCont; // SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode); // pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode); // pCfg->vnode = htonl(pCfg->vnode);
// //
...@@ -317,7 +317,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s ...@@ -317,7 +317,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
//} //}
// //
//void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { //void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
// SStatusMsg *pStatus = (SStatusMsg *)pCont; // SDMStatusMsg *pStatus = (SDMStatusMsg *)pCont;
// //
// SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp)); // SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp));
// if (pObj == NULL) { // if (pObj == NULL) {
......
...@@ -35,7 +35,7 @@ static void *tsDbSdb = NULL; ...@@ -35,7 +35,7 @@ static void *tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
static int32_t mgmtUpdateDb(SDbObj *pDb); static int32_t mgmtUpdateDb(SDbObj *pDb);
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
static int32_t mgmtDropDb(SDbObj *pDb); static int32_t mgmtDropDb(SDbObj *pDb);
...@@ -81,7 +81,7 @@ int32_t mgmtInitDbs() { ...@@ -81,7 +81,7 @@ int32_t mgmtInitDbs() {
SDbObj tObj; SDbObj tObj;
tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj;
tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "db", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction); tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction);
if (tsDbSdb == NULL) { if (tsDbSdb == NULL) {
mError("failed to init db data"); mError("failed to init db data");
return -1; return -1;
...@@ -133,7 +133,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { ...@@ -133,7 +133,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
return (SDbObj *)sdbGetRow(tsDbSdb, db); return (SDbObj *)sdbGetRow(tsDbSdb, db);
} }
static int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) { static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) {
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { if (pCreate->commitLog < 0 || pCreate->commitLog > 1) {
mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
...@@ -206,7 +206,7 @@ static int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) { ...@@ -206,7 +206,7 @@ static int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) { static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
// assign default parameters // assign default parameters
if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; // if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; //
if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; // if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; //
...@@ -251,7 +251,7 @@ static int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) { ...@@ -251,7 +251,7 @@ static int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) { static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb);
if (numOfDbs >= tsMaxDbs) { if (numOfDbs >= tsMaxDbs) {
mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs);
...@@ -442,7 +442,7 @@ static void mgmtMonitorDbDrop(void *unused, void *unusedt) { ...@@ -442,7 +442,7 @@ static void mgmtMonitorDbDrop(void *unused, void *unusedt) {
} }
} }
static int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) {
return 0; return 0;
// int32_t code = TSDB_CODE_SUCCESS; // int32_t code = TSDB_CODE_SUCCESS;
// //
...@@ -915,7 +915,7 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { ...@@ -915,7 +915,7 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SCreateDbMsg *pCreate = (SCreateDbMsg *) rpcMsg->pCont; SCMCreateDbMsg *pCreate = (SCMCreateDbMsg *) rpcMsg->pCont;
pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->maxSessions = htonl(pCreate->maxSessions);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
...@@ -953,7 +953,7 @@ static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) { ...@@ -953,7 +953,7 @@ static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SAlterDbMsg *pAlter = (SAlterDbMsg *) rpcMsg->pCont; SCMAlterDbMsg *pAlter = (SCMAlterDbMsg *) rpcMsg->pCont;
pAlter->daysPerFile = htonl(pAlter->daysPerFile); pAlter->daysPerFile = htonl(pAlter->daysPerFile);
pAlter->daysToKeep = htonl(pAlter->daysToKeep); pAlter->daysToKeep = htonl(pAlter->daysToKeep);
pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; pAlter->maxSessions = htonl(pAlter->maxSessions) + 1;
...@@ -983,7 +983,7 @@ static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) { ...@@ -983,7 +983,7 @@ static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) {
} }
if (pUser->superAuth) { if (pUser->superAuth) {
SDropDbMsg *pDrop = rpcMsg->pCont; SCMDropDbMsg *pDrop = rpcMsg->pCont;
rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
......
...@@ -27,41 +27,28 @@ ...@@ -27,41 +27,28 @@
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtSystem.h"
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtShell.h" #include "mgmtShell.h"
static int32_t mgmtCheckMgmtRunning();
char tsMgmtDirectory[128] = {0}; char tsMgmtDirectory[128] = {0};
void *tsMgmtTmr = NULL; void *tsMgmtTmr = NULL;
void *tsMgmtTranQhandle = NULL;
int32_t mgmtInitSystem() {
void mgmtCleanUpSystem() { if (mgmtInitShell() != 0) {
mPrint("starting to clean up mgmt"); mError("failed to init shell");
sdbCleanUpPeers();
mgmtCleanupBalance();
mgmtCleanupDClient();
mgmtCleanupDServer();
mgmtCleanUpShell();
mgmtCleanUpTables();
mgmtCleanUpVgroups();
mgmtCleanUpDbs();
mgmtCleanUpDnodes();
mgmtCleanUpUsers();
mgmtCleanUpAccts();
taosTmrCleanUp(tsMgmtTmr);
taosCleanUpScheduler(tsMgmtTranQhandle);
mPrint("mgmt is cleaned up");
}
int32_t mgmtCheckMgmtRunning() {
if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) {
return -1; return -1;
} }
tsetModuleStatus(TSDB_MOD_MGMT); struct stat dirstat;
bool fileExist = (stat(tsMgmtDirectory, &dirstat) == 0);
bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
if (asMaster || fileExist) {
if (mgmtStartSystem() != 0) {
return -1;
}
}
return 0; return 0;
} }
...@@ -79,11 +66,9 @@ int32_t mgmtStartSystem() { ...@@ -79,11 +66,9 @@ int32_t mgmtStartSystem() {
return 0; return 0;
} }
tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT");
tsMgmtTmr = taosTmrInit((tsMaxDnodes + tsMaxShellConns) * 3, 200, 3600000, "MND"); tsMgmtTmr = taosTmrInit((tsMaxDnodes + tsMaxShellConns) * 3, 200, 3600000, "MND");
if (tsMgmtTmr == NULL) { if (tsMgmtTmr == NULL) {
mError("failed to init timer, exit"); mError("failed to init timer");
return -1; return -1;
} }
...@@ -125,11 +110,6 @@ int32_t mgmtStartSystem() { ...@@ -125,11 +110,6 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (mgmtInitShell() < 0) {
mError("failed to init shell");
return -1;
}
if (sdbInitPeers(tsMgmtDirectory) < 0) { if (sdbInitPeers(tsMgmtDirectory) < 0) {
mError("failed to init peers"); mError("failed to init peers");
return -1; return -1;
...@@ -139,30 +119,11 @@ int32_t mgmtStartSystem() { ...@@ -139,30 +119,11 @@ int32_t mgmtStartSystem() {
mError("failed to init dnode balance") mError("failed to init dnode balance")
} }
mPrint("TDengine mgmt is initialized successfully"); mPrint("TDengine mgmt is initialized successfully");
return 0; return 0;
} }
int32_t mgmtInitSystem() {
struct stat dirstat;
bool directoryExist = (stat(tsMgmtDirectory, &dirstat) == 0);
bool equalWithMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
if (equalWithMaster || directoryExist) {
if (mgmtStartSystem() != 0) {
return -1;
}
}
if (mgmtInitShell() < 0) {
mError("failed to init shell");
return -1;
}
return 0;
}
void mgmtStopSystem() { void mgmtStopSystem() {
if (sdbMaster) { if (sdbMaster) {
...@@ -172,5 +133,30 @@ void mgmtStopSystem() { ...@@ -172,5 +133,30 @@ void mgmtStopSystem() {
mgmtCleanUpSystem(); mgmtCleanUpSystem();
remove(tsMgmtDirectory); remove(tsMgmtDirectory);
// mgmtInitRedirect();
} }
void mgmtCleanUpSystem() {
mPrint("starting to clean up mgmt");
sdbCleanUpPeers();
mgmtCleanupBalance();
mgmtCleanUpShell();
mgmtCleanupDClient();
mgmtCleanupDServer();
mgmtCleanUpTables();
mgmtCleanUpVgroups();
mgmtCleanUpDbs();
mgmtCleanUpDnodes();
mgmtCleanUpUsers();
mgmtCleanUpAccts();
taosTmrCleanUp(tsMgmtTmr);
mPrint("mgmt is cleaned up");
}
static int32_t mgmtCheckMgmtRunning() {
if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) {
return -1;
}
tsetModuleStatus(TSDB_MOD_MGMT);
return 0;
}
\ No newline at end of file
...@@ -18,5 +18,5 @@ ...@@ -18,5 +18,5 @@
#include "mgmtMnode.h" #include "mgmtMnode.h"
bool mgmtCheckRedirect(void *handle) { bool mgmtCheckRedirect(void *handle) {
return true; return false;
} }
\ No newline at end of file
...@@ -328,7 +328,7 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr ...@@ -328,7 +328,7 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
return pCreateTable; return pCreateTable;
} }
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtUser.h" #include "mgmtUser.h"
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg);
int32_t mgmtKillQuery(char *qidstr, void *pConn); int32_t mgmtKillQuery(char *qidstr, void *pConn);
int32_t mgmtKillStream(char *qidstr, void *pConn); int32_t mgmtKillStream(char *qidstr, void *pConn);
...@@ -63,7 +63,7 @@ typedef struct { ...@@ -63,7 +63,7 @@ typedef struct {
SStreamDesc sdesc[]; SStreamDesc sdesc[];
} SStreamShow; } SStreamShow;
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg) { int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) {
// SAcctObj *pAcct = pConn->pAcct; // SAcctObj *pAcct = pConn->pAcct;
// //
// if (contLen <= 0 || pAcct == NULL) { // if (contLen <= 0 || pAcct == NULL) {
...@@ -684,7 +684,7 @@ void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) { ...@@ -684,7 +684,7 @@ void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SKillQueryMsg *pKill = (SKillQueryMsg *) rpcMsg->pCont; SCMKillQueryMsg *pKill = (SCMKillQueryMsg *) rpcMsg->pCont;
int32_t code; int32_t code;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
...@@ -708,7 +708,7 @@ void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) { ...@@ -708,7 +708,7 @@ void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SKillStreamMsg *pKill = (SKillStreamMsg *) rpcMsg->pCont; SCMKillStreamMsg *pKill = (SCMKillStreamMsg *) rpcMsg->pCont;
int32_t code; int32_t code;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
...@@ -732,7 +732,7 @@ void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) { ...@@ -732,7 +732,7 @@ void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SKillConnectionMsg *pKill = (SKillConnectionMsg *) rpcMsg->pCont; SCMKillConnMsg *pKill = (SCMKillConnMsg *) rpcMsg->pCont;
int32_t code; int32_t code;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
......
...@@ -51,6 +51,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg); ...@@ -51,6 +51,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg);
static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg); static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg);
static void *tsMgmtShellRpc = NULL; static void *tsMgmtShellRpc = NULL;
static void *tsMgmtTranQhandle = NULL;
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *) = {0}; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *) = {0};
static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
...@@ -59,6 +60,8 @@ int32_t mgmtInitShell() { ...@@ -59,6 +60,8 @@ int32_t mgmtInitShell() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg);
tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT");
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0;
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
...@@ -89,6 +92,11 @@ int32_t mgmtInitShell() { ...@@ -89,6 +92,11 @@ int32_t mgmtInitShell() {
} }
void mgmtCleanUpShell() { void mgmtCleanUpShell() {
if (tsMgmtTranQhandle) {
taosCleanUpScheduler(tsMgmtTranQhandle);
tsMgmtTranQhandle = NULL;
}
if (tsMgmtShellRpc) { if (tsMgmtShellRpc) {
rpcClose(tsMgmtShellRpc); rpcClose(tsMgmtShellRpc);
tsMgmtShellRpc = NULL; tsMgmtShellRpc = NULL;
...@@ -112,12 +120,16 @@ void mgmtProcessTranRequest(SSchedMsg *sched) { ...@@ -112,12 +120,16 @@ void mgmtProcessTranRequest(SSchedMsg *sched) {
SRpcMsg *rpcMsg = sched->msg; SRpcMsg *rpcMsg = sched->msg;
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
} }
void mgmtAddToTranRequest(SRpcMsg *rpcMsg) { void mgmtAddToTranRequest(SRpcMsg *rpcMsg) {
SRpcMsg *queuedRpcMsg = malloc(sizeof(SRpcMsg));
memcpy(queuedRpcMsg, rpcMsg, sizeof(SRpcMsg));
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.msg = rpcMsg; schedMsg.msg = queuedRpcMsg;
schedMsg.fp = mgmtProcessTranRequest; schedMsg.fp = mgmtProcessTranRequest;
taosScheduleTask(tsMgmtTranQhandle, &schedMsg); taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
} }
...@@ -130,6 +142,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -130,6 +142,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
mTrace("%s is received", taosMsg[rpcMsg->msgType]);
if (tsMgmtProcessShellMsgFp[rpcMsg->msgType]) { if (tsMgmtProcessShellMsgFp[rpcMsg->msgType]) {
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg);
...@@ -147,15 +160,15 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -147,15 +160,15 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SShowMsg *pShowMsg = rpcMsg->pCont; SCMShowMsg *pShowMsg = rpcMsg->pCont;
if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) {
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return; return;
} }
} }
int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pShowRsp = rpcMallocCont(size); SCMShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) { if (pShowRsp == NULL) {
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
...@@ -179,9 +192,9 @@ static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { ...@@ -179,9 +192,9 @@ static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) {
mgmtSaveQhandle(pShow); mgmtSaveQhandle(pShow);
pShowRsp->qhandle = htobe64((uint64_t) pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow);
if (tsMgmtShowMetaFp[pShowMsg->type]) { if (tsMgmtShowMetaFp[pShowMsg->type]) {
code = (*tsMgmtShowMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle); code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle);
if (code == 0) { if (code == 0) {
size = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; size = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
} else { } else {
mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type,
taosMsg[(uint8_t) pShowMsg->type], code); taosMsg[(uint8_t) pShowMsg->type], code);
...@@ -245,7 +258,7 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { ...@@ -245,7 +258,7 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
// if free flag is set, client wants to clean the resources // if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*tsMgmtShowRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle);
if (rowsRead < 0) { if (rowsRead < 0) {
rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS;
...@@ -267,10 +280,10 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { ...@@ -267,10 +280,10 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
//SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) rpcMsg->pCont; //SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont;
//mgmtSaveQueryStreamList(pHBMsg); //mgmtSaveQueryStreamList(pHBMsg);
SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(sizeof(SHeartBeatRsp)); SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pHBRsp == NULL) { if (pHBRsp == NULL) {
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
...@@ -278,7 +291,10 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { ...@@ -278,7 +291,10 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
} }
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo); if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) {
mError("conn:%p is already released while process heart beat msg", rpcMsg->handle);
return;
}
pHBRsp->ipList.inUse = 0; pHBRsp->ipList.inUse = 0;
pHBRsp->ipList.port = htons(tsMnodeShellPort); pHBRsp->ipList.port = htons(tsMnodeShellPort);
...@@ -305,7 +321,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { ...@@ -305,7 +321,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
pHBRsp->killConnection = 0; pHBRsp->killConnection = 0;
rpcRsp.pCont = pHBRsp; rpcRsp.pCont = pHBRsp;
rpcRsp.contLen = sizeof(SHeartBeatRsp); rpcRsp.contLen = sizeof(SCMHeartBeatRsp);
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -326,12 +342,15 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr ...@@ -326,12 +342,15 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SConnectMsg *pConnectMsg = (SConnectMsg *) rpcMsg->pCont; SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) rpcMsg->pCont;
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo); if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) {
int32_t code; mError("conn:%p is already released while process connect msg", rpcMsg->handle);
return;
}
int32_t code;
SUserObj *pUser = mgmtGetUser(connInfo.user); SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) { if (pUser == NULL) {
code = TSDB_CODE_INVALID_USER; code = TSDB_CODE_INVALID_USER;
...@@ -364,7 +383,7 @@ static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { ...@@ -364,7 +383,7 @@ static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) {
} }
} }
SConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SConnectRsp)); SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
if (pConnectRsp == NULL) { if (pConnectRsp == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY; code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto connect_over; goto connect_over;
...@@ -397,7 +416,7 @@ connect_over: ...@@ -397,7 +416,7 @@ connect_over:
} else { } else {
mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcRsp.pCont = pConnectRsp; rpcRsp.pCont = pConnectRsp;
rpcRsp.contLen = sizeof(SConnectRsp); rpcRsp.contLen = sizeof(SCMConnectRsp);
} }
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -406,7 +425,7 @@ connect_over: ...@@ -406,7 +425,7 @@ connect_over:
* check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one. * check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one.
*/ */
static bool mgmtCheckMeterMetaMsgType(void *pMsg) { static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
STableInfoMsg *pInfo = (STableInfoMsg *) pMsg; SCMTableInfoMsg *pInfo = (SCMTableInfoMsg *) pMsg;
int16_t autoCreate = htons(pInfo->createFlag); int16_t autoCreate = htons(pInfo->createFlag);
STableInfo *pTable = mgmtGetTable(pInfo->tableId); STableInfo *pTable = mgmtGetTable(pInfo->tableId);
......
...@@ -200,7 +200,7 @@ void mgmtCleanUpSuperTables() { ...@@ -200,7 +200,7 @@ void mgmtCleanUpSuperTables() {
sdbCloseTable(tsSuperTableSdb); sdbCloseTable(tsSuperTableSdb);
} }
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb); int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb);
if (numOfTables >= TSDB_MAX_SUPER_TABLES) { if (numOfTables >= TSDB_MAX_SUPER_TABLES) {
mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES); mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES);
...@@ -259,7 +259,7 @@ void* mgmtGetSuperTable(char *tableId) { ...@@ -259,7 +259,7 @@ void* mgmtGetSuperTable(char *tableId) {
void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) {
//TODO get vgroup of dnodes //TODO get vgroup of dnodes
SSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum());
rsp->numOfDnodes = 1; rsp->numOfDnodes = 1;
rsp->dnodeIps[0] = 0; rsp->dnodeIps[0] = 0;
return rsp; return rsp;
......
...@@ -133,7 +133,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo ...@@ -133,7 +133,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDbObj *pDb = mgmtGetDb(pCreate->db); SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) { if (pDb == NULL) {
...@@ -189,7 +189,7 @@ void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *th ...@@ -189,7 +189,7 @@ void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *th
// //
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
assert(pVgroup != NULL); assert(pVgroup != NULL);
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
...@@ -239,7 +239,7 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c ...@@ -239,7 +239,7 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c
mgmtSendMsgToDnode(&ipSet, &rpcMsg); mgmtSendMsgToDnode(&ipSet, &rpcMsg);
} }
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SDbObj *pDb = mgmtGetDb(pCreate->db); SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("table:%s, failed to create table, db not selected", pCreate->tableId); mError("table:%s, failed to create table, db not selected", pCreate->tableId);
...@@ -331,7 +331,7 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { ...@@ -331,7 +331,7 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
} }
} }
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) {
STableInfo *pTable = mgmtGetTable(pAlter->tableId); STableInfo *pTable = mgmtGetTable(pAlter->tableId);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE; return TSDB_CODE_INVALID_TABLE;
...@@ -553,7 +553,7 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { ...@@ -553,7 +553,7 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) {
void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCreateTableMsg *pCreate = (SCreateTableMsg *) rpcMsg->pCont; SCMCreateTableMsg *pCreate = (SCMCreateTableMsg *) rpcMsg->pCont;
pCreate->numOfColumns = htons(pCreate->numOfColumns); pCreate->numOfColumns = htons(pCreate->numOfColumns);
pCreate->numOfTags = htons(pCreate->numOfTags); pCreate->numOfTags = htons(pCreate->numOfTags);
pCreate->sqlLen = htons(pCreate->sqlLen); pCreate->sqlLen = htons(pCreate->sqlLen);
...@@ -594,7 +594,7 @@ void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { ...@@ -594,7 +594,7 @@ void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) {
void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) { void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDropTableMsg *pDrop = (SDropTableMsg *) rpcMsg->pCont; SCMDropTableMsg *pDrop = (SCMDropTableMsg *) rpcMsg->pCont;
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to drop table, need redirect message", pDrop->tableId); mError("table:%s, failed to drop table, need redirect message", pDrop->tableId);
...@@ -644,7 +644,7 @@ void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { ...@@ -644,7 +644,7 @@ void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SAlterTableMsg *pAlter = (SAlterTableMsg *) rpcMsg->pCont; SCMAlterTableMsg *pAlter = (SCMAlterTableMsg *) rpcMsg->pCont;
if (!pUser->writeAuth) { if (!pUser->writeAuth) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; rpcRsp.code = TSDB_CODE_NO_RIGHTS;
...@@ -686,7 +686,11 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { ...@@ -686,7 +686,11 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
} }
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(thandle, &connInfo); if (rpcGetConnInfo(thandle, &connInfo) != 0) {
mError("conn:%p is already released while get table meta", thandle);
return;
}
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
...@@ -710,7 +714,7 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { ...@@ -710,7 +714,7 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
rpcRsp.pCont = NULL; rpcRsp.pCont = NULL;
rpcRsp.contLen = 0; rpcRsp.contLen = 0;
STableInfoMsg *pInfo = rpcMsg->pCont; SCMTableInfoMsg *pInfo = rpcMsg->pCont;
pInfo->createFlag = htons(pInfo->createFlag); pInfo->createFlag = htons(pInfo->createFlag);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
...@@ -735,8 +739,8 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { ...@@ -735,8 +739,8 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
return; return;
} }
int32_t contLen = sizeof(SCreateTableMsg) + sizeof(STagData); int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData);
SCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
if (pCreateMsg == NULL) { if (pCreateMsg == NULL) {
mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
...@@ -762,7 +766,10 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { ...@@ -762,7 +766,10 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) {
rpcRsp.contLen = 0; rpcRsp.contLen = 0;
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo); if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) {
mError("conn:%p is already released while get mulit table meta", rpcMsg->handle);
return;
}
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user); SUserObj *pUser = mgmtGetUser(connInfo.user);
...@@ -772,7 +779,7 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { ...@@ -772,7 +779,7 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SMultiTableInfoMsg *pInfo = rpcMsg->pCont; SCMMultiTableInfoMsg *pInfo = rpcMsg->pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables); pInfo->numOfTables = htonl(pInfo->numOfTables);
int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
...@@ -823,7 +830,7 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { ...@@ -823,7 +830,7 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) {
void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SSuperTableInfoMsg *pInfo = rpcMsg->pCont; SCMSuperTableInfoMsg *pInfo = rpcMsg->pCont;
STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId);
if (pTable == NULL) { if (pTable == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_TABLE; rpcRsp.code = TSDB_CODE_INVALID_TABLE;
...@@ -831,7 +838,7 @@ void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { ...@@ -831,7 +838,7 @@ void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable);
if (pRsp != NULL) { if (pRsp != NULL) {
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
rpcRsp.pCont = pRsp; rpcRsp.pCont = pRsp;
......
...@@ -59,7 +59,7 @@ int32_t mgmtInitUsers() { ...@@ -59,7 +59,7 @@ int32_t mgmtInitUsers() {
SUserObj tObj; SUserObj tObj;
tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "user", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction); tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction);
if (tsUserSdb == NULL) { if (tsUserSdb == NULL) {
mError("failed to init user data"); mError("failed to init user data");
return -1; return -1;
...@@ -123,7 +123,7 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { ...@@ -123,7 +123,7 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
SUserObj *pUser = (SUserObj *)sdbGetRow(tsUserSdb, name); SUserObj *pUser = (SUserObj *)sdbGetRow(tsUserSdb, name);
if (pUser != NULL) { if (pUser != NULL) {
mWarn("user:%s is already there", name); mTrace("user:%s is already there", name);
return TSDB_CODE_USER_ALREADY_EXIST; return TSDB_CODE_USER_ALREADY_EXIST;
} }
...@@ -330,9 +330,11 @@ static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t * ...@@ -330,9 +330,11 @@ static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *
SUserObj *mgmtGetUserFromConn(void *pConn) { SUserObj *mgmtGetUserFromConn(void *pConn) {
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(pConn, &connInfo); if (rpcGetConnInfo(pConn, &connInfo) == 0) {
return mgmtGetUser(connInfo.user);
}
return mgmtGetUser(connInfo.user); return NULL;
} }
static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) {
...@@ -347,7 +349,7 @@ static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { ...@@ -347,7 +349,7 @@ static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) {
} }
if (pUser->superAuth) { if (pUser->superAuth) {
SCreateUserMsg *pCreate = rpcMsg->pCont; SCMCreateUserMsg *pCreate = rpcMsg->pCont;
rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s is created by %s", pCreate->user, pUser->user); mLPrint("user:%s is created by %s", pCreate->user, pUser->user);
...@@ -370,7 +372,7 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { ...@@ -370,7 +372,7 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
return; return;
} }
SAlterUserMsg *pAlter = rpcMsg->pCont; SCMAlterUserMsg *pAlter = rpcMsg->pCont;
SUserObj *pUser = mgmtGetUser(pAlter->user); SUserObj *pUser = mgmtGetUser(pAlter->user);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
...@@ -477,7 +479,7 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { ...@@ -477,7 +479,7 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) {
return ; return ;
} }
SDropUserMsg *pDrop = rpcMsg->pCont; SCMDropUserMsg *pDrop = rpcMsg->pCont;
SUserObj *pUser = mgmtGetUser(pDrop->user); SUserObj *pUser = mgmtGetUser(pDrop->user);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
......
...@@ -534,7 +534,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { ...@@ -534,7 +534,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
SVPeerDesc *vpeerDesc = pVPeers->vpeerDesc; SVnodeDesc *vpeerDesc = pVPeers->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip);
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
......
...@@ -441,15 +441,16 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { ...@@ -441,15 +441,16 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
return; return;
} }
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn->user[0] == 0) return -1;
pInfo->clientIp = pConn->peerIp; pInfo->clientIp = pConn->peerIp;
pInfo->clientPort = pConn->peerPort; pInfo->clientPort = pConn->peerPort;
pInfo->serverIp = pConn->destIp; pInfo->serverIp = pConn->destIp;
assert(pConn->user[0]);
strcpy(pInfo->user, pConn->user); strcpy(pInfo->user, pConn->user);
return 0;
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册