提交 bfbd2707 编写于 作者: S slguan

fix compile error in mnode while rpc interface changed

上级 d01a0c59
......@@ -28,8 +28,6 @@ int32_t mgmtInitShell();
void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
/*
* If table not exist, will create it
......
......@@ -157,13 +157,19 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con
}
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(info->thandle, code, NULL, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.code = code;
rpcMsg.handle = info->thandle;
rpcSendResponse(&rpcMsg);
} else {
if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
mTrace("table:%s, start to process get meta", pTable->tableId);
mgmtProcessGetTableMeta(pTable, thandle);
} else {
rpcSendResponse(info->thandle, code, NULL, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.code = code;
rpcMsg.handle = info->thandle;
rpcSendResponse(&rpcMsg);
}
}
......@@ -236,7 +242,11 @@ static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) {
mgmtUpdateGrantInfoFp(pCont);
mTrace("grant info is updated");
}
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.code = TSDB_CODE_SUCCESS;
rpcMsg.handle = thandle;
rpcSendResponse(&rpcMsg);
}
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
......@@ -368,7 +378,7 @@ int32_t mgmtSendCfgDnodeMsg(char *cont) {
//#else
// (void)tsCfgDynamicOptions(pCfg->config);
//#endif
// return 0;
return 0;
}
int32_t mgmtInitDnodeInt() {
......
......@@ -46,34 +46,36 @@ static RetrieveMetaFp mgmtRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
static void mgmtInitShowMsgFp();
static void mgmtInitProcessShellMsg();
static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code);
static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle);
static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle);
static void mgmtProcessMsgFromShell(SRpcMsg *msg);
static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(SRpcMsg *msg);
static void mgmtProcessUnSupportMsg(SRpcMsg *msg);
static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void *tsShellConnServer = NULL;
void mgmtProcessTranRequest(SSchedMsg *sched) {
int8_t msgType = *(int8_t *) (sched->msg);
int32_t contLen = *(int32_t *) (sched->msg + sizeof(int8_t));
int8_t *pCont = sched->msg + sizeof(int32_t) + sizeof(int8_t);
void *pConn = sched->thandle;
(*mgmtProcessShellMsg[msgType])(pCont, contLen, pConn);
SRpcMsg rpcMsg;
rpcMsg.msgType = *(int8_t *) (sched->msg);
rpcMsg.contLen = *(int32_t *) (sched->msg + sizeof(int8_t));
rpcMsg.pCont = sched->msg + sizeof(int32_t) + sizeof(int8_t);
rpcMsg.handle = sched->thandle;
rpcMsg.code = TSDB_CODE_SUCCESS;
(*mgmtProcessShellMsg[rpcMsg.msgType])(&rpcMsg);
if (sched->msg) {
free(sched->msg);
}
}
void mgmtAddToTranRequest(int8_t type, void *pCont, int contLen, void *ahandle) {
void mgmtAddToTranRequest(SRpcMsg *rpcMsg) {
SSchedMsg schedMsg;
schedMsg.msg = malloc(contLen + sizeof(int32_t) + sizeof(int8_t));
schedMsg.msg = malloc(rpcMsg->contLen + sizeof(int32_t) + sizeof(int8_t));
schedMsg.fp = mgmtProcessTranRequest;
schedMsg.tfp = NULL;
schedMsg.thandle = ahandle;
*(int8_t *) (schedMsg.msg) = type;
*(int32_t *) (schedMsg.msg + sizeof(int8_t)) = contLen;
memcpy(schedMsg.msg + sizeof(int32_t) + sizeof(int8_t), pCont, contLen);
schedMsg.thandle = rpcMsg->handle;
*(int8_t *) (schedMsg.msg) = rpcMsg->msgType;
*(int32_t *) (schedMsg.msg + sizeof(int8_t)) = rpcMsg->contLen;
memcpy(schedMsg.msg + sizeof(int32_t) + sizeof(int8_t), rpcMsg->pCont, rpcMsg->contLen);
taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
}
......@@ -115,14 +117,20 @@ void mgmtCleanUpShell() {
}
}
void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
STableInfoMsg *pInfo = pCont;
void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp;
rpcRsp.handle = rpcMsg->handle;
rpcRsp.pCont = NULL;
rpcRsp.contLen = 0;
STableInfoMsg *pInfo = rpcMsg->pCont;
pInfo->createFlag = htons(pInfo->createFlag);
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("table:%s, failed to get table meta, invalid user", pInfo->tableId);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -130,19 +138,22 @@ void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
if (pTable == NULL) {
if (pInfo->createFlag != 1) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_TABLE;
rpcSendResponse(&rpcRsp);
return;
} else {
// on demand create table from super table if table does not exists
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId);
return;
}
SCreateTableMsg *pCreateMsg = rpcMallocCont(sizeof(SCreateTableMsg) + sizeof(STagData));
int32_t contLen = sizeof(SCreateTableMsg) + sizeof(STagData);
SCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
if (pCreateMsg == NULL) {
mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId);
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -150,31 +161,38 @@ void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
strcpy(pCreateMsg->tableId, pInfo->tableId);
mError("table:%s, start to create table while get meta info", pInfo->tableId);
mgmtCreateTable(pCreateMsg, contLen, ahandle, true);
mgmtCreateTable(pCreateMsg, contLen, rpcMsg->handle, true);
}
} else {
mgmtProcessGetTableMeta(pTable, ahandle);
mgmtProcessGetTableMeta(pTable, rpcMsg->handle);
}
}
void mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp;
rpcRsp.handle = rpcMsg->handle;
rpcRsp.pCont = NULL;
rpcRsp.contLen = 0;
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
rpcGetConnInfo(rpcMsg->handle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SMultiTableInfoMsg *pInfo = pCont;
SMultiTableInfoMsg *pInfo = rpcMsg->pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables);
int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
if (pMultiMeta == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -211,43 +229,48 @@ void mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
}
}
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMultiMeta, pMultiMeta->contLen);
rpcRsp.pCont = pMultiMeta;
rpcRsp.contLen = pMultiMeta->contLen;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
// bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SSuperTableInfoMsg *pInfo = pCont;
void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SSuperTableInfoMsg *pInfo = rpcMsg->pCont;
STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId);
if (pTable == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_TABLE;
rpcSendResponse(&rpcRsp);
return;
}
SSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable);
if (pRsp != NULL) {
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, msgLen);
rpcRsp.pCont = pRsp;
rpcRsp.contLen = msgLen;
rpcSendResponse(&rpcRsp);
} else {
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_TABLE;
rpcSendResponse(&rpcRsp);
}
}
void mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCreateDbMsg *pCreate = (SCreateDbMsg *) pCont;
SCreateDbMsg *pCreate = (SCreateDbMsg *) rpcMsg->pCont;
pCreate->maxSessions = htonl(pCreate->maxSessions);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
......@@ -272,21 +295,24 @@ void mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) {
}
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SAlterDbMsg *pAlter = (SAlterDbMsg *) pCont;
SAlterDbMsg *pAlter = (SAlterDbMsg *) rpcMsg->pCont;
pAlter->daysPerFile = htonl(pAlter->daysPerFile);
pAlter->daysToKeep = htonl(pAlter->daysToKeep);
pAlter->maxSessions = htonl(pAlter->maxSessions) + 1;
......@@ -301,92 +327,104 @@ void mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) {
}
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SKillQueryMsg *pKill = (SKillQueryMsg *) pCont;
SKillQueryMsg *pKill = (SKillQueryMsg *) rpcMsg->pCont;
int32_t code;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtKillQuery(pKill->queryId, ahandle);
code = mgmtKillQuery(pKill->queryId, rpcMsg->handle);
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SKillStreamMsg *pKill = (SKillStreamMsg *) pCont;
SKillStreamMsg *pKill = (SKillStreamMsg *) rpcMsg->pCont;
int32_t code;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtKillStream(pKill->queryId, ahandle);
code = mgmtKillStream(pKill->queryId, rpcMsg->handle);
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SKillConnectionMsg *pKill = (SKillConnectionMsg *) pCont;
SKillConnectionMsg *pKill = (SKillConnectionMsg *) rpcMsg->pCont;
int32_t code;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtKillConnection(pKill->queryId, ahandle);
code = mgmtKillConnection(pKill->queryId, rpcMsg->handle);
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
int32_t code;
if (pUser->superAuth) {
SCreateUserMsg *pCreate = pCont;
SCreateUserMsg *pCreate = rpcMsg->pCont;
code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s is created by %s", pCreate->user, pUser->user);
......@@ -395,29 +433,34 @@ void mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) {
code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pOperUser = mgmtGetUserFromConn(ahandle);
SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pOperUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SAlterUserMsg *pAlter = pCont;
SAlterUserMsg *pAlter = rpcMsg->pCont;
SUserObj *pUser = mgmtGetUser(pAlter->user);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -447,7 +490,9 @@ void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) {
code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -496,34 +541,40 @@ void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) {
code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
return;
}
code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return ;
}
SUserObj *pOperUser = mgmtGetUserFromConn(ahandle);
SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pOperUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return ;
}
SDropUserMsg *pDrop = pCont;
SDropUserMsg *pDrop = rpcMsg->pCont;
SUserObj *pUser = mgmtGetUser(pDrop->user);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return ;
}
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return ;
}
......@@ -554,23 +605,26 @@ void mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) {
code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return ;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return ;
}
int32_t code;
if (pUser->superAuth) {
SDropDbMsg *pDrop = pCont;
SDropDbMsg *pDrop = rpcMsg->pCont;
code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
......@@ -579,7 +633,8 @@ void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) {
code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
static void mgmtInitShowMsgFp() {
......@@ -618,10 +673,12 @@ static void mgmtInitShowMsgFp() {
mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes;
}
void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
SShowMsg *pShowMsg = pCont;
void mgmtProcessShowMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SShowMsg *pShowMsg = rpcMsg->pCont;
if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
}
......@@ -629,7 +686,8 @@ void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -649,7 +707,7 @@ void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
mgmtSaveQhandle(pShow);
pShowRsp->qhandle = htobe64((uint64_t) pShow);
code = (*mgmtGetMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->tableMeta, pShow, ahandle);
code = (*mgmtGetMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle);
if (code == 0) {
size = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
} else {
......@@ -659,14 +717,18 @@ void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
}
}
rpcSendResponse(ahandle, code, pShowRsp, size);
rpcRsp.pCont = pShowRsp;
rpcRsp.contLen = size;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) {
void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
int32_t rowsToRead = 0;
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *)pCont;
SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) rpcMsg->pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
/*
......@@ -675,14 +737,16 @@ void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) {
*/
if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_QHANDLE, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_QHANDLE;
rpcSendResponse(&rpcRsp);
return;
}
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
if (pShow->signature != (void *)pShow) {
mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature);
rpcSendResponse(ahandle, TSDB_CODE_MEMORY_CORRUPTED, NULL, 0);
rpcRsp.code = TSDB_CODE_MEMORY_CORRUPTED;
rpcSendResponse(&rpcRsp);
return;
} else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
......@@ -705,7 +769,7 @@ void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) {
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*mgmtRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, ahandle);
rowsRead = (*mgmtRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle);
if (rowsRead < 0) {
rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS;
......@@ -716,107 +780,120 @@ void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) {
pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, size);
rpcRsp.pCont = pRsp;
rpcRsp.contLen = size;
rpcSendResponse(&rpcRsp);
if (rowsToRead == 0) {
mgmtFreeQhandle(pShow);
}
}
void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
SCreateTableMsg *pCreate = (SCreateTableMsg *) pCont;
void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCreateTableMsg *pCreate = (SCreateTableMsg *) rpcMsg->pCont;
pCreate->numOfColumns = htons(pCreate->numOfColumns);
pCreate->numOfTags = htons(pCreate->numOfTags);
pCreate->sqlLen = htons(pCreate->sqlLen);
SSchema *pSchema = pCreate->schema;
SSchema *pSchema = (SSchema*) pCreate->schema;
for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = i;
pSchema++;
}
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table, need redirect message", pCreate->tableId);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("table:%s, failed to create table, invalid user", pCreate->tableId);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (!pUser->writeAuth) {
mError("table:%s, failed to create table, no rights", pCreate->tableId);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
int32_t code = mgmtCreateTable(pCreate, contLen, ahandle, false);
int32_t code = mgmtCreateTable(pCreate, rpcMsg->contLen, rpcMsg->handle, false);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
}
void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) {
SDropTableMsg *pDrop = (SDropTableMsg *) pCont;
void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDropTableMsg *pDrop = (SDropTableMsg *) rpcMsg->pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to drop table, need redirect message", pDrop->tableId);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("table:%s, failed to drop table, invalid user", pDrop->tableId);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (!pUser->writeAuth) {
mError("table:%s, failed to drop table, no rights", pDrop->tableId);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId);
if (pDb == NULL) {
mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
rpcSendResponse(ahandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED;
rpcSendResponse(&rpcRsp);
return;
}
int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
}
void mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SAlterTableMsg *pAlter = (SAlterTableMsg *) pCont;
int32_t code;
SAlterTableMsg *pAlter = (SAlterTableMsg *) rpcMsg->pCont;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
pAlter->type = htons(pAlter->type);
pAlter->numOfCols = htons(pAlter->numOfCols);
if (pAlter->numOfCols > 2) {
mError("table:%s error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
code = TSDB_CODE_APP_ERROR;
rpcRsp.code = TSDB_CODE_APP_ERROR;
} else {
SDbObj *pDb = mgmtGetDb(pAlter->db);
if (pDb) {
......@@ -824,61 +901,61 @@ void mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) {
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
}
code = mgmtAlterTable(pDb, pAlter);
if (code == 0) {
rpcRsp.code = mgmtAlterTable(pDb, pAlter);
if (rpcRsp.code == 0) {
mLPrint("table:%s is altered by %s", pAlter->tableId, pUser->user);
}
} else {
code = TSDB_CODE_DB_NOT_SELECTED;
rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED;
}
}
}
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(ahandle, code, NULL, 0);
}
rpcSendResponse(&rpcRsp);
}
void mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
int32_t code;
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *) rpcMsg->pCont;
if (strcmp(pUser->pAcct->user, "root") != 0) {
code = TSDB_CODE_NO_RIGHTS;
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtSendCfgDnodeMsg(pCont);
rpcRsp.code = mgmtSendCfgDnodeMsg(rpcMsg->pCont);
}
if (code == TSDB_CODE_SUCCESS) {
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mTrace("dnode:%s is configured by %s", pCfg->ip, pUser->user);
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcSendResponse(&rpcRsp);
}
void mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) {
SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) pCont;
void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) rpcMsg->pCont;
mgmtSaveQueryStreamList(pHBMsg);
SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(contLen);
SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(sizeof(SHeartBeatRsp));
if (pHBRsp == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
rpcFreeCont(pCont);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
rpcGetConnInfo(rpcMsg->handle, &connInfo);
pHBRsp->ipList.inUse = 0;
pHBRsp->ipList.port = htons(tsMgmtShellPort);
......@@ -904,7 +981,9 @@ void mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) {
pHBRsp->streamId = 0;
pHBRsp->killConnection = 0;
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SHeartBeatMsg));
rpcRsp.pCont = pHBRsp;
rpcRsp.contLen = sizeof(SHeartBeatRsp);
rpcSendResponse(&rpcRsp);
}
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
......@@ -922,10 +1001,12 @@ int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret,
}
}
static void mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) {
SConnectMsg *pConnectMsg = (SConnectMsg *) pCont;
static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SConnectMsg *pConnectMsg = (SConnectMsg *) rpcMsg->pCont;
SRpcConnInfo connInfo;
rpcGetConnInfo(thandle, &connInfo);
rpcGetConnInfo(rpcMsg->handle, &connInfo);
int32_t code;
SUserObj *pUser = mgmtGetUser(connInfo.user);
......@@ -987,13 +1068,15 @@ static void mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) {
}
connect_over:
rpcRsp.code = code;
if (code != TSDB_CODE_SUCCESS) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcSendResponse(thandle, code, NULL, 0);
} else {
mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcSendResponse(thandle, code, pConnectRsp, sizeof(SConnectRsp));
rpcRsp.pCont = pConnectRsp;
rpcRsp.contLen = sizeof(SConnectRsp);
}
rpcSendResponse(&rpcRsp);
}
/**
......@@ -1024,48 +1107,51 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
return false;
}
static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (sdbGetRunStatus() != SDB_STATUS_SERVING) {
mTrace("shell msg is ignored since SDB is not ready");
rpcSendResponse(ahandle, TSDB_CODE_NOT_READY, NULL, 0);
rpcFreeCont(pCont);
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = TSDB_CODE_NOT_READY, .msgType = 0};
rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (mgmtCheckMsgReadOnly(type, pCont)) {
(*mgmtProcessShellMsg[(int8_t)type])(pCont, contLen, ahandle);
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
(*mgmtProcessShellMsg[rpcMsg->msgType])(rpcMsg);
} else {
if (mgmtProcessShellMsg[(int8_t)type]) {
mgmtAddToTranRequest((int8_t)type, pCont, contLen, ahandle);
if (mgmtProcessShellMsg[rpcMsg->msgType]) {
mgmtAddToTranRequest(rpcMsg);
} else {
mError("%s from shell is not processed", taosMsg[(int8_t)type]);
mError("%s from shell is not processed", taosMsg[rpcMsg->msgType]);
}
}
//TODO free may be cause segment fault
//
// rpcFreeCont(pCont);
rpcFreeCont(rpcMsg->pCont);
}
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create vgroup, db not found", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_DB;
rpcSendResponse(&rpcRsp);
return;
}
SVgObj *pVgroup = mgmtCreateVgroup(pDb);
if (pVgroup == NULL) {
mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_ENOUGH_DNODES;
rpcSendResponse(&rpcRsp);
return;
}
void *cont = rpcMallocCont(contLen);
if (cont == NULL) {
mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1087,6 +1173,7 @@ void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *th
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
assert(pVgroup != NULL);
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
......@@ -1095,21 +1182,20 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c
return;
}
int32_t code;
STableInfo *pTable;
SDCreateTableMsg *pDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
rpcRsp.code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
} else {
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
rpcRsp.code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid);
rpcSendResponse(thandle, code, NULL, 0);
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1129,10 +1215,12 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c
}
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to get table meta, db not selected", pTable->tableId);
rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1141,15 +1229,17 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
rpcRsp.code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (code != TSDB_CODE_SUCCESS) {
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
} else {
pMeta->contLen = htons(pMeta->contLen);
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen);
rpcRsp.pCont = pMeta;
rpcRsp.contLen = pMeta->contLen;
}
rpcSendResponse(&rpcRsp);
}
static int32_t mgmtCheckRedirectMsgImp(void *pConn) {
......@@ -1158,20 +1248,26 @@ static int32_t mgmtCheckRedirectMsgImp(void *pConn) {
int32_t (*mgmtCheckRedirectMsg)(void *pConn) = mgmtCheckRedirectMsgImp;
static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {
.msgType = 0,
.pCont = 0,
.contLen = 0,
.code = TSDB_CODE_OPS_NOT_SUPPORT,
.handle = rpcMsg->handle
};
rpcSendResponse(&rpcRsp);
}
void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
static void mgmtProcessAlterAcctMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (!mgmtAlterAcctFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT;
rpcSendResponse(&rpcRsp);
return;
}
SAlterAcctMsg *pAlter = pCont;
SAlterAcctMsg *pAlter = rpcMsg->pCont;
pAlter->cfg.maxUsers = htonl(pAlter->cfg.maxUsers);
pAlter->cfg.maxDbs = htonl(pAlter->cfg.maxDbs);
pAlter->cfg.maxTimeSeries = htonl(pAlter->cfg.maxTimeSeries);
......@@ -1183,21 +1279,23 @@ static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle)
pAlter->cfg.maxInbound = htobe64(pAlter->cfg.maxInbound);
pAlter->cfg.maxOutbound = htobe64(pAlter->cfg.maxOutbound);
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("account:%s, failed to alter account, need redirect message", pAlter->user);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("account:%s, failed to alter account, invalid user", pAlter->user);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("account:%s, failed to alter account, no rights", pAlter->user);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1208,32 +1306,36 @@ static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle)
mError("account:%s, failed to alter account, reason:%s", pAlter->user, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessDropAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
static void mgmtProcessDropAcctMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (!mgmtDropAcctFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT;
rpcSendResponse(&rpcRsp);
return;
}
SDropAcctMsg *pDrop = (SDropAcctMsg *) pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
SDropAcctMsg *pDrop = (SDropAcctMsg *) rpcMsg->pCont;
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("account:%s, failed to drop account, need redirect message", pDrop->user);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("account:%s, failed to drop account, invalid user", pDrop->user);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("account:%s, failed to drop account, no rights", pDrop->user);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1244,16 +1346,19 @@ static void mgmtProcessDropAcctMsg(void *pCont, int32_t contLen, void *ahandle)
mError("account:%s, failed to drop account, reason:%s", pDrop->user, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
static void mgmtProcessCreateAcctMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (!mgmtCreateAcctFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT;
rpcSendResponse(&rpcRsp);
return;
}
SCreateAcctMsg *pCreate = (SCreateAcctMsg *) pCont;
SCreateAcctMsg *pCreate = (SCreateAcctMsg *) rpcMsg->pCont;
pCreate->cfg.maxUsers = htonl(pCreate->cfg.maxUsers);
pCreate->cfg.maxDbs = htonl(pCreate->cfg.maxDbs);
pCreate->cfg.maxTimeSeries = htonl(pCreate->cfg.maxTimeSeries);
......@@ -1265,21 +1370,23 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle
pCreate->cfg.maxInbound = htobe64(pCreate->cfg.maxInbound);
pCreate->cfg.maxOutbound = htobe64(pCreate->cfg.maxOutbound);
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("account:%s, failed to create account, need redirect message", pCreate->user);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("account:%s, failed to create account, invalid user", pCreate->user);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("account:%s, failed to create account, no rights", pCreate->user);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1290,31 +1397,36 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle
mError("account:%s, failed to create account, reason:%s", pCreate->user, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
static void mgmtProcessCreateDnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (!mgmtCreateDnodeFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT;
rpcSendResponse(&rpcRsp);
return;
}
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *) rpcMsg->pCont;
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("failed to create dnode:%s, redirect this message", pCreate->ip);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER));
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS));
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1325,31 +1437,36 @@ static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandl
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
static void mgmtProcessDropDnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (!mgmtDropDnodeByIpFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT;
rpcSendResponse(&rpcRsp);
return;
}
SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
SDropDnodeMsg *pDrop = (SDropDnodeMsg *) rpcMsg->pCont;
if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("failed to drop dnode:%s, redirect this message", pDrop->ip);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER));
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS));
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
......@@ -1360,7 +1477,8 @@ static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle)
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtInitProcessShellMsg() {
......@@ -1383,8 +1501,8 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
......
......@@ -133,7 +133,7 @@ int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
assert(pAcct != NULL);
int32_t code = mgmtCheckTableLimit(pAcct, pCreate);
int32_t code = mgmtCheckTableLimit(pAcct, pCreate->numOfColumns);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId);
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册