提交 8d2aba8a 编写于 作者: S shenglian zhou 提交者: shenglian zhou

(query,insert,other,connector,tools):use tlv for sversion/tversion for message compatibility

上级 5a3d9aac
......@@ -2756,8 +2756,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision);
pRes->sVersion = htonl(pRetrieve->sVersion);
pRes->tVersion = htonl(pRetrieve->tVersion);
pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1);
......@@ -2808,6 +2806,18 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:0x%"PRIx64, pSql->self, pRes->numOfRows, pRes->offset,
pRes->completed, pRes->qId);
if (pRetrieve->extend == 1) {
STLV* tlv = (STLV*)(pRetrieve->data + ntohl(pRetrieve->compLen));
while (tlv->type != TLV_TYPE_END_MARK) {
switch (tlv->type) {
case TLV_TYPE_META_VERSION:
pRes->sVersion = ntohl(*(int32_t*)tlv->value);
pRes->tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t)));
break;
}
tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
}
}
return 0;
}
......
......@@ -980,7 +980,9 @@ typedef struct {
} STLV;
enum {
TLV_TYPE_END_MARK = -1,
TLV_TYPE_DUMMY = 1,
TLV_TYPE_META_VERSION = 2,
};
#pragma pack(pop)
......
......@@ -391,7 +391,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
*contLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t)); //tlv meta version
*contLen += sizeof(STLV); // tlv end mark
// current solution only avoid crash, but cannot return error code to client
*pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen);
if (*pRsp == NULL) {
......@@ -409,8 +410,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
(*pRsp)->precision = htons(pQueryAttr->precision);
(*pRsp)->sVersion = htonl(pQueryAttr->tableGroupInfo.sVersion);
(*pRsp)->tVersion = htonl(pQueryAttr->tableGroupInfo.tVersion);
(*pRsp)->compressed = (int8_t)((tsCompressColData != -1) && checkNeedToCompressQueryCol(pQInfo));
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
......@@ -446,6 +445,18 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
qDebug("QInfo:0x%"PRIx64" has more results to retrieve", pQInfo->qId);
}
(*pRsp)->extend = 1;
STLV* tlv = (STLV*)((*pRsp)->data + compLen);
tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t));
int32_t sVersion = htonl(pQueryAttr->tableGroupInfo.sVersion);
int32_t tVersion = htonl(pQueryAttr->tableGroupInfo.tVersion);
memcpy(tlv->value, &sVersion, sizeof(int32_t));
memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t));
STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
tlvEnd->type = htons(TLV_TYPE_END_MARK);
tlvEnd->len = 0;
// the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS
if (pQInfo->code != TSDB_CODE_SUCCESS) {
rpcFreeCont(*pRsp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册