提交 10f78bb5 编写于 作者: S slguan

dnodeMgmt.c

上级 7fb3fd98
...@@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t ...@@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
pCmd->command = TSDB_SQL_MULTI_META; pCmd->command = TSDB_SQL_MULTI_META;
pCmd->count = 0; pCmd->count = 0;
int code = TSDB_CODE_INVALID_METER_ID; int code = TSDB_CODE_INVALID_TABLE_ID;
char *str = (char *)tblNameList; char *str = (char *)tblNameList;
SQueryInfo *pQueryInfo = NULL; SQueryInfo *pQueryInfo = NULL;
...@@ -1070,7 +1070,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t ...@@ -1070,7 +1070,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
// Check if the table name available or not // Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) { if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_INVALID_METER_ID; code = TSDB_CODE_INVALID_TABLE_ID;
sprintf(pCmd->payload, "table name is invalid"); sprintf(pCmd->payload, "table name is invalid");
return code; return code;
} }
...@@ -1080,7 +1080,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t ...@@ -1080,7 +1080,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
} }
if (++pCmd->count > TSDB_MULTI_METERMETA_MAX_NUM) { if (++pCmd->count > TSDB_MULTI_METERMETA_MAX_NUM) {
code = TSDB_CODE_INVALID_METER_ID; code = TSDB_CODE_INVALID_TABLE_ID;
sprintf(pCmd->payload, "tables over the max number"); sprintf(pCmd->payload, "tables over the max number");
return code; return code;
} }
......
...@@ -48,7 +48,7 @@ public enum TSDBError { ...@@ -48,7 +48,7 @@ public enum TSDBError {
TSDB_CODE_INVALID_VALUE(24, "invalid value"), TSDB_CODE_INVALID_VALUE(24, "invalid value"),
TSDB_CODE_REDIRECT(25, "service not available"), TSDB_CODE_REDIRECT(25, "service not available"),
TSDB_CODE_ALREADY_THERE(26, "already there"), TSDB_CODE_ALREADY_THERE(26, "already there"),
TSDB_CODE_INVALID_METER_ID(27, "invalid meter ID"), TSDB_CODE_INVALID_TABLE_ID(27, "invalid meter ID"),
TSDB_CODE_INVALID_SQL(28, "invalid SQL"), // this message often comes with additional info which will vary based on the specific error situation TSDB_CODE_INVALID_SQL(28, "invalid SQL"), // this message often comes with additional info which will vary based on the specific error situation
TSDB_CODE_NETWORK_UNAVAIL(29, "failed to connect to server"), TSDB_CODE_NETWORK_UNAVAIL(29, "failed to connect to server"),
TSDB_CODE_INVALID_MSG_LEN(30, "invalid msg len"), TSDB_CODE_INVALID_MSG_LEN(30, "invalid msg len"),
......
...@@ -25,12 +25,10 @@ extern "C" { ...@@ -25,12 +25,10 @@ extern "C" {
#include "tsched.h" #include "tsched.h"
#include "dnode.h" #include "dnode.h"
int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern void *dmQhandle; extern void *tsDnodeMgmtQhandle;
void dnodeSendVpeerCfgMsg(int32_t vnode); void dnodeSendVpeerCfgMsg(int32_t vnode);
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_UTIL_H
#define TDENGINE_DNODE_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tstatus.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
bool dnodeCheckVnodeExist(int32_t vnode);
void *dnodeGetVnodeObj(int32_t vnode);
#ifdef __cplusplus
}
#endif
#endif
...@@ -21,9 +21,10 @@ extern "C" { ...@@ -21,9 +21,10 @@ extern "C" {
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <stdbool.h>
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tstatus.h"
/* /*
* Open all Vnodes in the local data directory * Open all Vnodes in the local data directory
...@@ -38,34 +39,34 @@ int32_t dnodeCleanupVnodes(); ...@@ -38,34 +39,34 @@ int32_t dnodeCleanupVnodes();
/* /*
* Check if vnode already exists * Check if vnode already exists
*/ */
int32_t dnodeCheckVnodeExist(int vid); bool dnodeCheckVnodeExist(int32_t vid);
/* /*
* Create vnode with specified configuration and open it * Create vnode with specified configuration and open it
* if exist, config it
*/ */
//tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg); void* dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg);
void* dnodeCreateVnode(int vid, SVnodeCfg *cfg);
/* /*
* Modify vnode configuration information * Remove vnode from local repository
*/ */
int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg); int32_t dnodeDropVnode(int32_t vnode);
/* /*
* Modify vnode replication information * Get the vnode object that has been opened
*/ */
int32_t dnodeConfigVnodePeers(int vid/*, SVpeerCfgMsg *cfg*/); //tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int vid);
/* /*
* Remove vnode from local repository * get the status of vnode
*/ */
int32_t dnodeDropVnode(int vid); EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
/* /*
* Get the vnode object that has been opened * Check if vnode already exists, and table exist in this vnode
*/ */
//tsdb_repo_t* dnodeGetVnode(int vid); bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid);
void* dnodeGetVnode(int vid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -35,45 +35,26 @@ extern "C" { ...@@ -35,45 +35,26 @@ extern "C" {
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)); void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn));
/* /*
* Create noraml table with specified configuration and open it * Create table with specified configuration and open it
* if table already exist, update its schema and tag
*/ */
int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table); int32_t dnodeCreateTable(SDCreateTableMsg *table);
/* /*
* Create stream table with specified configuration and open it * Remove table from local repository
*/
int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table);
/*
* Create child table with specified configuration and open it
*/
int32_t dnodeCreateChildTable(SCreateChildTableMsg *table);
/*
* Modify normal table configuration information
*
*/
int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table);
/*
* Modify stream table configuration information
*/ */
int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table); int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid);
/* /*
* Modify child table configuration information * Create stream
* if stream already exist, update it
*/ */
int32_t dnodeAlterChildTable(SCreateChildTableMsg *table); int32_t dnodeCreateStream(SAlterStreamMsg *stream);
/* /*
* Remove all child tables of supertable from local repository * Remove all child tables of supertable from local repository
*/ */
int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid); int32_t dnodeDropSuperTable(uint64_t stableUid);
/*
* Remove table from local repository
*/
int32_t dnodeDropTable(int vid, int sid, int64_t uid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,84 +16,47 @@ ...@@ -16,84 +16,47 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "dnode.h"
#include "dnodeSystem.h"
#include "dnodeMgmt.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
#include "tsystem.h" #include "tsystem.h"
#include "mnode.h"
SMgmtObj mgmtObj; #include "dnode.h"
extern uint64_t tsCreatedTime; #include "dnodeSystem.h"
#include "dnodeMgmt.h"
int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj); #include "dnodeWrite.h"
int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen); #include "dnodeVnodeMgmt.h"
int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessTableCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj);
int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj);
void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables);
void vnodeOpenVnode(int vnode);
void vnodeCleanUpOneVnode(int vnode);
static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn); static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeInitProcessShellMsg(); static void dnodeInitProcessShellMsg();
char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { void taosSendMsgToMnodeImpFp(SSchedMsg *sched) {
char *pStart = (char *)malloc(size); char msgType = *sched->msg;
if (pStart == NULL) { char *content = sched->msg + sizeof(int32_t);
return NULL;
}
*pStart = type;
return pStart + 1;
}
char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = taosBuildRspMsgToMnodeWithSizeImp;
char *taosBuildReqMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
*pStart = type;
return pStart + 1;
}
char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = taosBuildReqMsgToMnodeWithSizeImp;
char *taosBuildRspMsgToMnodeImp(SMgmtObj *pObj, char type) { mgmtProcessMsgFromDnode(content, 0, msgType, NULL);
return taosBuildRspMsgToMnodeWithSizeImp(pObj, type, 256); rpcFreeCont(sched->msg);
} }
char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type) = taosBuildRspMsgToMnodeImp;
char *taosBuildReqMsgToMnodeImp(SMgmtObj *pObj, char type) { int32_t taosSendMsgToMnodeImp(int8_t *msg, int32_t msgLen) {
return taosBuildReqMsgToMnodeWithSizeImp(pObj, type, 256); dTrace("msg:%s is sent to mnode", taosMsg[(int32_t)(*(msg-sizeof(int32_t)))]);
}
char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type) = taosBuildReqMsgToMnodeImp;
int taosSendMsgToMnodeImp(SMgmtObj *pObj, char *msg, int msgLen) {
dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]);
/* /*
* Lite version has no message header, so minus one * Lite version has no message header, so minus one
*/ */
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.fp = mgmtProcessMsgFromDnodeSpec; schedMsg.fp = taosSendMsgToMnodeImpFp;
schedMsg.msg = msg - 1; schedMsg.msg = msg - sizeof(int32_t);
schedMsg.ahandle = NULL; schedMsg.ahandle = NULL;
schedMsg.thandle = NULL; schedMsg.thandle = NULL;
taosScheduleTask(dmQhandle, &schedMsg); taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
return 0; return 0;
} }
int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen) = taosSendMsgToMnodeImp; int32_t (*taosSendMsgToMnode)(char *msg, int32_t msgLen) = taosSendMsgToMnodeImp;
int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { int32_t taosSendSimpleRspToMnodeImp(int32_t rsptype, int32_t code) {
char *pStart = taosBuildRspMsgToMnode(0, rsptype); char *pStart = taosBuildRspMsgToMnode(0, rsptype);
if (pStart == NULL) { if (pStart == NULL) {
return 0; return 0;
...@@ -104,7 +67,7 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { ...@@ -104,7 +67,7 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) {
return 0; return 0;
} }
int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp; int (*taosSendSimpleRspToMnode)(int32_t rsptype, int32_t code) = taosSendSimpleRspToMnodeImp;
int32_t dnodeInitMgmtImp() { int32_t dnodeInitMgmtImp() {
dnodeInitProcessShellMsg(); dnodeInitProcessShellMsg();
...@@ -121,7 +84,7 @@ void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) { ...@@ -121,7 +84,7 @@ void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) {
int32_t msgType = *(int32_t*)(sched->msg); int32_t msgType = *(int32_t*)(sched->msg);
int8_t *content = sched->msg + sizeof(int32_t); int8_t *content = sched->msg + sizeof(int32_t);
dTrace("msg:%s is received from mgmt", taosMsg[msgType]); dTrace("msg:%s is received from mnode", taosMsg[msgType]);
dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL); dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL);
free(sched->msg); free(sched->msg);
...@@ -139,444 +102,118 @@ void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, ...@@ -139,444 +102,118 @@ void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType,
} }
} }
int dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { int32_t dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
int code = *pMsg; int32_t code = htonl(*((int32_t *) pMsg));
if (code == 0) { if (code == TSDB_CODE_SUCCESS) {
vnodeProcessCreateMeterMsg(pMsg + 1, msgLen - 1); SDCreateTableMsg *table = (SDCreateTableMsg *) (pMsg + sizeof(int32_t));
return dnodeCreateTable(table);
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pMsg + sizeof(int32_t));
int32_t vnode = htonl(table->vnode);
int32_t sid = htonl(table->sid);
uint64_t uid = htobe64(table->uid);
dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
return dnodeDropTable(vnode, sid, uid);
} else { } else {
STaosRsp *pRsp; dError("code:%d invalid message", code);
pRsp = (STaosRsp *)pMsg; return TSDB_CODE_INVALID_MSG;
int32_t *pint = (int32_t *)pRsp->more;
int vnode = htonl(*pint);
int sid = htonl(*(pint + 1));
dError("vid:%d, sid:%d, code:%d, meter is not configured, remove it", vnode, sid, code);
int ret = vnodeRemoveMeterObj(vnode, sid);
dTrace("vid:%d, sid:%d, meter delete ret:%d", vnode, sid, ret);
} }
return 0;
} }
int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SCreateMsg *pCreate; SDCreateTableMsg *table = (SDCreateTableMsg *) pCont;
int code = 0; int32_t code = dnodeCreateTable(table);
int vid; rpcSendSimpleRsp(pConn, code);
SVnodeObj * pVnode;
pCreate = (SCreateMsg *)pMsg;
vid = htons(pCreate->vnode);
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0) {
dError("vid:%d, not activated", vid);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
// if (pVnode->syncStatus == TSDB_VN_SYNC_STATUS_SYNCING) {
// code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen);
// dTrace("vid:%d, create msg is saved into sync queue", vid);
// } else {
code = vnodeProcessCreateMeterMsg(pMsg, msgLen);
// }
_over:
taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP, code);
return code; return code;
} }
int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SAlterStreamMsg *pAlter; SDCreateTableMsg *table = (SDCreateTableMsg *) pCont;
int code = 0; int32_t code = dnodeCreateTable(table);
int vid, sid; rpcSendSimpleRsp(pConn, code);
SVnodeObj * pVnode;
pAlter = (SAlterStreamMsg *)pMsg;
vid = htons(pAlter->vnode);
sid = htonl(pAlter->sid);
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0 || pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pAlter->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
if (pAlter->sid >= pVnode->cfg.maxSessions || pAlter->sid < 0) {
dError("vid:%d sid:%d uid:%" PRIu64 ", sid is out of range", pAlter->vnode, pAlter->sid, pAlter->uid);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _over;
}
SMeterObj *pMeterObj = vnodeList[vid].meterList[sid];
if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) {
dError("vid:%d sid:%d, not active table", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _over;
}
pMeterObj->status = pAlter->status;
if (pMeterObj->status == 1) {
if (pAlter->stime > pMeterObj->lastKey) // starting time can be specified
pMeterObj->lastKey = pAlter->stime;
vnodeCreateStream(pMeterObj);
} else {
vnodeRemoveStream(pMeterObj);
}
vnodeSaveMeterObjToFile(pMeterObj);
_over:
taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_ALTER_STREAM_RSP, code);
return code; return code;
} }
int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) { int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, void *pConn) {
int code; SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont;
SMeterObj * pObj = NULL; int32_t vnode = htonl(table->vnode);
SConnSec connSec; int32_t sid = htonl(table->sid);
SCreateMsg *pCreate = (SCreateMsg *)pMsg; uint64_t uid = htobe64(table->uid);
pCreate->vnode = htons(pCreate->vnode);
pCreate->sid = htonl(pCreate->sid);
pCreate->lastCreate = htobe64(pCreate->lastCreate);
pCreate->timeStamp = htobe64(pCreate->timeStamp);
if (pCreate->vnode >= TSDB_MAX_VNODES || pCreate->vnode < 0) {
dError("vid:%d is out of range", pCreate->vnode);
code = TSDB_CODE_INVALID_VNODE_ID;
goto _create_over;
}
SVnodeObj *pVnode = vnodeList + pCreate->vnode;
if (pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pCreate->vnode);
vnodeSendVpeerCfgMsg(pCreate->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _create_over;
}
if (pCreate->sid >= pVnode->cfg.maxSessions || pCreate->sid < 0) {
dError("vid:%d sid:%d id:%s, sid is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _create_over;
}
pCreate->numOfColumns = htons(pCreate->numOfColumns);
if (pCreate->numOfColumns <= 0) {
dTrace("vid:%d sid:%d id:%s, numOfColumns is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId);
code = TSDB_CODE_OTHERS;
goto _create_over;
}
pCreate->sqlLen = htons(pCreate->sqlLen);
pObj = (SMeterObj *)calloc(1, sizeof(SMeterObj) + pCreate->sqlLen + 1);
if (pObj == NULL) {
dError("vid:%d sid:%d id:%s, no memory to allocate meterObj", pCreate->vnode, pCreate->sid, pCreate->meterId);
code = TSDB_CODE_NO_RESOURCE;
goto _create_over;
}
/*
* memory alignment may cause holes in SColumn struct which are not assigned any value
* therefore, we could not use memcmp to compare whether two SColumns are equal or not.
* So, we need to set the memory to 0 when allocating memory.
*/
pObj->schema = (SColumn *)calloc(1, pCreate->numOfColumns * sizeof(SColumn));
pObj->vnode = pCreate->vnode;
pObj->sid = pCreate->sid;
pObj->uid = pCreate->uid;
memcpy(pObj->meterId, pCreate->meterId, TSDB_TABLE_ID_LEN);
pObj->numOfColumns = pCreate->numOfColumns;
pObj->timeStamp = pCreate->timeStamp;
pObj->sversion = htonl(pCreate->sversion);
pObj->maxBytes = 0;
for (int i = 0; i < pObj->numOfColumns; ++i) {
pObj->schema[i].type = pCreate->schema[i].type;
pObj->schema[i].bytes = htons(pCreate->schema[i].bytes);
pObj->schema[i].colId = htons(pCreate->schema[i].colId);
pObj->bytesPerPoint += pObj->schema[i].bytes;
if (pObj->maxBytes < pObj->schema[i].bytes) pObj->maxBytes = pObj->schema[i].bytes;
}
if (pCreate->sqlLen > 0) {
pObj->sqlLen = pCreate->sqlLen;
pObj->pSql = ((char *)pObj) + sizeof(SMeterObj);
memcpy(pObj->pSql, (char *)pCreate->schema + pCreate->numOfColumns * sizeof(SMColumn), pCreate->sqlLen);
pObj->pSql[pCreate->sqlLen] = 0;
}
pObj->pointsPerFileBlock = pVnode->cfg.rowsInFileBlock;
if (sizeof(TSKEY) != pObj->schema[0].bytes) {
dError("key length is not matched, required key length:%d", sizeof(TSKEY));
code = TSDB_CODE_OTHERS;
goto _create_over;
}
// security info shall be saved here
connSec.spi = pCreate->spi;
connSec.encrypt = pCreate->encrypt;
memcpy(connSec.secret, pCreate->secret, TSDB_KEY_LEN);
memcpy(connSec.cipheringKey, pCreate->cipheringKey, TSDB_KEY_LEN);
code = vnodeCreateMeterObj(pObj, &connSec);
_create_over:
if (code != TSDB_CODE_SUCCESS) {
dTrace("vid:%d sid:%d id:%s, failed to create meterObj", pCreate->vnode, pCreate->sid, pCreate->meterId);
tfree(pObj);
}
dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
int32_t code = dnodeDropTable(vnode, sid, uid);
rpcSendSimpleRsp(pConn, code);
return code; return code;
} }
int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, void *pConn) {
SMeterObj * pObj; int32_t code = htonl(*((int32_t *) pCont));
SRemoveMeterMsg *pRemove;
int code = 0;
pRemove = (SRemoveMeterMsg *)pMsg; if (code == TSDB_CODE_SUCCESS) {
pRemove->vnode = htons(pRemove->vnode); SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t));
pRemove->sid = htonl(pRemove->sid); int32_t vnode = htonl(vpeer->vnode);
return dnodeCreateVnode(vnode, vpeer);
if (pRemove->vnode < 0 || pRemove->vnode >= TSDB_MAX_VNODES) { } else if (code == TSDB_CODE_INVALID_VNODE_ID) {
dWarn("vid:%d sid:%d, already removed", pRemove->vnode, pRemove->sid); SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t));
goto _remove_over; int32_t vnode = htonl(vpeer->vnode);
} dError("vnode:%d, not exist, remove it", vnode);
return dnodeDropVnode(vnode);
if (vnodeList[pRemove->vnode].meterList == NULL) goto _remove_over;
pObj = vnodeList[pRemove->vnode].meterList[pRemove->sid];
if (pObj == NULL) goto _remove_over;
if (memcmp(pObj->meterId, pRemove->meterId, TSDB_TABLE_ID_LEN) != 0) {
dWarn("vid:%d sid:%d id:%s, remove ID:%s, meter ID not matched", pObj->vnode, pObj->sid, pObj->meterId,
pRemove->meterId);
goto _remove_over;
}
if (vnodeRemoveMeterObj(pRemove->vnode, pRemove->sid) == TSDB_CODE_ACTION_IN_PROGRESS) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
goto _remove_over;
}
dTrace("vid:%d sid:%d id:%s, meterObj is removed", pRemove->vnode, pRemove->sid, pRemove->meterId);
_remove_over:
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_REMOVE_RSP, code);
return 0;
}
int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
SVPeersMsg *pMsg = (SVPeersMsg *)msg;
int i, vnode;
vnode = htonl(pMsg->vnode);
if (vnode >= TSDB_MAX_VNODES) {
dError("vid:%d, vnode is out of range", vnode);
return -1;
}
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_CREATING) {
dTrace("vid:%d, vnode is still under creating", vnode);
return 0;
}
SVnodeCfg *pCfg = &pMsg->cfg;
pCfg->vgId = htonl(pCfg->vgId);
pCfg->maxSessions = htonl(pCfg->maxSessions);
pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize);
pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks);
pCfg->daysPerFile = htonl(pCfg->daysPerFile);
pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1);
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
pCfg->commitTime = htonl(pCfg->commitTime);
pCfg->blocksPerMeter = htons(pCfg->blocksPerMeter);
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
if (pCfg->replications > 0) {
dPrint("vid:%d, vpeer cfg received, replica:%d session:%d, vnodeList replica:%d session:%d, acct:%s db:%s",
vnode, pCfg->replications, pCfg->maxSessions, vnodeList[vnode].cfg.replications, vnodeList[vnode].cfg.maxSessions,
pCfg->acct, pCfg->db);
for (i = 0; i < pCfg->replications; ++i) {
pMsg->vpeerDesc[i].vnode = htonl(pMsg->vpeerDesc[i].vnode);
pMsg->vpeerDesc[i].ip = htonl(pMsg->vpeerDesc[i].ip);
dPrint("vid:%d, vpeer:%d ip:0x%x vid:%d ", vnode, i, pMsg->vpeerDesc[i].ip, pMsg->vpeerDesc[i].vnode);
}
}
if (vnodeList[vnode].cfg.maxSessions == 0) {
dPrint("vid:%d, vnode is empty", vnode);
if (pCfg->maxSessions > 0) {
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_OFFLINE) {
dPrint("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc);
} else {
dPrint("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS;
}
}
} else {
dPrint("vid:%d, vnode is not empty", vnode);
if (pCfg->maxSessions > 0) {
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_DELETING) {
dPrint("vid:%d, status:%s, wait vnode delete finished", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
} else {
dPrint("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeCleanUpOneVnode(vnode);
}
vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc);
vnodeSaveVnodeCfg(vnode, pCfg, pMsg->vpeerDesc);
/*
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeUpdateHeadFile(vnode, vnodeList[vnode].cfg.maxSessions, pCfg->maxSessions);
vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions;
vnodeOpenVnode(vnode);
}
*/
}
return 0;
} else { } else {
dPrint("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dError("code:%d invalid message", code);
vnodeRemoveVnode(vnode); return TSDB_CODE_INVALID_MSG;
}
} }
return 0;
} }
int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, void *pConn) {
STaosRsp *pRsp; SVPeersMsg *vpeer = (SVPeersMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
pRsp = (STaosRsp *)msg;
if (pRsp->code == 0) {
vnodeProcessVPeerCfg(pRsp->more, msgLen - sizeof(STaosRsp), pMgmtObj);
} else {
int32_t *pint = (int32_t *)pRsp->more;
int vnode = htonl(*pint);
if (vnode < TSDB_MAX_VNODES && vnodeList[vnode].lastKey != 0) {
dError("vnode:%d not configured, it shall be empty, code:%d", vnode, pRsp->code);
vnodeRemoveVnode(vnode);
} else {
dError("vnode:%d is invalid, code:%d", vnode, pRsp->code);
}
}
return 0;
}
int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
int code = 0;
code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj);
char * pStart;
STaosRsp * pRsp;
SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg;
pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP); dPrint("vnode:%d, start to config", vnode);
if (pStart == NULL) return -1;
pRsp = (STaosRsp *)pStart;
pRsp->code = code;
memcpy(pRsp->more, pVPeersMsg->cfg.db, TSDB_DB_NAME_LEN);
msgLen = sizeof(STaosRsp) + TSDB_DB_NAME_LEN;
taosSendMsgToMnode(pMgmtObj, pStart, msgLen);
int32_t code = dnodeCreateVnode(vnode, vpeer);
rpcSendSimpleRsp(pConn, code);
return code; return code;
} }
int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SFreeVnodeMsg *pFree; SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
pFree = (SFreeVnodeMsg *)pMsg; dPrint("vnode:%d, remove it", vnode);
pFree->vnode = htons(pFree->vnode);
if (pFree->vnode < 0 || pFree->vnode >= TSDB_MAX_VNODES) { int32_t code = dnodeDropVnode(vnode);
dWarn("vid:%d, out of range", pFree->vnode); rpcSendSimpleRsp(pConn, code);
return -1;
}
dTrace("vid:%d, receive free vnode message", pFree->vnode); return code;
int32_t code = vnodeRemoveVnode(pFree->vnode);
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP, code);
return 0;
} }
int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SCfgMsg *pCfg = (SCfgMsg *)cont; SCfgMsg *pCfg = (SCfgMsg *)pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config);
int code = tsCfgDynamicOptions(pCfg->config); rpcSendSimpleRsp(pConn, code);
return code;
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_CFG_RSP, code);
return 0;
} }
void dnodeSendVpeerCfgMsg(int32_t vnode) { void dnodeSendVpeerCfgMsg(int32_t vnode) {
char * pMsg, *pStart; SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg));
int msgLen; if (cfg == NULL) {
SVpeerCfgMsg *pCfg; return;
SMgmtObj * pObj = &mgmtObj; }
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VNODE_CFG);
if (pStart == NULL) return;
pMsg = pStart;
pCfg = (SVpeerCfgMsg *)pMsg;
pCfg->vnode = htonl(vnode);
pMsg += sizeof(SVpeerCfgMsg);
msgLen = pMsg - pStart; cfg->vnode = htonl(vnode);
taosSendMsgToMnode(pObj, pStart, msgLen); taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg));
} }
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
char * pMsg, *pStart; SMeterCfgMsg *cfg = (SMeterCfgMsg *) rpcMallocCont(sizeof(SMeterCfgMsg));
int msgLen; if (cfg == NULL) {
SMeterCfgMsg *pCfg; return;
SMgmtObj * pObj = &mgmtObj; }
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_TABLE_CFG);
if (pStart == NULL) return -1;
pMsg = pStart;
pCfg = (SMeterCfgMsg *)pMsg;
pCfg->vnode = htonl(vnode);
pCfg->sid = htonl(sid);
pMsg += sizeof(SMeterCfgMsg);
msgLen = pMsg - pStart;
return taosSendMsgToMnode(pObj, pStart, msgLen);
}
cfg->vnode = htonl(vnode);
taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg));
}
void dnodeInitProcessShellMsg() { void dnodeInitProcessShellMsg() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeUtil.h" #include "dnodeVnodeMgmt.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn); static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn);
......
...@@ -53,7 +53,7 @@ static int32_t dnodeInitTmrCtl(); ...@@ -53,7 +53,7 @@ static int32_t dnodeInitTmrCtl();
void *tsStatusTimer = NULL; void *tsStatusTimer = NULL;
void *vnodeTmrCtrl; void *vnodeTmrCtrl;
void **tsRpcQhandle; void **tsRpcQhandle;
void *dmQhandle; void *tsDnodeMgmtQhandle;
void *tsQueryQhandle; void *tsQueryQhandle;
int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1; int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int32_t tsMaxQueues; int32_t tsMaxQueues;
...@@ -298,7 +298,7 @@ static int32_t dnodeInitRpcQHandle() { ...@@ -298,7 +298,7 @@ static int32_t dnodeInitRpcQHandle() {
tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
} }
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); tsDnodeMgmtQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
return 0; return 0;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeUtil.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckVnodeExist(int32_t vnode) {
return true;
}
...@@ -14,4 +14,20 @@ ...@@ -14,4 +14,20 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "taoserror.h"
#include "dnodeVnodeMgmt.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckVnodeExist(int32_t vnode) {
return true;
}
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
return true;
}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h"
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) {
SShellSubmitRspMsg result = {0}; SShellSubmitRspMsg result = {0};
...@@ -32,35 +33,38 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe ...@@ -32,35 +33,38 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe
//TODO: submit implementation //TODO: submit implementation
} }
int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) { int32_t dnodeCreateTable(SDCreateTableMsg *table) {
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table) {
return 0;
}
int32_t dnodeCreateChildTable(SCreateChildTableMsg *table) {
return 0;
}
int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table) { /*
return 0; * Remove table from local repository
*/
int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid) {
return TSDB_CODE_SUCCESS;
} }
int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table) { /*
return 0; * Create stream
} * if stream already exist, update it
*/
int32_t dnodeCreateStream(SAlterStreamMsg *stream) {
int32_t vnode = htonl(stream->vnode);
int32_t sid = htonl(stream->sid);
uint64_t uid = htobe64(stream->uid);
int32_t dnodeAlterChildTable(SCreateChildTableMsg *table) { if (!dnodeCheckTableExist(vnode, sid, uid)) {
return 0; return TSDB_CODE_INVALID_TABLE;
} }
int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid) { //TODO create or remove stream
return 0;
} }
int32_t dnodeDropTable(int vid, int sid, int64_t uid) { /*
return 0; * Remove all child tables of supertable from local repository
*/
int32_t dnodeDropSuperTable(uint64_t stableUid) {
return TSDB_CODE_SUCCESS;
} }
...@@ -49,7 +49,6 @@ extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int si ...@@ -49,7 +49,6 @@ extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int si
extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size); extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type); extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type);
extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type); extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type);
extern int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen);
extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code); extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code);
extern void (*dnodeInitMgmtIp)(); extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmt)(); extern int (*dnodeInitMgmt)();
......
...@@ -335,6 +335,7 @@ typedef struct { ...@@ -335,6 +335,7 @@ typedef struct {
char payload[]; /* payload for wildcard match in show tables */ char payload[]; /* payload for wildcard match in show tables */
} SShowObj; } SShowObj;
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -161,6 +161,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode s ...@@ -161,6 +161,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode s
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources") TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources")
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
......
...@@ -313,72 +313,28 @@ typedef struct SSchema { ...@@ -313,72 +313,28 @@ typedef struct SSchema {
short bytes; short bytes;
} SSchema; } SSchema;
typedef struct SMColumn { typedef struct {
int8_t type; int8_t type;
int16_t colId; int16_t colId;
int16_t bytes; int16_t bytes;
} SMColumn; } SDTableColumn;
typedef struct {
int32_t size;
int8_t* data;
} SVariableMsg;
typedef struct {
short vnode;
int32_t sid;
uint64_t uid;
char spi;
char encrypt;
char meterId[TSDB_TABLE_ID_LEN];
char secret[TSDB_KEY_LEN];
char cipheringKey[TSDB_KEY_LEN];
uint64_t timeStamp;
uint64_t lastCreate;
short numOfColumns;
short sqlLen; // SQL string is after schema
char reserved[16];
int32_t sversion;
SMColumn schema[];
} SCreateMsg;
typedef struct { typedef struct {
int32_t vnode; int32_t vnode;
int32_t sid; int32_t sid;
uint64_t uid; uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1]; uint64_t superTableUid;
char superTableId[TSDB_TABLE_ID_LEN + 1]; int32_t tableType;
uint64_t createdTime;
int32_t sversion; int32_t sversion;
int16_t numOfColumns; int16_t numOfColumns;
int16_t numOfTags; int16_t numOfTags;
int32_t tagDataLen; int32_t tagDataLen;
int8_t data[]; int32_t sqlDataLen;
} SCreateChildTableMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime; uint64_t createdTime;
int32_t sversion;
int16_t numOfColumns;
int8_t data[];
} SCreateNormalTableMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime; char superTableId[TSDB_TABLE_ID_LEN + 1];
int32_t sversion;
int16_t numOfColumns;
int32_t sqlLen;
int8_t data[]; int8_t data[];
} SCreateStreamTableMsg; } SDCreateTableMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_ID_LEN]; char db[TSDB_TABLE_ID_LEN];
...@@ -468,10 +424,10 @@ typedef struct { ...@@ -468,10 +424,10 @@ typedef struct {
int32_t sid; int32_t sid;
uint64_t uid; uint64_t uid;
char meterId[TSDB_TABLE_ID_LEN]; char meterId[TSDB_TABLE_ID_LEN];
} SRemoveMeterMsg; } SDRemoveTableMsg;
typedef struct { typedef struct {
short vnode; int32_t vnode;
} SFreeVnodeMsg; } SFreeVnodeMsg;
typedef struct SColIndexEx { typedef struct SColIndexEx {
...@@ -923,11 +879,11 @@ typedef struct { ...@@ -923,11 +879,11 @@ typedef struct {
} SKillQuery, SKillStream, SKillConnection; } SKillQuery, SKillStream, SKillConnection;
typedef struct { typedef struct {
short vnode; int32_t vnode;
int32_t sid; int32_t sid;
uint64_t uid; uint64_t uid;
uint64_t stime; // stream starting time uint64_t stime; // stream starting time
char status; int32_t status;
} SAlterStreamMsg; } SAlterStreamMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -78,7 +78,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -78,7 +78,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
pMsg = mgmtBuildCreateMeterIe(pTable, pMsg, vnode); pMsg = mgmtBuildCreateMeterIe(pTable, pMsg, vnode);
} else { } else {
mTrace("dnode:%s, vnode:%d sid:%d, meter not there", taosIpStr(pObj->privateIp), vnode, sid); mTrace("dnode:%s, vnode:%d sid:%d, meter not there", taosIpStr(pObj->privateIp), vnode, sid);
*pMsg = TSDB_CODE_INVALID_METER_ID; *pMsg = TSDB_CODE_INVALID_TABLE_ID;
pMsg++; pMsg++;
*(int32_t *)pMsg = htonl(vnode); *(int32_t *)pMsg = htonl(vnode);
...@@ -307,7 +307,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { ...@@ -307,7 +307,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
} }
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
SRemoveMeterMsg *pRemove; SDRemoveTableMsg *pRemove;
char * pMsg, *pStart; char * pMsg, *pStart;
int i, msgLen = 0; int i, msgLen = 0;
SDnodeObj * pObj; SDnodeObj * pObj;
...@@ -326,12 +326,12 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { ...@@ -326,12 +326,12 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
if (pStart == NULL) continue; if (pStart == NULL) continue;
pMsg = pStart; pMsg = pStart;
pRemove = (SRemoveMeterMsg *)pMsg; pRemove = (SDRemoveTableMsg *)pMsg;
pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode); pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode);
pRemove->sid = htonl(pTable->gid.sid); pRemove->sid = htonl(pTable->gid.sid);
memcpy(pRemove->meterId, pTable->meterId, TSDB_TABLE_ID_LEN); memcpy(pRemove->meterId, pTable->meterId, TSDB_TABLE_ID_LEN);
pMsg += sizeof(SRemoveMeterMsg); pMsg += sizeof(SDRemoveTableMsg);
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen); taosSendMsgToDnode(pObj, pStart, msgLen);
...@@ -559,7 +559,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { ...@@ -559,7 +559,7 @@ int mgmtSendCfgDnodeMsg(char *cont) {
* functions for communicate between dnode and mnode * functions for communicate between dnode and mnode
*/ */
extern void *dmQhandle; extern void *tsDnodeMgmtQhandle;
void * mgmtStatusTimer = NULL; void * mgmtStatusTimer = NULL;
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
...@@ -608,7 +608,7 @@ int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) { ...@@ -608,7 +608,7 @@ int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) {
schedMsg.msg = msg - 1; schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL; schedMsg.ahandle = NULL;
schedMsg.thandle = NULL; schedMsg.thandle = NULL;
taosScheduleTask(dmQhandle, &schedMsg); taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
return 0; return 0;
} }
...@@ -682,13 +682,3 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { ...@@ -682,13 +682,3 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) {
*/ */
} }
void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp; void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp;
void mgmtProcessMsgFromDnodeSpecImp(SSchedMsg *sched) {
char msgType = *sched->msg;
char *content = sched->msg + 1;
mTrace("msg:%s is received from dnode", taosMsg[(uint8_t)msgType]);
mgmtProcessMsgFromDnode(content, 0, msgType, mgmtGetDnode(0));
free(sched->msg);
}
void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched) = mgmtProcessMsgFromDnodeSpecImp;
...@@ -296,11 +296,11 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -296,11 +296,11 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
if (pShow->payloadLen > 0 ) { if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload); pTable = mgmtGetTable(pShow->payload);
if (NULL == pTable) { if (NULL == pTable) {
return TSDB_CODE_INVALID_METER_ID; return TSDB_CODE_INVALID_TABLE_ID;
} }
pVgroup = mgmtGetVgroup(pTable->gid.vgId); pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (NULL == pVgroup) return TSDB_CODE_INVALID_METER_ID; if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册