提交 25d66d6d 编写于 作者: S slzhou

fix: compact vnode range - nano precision or not

上级 595fdbed
......@@ -4239,6 +4239,44 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_SUCCESS;
}
int32_t compactVnodeGetTimestamp(tSqlExpr* pExpr, int64_t* tsKey) {
int64_t val = 0;
bool parsed = false;
if (pExpr->value.nType == TSDB_DATA_TYPE_BINARY) {
pExpr->value.nLen = stringProcess(pExpr->value.pz, pExpr->value.nLen);
char* seg = strnchr(pExpr->value.pz, '-', pExpr->value.nLen, false);
if (seg != NULL) {
if (taosParseTime(pExpr->value.pz, &val, pExpr->value.nLen, TSDB_TIME_PRECISION_NANO, tsDaylight) == TSDB_CODE_SUCCESS) {
pExpr->flags |= (1 << EXPR_FLAG_NS_TIMESTAMP);
parsed = true;
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
} else {
SStrToken token = {.z = pExpr->value.pz, .n = pExpr->value.nLen, .type = TK_ID};
int32_t len = tGetToken(pExpr->value.pz, &token.type);
if ((token.type != TK_INTEGER && token.type != TK_FLOAT) || len != pExpr->value.nLen) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
}
if (!parsed) {
if (pExpr->value.nType == -1) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tVariantDump(&pExpr->value, (char*)&val, TSDB_DATA_TYPE_BIGINT, true);
}
*tsKey = val;
return TSDB_CODE_SUCCESS;
}
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg1 = "start timestamp error";
const char* msg2 = "end timestamp error";
......@@ -4247,11 +4285,12 @@ static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type;
// save the compact range to range of query info
// save the compact range to range of query info and
// the flags of pInfo->pCompactRange->start/end indicate whether the ts is nano precsion or not
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pInfo->pCompactRange->start) {
if (getTimeRange(&pQueryInfo->range, pInfo->pCompactRange->start, TK_GE, TSDB_TIME_PRECISION_NANO) != TSDB_CODE_SUCCESS) {
if (compactVnodeGetTimestamp(pInfo->pCompactRange->start, &pQueryInfo->range.skey) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
} else {
......@@ -4259,7 +4298,7 @@ static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
if (pInfo->pCompactRange->end) {
if (getTimeRange(&pQueryInfo->range, pInfo->pCompactRange->end, TK_LE, TSDB_TIME_PRECISION_NANO) != TSDB_CODE_SUCCESS) {
if (compactVnodeGetTimestamp(pInfo->pCompactRange->end, &pQueryInfo->range.ekey) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
} else {
......
......@@ -2024,7 +2024,7 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int count = removeDupVgid(result, size);
int32_t payloadLen = sizeof(SCompactMsg) + count * sizeof(int32_t) + // compact msg(include vgroup list)
sizeof(STLV) + sizeof(int64_t) * 2 + // skey, ekey
sizeof(STLV) + sizeof(int64_t) * 2 + 2 + // skey, ekey, nano skey, nano ekey
sizeof(STLV); //end mark
pCmd->payloadLen = payloadLen ;
pCmd->msgType = TSDB_MSG_TYPE_CM_COMPACT_VNODE;
......@@ -2053,17 +2053,33 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCompactMsg->vgid[i] = htons(result[i]);
}
free(result);
char* p = (char*)pCompactMsg + sizeof(SCompactMsg) + count * sizeof(int32_t);
STLV *tlv = (STLV *)(p);
tlv->type = htons(TLV_TYPE_COMPACT_VNODES_TIME_RANGE);
tlv->len = htonl(sizeof(int64_t) * 2);
tlv->len = htonl(sizeof(int64_t) * 2 + 2);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
*(int64_t*)tlv->value = htobe64(pQueryInfo->range.skey);
*(int64_t*)(tlv->value+sizeof(int64_t)) = htobe64(pQueryInfo->range.ekey);
p = tlv->value;
*(int64_t*)p = htobe64(pQueryInfo->range.skey);
p += sizeof(int64_t);
*(int64_t*)(p) = htobe64(pQueryInfo->range.ekey);
p += sizeof(int64_t);
if (pInfo->pCompactRange->start) {
int8_t isNano = pInfo->pCompactRange->start->flags &= (1 << EXPR_FLAG_NS_TIMESTAMP);
*(int8_t*)(p) = isNano; // skey is nano precision?
} else {
*(int8_t*)(p) = 0;
}
p += sizeof(int8_t);
p += sizeof(*tlv) + sizeof(int64_t) * 2;
if (pInfo->pCompactRange->end) {
int8_t isNano = (pInfo->pCompactRange->end->flags &= (1 << EXPR_FLAG_NS_TIMESTAMP));
*(int8_t*)(p) = isNano; // ekey is nano precison
} else {
*(int8_t*)(p) = 0;
}
p += sizeof(int8_t);
tlv = (STLV *)p;
tlv->type = htons(TLV_TYPE_END_MARK);
......
......@@ -1298,6 +1298,8 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int64_t skey = INT64_MIN;
int64_t ekey = INT64_MAX;
int8_t nanoSkey = 1;
int8_t nanoEkey = 1;
int32_t count = ntohs(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count);
......@@ -1322,10 +1324,16 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
}
switch (tlv->type) {
case TLV_TYPE_COMPACT_VNODES_TIME_RANGE: {
assert(tlv->len == 2 * sizeof(int64_t));
skey = htobe64(*(int64_t*)tlv->value);
ekey = htobe64(*(int64_t*)(tlv->value + sizeof(int64_t)));
p += sizeof(*tlv) + tlv->len;
assert(tlv->len == 2 * sizeof(int64_t) + 2);
p = tlv->value;
skey = htobe64(*(int64_t*)(p));
p += sizeof(int64_t);
ekey = htobe64(*(int64_t*)(p));
p += sizeof(int64_t);
nanoSkey = *(int8_t*)p;
p += sizeof(int8_t);
nanoEkey = *(int8_t*)p;
p += sizeof(int8_t);
break;
}
default: {
......@@ -1345,10 +1353,10 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb && pVgroup->vgId == buf[i]) {
if (skey != INT64_MIN) {
if (nanoSkey) {
skey = convertTimePrecision(skey, TSDB_TIME_PRECISION_NANO, pVgroup->pDb->cfg.precision);
}
if (ekey != INT64_MAX) {
if (nanoEkey) {
ekey = convertTimePrecision(ekey, TSDB_TIME_PRECISION_NANO, pVgroup->pDb->cfg.precision);
}
mInfo("vgId: %d send compact msg. start %"PRId64 " end %"PRId64, pVgroup->vgId, skey, ekey);
......
......@@ -97,6 +97,20 @@ class TDTestCase:
run_time = time.time()-start_time
printf(f"it takes ${run_time} seconds")
tdSql.query("show vgroups")
index = tdSql.getData(0,0)
tdSql.checkData(0, 6, 0)
tdSql.execute(f"compact vnodes in({index}) start with 1000 end with now+5b")
start_time = time.time()
while True:
tdSql.query("show vgroups")
if tdSql.getData(0, 6) == 0:
break
else:
time.sleep(0.1)
run_time = time.time()-start_time
printf(f"it takes ${run_time} seconds")
tdSql.query("show vgroups")
index = tdSql.getData(0,0)
tdSql.checkData(0, 6, 0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册