提交 55a9a5db 编写于 作者: S slguan

message for drop vgroup

上级 683ef75f
......@@ -1804,6 +1804,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SDropTableMsg *pDropTableMsg = (SDropTableMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name);
pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE;
return TSDB_CODE_SUCCESS;
......
......@@ -45,7 +45,7 @@ bool dnodeCheckVnodeExist(int32_t vid);
* Create vnode with specified configuration and open it
* if exist, config it
*/
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg);
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode);
/*
* Remove vnode from local repository
......
......@@ -177,9 +177,9 @@ void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType
void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDRemoveTableMsg *pTable = pCont;
pTable->sid = htonl(pTable->sid);
pTable->sid = htonl(pTable->sid);
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
pTable->uid = htobe64(pTable->uid);
pTable->uid = htobe64(pTable->uid);
for (int i = 0; i < pTable->numOfVPeers; ++i) {
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
......@@ -194,9 +194,8 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(vpeer->vnode);
dnodeCreateVnode(vnode, vpeer);
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) (pCont + sizeof(int32_t));
dnodeCreateVnode(pVnode);
} else if (code == TSDB_CODE_INVALID_VNODE_ID) {
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(vpeer->vnode);
......@@ -207,22 +206,15 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void
}
}
void dnodeProcessVPeersMsg(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SVPeersMsg *vpeer = (SVPeersMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
dPrint("vnode:%d, start to config", vnode);
int32_t code = dnodeCreateVnode(vnode, vpeer);
void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont;
int32_t code = dnodeCreateVnode(pVnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
dPrint("vnode:%d, remove it", vnode);
SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(pVnode->vnode);
int32_t code = dnodeDropVnode(vnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
......@@ -256,7 +248,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
void dnodeInitProcessShellMsg() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
......
......@@ -31,11 +31,13 @@ bool dnodeCheckVnodeExist(int32_t vnode) {
return true;
}
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg) {
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) {
dPrint("vnode:%d, is created", htonl(pVnode->vnode));
return 0;
}
int32_t dnodeDropVnode(int32_t vnode) {
dPrint("vnode:%d, is dropped", vnode);
return 0;
}
......
......@@ -70,7 +70,7 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) {
* Remove table from local repository
*/
int32_t dnodeDropTable(SDRemoveTableMsg *pTable) {
dPrint("table:%s, sid:%d will be removed", pTable->tableId, pTable->sid);
dPrint("table:%s, sid:%d is removed", pTable->tableId, pTable->sid);
return TSDB_CODE_SUCCESS;
}
......
......@@ -41,7 +41,7 @@ extern "C" {
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_VPEERS 13
#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16
......@@ -266,7 +266,6 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
int8_t igNotExists;
} SDropTableMsg;
......@@ -623,7 +622,7 @@ typedef struct {
int32_t vnode;
SVnodeCfg cfg;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
} SVPeersMsg;
} SCreateVnodeMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
......
......@@ -32,7 +32,7 @@ void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle)
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle);
extern int32_t (*mgmtInitDnodeInt)();
extern void (*mgmtCleanUpDnodeInt)();
......
......@@ -39,11 +39,10 @@ int32_t mgmtKillStream(char *qidstr, void *pConn);
int32_t mgmtKillConnection(char *qidstr, void *pConn);
enum {
TSDB_PROCESS_CREATE_TABLE,
TSDB_PROCESS_CREATE_VGROUP,
TSDB_PROCESS_CREATE_VGROUP_AND_TABLE,
TSDB_PROCESS_CREATE_VNODE,
TSDB_PROCESS_TABLE_CFG,
TSDB_PROCESS_CREATE_VGROUP_GET_META,
TSDB_PROCESS_CREATE_TABLE,
TSDB_PROCESS_CREATE_TABLE_GET_META,
};
typedef struct {
......
......@@ -36,6 +36,21 @@ extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahand
extern void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
/*
* If table not exist, will create it
*/
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
/*
* If vgroup not exist, will create vgroup
*/
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
/*
* If vgroup create returned, will then create table
*/
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
#ifdef __cplusplus
}
#endif
......
......@@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle);
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
......@@ -45,7 +45,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle);
#ifdef __cplusplus
}
......
......@@ -42,7 +42,7 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode);
SCreateVnodeMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode);
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
......
......@@ -311,7 +311,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
pSchema++;
}
memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
memcpy(pCreateTable + sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreateTable;
}
......@@ -380,6 +380,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pRemove->tableId, pTable->tableId);
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
......
......@@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) {
}
}
}
// mgmtSendFreeVnodesMsg(pVgroup);
// mgmtSendRemoveVgroupMsg(pVgroup);
pVgroup = pVgroup->next;
}
......@@ -355,7 +355,7 @@ int32_t mgmtDropDb(SDbObj *pDb) {
if (!finished) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
mgmtSendFreeVnodesMsg(pVgroup);
mgmtSendRemoveVgroupMsg(pVgroup, NULL);
pVgroup = pVgroup->next;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
......
......@@ -27,6 +27,7 @@
#include "mgmtDnode.h"
#include "mgmtDnodeInt.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
......@@ -137,7 +138,7 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con
if (thandle == NULL) return;
SProcessInfo *info = thandle;
assert(info->type == TSDB_PROCESS_CREATE_TABLE);
assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
STableInfo *pTable = info->ahandle;
if (code != TSDB_CODE_SUCCESS) {
......@@ -148,7 +149,17 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con
mgmtSetTableDirty(pTable, false);
}
rpcSendResponse(info->thandle, code, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(info->thandle, code, NULL, 0);
} else {
if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
mTrace("table:%s, start to process get meta", pTable->tableId);
mgmtProcessGetTableMeta(pTable, thandle);
} else {
rpcSendResponse(info->thandle, code, NULL, 0);
}
}
free(info);
}
......@@ -169,7 +180,7 @@ void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *a
}
static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
mTrace("free vnode rsp received, handle:%p code:%d", thandle, code);
mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code);
}
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
......@@ -177,13 +188,18 @@ static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t con
if (thandle == NULL) return;
SProcessInfo *info = thandle;
assert(info->type == TSDB_PROCESS_CREATE_VGROUP);
assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
info->received++;
SVgObj *pVgroup = info->ahandle;
bool isGetMeta = false;
if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
isGetMeta = true;
}
mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
if (info->received == pVgroup->numOfVnodes) {
mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle);
mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
free(info);
}
}
......@@ -198,9 +214,9 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
SCreateVnodeMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
if (pVpeer != NULL) {
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), ahandle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle);
}
}
......@@ -236,23 +252,25 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid);
mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
}
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle) {
mTrace("vnode:%d send free vnode msg, handle:%p", vnode, handle);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg));
if (pFreeVnode != NULL) {
pFreeVnode->vnode = htonl(vnode);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle);
}
}
void mgmtSendFreeVnodesMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *handle) {
void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, handle);
mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
}
}
......
......@@ -416,6 +416,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pRemove->tableId, pTable->tableId);
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
......
......@@ -28,6 +28,7 @@
#include "mgmtConn.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
......@@ -38,23 +39,20 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#define MAX_LEN_OF_METER_META (sizeof(SMultiTableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN)
typedef int32_t (*GetMateFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*RetrieveMetaFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static GetMateFp* mgmtGetMetaFp;
static RetrieveMetaFp* mgmtRetrieveFp;
static void mgmtInitShowMsgFp();
void *tsShellConnServer = NULL;
static GetMateFp mgmtGetMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
static RetrieveMetaFp mgmtRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
static void mgmtInitShowMsgFp();
static void mgmtInitProcessShellMsg();
static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code);
static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle);
static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle);
static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void *tsShellConnServer = NULL;
void mgmtProcessTranRequest(SSchedMsg *sched) {
int8_t msgType = *(int8_t *) (sched->msg);
int32_t contLen = *(int32_t *) (sched->msg + sizeof(int8_t));
......@@ -118,68 +116,44 @@ void mgmtCleanUpShell() {
}
void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
STableInfoMsg *pInfo = pCont;
pInfo->createFlag = htons(pInfo->createFlag);
SDbObj* pDb = mgmtGetDbByTableId(pInfo->tableId);
if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0);
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
mError("table:%s, failed to get table meta, invalid user", pInfo->tableId);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
STableInfo *pTable = mgmtGetTable(pInfo->tableId);
// on demand create table from super table if meter does not exists
if (pTable == NULL && pInfo->createFlag == 1) {
// write operation needs to redirect to master mnode
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
return;
}
SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
if (pCreateMsg == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
if (pTable == NULL) {
if (pInfo->createFlag != 1) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0);
return;
}
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strcpy(pCreateMsg->tableId, pInfo->tableId);
mgmtCreateTable(pCreateMsg, contLen, NULL);
char stableName[TSDB_TABLE_ID_LEN] = {0};
strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN);
mTrace("table:%s is auto created by %s from %s", pCreateMsg->tableId, pUser->user, stableName);
tfree(pCreateMsg);
pTable = mgmtGetTable(pInfo->tableId);
}
} else {
// on demand create table from super table if table does not exists
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId);
return;
}
if (pTable == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0);
return;
}
SCreateTableMsg *pCreateMsg = rpcMallocCont(sizeof(SCreateTableMsg) + sizeof(STagData));
if (pCreateMsg == NULL) {
mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId);
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strcpy(pCreateMsg->tableId, pInfo->tableId);
if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0);
mError("table:%s, start to create table while get meta info", pInfo->tableId);
mgmtCreateTable(pCreateMsg, contLen, ahandle, true);
}
} else {
pMeta->contLen = htons(pMeta->contLen);
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen);
mgmtProcessGetTableMeta(pTable, ahandle);
}
}
......@@ -609,7 +583,6 @@ void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) {
}
static void mgmtInitShowMsgFp() {
mgmtGetMetaFp = (GetMateFp *)malloc(TSDB_MGMT_TABLE_MAX * sizeof(GetMateFp));
mgmtGetMetaFp[TSDB_MGMT_TABLE_ACCT] = mgmtGetAcctMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_USER] = mgmtGetUserMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_DB] = mgmtGetDbMeta;
......@@ -627,7 +600,6 @@ static void mgmtInitShowMsgFp() {
mgmtGetMetaFp[TSDB_MGMT_TABLE_GRANTS] = mgmtGetGrantsMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_VNODES] = mgmtGetVnodeMeta;
mgmtRetrieveFp = (RetrieveMetaFp *)malloc(TSDB_MGMT_TABLE_MAX * sizeof(RetrieveMetaFp));
mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts;
mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers;
mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs;
......@@ -782,7 +754,7 @@ void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
return;
}
int32_t code = mgmtCreateTable(pCreate, contLen, ahandle);
int32_t code = mgmtCreateTable(pCreate, contLen, ahandle, false);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcSendResponse(ahandle, code, NULL, 0);
}
......@@ -809,9 +781,9 @@ void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) {
return;
}
SDbObj *pDb = mgmtGetDb(pDrop->db);
SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId);
if (pDb == NULL) {
mError("table:%s, failed to drop table, db:%s not selected", pDrop->tableId, pDrop->db);
mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
rpcSendResponse(ahandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
return;
}
......@@ -1107,6 +1079,111 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessSuperTableMetaMsg;
}
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create vgroup, db not found", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0);
return;
}
SVgObj *pVgroup = mgmtCreateVgroup(pDb);
if (pVgroup == NULL) {
mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0);
return;
}
void *cont = rpcMallocCont(contLen);
if (cont == NULL) {
mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
memcpy(cont, pCreate, contLen);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_VGROUP;
info->thandle = thandle;
info->ahandle = pVgroup;
info->cont = cont;
info->contLen = contLen;
if (isGetMeta) {
info->type = TSDB_PROCESS_CREATE_VGROUP_GET_META;
}
mgmtSendCreateVgroupMsg(pVgroup, info);
}
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
assert(pVgroup != NULL);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta);
return;
}
int32_t code;
STableInfo *pTable;
SDCreateTableMsg *pDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
} else {
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid);
rpcSendResponse(thandle, code, NULL, 0);
return;
}
assert(pDCreate != NULL);
assert(pTable != NULL);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->thandle = thandle;
info->ahandle = pTable;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
if (isGetMeta) {
info->type = TSDB_PROCESS_CREATE_TABLE_GET_META;
}
mgmtSendCreateTableMsg(pDCreate, &ipSet, info);
}
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to get table meta, db not selected", pTable->tableId);
rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
return;
}
SRpcConnInfo connInfo;
rpcGetConnInfo(thandle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
} else {
pMeta->contLen = htons(pMeta->contLen);
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen);
}
}
static int32_t mgmtCheckRedirectMsgImp(void *pConn) {
return 0;
}
......
......@@ -34,6 +34,7 @@
#include "mgmtGrant.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
......@@ -111,81 +112,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return TSDB_CODE_SUCCESS;
}
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create vgroup, db not found", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0);
return;
}
SVgObj *pVgroup = mgmtCreateVgroup(pDb);
if (pVgroup == NULL) {
mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0);
return;
}
void *cont = rpcMallocCont(contLen);
if (cont == NULL) {
mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
memcpy(cont, pCreate, contLen);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_VGROUP;
info->thandle = thandle;
info->ahandle = pVgroup;
info->cont = cont;
info->contLen = contLen;
mgmtSendCreateVgroupMsg(pVgroup, info);
}
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
assert(pVgroup != NULL);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateVgroup(pCreate, contLen, thandle);
return;
}
int32_t code;
STableInfo *pTable;
SDCreateTableMsg *pDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
} else {
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid);
rpcSendResponse(thandle, code, NULL, 0);
return;
}
assert(pDCreate != NULL);
assert(pTable != NULL);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->thandle = thandle;
info->ahandle = pTable;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateTableMsg(pDCreate, &ipSet, info);
}
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create table, db not selected", pCreate->tableId);
......@@ -232,10 +159,10 @@ int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle
SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb);
if (pVgroup == NULL) {
mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId);
mgmtProcessCreateVgroup(pCreate, contLen, thandle);
mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta);
} else {
mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle);
mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle, isGetMeta);
}
return TSDB_CODE_ACTION_IN_PROGRESS;
......@@ -494,4 +421,5 @@ SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) {
pTable->dirty = isDirty;
}
\ No newline at end of file
}
......@@ -192,10 +192,12 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
}
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
// mgmtSendFreeVnodesMsg(pVgroup);
mgmtSendRemoveVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
return 0;
return TSDB_CODE_SUCCESS;
}
void mgmtSetVgroupIdPool() {
......@@ -480,11 +482,11 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
taosFreeId(pVgroup->idPool, pTable->sid);
}
SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) {
SCreateVnodeMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) {
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) return NULL;
SVPeersMsg *pVPeers = rpcMallocCont(sizeof(SVPeersMsg));
SCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SCreateVnodeMsg));
if (pVPeers == NULL) return NULL;
pVPeers->vnode = htonl(vnode);
......
......@@ -28,8 +28,8 @@ char *taosMsg[] = {
"remove-table",
"remove-table-rsp",
"vpeers",
"vpeers-rsp",
"create-vnode",
"create-vnode-rsp",
"free-vnode",
"free-vnode-rsp",
"cfg-dnode",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册