提交 b194ff18 编写于 作者: S shenglian zhou

(query):use tlv for client/server message compability

上级 ab8b8875
...@@ -964,9 +964,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -964,9 +964,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->schemaVersion = htons(pTableMeta->sversion);
pQueryMsg->tagVersion = htons(pTableMeta->tversion);
// set column list ids // set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo); char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo);
...@@ -1152,21 +1150,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1152,21 +1150,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sqlLen; pMsg += sqlLen;
/*
//MSG EXTEND DEMO
pQueryMsg->extend = 1; pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg; STLV *tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_DUMMY); tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int16_t)); tlv->len = htonl(sizeof(int16_t) * 2);
*(int16_t *)tlv->value = htons(12345); *(int16_t*)tlv->value = htons(pTableMeta->sversion);
*(int16_t*)(tlv->value+sizeof(int16_t)) = htons(pTableMeta->tversion);
pMsg += sizeof(*tlv) + ntohl(tlv->len); pMsg += sizeof(*tlv) + ntohl(tlv->len);
tlv = (STLV *)pMsg; tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_END_MARK);
tlv->len = 0; tlv->len = 0;
pMsg += sizeof(*tlv); pMsg += sizeof(*tlv);
*/
int32_t msgLen = (int32_t)(pMsg - pCmd->payload); int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
......
...@@ -523,8 +523,6 @@ typedef struct { ...@@ -523,8 +523,6 @@ typedef struct {
int32_t udfNum; // number of udf function int32_t udfNum; // number of udf function
int32_t udfContentOffset; int32_t udfContentOffset;
int32_t udfContentLen; int32_t udfContentLen;
int16_t schemaVersion;
int16_t tagVersion;
SColumnInfo tableCols[]; SColumnInfo tableCols[];
} SQueryTableMsg; } SQueryTableMsg;
......
...@@ -428,6 +428,8 @@ typedef struct SQueryParam { ...@@ -428,6 +428,8 @@ typedef struct SQueryParam {
int32_t tableScanOperator; int32_t tableScanOperator;
SArray *pOperator; SArray *pOperator;
SUdfInfo *pUdfInfo; SUdfInfo *pUdfInfo;
int16_t schemaVersion;
int16_t tagVersion;
} SQueryParam; } SQueryParam;
typedef struct SColumnDataParam{ typedef struct SColumnDataParam{
......
...@@ -8264,10 +8264,6 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -8264,10 +8264,6 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
goto _cleanup; goto _cleanup;
} }
/*
//MSG EXTEND DEMO
if (pQueryMsg->extend) { if (pQueryMsg->extend) {
pMsg += pQueryMsg->sqlstrLen; pMsg += pQueryMsg->sqlstrLen;
...@@ -8276,19 +8272,24 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -8276,19 +8272,24 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
tlv = (STLV *)pMsg; tlv = (STLV *)pMsg;
tlv->type = ntohs(tlv->type); tlv->type = ntohs(tlv->type);
tlv->len = ntohl(tlv->len); tlv->len = ntohl(tlv->len);
if (tlv->len > 0) { if (tlv->type == TLV_TYPE_END_MARK) {
*(int16_t *)tlv->value = ntohs(*(int16_t *)tlv->value); break;
qDebug("Got TLV,type:%d,len:%d,value:%d", tlv->type, tlv->len, *(int16_t*)tlv->value); }
pMsg += sizeof(*tlv) + tlv->len; switch(tlv->type) {
continue; case TLV_TYPE_META_VERSION: {
assert(tlv->len == 2*sizeof(int16_t));
param->schemaVersion = ntohs(*(int16_t*)tlv->value);
param->tagVersion = ntohs(*(int16_t*)(tlv->value + sizeof(int16_t)));
pMsg += sizeof(*tlv) + tlv->len;
break;
}
default: {
pMsg += sizeof(*tlv) + tlv->len;
break;
}
} }
break;
} }
} }
*/
qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64,
......
...@@ -162,8 +162,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -162,8 +162,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(0); assert(0);
} }
int16_t queryTagVersion = ntohs(pQueryMsg->tagVersion); int16_t queryTagVersion = param.tagVersion;
int16_t querySchemaVersion = ntohs(pQueryMsg->schemaVersion); int16_t querySchemaVersion = param.schemaVersion;
if (queryTagVersion < tableGroupInfo.tVersion || querySchemaVersion < tableGroupInfo.sVersion) { if (queryTagVersion < tableGroupInfo.tVersion || querySchemaVersion < tableGroupInfo.sVersion) {
code = TSDB_CODE_QRY_INVALID_SCHEMA_VERSION; code = TSDB_CODE_QRY_INVALID_SCHEMA_VERSION;
goto _over; goto _over;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册