提交 5cb76602 编写于 作者: S slguan

communication from dnode to mgmt

上级 86402cc9
......@@ -31,44 +31,49 @@ void (*dnodeInitMgmtIpFp)() = NULL;
int32_t (*dnodeInitMgmtFp)() = NULL;
void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg();
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t));
int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t));
int8_t *pCont = sched->msg;
void *pConn = NULL;
int32_t contLen = *(int32_t *) (sched->msg - 4);
int32_t code = *(int32_t *) (sched->msg - 8);
int8_t msgType = *(int8_t *) (sched->msg - 9);
void *handle = sched->ahandle;
int8_t *pCont = sched->msg;
mgmtProcessMsgFromDnode(pCont, contLen, msgType, pConn);
mgmtProcessMsgFromDnode(pCont, contLen, handle, code);
rpcFreeCont(sched->msg);
}
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
dTrace("msg:%s is sent to mnode", taosMsg[msgType]);
if (dnodeSendMsgToMnodeFp) {
dnodeSendMsgToMnodeFp(msgType, pCont, contLen);
} else {
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
schedMsg.ahandle = ahandle;
*(int32_t *) (pCont - 4) = contLen;
*(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS;
*(int8_t *) (pCont - 9) = msgType;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
}
}
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
dTrace("rsp:%s is sent to mnode", taosMsg[msgType]);
if (tsIsCluster) {
rpcSendResponse(pConn, code, pCont, contLen);
if (dnodeSendRspToMnodeFp) {
dnodeSendRspToMnodeFp(pConn, code, pCont, contLen);
} else {
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeFp;
schedMsg.msg = pCont;
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
*(int32_t *) (pCont - 4) = contLen;
*(int32_t *) (pCont - 8) = code;
*(int8_t *) (pCont - 9) = msgType;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
}
}
......@@ -88,7 +93,7 @@ void dnodeInitMgmtIp() {
}
}
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code) {
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType);
} else {
......
......@@ -46,6 +46,11 @@ extern void (*dnodeParseParameterK)();
extern int32_t (*dnodeCheckSystem)();
// dnodeSystem
extern void *tsDnodeMgmtQhandle;
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code);
// dnodeModule
extern void (*dnodeStartModules)();
......
......@@ -258,11 +258,13 @@ typedef struct {
//mgmtSystem
int32_t mgmtStartSystem();
void mgmtCleanUpSystem();
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int contLen, void *pConn, int32_t code);
extern int32_t (*mgmtInitSystem)();
extern void (*mgmtStopSystem)();
extern void (*mgmtCleanUpRedirect)();
#ifdef __cplusplus
}
#endif
......
......@@ -162,6 +162,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee")
#ifdef TAOS_ERROR_C
};
......
......@@ -240,10 +240,11 @@ typedef struct {
int16_t numOfTags;
int32_t tagDataLen;
int32_t sqlDataLen;
int32_t contLen;
uint64_t createdTime;
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t data[];
char data[];
} SDCreateTableMsg;
typedef struct {
......@@ -325,9 +326,13 @@ typedef struct {
short vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
} SDRemoveTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
} SDRemoveSuperTableMsg;
typedef struct {
int32_t vnode;
} SFreeVnodeMsg;
......@@ -603,8 +608,8 @@ typedef struct {
} SSecIe;
typedef struct {
int32_t dnode; //the ID of dnode
int32_t vnode; //the index of vnode
uint32_t dnode; //the ip of dnode
int32_t vnode; //the index of vnode
uint32_t ip;
} SVPeerDesc;
......@@ -616,7 +621,7 @@ typedef struct {
typedef struct {
int32_t vnode;
SVnodeCfg cfg;
SVPeerDesc vpeerDesc[];
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
} SVPeersMsg;
typedef struct {
......@@ -732,11 +737,13 @@ typedef struct {
} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint32_t dnode;
int32_t vnode;
int32_t sid;
} STableCfgMsg;
typedef struct {
uint32_t dnode;
int32_t vnode;
} SVpeerCfgMsg;
......
......@@ -35,7 +35,7 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable);
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
......
......@@ -57,6 +57,7 @@ extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg);
extern SDnodeObj tsDnodeObj;
#ifdef __cplusplus
}
#endif
......
......@@ -26,23 +26,19 @@ extern "C" {
extern void *mgmtStatusTimer;
int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup);
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle);
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle);
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle);
void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle);
int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup);
int mgmtSendVPeersMsg(SVgObj *pVgroup);
int mgmtSendFreeVnodeMsg(SVgObj *pVgroup);
int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid);
char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type);
char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type);
extern int32_t (*mgmtSendSimpleRspToDnode)(void *pConn, int32_t msgType, int32_t code);
extern int32_t (*mgmtSendMsgToDnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*mgmtInitDnodeInt)();
extern void (*mgmtCleanUpDnodeInt)();
extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId);
void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
#ifdef __cplusplus
}
......
......@@ -41,6 +41,9 @@ void mgmtCleanUpMeters();
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable);
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
#ifdef __cplusplus
}
......
......@@ -27,6 +27,7 @@ extern "C" {
int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
SVgObj *mgmtCreateVgroup(SDbObj *pDb);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
......@@ -41,6 +42,11 @@ SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode);
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
#ifdef __cplusplus
}
#endif
......
......@@ -273,7 +273,7 @@ void mgmtCleanUpChildTables() {
sdbCloseTable(tsChildTableSdb);
}
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) {
SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable) {
// SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg;
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
// memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
......@@ -360,7 +360,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
return TSDB_CODE_OTHERS;
}
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
......
......@@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) {
}
}
}
mgmtSendFreeVnodeMsg(pVgroup);
mgmtSendFreeVnodesMsg(pVgroup);
pVgroup = pVgroup->next;
}
......@@ -355,7 +355,7 @@ int32_t mgmtDropDb(SDbObj *pDb) {
if (!finished) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
mgmtSendFreeVnodeMsg(pVgroup);
mgmtSendFreeVnodesMsg(pVgroup);
pVgroup = pVgroup->next;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
......
......@@ -22,6 +22,7 @@
#include "mgmtDnode.h"
#include "mgmtBalance.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
SDnodeObj tsDnodeObj;
......@@ -596,4 +597,5 @@ bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) {
return true;
}
bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp;
\ No newline at end of file
bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp;
此差异已折叠。
......@@ -387,7 +387,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
return TSDB_CODE_OTHERS;
}
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsNormalTableSdb, pTable);
......
......@@ -390,3 +390,31 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
return numOfRows;
}
SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable) {
SDCreateTableMsg *pCreate = NULL;
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
pCreate = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else {
}
return pCreate;
}
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
SDRemoveTableMsg *pRemove = NULL;
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
pRemove = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else {
}
return pRemove;
}
\ No newline at end of file
......@@ -234,7 +234,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
}
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
mgmtSendFreeVnodeMsg(pVgroup);
mgmtSendFreeVnodesMsg(pVgroup);
sdbDeleteRow(tsVgroupSdb, pVgroup);
return 0;
......@@ -521,3 +521,63 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
pVgroup->tableList[pTable->sid] = NULL;
taosFreeId(pVgroup->idPool, pTable->sid);
}
SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) {
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) return NULL;
SVPeersMsg *pVPeers = rpcMallocCont(sizeof(SVPeersMsg));
if (pVPeers == NULL) return NULL;
pVPeers->vnode = htonl(vnode);
pVPeers->cfg = pDb->cfg;
SVnodeCfg *pCfg = &pVPeers->cfg;
pCfg->vgId = htonl(pVgroup->vgId);
pCfg->maxSessions = htonl(pCfg->maxSessions);
pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize);
pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks);
pCfg->daysPerFile = htonl(pCfg->daysPerFile);
pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1);
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
pCfg->commitTime = htonl(pCfg->commitTime);
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
SVPeerDesc *vpeerDesc = pVPeers->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip);
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
}
return pVPeers;
}
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
if (vnode < 0 || vnode >= TSDB_MAX_VNODES) {
return NULL;
}
SDnodeObj *pDnode = mgmtGetDnode(dnode);
if (pDnode == NULL) {
return NULL;
}
int32_t vgId = pDnode->vload[vnode].vgId;
return mgmtGetVgroup(vgId);
}
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMgmtDnodePort + 1};
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
ipSet.ip[i] = pVgroup->vnodeGid[i].ip;
}
return ipSet;
}
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
SRpcIpSet ipSet = {.ip = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1};
return ipSet;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册