提交 8be9f845 编写于 作者: S slzhou

enhance: add time range to compact vnodes

上级 11824a52
......@@ -149,6 +149,8 @@ static bool isLogicalOperator(tSqlExpr* pExpr);
static bool isComparisonOperator(tSqlExpr* pExpr);
int validateTableName(char *tblName, int len, SStrToken* psTblToken, bool *dbIncluded);
static int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t timePrecision);
static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0;
}
......@@ -4238,9 +4240,30 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_SUCCESS;
}
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg1 = "start timestamp error";
const char* msg2 = "end timestamp error";
SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type;
// save the compact range to range of query info
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pInfo->pCompactRange->start) {
if (getTimeRange(&pQueryInfo->range, pInfo->pCompactRange->start, TK_GE, TSDB_TIME_PRECISION_NANO) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
} else {
pQueryInfo->range.skey = INT64_MIN;
}
if (pInfo->pCompactRange->end) {
if (getTimeRange(&pQueryInfo->range, pInfo->pCompactRange->end, TK_LE, TSDB_TIME_PRECISION_NANO) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else {
pQueryInfo->range.ekey = INT64_MAX;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -4416,10 +4439,6 @@ bool groupbyTbname(SQueryInfo* pQueryInfo) {
return false;
}
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
int32_t startIdx = 0;
int32_t aggUdf = 0;
......
......@@ -1995,6 +1995,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pInfo->list == NULL || taosArrayGetSize(pInfo->list) <= 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -2041,6 +2042,10 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tNameGetFullDbName(&pTableMetaInfo->name, pCompactMsg->db);
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pCompactMsg->skey = htobe64(pQueryInfo->range.skey);
pCompactMsg->ekey = htobe64(pQueryInfo->range.ekey);
pCompactMsg->numOfVgroup = htons(count);
for (int32_t i = 0; i < count; i++) {
pCompactMsg->vgid[i] = htons(result[i]);
......
......@@ -426,7 +426,13 @@ typedef struct {
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
} SDropVnodeMsg, SSyncVnodeMsg;
typedef struct {
int32_t vgId;
int64_t skey;
int64_t ekey;
} SCompactVnodeMsg;
typedef struct {
int32_t contLen;
......@@ -887,6 +893,8 @@ typedef struct {
typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int64_t skey;
int64_t ekey;
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
......
......@@ -107,122 +107,123 @@
#define TK_COMPACT 89
#define TK_LP 90
#define TK_RP 91
#define TK_IF 92
#define TK_EXISTS 93
#define TK_AS 94
#define TK_OUTPUTTYPE 95
#define TK_AGGREGATE 96
#define TK_BUFSIZE 97
#define TK_PPS 98
#define TK_TSERIES 99
#define TK_DBS 100
#define TK_STORAGE 101
#define TK_QTIME 102
#define TK_CONNS 103
#define TK_STATE 104
#define TK_COMMA 105
#define TK_KEEP 106
#define TK_CACHE 107
#define TK_REPLICA 108
#define TK_QUORUM 109
#define TK_DAYS 110
#define TK_MINROWS 111
#define TK_MAXROWS 112
#define TK_BLOCKS 113
#define TK_CTIME 114
#define TK_WAL 115
#define TK_FSYNC 116
#define TK_COMP 117
#define TK_PRECISION 118
#define TK_UPDATE 119
#define TK_CACHELAST 120
#define TK_PARTITIONS 121
#define TK_UNSIGNED 122
#define TK_USING 123
#define TK_TO 124
#define TK_SPLIT 125
#define TK_NULL 126
#define TK_NOW 127
#define TK_VARIABLE 128
#define TK_SELECT 129
#define TK_UNION 130
#define TK_ALL 131
#define TK_DISTINCT 132
#define TK_FROM 133
#define TK_RANGE 134
#define TK_INTERVAL 135
#define TK_EVERY 136
#define TK_SESSION 137
#define TK_STATE_WINDOW 138
#define TK_FILL 139
#define TK_SLIDING 140
#define TK_ORDER 141
#define TK_BY 142
#define TK_ASC 143
#define TK_GROUP 144
#define TK_HAVING 145
#define TK_LIMIT 146
#define TK_OFFSET 147
#define TK_SLIMIT 148
#define TK_SOFFSET 149
#define TK_WHERE 150
#define TK_TODAY 151
#define TK_RESET 152
#define TK_QUERY 153
#define TK_SYNCDB 154
#define TK_ADD 155
#define TK_COLUMN 156
#define TK_MODIFY 157
#define TK_TAG 158
#define TK_CHANGE 159
#define TK_SET 160
#define TK_KILL 161
#define TK_CONNECTION 162
#define TK_STREAM 163
#define TK_COLON 164
#define TK_DELETE 165
#define TK_ABORT 166
#define TK_AFTER 167
#define TK_ATTACH 168
#define TK_BEFORE 169
#define TK_BEGIN 170
#define TK_CASCADE 171
#define TK_CONFLICT 172
#define TK_COPY 173
#define TK_DEFERRED 174
#define TK_DELIMITERS 175
#define TK_DETACH 176
#define TK_EACH 177
#define TK_END 178
#define TK_EXPLAIN 179
#define TK_FAIL 180
#define TK_FOR 181
#define TK_IGNORE 182
#define TK_IMMEDIATE 183
#define TK_INITIALLY 184
#define TK_INSTEAD 185
#define TK_KEY 186
#define TK_OF 187
#define TK_RAISE 188
#define TK_REPLACE 189
#define TK_RESTRICT 190
#define TK_ROW 191
#define TK_STATEMENT 192
#define TK_TRIGGER 193
#define TK_VIEW 194
#define TK_IPTOKEN 195
#define TK_SEMI 196
#define TK_NONE 197
#define TK_PREV 198
#define TK_LINEAR 199
#define TK_IMPORT 200
#define TK_TBNAME 201
#define TK_JOIN 202
#define TK_INSERT 203
#define TK_INTO 204
#define TK_VALUES 205
#define TK_FILE 206
#define TK_START 92
#define TK_WITH 93
#define TK_END 94
#define TK_IF 95
#define TK_EXISTS 96
#define TK_AS 97
#define TK_OUTPUTTYPE 98
#define TK_AGGREGATE 99
#define TK_BUFSIZE 100
#define TK_PPS 101
#define TK_TSERIES 102
#define TK_DBS 103
#define TK_STORAGE 104
#define TK_QTIME 105
#define TK_CONNS 106
#define TK_STATE 107
#define TK_COMMA 108
#define TK_KEEP 109
#define TK_CACHE 110
#define TK_REPLICA 111
#define TK_QUORUM 112
#define TK_DAYS 113
#define TK_MINROWS 114
#define TK_MAXROWS 115
#define TK_BLOCKS 116
#define TK_CTIME 117
#define TK_WAL 118
#define TK_FSYNC 119
#define TK_COMP 120
#define TK_PRECISION 121
#define TK_UPDATE 122
#define TK_CACHELAST 123
#define TK_PARTITIONS 124
#define TK_UNSIGNED 125
#define TK_USING 126
#define TK_TO 127
#define TK_SPLIT 128
#define TK_NULL 129
#define TK_NOW 130
#define TK_VARIABLE 131
#define TK_SELECT 132
#define TK_UNION 133
#define TK_ALL 134
#define TK_DISTINCT 135
#define TK_FROM 136
#define TK_RANGE 137
#define TK_INTERVAL 138
#define TK_EVERY 139
#define TK_SESSION 140
#define TK_STATE_WINDOW 141
#define TK_FILL 142
#define TK_SLIDING 143
#define TK_ORDER 144
#define TK_BY 145
#define TK_ASC 146
#define TK_GROUP 147
#define TK_HAVING 148
#define TK_LIMIT 149
#define TK_OFFSET 150
#define TK_SLIMIT 151
#define TK_SOFFSET 152
#define TK_WHERE 153
#define TK_TODAY 154
#define TK_RESET 155
#define TK_QUERY 156
#define TK_SYNCDB 157
#define TK_ADD 158
#define TK_COLUMN 159
#define TK_MODIFY 160
#define TK_TAG 161
#define TK_CHANGE 162
#define TK_SET 163
#define TK_KILL 164
#define TK_CONNECTION 165
#define TK_STREAM 166
#define TK_COLON 167
#define TK_DELETE 168
#define TK_ABORT 169
#define TK_AFTER 170
#define TK_ATTACH 171
#define TK_BEFORE 172
#define TK_BEGIN 173
#define TK_CASCADE 174
#define TK_CONFLICT 175
#define TK_COPY 176
#define TK_DEFERRED 177
#define TK_DELIMITERS 178
#define TK_DETACH 179
#define TK_EACH 180
#define TK_EXPLAIN 181
#define TK_FAIL 182
#define TK_FOR 183
#define TK_IGNORE 184
#define TK_IMMEDIATE 185
#define TK_INITIALLY 186
#define TK_INSTEAD 187
#define TK_KEY 188
#define TK_OF 189
#define TK_RAISE 190
#define TK_REPLACE 191
#define TK_RESTRICT 192
#define TK_ROW 193
#define TK_STATEMENT 194
#define TK_TRIGGER 195
#define TK_VIEW 196
#define TK_IPTOKEN 197
#define TK_SEMI 198
#define TK_NONE 199
#define TK_PREV 200
#define TK_LINEAR 201
#define TK_IMPORT 202
#define TK_TBNAME 203
#define TK_JOIN 204
#define TK_INSERT 205
#define TK_INTO 206
#define TK_VALUES 207
#define TK_FILE 208
#define TK_SPACE 300
#define TK_COMMENT 301
......
......@@ -51,7 +51,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg);
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup);
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup, int64_t skey, int64_t ekey);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromIp(char *ep);
......
......@@ -1296,6 +1296,9 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int64_t skey = htobe64(pCompactMsg->skey);
int64_t ekey = htobe64(pCompactMsg->ekey);
int32_t count = ntohs(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count);
if (buf == NULL) {
......@@ -1314,7 +1317,9 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb && pVgroup->vgId == buf[i]) {
mnodeSendCompactVgroupMsg(pVgroup);
skey = convertTimePrecision(skey, TSDB_TIME_PRECISION_NANO, pVgroup->pDb->cfg.precision);
ekey = convertTimePrecision(skey, TSDB_TIME_PRECISION_NANO, pVgroup->pDb->cfg.precision);
mnodeSendCompactVgroupMsg(pVgroup, skey, ekey);
mnodeDecVgroupRef(pVgroup);
valid = true;
break;
......
......@@ -1114,6 +1114,18 @@ static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) {
return pSyncVnode;
}
static SCompactVnodeMsg *mnodeBuildCompactVnodeMsg(int32_t vgId, int64_t skey, int64_t ekey) {
SCompactVnodeMsg *pCompactVnode = rpcMallocCont(sizeof(SCompactVnodeMsg));
if (pCompactVnode == NULL) {
return NULL;
}
pCompactVnode->vgId = htonl(vgId);
pCompactVnode->skey = htobe64(skey);
pCompactVnode->ekey = htobe64(ekey);
return pCompactVnode;
}
static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
......@@ -1127,8 +1139,8 @@ static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
dnodeSendMsgToDnode(epSet, &rpcMsg);
}
static void mnodeSendCompactVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SCompactVnodeMsg *pCompactVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
static void mnodeSendCompactVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, int64_t skey, int64_t ekey) {
SCompactVnodeMsg *pCompactVnode = mnodeBuildCompactVnodeMsg(pVgroup->vgId, skey, ekey);
SRpcMsg rpcMsg = {
.ahandle = NULL,
.pCont = pCompactVnode,
......@@ -1152,14 +1164,14 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
}
}
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup) {
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup, int64_t skey, int64_t ekey) {
mDebug("vgId:%d, send compact all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
//if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue; //TODO(yihaoDeng): compact slave or not ?
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send compact vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendCompactVnodeMsg(pVgroup, &epSet);
mnodeSendCompactVnodeMsg(pVgroup, &epSet, skey, ekey);
}
}
......
......@@ -272,6 +272,7 @@ typedef struct SSqlInfo {
SAlterTableInfo *pAlterInfo;
SMiscInfo *pMiscInfo;
SDelData *pDelData;
SRangeVal *pCompactRange; // for compact vnode time range
};
} SSqlInfo;
......@@ -363,7 +364,7 @@ void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd, SSt
void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip);
void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege, SStrToken *pTags);
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam);
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam, SRangeVal* range);
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo);
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo);
......
......@@ -189,7 +189,22 @@ cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(p
////////////////////////////// COMPACT STATEMENT //////////////////////////////////////////////
cmd ::= COMPACT VNODES IN LP exprlist(Y) RP. { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y);}
cmd ::= COMPACT VNODES IN LP exprlist(Y) RP compact_range_option(Z). { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y, &Z);}
%type range_start_opt {tSqlExpr*}
%destructor timestamp {tSqlExprDestroy($$);}
range_start_opt(A) ::= . { A = 0; }
range_start_opt(A) ::= START WITH timestamp(B). { A = B; }
%type range_end_opt {tSqlExpr*}
%destructor timestamp {tSqlExprDestroy($$);}
range_end_opt(A) ::= . { A = 0; }
range_end_opt(A) ::= END WITH timestamp(B). { A = B; }
%type compact_range_option {SRangeVal}
compact_range_option(N) ::= range_start_opt(S) range_end_opt(E). {N.start = S; N.end = E;}
// An IDENTIFIER can be a generic identifier, or one of several keywords.
// Any non-standard keyword can also be an identifier.
......
......@@ -1299,6 +1299,8 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
tfree(pInfo->pAlterInfo);
} else if (pInfo->type == TSDB_SQL_COMPACT_VNODE) {
tSqlExprListDestroy(pInfo->list);
tSqlExprDestroy(pInfo->pCompactRange->start);
tSqlExprDestroy(pInfo->pCompactRange->end);
} else {
if (pInfo->pMiscInfo != NULL) {
taosArrayDestroy(&pInfo->pMiscInfo->a);
......@@ -1468,9 +1470,10 @@ void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken
pInfo->pMiscInfo->user.passwd = *pPwd;
}
}
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam) {
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam, SRangeVal* pRange) {
pInfo->type = type;
pInfo->list = pParam;
pInfo->pCompactRange = pRange;
}
bool removeSingleQuota(SStrToken* pStr) {
......
此差异已折叠。
......@@ -232,7 +232,9 @@ static SKeyword keywordTable[] = {
{"SPLIT", TK_SPLIT},
{"ALIVE", TK_ALIVE},
{"CLUSTER", TK_CLUSTER},
{"DELETE", TK_DELETE}
{"DELETE", TK_DELETE},
{"START", TK_START},
{"WITH", TK_WITH}
};
static const char isIdChar[] = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册