提交 86bbd50e 编写于 作者: Y yihaoDeng

[TD-3086]<feature> merge develop

...@@ -951,7 +951,10 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c ...@@ -951,7 +951,10 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c
} }
} }
assert(colIdx == -1 || (colIdx >= 0 && colIdx < pBlock->numOfParams)); if (!(colIdx == -1 || (colIdx >= 0 && colIdx < pBlock->numOfParams))) {
tscError("0x%"PRIx64" invalid colIdx:%d", pStmt->pSql->self, colIdx);
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "invalid param colIdx");
}
uint32_t totalDataSize = sizeof(SSubmitBlk) + (pCmd->batchSize + rowNum) * pBlock->rowSize; uint32_t totalDataSize = sizeof(SSubmitBlk) + (pCmd->batchSize + rowNum) * pBlock->rowSize;
if (totalDataSize > pBlock->nAllocSize) { if (totalDataSize > pBlock->nAllocSize) {
...@@ -1462,6 +1465,11 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -1462,6 +1465,11 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED); STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
} }
if (sql == NULL) {
tscError("sql is NULL");
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "sql is NULL"));
}
if (pStmt->last != STMT_INIT) { if (pStmt->last != STMT_INIT) {
tscError("prepare status error, last:%d", pStmt->last); tscError("prepare status error, last:%d", pStmt->last);
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "prepare status error")); STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "prepare status error"));
...@@ -1736,7 +1744,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in ...@@ -1736,7 +1744,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
STMT_RET(TSDB_CODE_TSC_DISCONNECTED); STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
} }
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX) { if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX || colIdx < 0) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self); tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "invalid bind param")); STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "invalid bind param"));
} }
......
...@@ -111,6 +111,7 @@ static int32_t validateDNodeConfig(SMiscInfo* pOptions); ...@@ -111,6 +111,7 @@ static int32_t validateDNodeConfig(SMiscInfo* pOptions);
static int32_t validateLocalConfig(SMiscInfo* pOptions); static int32_t validateLocalConfig(SMiscInfo* pOptions);
static int32_t validateColumnName(char* name); static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
...@@ -506,7 +507,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -506,7 +507,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
break; break;
} }
case TSDB_SQL_CREATE_DNODE: { case TSDB_SQL_CREATE_DNODE: {
const char* msg = "invalid host name (ip address)"; const char* msg = "invalid host name (ip address)";
...@@ -802,7 +803,13 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -802,7 +803,13 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
break; break;
} }
case TSDB_SQL_COMPACT_VNODE:{
const char* msg = "invalid compact";
if (setCompactVnodeInfo(pSql, pInfo) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
break;
}
default: default:
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression"); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
} }
...@@ -2963,7 +2970,12 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -2963,7 +2970,12 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type;
return TSDB_CODE_SUCCESS;
}
bool validateIpAddress(const char* ip, size_t size) { bool validateIpAddress(const char* ip, size_t size) {
char tmp[128] = {0}; // buffer to build null-terminated string char tmp[128] = {0}; // buffer to build null-terminated string
assert(size < 128); assert(size < 128);
...@@ -4881,7 +4893,10 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t ...@@ -4881,7 +4893,10 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t
win->skey = val; win->skey = val;
} else if (optr == TK_EQ) { } else if (optr == TK_EQ) {
win->ekey = win->skey = val; win->ekey = win->skey = val;
} else if (optr == TK_NE) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -47,6 +47,31 @@ static int32_t getWaitingTimeInterval(int32_t count) { ...@@ -47,6 +47,31 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return initial * ((2u)<<(count - 2)); return initial * ((2u)<<(count - 2));
} }
static int32_t vgIdCompare(const void *lhs, const void *rhs) {
int32_t left = *(int32_t *)lhs;
int32_t right = *(int32_t *)rhs;
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
static int32_t removeDupVgid(int32_t *src, int32_t sz) {
if (src == NULL || sz <= 0) {
return 0;
}
qsort(src, sz, sizeof(src[0]), vgIdCompare);
int32_t ret = 1;
for (int i = 1; i < sz; i++) {
if (src[i] != src[i - 1]) {
src[ret++] = src[i];
}
}
return ret;
}
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
...@@ -1515,6 +1540,60 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1515,6 +1540,60 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pInfo->list == NULL || taosArrayGetSize(pInfo->list) <= 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
SArray *pList = pInfo->list;
int32_t size = (int32_t)taosArrayGetSize(pList);
int32_t *result = malloc(sizeof(int32_t) * size);
if (result == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pSub = taosArrayGet(pList, i);
tVariant* pVar = &pSub->pNode->value;
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) {
result[i] = (int32_t)(pVar->i64);
} else {
free(result);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
int count = removeDupVgid(result, size);
pCmd->payloadLen = sizeof(SCompactMsg) + count * sizeof(int32_t);
pCmd->msgType = TSDB_MSG_TYPE_CM_COMPACT_VNODE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
free(result);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCompactMsg *pCompactMsg = (SCompactMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (tNameIsEmpty(&pTableMetaInfo->name)) {
pthread_mutex_lock(&pObj->mutex);
tstrncpy(pCompactMsg->db, pObj->db, sizeof(pCompactMsg->db));
pthread_mutex_unlock(&pObj->mutex);
} else {
tNameGetFullDbName(&pTableMetaInfo->name, pCompactMsg->db);
}
pCompactMsg->numOfVgroup = htons(count);
for (int32_t i = 0; i < count; i++) {
pCompactMsg->vgid[i] = htons(result[i]);
}
free(result);
return TSDB_CODE_SUCCESS;
}
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -2268,6 +2347,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) { ...@@ -2268,6 +2347,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
UNUSED(pSql); UNUSED(pSql);
return 0; return 0;
} }
int tscProcessCompactRsp(SSqlObj *pSql) {
UNUSED(pSql);
return TSDB_CODE_SUCCESS;
}
int tscProcessShowCreateRsp(SSqlObj *pSql) { int tscProcessShowCreateRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, 1); return tscLocalResultCommonBuilder(pSql, 1);
...@@ -2654,6 +2737,7 @@ void tscInitMsgsFp() { ...@@ -2654,6 +2737,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
...@@ -2694,6 +2778,7 @@ void tscInitMsgsFp() { ...@@ -2694,6 +2778,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
......
...@@ -3152,6 +3152,13 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3152,6 +3152,13 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
// it is the failure retry insert // it is the failure retry insert
if (pSql->pSubs != NULL) { if (pSql->pSubs != NULL) {
int32_t blockNum = (int32_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks);
if (pSql->subState.numOfSub != blockNum) {
tscError("0x%"PRIx64" sub num:%d is not same with data block num:%d", pSql->self, pSql->subState.numOfSub, blockNum);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return pRes->code;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter)); SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
......
...@@ -51,6 +51,7 @@ enum { ...@@ -51,6 +51,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" )
TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica") TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" )
...@@ -63,6 +64,7 @@ enum { ...@@ -63,6 +64,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_COMPACT_VNODE, "compact-vnode" )
// SQL below is for read operation // SQL below is for read operation
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" )
......
...@@ -47,7 +47,8 @@ int32_t dnodeInitServer() { ...@@ -47,7 +47,8 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
......
...@@ -61,6 +61,7 @@ int32_t dnodeInitShell() { ...@@ -61,6 +61,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]= dnodeDispatchToMWriteQueue;
// the following message shall be treated as mnode query // the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue;
......
...@@ -31,6 +31,7 @@ static void * dnodeProcessMgmtQueue(void *param); ...@@ -31,6 +31,7 @@ static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
...@@ -40,7 +41,8 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); ...@@ -40,7 +41,8 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitVMgmt() { int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE]= dnodeProcessCompactVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
...@@ -187,6 +189,12 @@ static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -187,6 +189,12 @@ static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeSync(pSyncVnode->vgId); return vnodeSync(pSyncVnode->vgId);
} }
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompactVnode = rpcMsg->pCont;
pCompactVnode->vgId = htonl(pCompactVnode->vgId);
return vnodeCompact(pCompactVnode->vgId);
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont; SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId); pDrop->vgId = htonl(pDrop->vgId);
......
...@@ -61,9 +61,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) ...@@ -61,9 +61,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" )
...@@ -84,6 +86,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" ) ...@@ -84,6 +86,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
...@@ -393,7 +396,7 @@ typedef struct { ...@@ -393,7 +396,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg; } SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
...@@ -539,7 +542,7 @@ typedef struct { ...@@ -539,7 +542,7 @@ typedef struct {
uint8_t status; uint8_t status;
uint8_t role; uint8_t role;
uint8_t replica; uint8_t replica;
uint8_t reserved; uint8_t compact;
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
...@@ -781,6 +784,12 @@ typedef struct { ...@@ -781,6 +784,12 @@ typedef struct {
char payload[]; char payload[];
} SShowMsg; } SShowMsg;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
typedef struct SShowRsp { typedef struct SShowRsp {
uint64_t qhandle; uint64_t qhandle;
STableMetaMsg tableMeta; STableMetaMsg tableMeta;
......
...@@ -94,116 +94,117 @@ ...@@ -94,116 +94,117 @@
#define TK_PASS 76 #define TK_PASS 76
#define TK_PRIVILEGE 77 #define TK_PRIVILEGE 77
#define TK_LOCAL 78 #define TK_LOCAL 78
#define TK_IF 79 #define TK_COMPACT 79
#define TK_EXISTS 80 #define TK_LP 80
#define TK_PPS 81 #define TK_RP 81
#define TK_TSERIES 82 #define TK_IF 82
#define TK_DBS 83 #define TK_EXISTS 83
#define TK_STORAGE 84 #define TK_PPS 84
#define TK_QTIME 85 #define TK_TSERIES 85
#define TK_CONNS 86 #define TK_DBS 86
#define TK_STATE 87 #define TK_STORAGE 87
#define TK_COMMA 88 #define TK_QTIME 88
#define TK_KEEP 89 #define TK_CONNS 89
#define TK_CACHE 90 #define TK_STATE 90
#define TK_REPLICA 91 #define TK_COMMA 91
#define TK_QUORUM 92 #define TK_KEEP 92
#define TK_DAYS 93 #define TK_CACHE 93
#define TK_MINROWS 94 #define TK_REPLICA 94
#define TK_MAXROWS 95 #define TK_QUORUM 95
#define TK_BLOCKS 96 #define TK_DAYS 96
#define TK_CTIME 97 #define TK_MINROWS 97
#define TK_WAL 98 #define TK_MAXROWS 98
#define TK_FSYNC 99 #define TK_BLOCKS 99
#define TK_COMP 100 #define TK_CTIME 100
#define TK_PRECISION 101 #define TK_WAL 101
#define TK_UPDATE 102 #define TK_FSYNC 102
#define TK_CACHELAST 103 #define TK_COMP 103
#define TK_PARTITIONS 104 #define TK_PRECISION 104
#define TK_LP 105 #define TK_UPDATE 105
#define TK_RP 106 #define TK_CACHELAST 106
#define TK_UNSIGNED 107 #define TK_PARTITIONS 107
#define TK_TAGS 108 #define TK_UNSIGNED 108
#define TK_USING 109 #define TK_TAGS 109
#define TK_AS 110 #define TK_USING 110
#define TK_NULL 111 #define TK_AS 111
#define TK_NOW 112 #define TK_NULL 112
#define TK_SELECT 113 #define TK_NOW 113
#define TK_UNION 114 #define TK_SELECT 114
#define TK_ALL 115 #define TK_UNION 115
#define TK_DISTINCT 116 #define TK_ALL 116
#define TK_FROM 117 #define TK_DISTINCT 117
#define TK_VARIABLE 118 #define TK_FROM 118
#define TK_INTERVAL 119 #define TK_VARIABLE 119
#define TK_SESSION 120 #define TK_INTERVAL 120
#define TK_STATE_WINDOW 121 #define TK_SESSION 121
#define TK_FILL 122 #define TK_STATE_WINDOW 122
#define TK_SLIDING 123 #define TK_FILL 123
#define TK_ORDER 124 #define TK_SLIDING 124
#define TK_BY 125 #define TK_ORDER 125
#define TK_ASC 126 #define TK_BY 126
#define TK_DESC 127 #define TK_ASC 127
#define TK_GROUP 128 #define TK_DESC 128
#define TK_HAVING 129 #define TK_GROUP 129
#define TK_LIMIT 130 #define TK_HAVING 130
#define TK_OFFSET 131 #define TK_LIMIT 131
#define TK_SLIMIT 132 #define TK_OFFSET 132
#define TK_SOFFSET 133 #define TK_SLIMIT 133
#define TK_WHERE 134 #define TK_SOFFSET 134
#define TK_RESET 135 #define TK_WHERE 135
#define TK_QUERY 136 #define TK_RESET 136
#define TK_SYNCDB 137 #define TK_QUERY 137
#define TK_ADD 138 #define TK_SYNCDB 138
#define TK_COLUMN 139 #define TK_ADD 139
#define TK_MODIFY 140 #define TK_COLUMN 140
#define TK_TAG 141 #define TK_MODIFY 141
#define TK_CHANGE 142 #define TK_TAG 142
#define TK_SET 143 #define TK_CHANGE 143
#define TK_KILL 144 #define TK_SET 144
#define TK_CONNECTION 145 #define TK_KILL 145
#define TK_STREAM 146 #define TK_CONNECTION 146
#define TK_COLON 147 #define TK_STREAM 147
#define TK_ABORT 148 #define TK_COLON 148
#define TK_AFTER 149 #define TK_ABORT 149
#define TK_ATTACH 150 #define TK_AFTER 150
#define TK_BEFORE 151 #define TK_ATTACH 151
#define TK_BEGIN 152 #define TK_BEFORE 152
#define TK_CASCADE 153 #define TK_BEGIN 153
#define TK_CLUSTER 154 #define TK_CASCADE 154
#define TK_CONFLICT 155 #define TK_CLUSTER 155
#define TK_COPY 156 #define TK_CONFLICT 156
#define TK_DEFERRED 157 #define TK_COPY 157
#define TK_DELIMITERS 158 #define TK_DEFERRED 158
#define TK_DETACH 159 #define TK_DELIMITERS 159
#define TK_EACH 160 #define TK_DETACH 160
#define TK_END 161 #define TK_EACH 161
#define TK_EXPLAIN 162 #define TK_END 162
#define TK_FAIL 163 #define TK_EXPLAIN 163
#define TK_FOR 164 #define TK_FAIL 164
#define TK_IGNORE 165 #define TK_FOR 165
#define TK_IMMEDIATE 166 #define TK_IGNORE 166
#define TK_INITIALLY 167 #define TK_IMMEDIATE 167
#define TK_INSTEAD 168 #define TK_INITIALLY 168
#define TK_MATCH 169 #define TK_INSTEAD 169
#define TK_KEY 170 #define TK_MATCH 170
#define TK_OF 171 #define TK_KEY 171
#define TK_RAISE 172 #define TK_OF 172
#define TK_REPLACE 173 #define TK_RAISE 173
#define TK_RESTRICT 174 #define TK_REPLACE 174
#define TK_ROW 175 #define TK_RESTRICT 175
#define TK_STATEMENT 176 #define TK_ROW 176
#define TK_TRIGGER 177 #define TK_STATEMENT 177
#define TK_VIEW 178 #define TK_TRIGGER 178
#define TK_SEMI 179 #define TK_VIEW 179
#define TK_NONE 180 #define TK_SEMI 180
#define TK_PREV 181 #define TK_NONE 181
#define TK_LINEAR 182 #define TK_PREV 182
#define TK_IMPORT 183 #define TK_LINEAR 183
#define TK_TBNAME 184 #define TK_IMPORT 184
#define TK_JOIN 185 #define TK_TBNAME 185
#define TK_INSERT 186 #define TK_JOIN 186
#define TK_INTO 187 #define TK_INSERT 187
#define TK_VALUES 188 #define TK_INTO 188
#define TK_VALUES 189
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -62,6 +62,7 @@ int32_t vnodeOpen(int32_t vgId); ...@@ -62,6 +62,7 @@ int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId); int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
int32_t vnodeCompact(int32_t vgId);
// vnodeMgmt // vnodeMgmt
int32_t vnodeInitMgmt(); int32_t vnodeInitMgmt();
......
...@@ -144,6 +144,7 @@ typedef struct SVgObj { ...@@ -144,6 +144,7 @@ typedef struct SVgObj {
int8_t reserved0[4]; int8_t reserved0[4];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int32_t vgCfgVersion; int32_t vgCfgVersion;
int8_t compact;
int8_t reserved1[8]; int8_t reserved1[8];
int8_t updateEnd[4]; int8_t updateEnd[4];
int32_t refCount; int32_t refCount;
......
...@@ -51,6 +51,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); ...@@ -51,6 +51,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg);
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup); void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromIp(char *ep); SRpcEpSet mnodeGetEpSetFromIp(char *ep);
......
...@@ -53,6 +53,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -53,6 +53,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC #ifndef _TOPIC
...@@ -200,10 +201,12 @@ int32_t mnodeInitDbs() { ...@@ -200,10 +201,12 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_COMPACT_VNODE, mnodeProcessCompactMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
mDebug("table:dbs table is created"); mDebug("table:dbs table is created");
return tpInit(); return tpInit();
} }
...@@ -1267,7 +1270,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { ...@@ -1267,7 +1270,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
void *pIter = NULL; void *pIter = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
while (1) { while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup); pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) { if (pVgroup->pDb == pDb) {
...@@ -1281,6 +1284,43 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { ...@@ -1281,6 +1284,43 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int32_t count = ntohs(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count);
if (buf == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < count; i++) {
buf[i] = ntohs(pCompactMsg->vgid[i]);
}
// copy from mnodeSyncDb, so ugly
for (int32_t i = 0; i < count; i++) {
SVgObj *pVgroup = NULL;
void *pIter = NULL;
bool valid = false;
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb && pVgroup->vgId == buf[i]) {
mnodeSendCompactVgroupMsg(pVgroup);
mnodeDecVgroupRef(pVgroup);
valid = true;
break;
}
mnodeDecVgroupRef(pVgroup);
}
if (valid == false) {
mLError("db:%s, cannot find valid vgId: %d", pDb->name, buf[i]);
}
}
free(buf);
mLInfo("db:%s, trigger compact", pDb->name);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont; SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont;
mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists); mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists);
...@@ -1303,6 +1343,20 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) { ...@@ -1303,6 +1343,20 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
return mnodeSyncDb(pMsg->pDb, pMsg); return mnodeSyncDb(pMsg->pDb, pMsg);
} }
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg) {
SCompactMsg *pCompact = pMsg->rpcMsg.pCont;
mDebug("db:%s, compact is received from thandle:%p", pCompact->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCompact->db);
if (pMsg->pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED;
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping, ignore compact request", pCompact->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
return mnodeCompact(pMsg->pDb, pCompact);
}
void mnodeDropAllDbs(SAcctObj *pAcct) { void mnodeDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0; int32_t numOfDbs = 0;
......
...@@ -1202,4 +1202,4 @@ int32_t mnodeCompactWal() { ...@@ -1202,4 +1202,4 @@ int32_t mnodeCompactWal() {
sdbInfo("vgId:1, compact mnode wal success"); sdbInfo("vgId:1, compact mnode wal success");
return 0; return 0;
} }
\ No newline at end of file
...@@ -60,6 +60,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -60,6 +60,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessCompactVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ; static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ;
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
...@@ -236,6 +237,7 @@ int32_t mnodeInitVgroups() { ...@@ -236,6 +237,7 @@ int32_t mnodeInitVgroups() {
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP, mnodeProcessCompactVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp);
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg); mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg);
...@@ -350,6 +352,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -350,6 +352,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes); pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup,NULL); mnodeSendAlterVgroupMsg(pVgroup,NULL);
} }
pVgroup->compact = pVload->compact;
} }
static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
...@@ -717,6 +720,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -717,6 +720,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
cols++; cols++;
} }
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "compacting");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols); pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
...@@ -820,7 +830,11 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v ...@@ -820,7 +830,11 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
cols++; cols++;
} }
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->compact;
cols++;
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
numOfRows++; numOfRows++;
} }
...@@ -979,6 +993,7 @@ static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) { ...@@ -979,6 +993,7 @@ static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) {
return pSyncVnode; return pSyncVnode;
} }
static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId); SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -991,6 +1006,18 @@ static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { ...@@ -991,6 +1006,18 @@ static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
dnodeSendMsgToDnode(epSet, &rpcMsg); dnodeSendMsgToDnode(epSet, &rpcMsg);
} }
static void mnodeSendCompactVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SCompactVnodeMsg *pCompactVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = {
.ahandle = NULL,
.pCont = pCompactVnode,
.contLen = pCompactVnode ? sizeof(SCompactVnodeMsg) : 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_COMPACT_VNODE
};
dnodeSendMsgToDnode(epSet, &rpcMsg);
}
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) { void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
...@@ -1004,6 +1031,17 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) { ...@@ -1004,6 +1031,17 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
} }
} }
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send compact all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
//if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue; //TODO(yihaoDeng): compact slave or not ?
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send compact vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendCompactVnodeMsg(pVgroup, &epSet);
}
}
static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) { static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) {
SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -1050,6 +1088,9 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -1050,6 +1088,9 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
dnodeSendRpcMWriteRsp(mnodeMsg, code); dnodeSendRpcMWriteRsp(mnodeMsg, code);
} }
} }
static void mnodeProcessCompactVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("compact vnode rsp received");
}
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return; if (rpcMsg->ahandle == NULL) return;
...@@ -1346,4 +1387,4 @@ int32_t mnodeCompactVgroups() { ...@@ -1346,4 +1387,4 @@ int32_t mnodeCompactVgroups() {
mInfo("end to compact vgroups table..."); mInfo("end to compact vgroups table...");
return 0; return 0;
} }
\ No newline at end of file
...@@ -312,6 +312,8 @@ void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd); ...@@ -312,6 +312,8 @@ void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd);
void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip); void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip);
void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege); void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege);
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam);
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo); void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo);
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo); void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo);
......
...@@ -174,6 +174,10 @@ cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SStrToken t = {0}; setCreateD ...@@ -174,6 +174,10 @@ cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SStrToken t = {0}; setCreateD
cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);} cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);}
cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);} cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);}
////////////////////////////// COMPACT STATEMENT //////////////////////////////////////////////
cmd ::= COMPACT VNODES IN LP exprlist(Y) RP. { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y);}
// An IDENTIFIER can be a generic identifier, or one of several keywords. // An IDENTIFIER can be a generic identifier, or one of several keywords.
// Any non-standard keyword can also be an identifier. // Any non-standard keyword can also be an identifier.
// And "ids" is an identifer-or-string. // And "ids" is an identifer-or-string.
......
...@@ -1067,6 +1067,10 @@ void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken ...@@ -1067,6 +1067,10 @@ void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken
pInfo->pMiscInfo->user.passwd = *pPwd; pInfo->pMiscInfo->user.passwd = *pPwd;
} }
} }
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam) {
pInfo->type = type;
pInfo->list = pParam;
}
void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd) { void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd) {
pInfo->type = TSDB_SQL_CREATE_USER; pInfo->type = TSDB_SQL_CREATE_USER;
......
此差异已折叠。
此差异已折叠。
...@@ -219,6 +219,7 @@ static SKeyword keywordTable[] = { ...@@ -219,6 +219,7 @@ static SKeyword keywordTable[] = {
{"PARTITIONS", TK_PARTITIONS}, {"PARTITIONS", TK_PARTITIONS},
{"TOPIC", TK_TOPIC}, {"TOPIC", TK_TOPIC},
{"TOPICS", TK_TOPICS}, {"TOPICS", TK_TOPICS},
{"COMPACT", TK_COMPACT},
{"MODIFY", TK_MODIFY} {"MODIFY", TK_MODIFY}
}; };
......
...@@ -114,6 +114,7 @@ int32_t vnodeSync(int32_t vgId) { ...@@ -114,6 +114,7 @@ int32_t vnodeSync(int32_t vgId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeDrop(int32_t vgId) { int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
...@@ -133,6 +134,19 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -133,6 +134,19 @@ int32_t vnodeDrop(int32_t vgId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeCompact(int32_t vgId) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, compact vnode msg is received", vgId);
//not care success or not
tsdbCompact(((SVnodeObj*)pVnode)->tsdb);
vnodeRelease(pVnode);
} else {
vInfo("vgId:%d, vnode not exist, can't compact it", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg = pVnode->tsdbCfg; STsdbCfg tsdbCfg = pVnode->tsdbCfg;
......
...@@ -148,6 +148,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { ...@@ -148,6 +148,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
pLoad->status = pVnode->status; pLoad->status = pVnode->status;
pLoad->role = pVnode->role; pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica; pLoad->replica = pVnode->syncCfg.replica;
pLoad->compact = (pVnode->tsdb != NULL) && tsdbInCompact(pVnode->tsdb) ? 1 : 0;
} }
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
......
...@@ -91,8 +91,8 @@ class TDTestCase: ...@@ -91,8 +91,8 @@ class TDTestCase:
tdSql.query("select * from db.st where name = 1231231") tdSql.query("select * from db.st where name = 1231231")
tdSql.checkRows(0) tdSql.checkRows(0)
# <> for timestamp type # <> for timestamp type not supported on primary timestamp
tdSql.query("select * from db.st where ts <> '2020-05-13 10:00:00.002'") tdSql.error("select * from db.st where ts <> '2020-05-13 10:00:00.002'")
# tdSql.checkRows(4) # tdSql.checkRows(4)
# <> for numeric type # <> for numeric type
......
...@@ -58,8 +58,8 @@ class TDTestCase: ...@@ -58,8 +58,8 @@ class TDTestCase:
ts_len4 = len(tdSql.cursor.fetchall()) ts_len4 = len(tdSql.cursor.fetchall())
tdSql.execute("select * from t2ts1 where ts = now") tdSql.execute("select * from t2ts1 where ts = now")
ts_len5 = len(tdSql.cursor.fetchall()) ts_len5 = len(tdSql.cursor.fetchall())
tdSql.execute("select * from t2ts1 where ts <> now") # tdSql.execute("select * from t2ts1 where ts <> now")
ts_len6 = len(tdSql.cursor.fetchall()) ts_len6 = 3
tdSql.execute("select * from t2ts1 where ts between 0 and now") tdSql.execute("select * from t2ts1 where ts between 0 and now")
ts_len7 = len(tdSql.cursor.fetchall()) ts_len7 = len(tdSql.cursor.fetchall())
tdSql.execute("select * from t2ts1 where ts between now and now+100d") tdSql.execute("select * from t2ts1 where ts between now and now+100d")
......
...@@ -24,6 +24,8 @@ $tsu = $tsu + $ts0 ...@@ -24,6 +24,8 @@ $tsu = $tsu + $ts0
print ==================>issue #3481, normal column not allowed, print ==================>issue #3481, normal column not allowed,
sql_error select ts,c1,min(c2) from ts_stb0 sql_error select ts,c1,min(c2) from ts_stb0
print ==================>issue #4681, not equal operator on primary timestamp not allowed
sql_error select * from ts_stb0 where ts <> $ts0
##### select from supertable ##### select from supertable
$tb = $tbPrefix . 0 $tb = $tbPrefix . 0
...@@ -51,4 +53,4 @@ sql select first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb wher ...@@ -51,4 +53,4 @@ sql select first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb wher
if $data13 != 598.000000000 then if $data13 != 598.000000000 then
print expect 598.000000000, actual $data03 print expect 598.000000000, actual $data03
return -1 return -1
endi endi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册