提交 e97bfe57 编写于 作者: S slzhou

enhance: compact time range - msg compatibility

上级 1366aa7a
...@@ -2022,7 +2022,11 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2022,7 +2022,11 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
int count = removeDupVgid(result, size); int count = removeDupVgid(result, size);
pCmd->payloadLen = sizeof(SCompactMsg) + count * sizeof(int32_t);
int32_t payloadLen = sizeof(SCompactMsg) + count * sizeof(int32_t) + // compact msg(include vgroup list)
sizeof(STLV) + sizeof(int64_t) * 2 + // skey, ekey
sizeof(STLV); //end mark
pCmd->payloadLen = payloadLen ;
pCmd->msgType = TSDB_MSG_TYPE_CM_COMPACT_VNODE; pCmd->msgType = TSDB_MSG_TYPE_CM_COMPACT_VNODE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
...@@ -2031,6 +2035,8 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2031,6 +2035,8 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SCompactMsg *pCompactMsg = (SCompactMsg *)pCmd->payload; SCompactMsg *pCompactMsg = (SCompactMsg *)pCmd->payload;
pCompactMsg->extend = 1;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
...@@ -2042,16 +2048,28 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2042,16 +2048,28 @@ int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tNameGetFullDbName(&pTableMetaInfo->name, pCompactMsg->db); tNameGetFullDbName(&pTableMetaInfo->name, pCompactMsg->db);
} }
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pCompactMsg->skey = htobe64(pQueryInfo->range.skey);
pCompactMsg->ekey = htobe64(pQueryInfo->range.ekey);
pCompactMsg->numOfVgroup = htons(count); pCompactMsg->numOfVgroup = htons(count);
for (int32_t i = 0; i < count; i++) { for (int32_t i = 0; i < count; i++) {
pCompactMsg->vgid[i] = htons(result[i]); pCompactMsg->vgid[i] = htons(result[i]);
} }
free(result); free(result);
char* p = (char*)pCompactMsg + sizeof(SCompactMsg) + count * sizeof(int32_t);
STLV *tlv = (STLV *)(p);
tlv->type = htons(TLV_TYPE_COMPACT_VNODES_TIME_RANGE);
tlv->len = htonl(sizeof(int64_t) * 2);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
*(int16_t*)tlv->value = htobe64(pQueryInfo->range.skey);
*(int16_t*)(tlv->value+sizeof(int64_t)) = htobe64(pQueryInfo->range.ekey);
p += sizeof(*tlv) + sizeof(int64_t) * 2;
tlv = (STLV *)p;
tlv->type = htons(TLV_TYPE_END_MARK);
tlv->len = 0;
p += sizeof(*tlv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -893,10 +893,10 @@ typedef struct { ...@@ -893,10 +893,10 @@ typedef struct {
typedef struct { typedef struct {
int8_t extend; int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int64_t skey;
int64_t ekey;
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t vgid[]; int32_t vgid[];
// tlv int64_t skey, ekey;
// tlv end
} SCompactMsg; } SCompactMsg;
typedef struct SShowRsp { typedef struct SShowRsp {
...@@ -1039,6 +1039,10 @@ enum { ...@@ -1039,6 +1039,10 @@ enum {
TLV_TYPE_META_VERSION = 1, TLV_TYPE_META_VERSION = 1,
}; };
enum ETlvTypeCompact {
TLV_TYPE_COMPACT_VNODES_TIME_RANGE = 1,
};
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -1296,18 +1296,46 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { ...@@ -1296,18 +1296,46 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) { static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int64_t skey = htobe64(pCompactMsg->skey); int64_t skey = INT64_MIN;
int64_t ekey = htobe64(pCompactMsg->ekey); int64_t ekey = INT64_MAX;
int32_t count = ntohs(pCompactMsg->numOfVgroup); int32_t count = ntohs(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count); int32_t *buf = malloc(sizeof(int32_t) * count);
if (buf == NULL) { if (buf == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < count; i++) { for (int32_t i = 0; i < count; i++) {
buf[i] = ntohs(pCompactMsg->vgid[i]); buf[i] = ntohs(pCompactMsg->vgid[i]);
} }
if (pCompactMsg->extend) {
char* p = (char*)pCompactMsg + sizeof(SCompactMsg) + count * sizeof(int32_t);
STLV* tlv = NULL;
while (1) {
tlv = (STLV*)p;
tlv->type = ntohs(tlv->type);
tlv->len = ntohl(tlv->len);
if (tlv->type == TLV_TYPE_END_MARK) {
break;
}
switch (tlv->type) {
case TLV_TYPE_COMPACT_VNODES_TIME_RANGE: {
assert(tlv->len == 2 * sizeof(int64_t));
skey = htobe64(*(int64_t*)tlv->value);
ekey = htobe64(*(int64_t*)(tlv->value + sizeof(int64_t)));
p += sizeof(*tlv) + tlv->len;
break;
}
default: {
p += sizeof(*tlv) + tlv->len;
break;
}
}
}
}
// copy from mnodeSyncDb, so ugly // copy from mnodeSyncDb, so ugly
for (int32_t i = 0; i < count; i++) { for (int32_t i = 0; i < count; i++) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册