提交 aa84e61e 编写于 作者: S slguan

communication from dnode to mgmt

上级 5cb76602
......@@ -26,7 +26,7 @@ extern "C" {
int32_t dnodeInitMgmt();
void dnodeInitMgmtIp();
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code);
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen);
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
......
......@@ -43,11 +43,11 @@ static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
void *handle = sched->ahandle;
int8_t *pCont = sched->msg;
mgmtProcessMsgFromDnode(pCont, contLen, handle, code);
mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code);
rpcFreeCont(sched->msg);
}
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
dTrace("msg:%s is sent to mnode", taosMsg[msgType]);
if (dnodeSendMsgToMnodeFp) {
dnodeSendMsgToMnodeFp(msgType, pCont, contLen);
......@@ -55,7 +55,6 @@ void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *aha
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
schedMsg.ahandle = ahandle;
*(int32_t *) (pCont - 4) = contLen;
*(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS;
*(int8_t *) (pCont - 9) = msgType;
......@@ -69,7 +68,7 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont,
dnodeSendRspToMnodeFp(pConn, code, pCont, contLen);
} else {
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeFp;
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
*(int32_t *) (pCont - 4) = contLen;
*(int32_t *) (pCont - 8) = code;
......@@ -93,7 +92,7 @@ void dnodeInitMgmtIp() {
}
}
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType);
} else {
......@@ -221,7 +220,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) {
}
cfg->vnode = htonl(vnode);
dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SVpeerCfgMsg), TSDB_MSG_TYPE_VNODE_CFG);
dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg));
}
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
......@@ -231,7 +230,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
}
cfg->vnode = htonl(vnode);
dnodeSendMsgToMnode((int8_t*)cfg, sizeof(STableCfgMsg), TSDB_MSG_TYPE_TABLE_CFG);
dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg));
}
void dnodeInitProcessShellMsg() {
......
......@@ -49,7 +49,7 @@ extern int32_t (*dnodeCheckSystem)();
// dnodeSystem
extern void *tsDnodeMgmtQhandle;
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code);
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
// dnodeModule
extern void (*dnodeStartModules)();
......
......@@ -26,18 +26,19 @@ extern "C" {
extern void *mgmtStatusTimer;
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle);
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle);
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle);
void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle);
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
extern int32_t (*mgmtInitDnodeInt)();
extern void (*mgmtCleanUpDnodeInt)();
extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId);
void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
#ifdef __cplusplus
......
......@@ -38,6 +38,23 @@ int32_t mgmtKillStream(char *qidstr, void *pConn);
int32_t mgmtKillConnection(char *qidstr, void *pConn);
enum {
TSDB_PROCESS_CREATE_TABLE,
TSDB_PROCESS_CREATE_VGROUP,
TSDB_PROCESS_CREATE_VGROUP_AND_TABLE,
TSDB_PROCESS_CREATE_VNODE,
TSDB_PROCESS_TABLE_CFG,
};
typedef struct {
void *thandle; // come from uplayer
void *ahandle; // object to process
void *cont; // additional information of object to process
int32_t type; // the type of sync process
int32_t received; // num of received, such as numOfVnodes
int32_t contLen; // the length of additional information
} SProcessInfo;
#ifdef __cplusplus
}
#endif
......
......@@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
......
......@@ -31,6 +31,7 @@
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtProfile.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
......@@ -339,19 +340,16 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
mError("table:%s, corresponding super table schema is null", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
}
// memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size);
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
}
mgmtSendCreateTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
......@@ -360,7 +358,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
return TSDB_CODE_OTHERS;
}
mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
......
......@@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) {
}
}
}
mgmtSendFreeVnodesMsg(pVgroup);
// mgmtSendFreeVnodesMsg(pVgroup);
pVgroup = pVgroup->next;
}
......@@ -465,7 +465,7 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
//rebuild meterList in mgmtVgroup.c
mgmtUpdateVgroup(pVgroup);
}
mgmtSendVPeersMsg(pVgroup);
// mgmtSendCreateVnodeMsg(pVgroup);
pVgroup = pVgroup->next;
}
mgmtStartBalanceTimer(10);
......
......@@ -26,10 +26,11 @@
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDnodeInt.h"
#include "mgmtProfile.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
void (*mgmtSendMsgToDnodeFp)(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL;
void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL;
void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL;
void *mgmtStatusTimer = NULL;
......@@ -44,10 +45,10 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) {
rpcFreeCont(sched->msg);
}
void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
mTrace("msg:%s is sent to dnode", taosMsg[msgType]);
if (mgmtSendMsgToDnodeFp) {
mgmtSendMsgToDnodeFp(msgType, pCont, contLen, ahandle);
mgmtSendMsgToDnodeFp(ipSet, msgType, pCont, contLen, ahandle);
} else {
SSchedMsg schedMsg = {0};
schedMsg.fp = mgmtSendMsgToDnodeQueueFp;
......@@ -75,7 +76,7 @@ void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont,
}
}
static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
STableCfgMsg *pCfg = (STableCfgMsg *) pCont;
pCfg->dnode = htonl(pCfg->dnode);
pCfg->vnode = htonl(pCfg->vnode);
......@@ -84,19 +85,21 @@ static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe
if (!sdbMaster) {
mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
return;
}
STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
if (pTable == NULL) {
mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
return;
}
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
mgmtSendCreateTableMsg(pTable, NULL);
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
mgmtSendCreateTableMsg(pTable, &ipSet, NULL);
}
static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
......@@ -117,11 +120,22 @@ static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe
}
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
mgmtSendVPeersMsg(pVgroup, pCfg->vnode, NULL);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL);
}
static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
mTrace("create table rsp received, handle:%p code:%d", thandle, code);
if (thandle == NULL) return;
SProcessInfo *info = thandle;
if (info->type == TSDB_PROCESS_CREATE_TABLE) {
rpcSendResponse(info->thandle, code, NULL, 0);
} else {
mError("create table rsp received, handle:%p code:%d, invalid type:%d", info->type);
rpcSendResponse(info->thandle, TSDB_CODE_INVALID_MSG, NULL, 0);
}
}
static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
......@@ -132,8 +146,59 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL
mTrace("free vnode rsp received, handle:%p code:%d", thandle, code);
}
static void mgmtProcessVPeersRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
mTrace("vpeers rsp received, handle:%p code:%d", thandle, code);
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) {
mTrace("create vnode received, vgroup_info:%p code:%d", ahandle, code);
if (ahandle == NULL) {
rpcFreeCont(pCont);
return;
}
SProcessInfo *vgroup_info = ahandle;
assert(vgroup_info->type == TSDB_PROCESS_CREATE_VGROUP);
vgroup_info->received++;
SVgObj *pVgroup = vgroup_info->ahandle;
mTrace("vgroup:%d, received:%d numOfVnodes:%d table_info:%p",
pVgroup->vgId, vgroup_info->received, pVgroup->numOfVnodes, vgroup_info->thandle);
if (vgroup_info->received == pVgroup->numOfVnodes) {
SProcessInfo *table_info = vgroup_info->thandle;
assert(table_info->type == TSDB_PROCESS_CREATE_VGROUP_AND_TABLE);
STableInfo *pTable = table_info->ahandle;
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->thandle = table_info->thandle;
info->ahandle = pTable;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateTableMsg(pTable, &ipSet, info);
free(vgroup_info);
free(table_info);
}
rpcFreeCont(pCont);
}
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *table_info) {
SProcessInfo *vgroup_info = calloc(1, sizeof(SProcessInfo));
vgroup_info->type = TSDB_PROCESS_CREATE_VGROUP;
vgroup_info->thandle = table_info;
vgroup_info->ahandle = pVgroup;
mTrace("vgroup:%d, send create all vnodes msg, table_info:%p vgroup_info:%p ", pVgroup->vgId, table_info, vgroup_info);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, vgroup_info);
}
}
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *vgroup_info) {
mTrace("vgroup:%d, send create vnode:%d msg, vgroup_info:%p", pVgroup->vgId, vnode, vgroup_info);
SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
if (pVpeer != NULL) {
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), vgroup_info);
}
}
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *handle, int32_t code) {
......@@ -146,61 +211,52 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *h
} else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) {
mgmtProcessRemoveTableRsp(msgType, pCont, contLen, handle, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) {
mgmtProcessVPeersRsp(msgType, pCont, contLen, handle, code);
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, handle, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, handle, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
} else {
mError("%s from dnode is not processed", taosMsg[msgType]);
mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
}
}
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) {
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send create table msg, handle:%p", pTable->tableId, pTable->sid);
SDCreateTableMsg *pCreate = mgmtBuildCreateTableMsg(pTable);
if (pCreate != NULL) {
mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), handle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle);
}
}
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) {
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid);
SDRemoveTableMsg *pRemove = mgmtBuildRemoveTableMsg(pTable);
if (pRemove != NULL) {
mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), handle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle);
}
}
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) {
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid);
}
void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle) {
mTrace("vgroup:%d, vnode:%d send vpeer msg, handle:%p", pVgroup->vgId, vnode, handle);
SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
if (pVpeer != NULL) {
mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), handle);
}
}
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle) {
mTrace("vnode:%d send free vnode msg, handle:%p", vnode, handle);
SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg));
if (pFreeVnode != NULL) {
pFreeVnode->vnode = htonl(vnode);
mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle);
}
}
void mgmtSendFreeVnodesMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *handle) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid + i, &ipSet, handle);
mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, handle);
}
}
......
......@@ -273,7 +273,7 @@ int32_t mgmtInitNormalTables() {
continue;
}
mgmtAddTableIntoVgroup(pVgroup, pTable);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable);
//pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1);
......@@ -373,12 +373,10 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
return TSDB_CODE_SDB_ERROR;
}
mgmtSendCreateNormalTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
......@@ -387,7 +385,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
return TSDB_CODE_OTHERS;
}
mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsNormalTableSdb, pTable);
......
......@@ -155,7 +155,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strcpy(pCreateMsg->tableId, pInfo->tableId);
int32_t code = mgmtCreateTable(pDb, pCreateMsg);
int32_t code = mgmtCreateTable(pDb, pCreateMsg, NULL);
char stableName[TSDB_TABLE_ID_LEN] = {0};
strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN);
......@@ -803,7 +803,7 @@ int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb) {
code = mgmtCreateTable(pDb, pCreate);
code = mgmtCreateTable(pDb, pCreate, ahandle);
} else {
code = TSDB_CODE_DB_NOT_SELECTED;
}
......
......@@ -33,6 +33,7 @@
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
......@@ -110,7 +111,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return TSDB_CODE_SUCCESS;
}
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle) {
STableInfo *pTable = mgmtGetTable(pCreate->tableId);
if (pTable != NULL) {
if (pCreate->igExists) {
......@@ -143,13 +144,30 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
int32_t sid;
SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid);
if (pVgroup == NULL) {
// process it in a callback function
return TSDB_CODE_ACTION_IN_PROGRESS;
code = mgmtCreateVgroup(pDb);
if (code == TSDB_CODE_SUCCESS) {
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_VGROUP_AND_TABLE;
info->ahandle = pTable;
info->thandle = thandle;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateVgroupMsg(pVgroup, &ipSet, info);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
} else {
if (pCreate->numOfColumns == 0) {
return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid);
code = mgmtCreateChildTable(pDb, pCreate, pVgroup, sid);
} else {
return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid);
code = mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid);
}
if (code == TSDB_CODE_SUCCESS) {
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->ahandle = pTable;
info->thandle = thandle;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateTableMsg(pTable, &ipSet, info);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
} else {
......
......@@ -216,8 +216,6 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i)
mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
mgmtSendVPeersMsg(pVgroup);
return pVgroup;
}
......@@ -234,7 +232,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
}
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
mgmtSendFreeVnodesMsg(pVgroup);
// mgmtSendFreeVnodesMsg(pVgroup);
sdbDeleteRow(tsVgroupSdb, pVgroup);
return 0;
......@@ -578,6 +576,6 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
}
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
SRpcIpSet ipSet = {.ip = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1};
SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1};
return ipSet;
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ typedef struct _sched_msg {
void (*tfp)(void *, void *);
int8_t *msg;
void *msg;
void *ahandle;
void *thandle;
} SSchedMsg;
......
......@@ -23,7 +23,7 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
extern char dataDir[TSDB_FILENAME_LEN];
extern char dataDir[];
bool taosGetSysMemory(float *memoryUsedMB);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册