提交 151fd310 编写于 作者: Y yihaoDeng

support compact vnode

上级 432f961e
...@@ -106,6 +106,7 @@ static int32_t validateDNodeConfig(SMiscInfo* pOptions); ...@@ -106,6 +106,7 @@ static int32_t validateDNodeConfig(SMiscInfo* pOptions);
static int32_t validateLocalConfig(SMiscInfo* pOptions); static int32_t validateLocalConfig(SMiscInfo* pOptions);
static int32_t validateColumnName(char* name); static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
...@@ -377,7 +378,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -377,7 +378,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
break; break;
} }
case TSDB_SQL_CREATE_DNODE: { case TSDB_SQL_CREATE_DNODE: {
const char* msg = "invalid host name (ip address)"; const char* msg = "invalid host name (ip address)";
...@@ -686,7 +687,13 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -686,7 +687,13 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
break; break;
} }
case TSDB_SQL_COMPACT_VNODE:{
const char* msg = "invalid compact";
if (setCompactVnodeInfo(pSql, pInfo) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
break;
}
default: default:
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression"); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
} }
...@@ -2789,7 +2796,13 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -2789,7 +2796,13 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
//STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
pCmd->command = pInfo->type;
return TSDB_CODE_SUCCESS;
}
bool validateIpAddress(const char* ip, size_t size) { bool validateIpAddress(const char* ip, size_t size) {
char tmp[128] = {0}; // buffer to build null-terminated string char tmp[128] = {0}; // buffer to build null-terminated string
assert(size < 128); assert(size < 128);
......
...@@ -47,6 +47,31 @@ static int32_t getWaitingTimeInterval(int32_t count) { ...@@ -47,6 +47,31 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return initial * ((2u)<<(count - 2)); return initial * ((2u)<<(count - 2));
} }
static int32_t vgIdCompare(const void *lhs, const void *rhs) {
int32_t left = *(int32_t *)lhs;
int32_t right = *(int32_t *)rhs;
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
static int32_t removeDupVgid(int32_t *src, int32_t sz) {
if (src == NULL || sz <= 0) {
return 0;
}
qsort(src, sz, sizeof(src[0]), vgIdCompare);
int32_t ret = 1;
for (int i = 1; i < sz; i++) {
if (src[i] != src[i - 1]) {
src[ret++] = src[i];
}
}
return ret;
}
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
...@@ -1532,6 +1557,61 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1532,6 +1557,61 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pInfo->list == NULL || taosArrayGetSize(pInfo->list) <= 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
//const char *msg = "invalid compact param";
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
SArray *pList = pInfo->list;
int32_t size = taosArrayGetSize(pList);
int32_t *result = malloc(sizeof(int32_t) * size);
if (result == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pSub = taosArrayGet(pList, i);
tVariant* pVar = &pSub->pNode->value;
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_INT) {
result[i] = (int32_t)(pVar->i64);
} else {
free(result);
return TSDB_CODE_TSC_INVALID_SQL;
}
}
int count = removeDupVgid(result, size);
pCmd->payloadLen = sizeof(SCompactMsg) + count * sizeof(int32_t);
pCmd->msgType = TSDB_MSG_TYPE_CM_COMPACT_VNODE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
free(result);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCompactMsg *pCompactMsg = (SCompactMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (tNameIsEmpty(&pTableMetaInfo->name)) {
pthread_mutex_lock(&pObj->mutex);
tstrncpy(pCompactMsg->db, pObj->db, sizeof(pCompactMsg->db));
pthread_mutex_unlock(&pObj->mutex);
} else {
tNameGetFullDbName(&pTableMetaInfo->name, pCompactMsg->db);
}
pCompactMsg->numOfVgroup = htons(count);
for (int32_t i = 0; i < count; i++) {
pCompactMsg->vgid[i] = htons(result[i]);
}
free(result);
return TSDB_CODE_SUCCESS;
}
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -2302,6 +2382,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) { ...@@ -2302,6 +2382,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
UNUSED(pSql); UNUSED(pSql);
return 0; return 0;
} }
int tscProcessCompactRsp(SSqlObj *pSql) {
UNUSED(pSql);
return TSDB_CODE_SUCCESS;
}
int tscProcessShowCreateRsp(SSqlObj *pSql) { int tscProcessShowCreateRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, 1); return tscLocalResultCommonBuilder(pSql, 1);
...@@ -2614,6 +2698,7 @@ void tscInitMsgsFp() { ...@@ -2614,6 +2698,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
...@@ -2655,6 +2740,7 @@ void tscInitMsgsFp() { ...@@ -2655,6 +2740,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
......
...@@ -51,6 +51,7 @@ enum { ...@@ -51,6 +51,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" )
TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica") TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" )
...@@ -63,6 +64,7 @@ enum { ...@@ -63,6 +64,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_COMPACT_VNODE, "compact-vnode" )
// SQL below is for read operation // SQL below is for read operation
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" )
......
...@@ -47,7 +47,8 @@ int32_t dnodeInitServer() { ...@@ -47,7 +47,8 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
......
...@@ -31,6 +31,7 @@ static void * dnodeProcessMgmtQueue(void *param); ...@@ -31,6 +31,7 @@ static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
...@@ -40,7 +41,8 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); ...@@ -40,7 +41,8 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitVMgmt() { int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE]= dnodeProcessCompactVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
...@@ -188,6 +190,15 @@ static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -188,6 +190,15 @@ static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeSync(pSyncVnode->vgId); return vnodeSync(pSyncVnode->vgId);
} }
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompactVnode = rpcMsg->pCont;
pCompactVnode->vgId = htonl(pCompactVnode->vgId);
//do nothing
dDebug("trige compact at vgid: %d", pCompactVnode->vgId);
return TSDB_CODE_SUCCESS;
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont; SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId); pDrop->vgId = htonl(pDrop->vgId);
......
...@@ -61,9 +61,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) ...@@ -61,9 +61,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" )
...@@ -84,6 +86,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" ) ...@@ -84,6 +86,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "tables-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "tables-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
...@@ -390,7 +394,7 @@ typedef struct { ...@@ -390,7 +394,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg; } SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
...@@ -775,6 +779,12 @@ typedef struct { ...@@ -775,6 +779,12 @@ typedef struct {
char payload[]; char payload[];
} SShowMsg; } SShowMsg;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
typedef struct SShowRsp { typedef struct SShowRsp {
uint64_t qhandle; uint64_t qhandle;
STableMetaMsg tableMeta; STableMetaMsg tableMeta;
......
...@@ -17,192 +17,193 @@ ...@@ -17,192 +17,193 @@
#define TDENGINE_TTOKENDEF_H #define TDENGINE_TTOKENDEF_H
#define TK_ID 1 #define TK_ID 1
#define TK_BOOL 2 #define TK_BOOL 2
#define TK_TINYINT 3 #define TK_TINYINT 3
#define TK_SMALLINT 4 #define TK_SMALLINT 4
#define TK_INTEGER 5 #define TK_INTEGER 5
#define TK_BIGINT 6 #define TK_BIGINT 6
#define TK_FLOAT 7 #define TK_FLOAT 7
#define TK_DOUBLE 8 #define TK_DOUBLE 8
#define TK_STRING 9 #define TK_STRING 9
#define TK_TIMESTAMP 10 #define TK_TIMESTAMP 10
#define TK_BINARY 11 #define TK_BINARY 11
#define TK_NCHAR 12 #define TK_NCHAR 12
#define TK_OR 13 #define TK_OR 13
#define TK_AND 14 #define TK_AND 14
#define TK_NOT 15 #define TK_NOT 15
#define TK_EQ 16 #define TK_EQ 16
#define TK_NE 17 #define TK_NE 17
#define TK_ISNULL 18 #define TK_ISNULL 18
#define TK_NOTNULL 19 #define TK_NOTNULL 19
#define TK_IS 20 #define TK_IS 20
#define TK_LIKE 21 #define TK_LIKE 21
#define TK_GLOB 22 #define TK_GLOB 22
#define TK_BETWEEN 23 #define TK_BETWEEN 23
#define TK_IN 24 #define TK_IN 24
#define TK_GT 25 #define TK_GT 25
#define TK_GE 26 #define TK_GE 26
#define TK_LT 27 #define TK_LT 27
#define TK_LE 28 #define TK_LE 28
#define TK_BITAND 29 #define TK_BITAND 29
#define TK_BITOR 30 #define TK_BITOR 30
#define TK_LSHIFT 31 #define TK_LSHIFT 31
#define TK_RSHIFT 32 #define TK_RSHIFT 32
#define TK_PLUS 33 #define TK_PLUS 33
#define TK_MINUS 34 #define TK_MINUS 34
#define TK_DIVIDE 35 #define TK_DIVIDE 35
#define TK_TIMES 36 #define TK_TIMES 36
#define TK_STAR 37 #define TK_STAR 37
#define TK_SLASH 38 #define TK_SLASH 38
#define TK_REM 39 #define TK_REM 39
#define TK_CONCAT 40 #define TK_CONCAT 40
#define TK_UMINUS 41 #define TK_UMINUS 41
#define TK_UPLUS 42 #define TK_UPLUS 42
#define TK_BITNOT 43 #define TK_BITNOT 43
#define TK_SHOW 44 #define TK_SHOW 44
#define TK_DATABASES 45 #define TK_DATABASES 45
#define TK_TOPICS 46 #define TK_TOPICS 46
#define TK_MNODES 47 #define TK_MNODES 47
#define TK_DNODES 48 #define TK_DNODES 48
#define TK_ACCOUNTS 49 #define TK_ACCOUNTS 49
#define TK_USERS 50 #define TK_USERS 50
#define TK_MODULES 51 #define TK_MODULES 51
#define TK_QUERIES 52 #define TK_QUERIES 52
#define TK_CONNECTIONS 53 #define TK_CONNECTIONS 53
#define TK_STREAMS 54 #define TK_STREAMS 54
#define TK_VARIABLES 55 #define TK_VARIABLES 55
#define TK_SCORES 56 #define TK_SCORES 56
#define TK_GRANTS 57 #define TK_GRANTS 57
#define TK_VNODES 58 #define TK_VNODES 58
#define TK_IPTOKEN 59 #define TK_IPTOKEN 59
#define TK_DOT 60 #define TK_DOT 60
#define TK_CREATE 61 #define TK_CREATE 61
#define TK_TABLE 62 #define TK_TABLE 62
#define TK_STABLE 63 #define TK_STABLE 63
#define TK_DATABASE 64 #define TK_DATABASE 64
#define TK_TABLES 65 #define TK_TABLES 65
#define TK_STABLES 66 #define TK_STABLES 66
#define TK_VGROUPS 67 #define TK_VGROUPS 67
#define TK_DROP 68 #define TK_DROP 68
#define TK_TOPIC 69 #define TK_TOPIC 69
#define TK_DNODE 70 #define TK_DNODE 70
#define TK_USER 71 #define TK_USER 71
#define TK_ACCOUNT 72 #define TK_ACCOUNT 72
#define TK_USE 73 #define TK_USE 73
#define TK_DESCRIBE 74 #define TK_DESCRIBE 74
#define TK_ALTER 75 #define TK_ALTER 75
#define TK_PASS 76 #define TK_PASS 76
#define TK_PRIVILEGE 77 #define TK_PRIVILEGE 77
#define TK_LOCAL 78 #define TK_LOCAL 78
#define TK_IF 79 #define TK_COMPACT 79
#define TK_EXISTS 80 #define TK_LP 80
#define TK_PPS 81 #define TK_RP 81
#define TK_TSERIES 82 #define TK_IF 82
#define TK_DBS 83 #define TK_EXISTS 83
#define TK_STORAGE 84 #define TK_PPS 84
#define TK_QTIME 85 #define TK_TSERIES 85
#define TK_CONNS 86 #define TK_DBS 86
#define TK_STATE 87 #define TK_STORAGE 87
#define TK_KEEP 88 #define TK_QTIME 88
#define TK_CACHE 89 #define TK_CONNS 89
#define TK_REPLICA 90 #define TK_STATE 90
#define TK_QUORUM 91 #define TK_KEEP 91
#define TK_DAYS 92 #define TK_CACHE 92
#define TK_MINROWS 93 #define TK_REPLICA 93
#define TK_MAXROWS 94 #define TK_QUORUM 94
#define TK_BLOCKS 95 #define TK_DAYS 95
#define TK_CTIME 96 #define TK_MINROWS 96
#define TK_WAL 97 #define TK_MAXROWS 97
#define TK_FSYNC 98 #define TK_BLOCKS 98
#define TK_COMP 99 #define TK_CTIME 99
#define TK_PRECISION 100 #define TK_WAL 100
#define TK_UPDATE 101 #define TK_FSYNC 101
#define TK_CACHELAST 102 #define TK_COMP 102
#define TK_PARTITIONS 103 #define TK_PRECISION 103
#define TK_LP 104 #define TK_UPDATE 104
#define TK_RP 105 #define TK_CACHELAST 105
#define TK_UNSIGNED 106 #define TK_PARTITIONS 106
#define TK_TAGS 107 #define TK_UNSIGNED 107
#define TK_USING 108 #define TK_TAGS 108
#define TK_COMMA 109 #define TK_USING 109
#define TK_AS 110 #define TK_COMMA 110
#define TK_NULL 111 #define TK_AS 111
#define TK_SELECT 112 #define TK_NULL 112
#define TK_UNION 113 #define TK_SELECT 113
#define TK_ALL 114 #define TK_UNION 114
#define TK_DISTINCT 115 #define TK_ALL 115
#define TK_FROM 116 #define TK_DISTINCT 116
#define TK_VARIABLE 117 #define TK_FROM 117
#define TK_INTERVAL 118 #define TK_VARIABLE 118
#define TK_SESSION 119 #define TK_INTERVAL 119
#define TK_FILL 120 #define TK_SESSION 120
#define TK_SLIDING 121 #define TK_FILL 121
#define TK_ORDER 122 #define TK_SLIDING 122
#define TK_BY 123 #define TK_ORDER 123
#define TK_ASC 124 #define TK_BY 124
#define TK_DESC 125 #define TK_ASC 125
#define TK_GROUP 126 #define TK_DESC 126
#define TK_HAVING 127 #define TK_GROUP 127
#define TK_LIMIT 128 #define TK_HAVING 128
#define TK_OFFSET 129 #define TK_LIMIT 129
#define TK_SLIMIT 130 #define TK_OFFSET 130
#define TK_SOFFSET 131 #define TK_SLIMIT 131
#define TK_WHERE 132 #define TK_SOFFSET 132
#define TK_NOW 133 #define TK_WHERE 133
#define TK_RESET 134 #define TK_NOW 134
#define TK_QUERY 135 #define TK_RESET 135
#define TK_SYNCDB 136 #define TK_QUERY 136
#define TK_ADD 137 #define TK_SYNCDB 137
#define TK_COLUMN 138 #define TK_ADD 138
#define TK_TAG 139 #define TK_COLUMN 139
#define TK_CHANGE 140 #define TK_TAG 140
#define TK_SET 141 #define TK_CHANGE 141
#define TK_KILL 142 #define TK_SET 142
#define TK_CONNECTION 143 #define TK_KILL 143
#define TK_STREAM 144 #define TK_CONNECTION 144
#define TK_COLON 145 #define TK_STREAM 145
#define TK_ABORT 146 #define TK_COLON 146
#define TK_AFTER 147 #define TK_ABORT 147
#define TK_ATTACH 148 #define TK_AFTER 148
#define TK_BEFORE 149 #define TK_ATTACH 149
#define TK_BEGIN 150 #define TK_BEFORE 150
#define TK_CASCADE 151 #define TK_BEGIN 151
#define TK_CLUSTER 152 #define TK_CASCADE 152
#define TK_CONFLICT 153 #define TK_CLUSTER 153
#define TK_COPY 154 #define TK_CONFLICT 154
#define TK_DEFERRED 155 #define TK_COPY 155
#define TK_DELIMITERS 156 #define TK_DEFERRED 156
#define TK_DETACH 157 #define TK_DELIMITERS 157
#define TK_EACH 158 #define TK_DETACH 158
#define TK_END 159 #define TK_EACH 159
#define TK_EXPLAIN 160 #define TK_END 160
#define TK_FAIL 161 #define TK_EXPLAIN 161
#define TK_FOR 162 #define TK_FAIL 162
#define TK_IGNORE 163 #define TK_FOR 163
#define TK_IMMEDIATE 164 #define TK_IGNORE 164
#define TK_INITIALLY 165 #define TK_IMMEDIATE 165
#define TK_INSTEAD 166 #define TK_INITIALLY 166
#define TK_MATCH 167 #define TK_INSTEAD 167
#define TK_KEY 168 #define TK_MATCH 168
#define TK_OF 169 #define TK_KEY 169
#define TK_RAISE 170 #define TK_OF 170
#define TK_REPLACE 171 #define TK_RAISE 171
#define TK_RESTRICT 172 #define TK_REPLACE 172
#define TK_ROW 173 #define TK_RESTRICT 173
#define TK_STATEMENT 174 #define TK_ROW 174
#define TK_TRIGGER 175 #define TK_STATEMENT 175
#define TK_VIEW 176 #define TK_TRIGGER 176
#define TK_SEMI 177 #define TK_VIEW 177
#define TK_NONE 178 #define TK_SEMI 178
#define TK_PREV 179 #define TK_NONE 179
#define TK_LINEAR 180 #define TK_PREV 180
#define TK_IMPORT 181 #define TK_LINEAR 181
#define TK_TBNAME 182 #define TK_IMPORT 182
#define TK_JOIN 183 #define TK_TBNAME 183
#define TK_INSERT 184 #define TK_JOIN 184
#define TK_INTO 185 #define TK_INSERT 185
#define TK_VALUES 186 #define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -51,6 +51,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); ...@@ -51,6 +51,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup);
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup); void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromIp(char *ep); SRpcEpSet mnodeGetEpSetFromIp(char *ep);
......
...@@ -51,6 +51,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -51,6 +51,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC #ifndef _TOPIC
...@@ -198,10 +199,12 @@ int32_t mnodeInitDbs() { ...@@ -198,10 +199,12 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_COMPACT_VNODE, mnodeProcessCompactMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
mDebug("table:dbs table is created"); mDebug("table:dbs table is created");
return tpInit(); return tpInit();
} }
...@@ -1207,7 +1210,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { ...@@ -1207,7 +1210,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
void *pIter = NULL; void *pIter = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
while (1) { while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup); pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) { if (pVgroup->pDb == pDb) {
...@@ -1221,6 +1224,34 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { ...@@ -1221,6 +1224,34 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int32_t count = htonl(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count);
if (buf == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < count; i++) {
buf[i] = htonl(pCompactMsg->vgid[i]);
}
// copy from mnodeSyncDb, so ugly
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) {
mnodeSendCompactVgroupMsg(pVgroup);
}
mnodeDecVgroupRef(pVgroup);
}
free(buf);
mLInfo("db:%s, trigger compact", pDb->name);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont; SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont;
mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists); mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists);
...@@ -1243,6 +1274,19 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) { ...@@ -1243,6 +1274,19 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
return mnodeSyncDb(pMsg->pDb, pMsg); return mnodeSyncDb(pMsg->pDb, pMsg);
} }
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg) {
SCompactMsg *pCompact = pMsg->rpcMsg.pCont;
mDebug("db:%s, compact is received from thandle:%p", pCompact->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCompact->db);
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping, ignore compact request", pCompact->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
return mnodeCompact(pMsg->pDb, pCompact);
}
void mnodeDropAllDbs(SAcctObj *pAcct) { void mnodeDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0; int32_t numOfDbs = 0;
...@@ -1297,4 +1341,4 @@ int32_t mnodeCompactDbs() { ...@@ -1297,4 +1341,4 @@ int32_t mnodeCompactDbs() {
mInfo("end to compact dbs table..."); mInfo("end to compact dbs table...");
return 0; return 0;
} }
\ No newline at end of file
...@@ -1193,4 +1193,4 @@ int32_t mnodeCompactWal() { ...@@ -1193,4 +1193,4 @@ int32_t mnodeCompactWal() {
sdbInfo("vgId:1, compact mnode wal success"); sdbInfo("vgId:1, compact mnode wal success");
return 0; return 0;
} }
\ No newline at end of file
...@@ -61,6 +61,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v ...@@ -61,6 +61,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessCompactVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ;
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
...@@ -238,6 +239,7 @@ int32_t mnodeInitVgroups() { ...@@ -238,6 +239,7 @@ int32_t mnodeInitVgroups() {
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessSyncVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP, mnodeProcessCompactVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp);
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg);
...@@ -977,6 +979,7 @@ static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) { ...@@ -977,6 +979,7 @@ static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) {
return pSyncVnode; return pSyncVnode;
} }
static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId); SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -989,6 +992,18 @@ static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { ...@@ -989,6 +992,18 @@ static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
dnodeSendMsgToDnode(epSet, &rpcMsg); dnodeSendMsgToDnode(epSet, &rpcMsg);
} }
static void mnodeSendCompactVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = {
.ahandle = NULL,
.pCont = pSyncVnode,
.contLen = pSyncVnode ? sizeof(SCompactVnodeMsg) : 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_COMPACT_VNODE
};
dnodeSendMsgToDnode(epSet, &rpcMsg);
}
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) { void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
...@@ -1002,6 +1017,17 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) { ...@@ -1002,6 +1017,17 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
} }
} }
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send compact all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue; //TODO(yihaoDeng): compact slave or not ?
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send compact vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendCompactVnodeMsg(pVgroup, &epSet);
}
}
static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) { static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) {
SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -1032,6 +1058,9 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -1032,6 +1058,9 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) { static void mnodeProcessSyncVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("sync vnode rsp received"); mDebug("sync vnode rsp received");
} }
static void mnodeProcessCompactVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("compact vnode rsp received");
}
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return; if (rpcMsg->ahandle == NULL) return;
...@@ -1328,4 +1357,4 @@ int32_t mnodeCompactVgroups() { ...@@ -1328,4 +1357,4 @@ int32_t mnodeCompactVgroups() {
mInfo("end to compact vgroups table..."); mInfo("end to compact vgroups table...");
return 0; return 0;
} }
\ No newline at end of file
...@@ -303,6 +303,8 @@ void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd); ...@@ -303,6 +303,8 @@ void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd);
void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip); void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip);
void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege); void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege);
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam);
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo); void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo);
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo); void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo);
......
...@@ -174,6 +174,10 @@ cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SStrToken t = {0}; setCreateD ...@@ -174,6 +174,10 @@ cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SStrToken t = {0}; setCreateD
cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);} cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);}
cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);} cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);}
////////////////////////////// COMPACT STATEMENT //////////////////////////////////////////////
cmd ::= COMPACT VNODES IN LP exprlist(Y) RP. { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y);}
// An IDENTIFIER can be a generic identifier, or one of several keywords. // An IDENTIFIER can be a generic identifier, or one of several keywords.
// Any non-standard keyword can also be an identifier. // Any non-standard keyword can also be an identifier.
// And "ids" is an identifer-or-string. // And "ids" is an identifer-or-string.
......
...@@ -1058,6 +1058,10 @@ void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken ...@@ -1058,6 +1058,10 @@ void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken
pInfo->pMiscInfo->user.passwd = *pPwd; pInfo->pMiscInfo->user.passwd = *pPwd;
} }
} }
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam) {
pInfo->type = type;
pInfo->list = pParam;
}
void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd) { void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd) {
pInfo->type = TSDB_SQL_CREATE_USER; pInfo->type = TSDB_SQL_CREATE_USER;
......
此差异已折叠。
...@@ -217,7 +217,8 @@ static SKeyword keywordTable[] = { ...@@ -217,7 +217,8 @@ static SKeyword keywordTable[] = {
{"DISTINCT", TK_DISTINCT}, {"DISTINCT", TK_DISTINCT},
{"PARTITIONS", TK_PARTITIONS}, {"PARTITIONS", TK_PARTITIONS},
{"TOPIC", TK_TOPIC}, {"TOPIC", TK_TOPIC},
{"TOPICS", TK_TOPICS} {"TOPICS", TK_TOPICS},
{"COMPACT", TK_COMPACT}
}; };
static const char isIdChar[] = { static const char isIdChar[] = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册