提交 767d1cb2 编写于 作者: S shenglian zhou

(query):check sversion/tversion when query rsp instead of fetch rsp

上级 9d41abcd
......@@ -2704,14 +2704,29 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
pRes->data = NULL;
tscResetForNextRetrieve(pRes);
tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId);
int32_t sVersion = -1;
int32_t tVersion = -1;
if (pQueryAttr->extend == 1) {
STLV* tlv = (STLV*)(pRes->pRsp + sizeof(SQueryTableRsp));
while (tlv->type != TLV_TYPE_END_MARK) {
switch (ntohs(tlv->type)) {
case TLV_TYPE_META_VERSION:
sVersion = ntohl(*(int32_t*)tlv->value);
tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t)));
break;
}
tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
}
}
STableMetaInfo* tableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
if (tableMetaInfo->pTableMeta->sversion < pQueryAttr->sVersion ||
tableMetaInfo->pTableMeta->tversion < pQueryAttr->tVersion) {
if (tableMetaInfo->pTableMeta->sversion < sVersion ||
tableMetaInfo->pTableMeta->tversion < tVersion) {
return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION;
}
tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId);
return 0;
}
......@@ -2802,16 +2817,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
}
char* p = NULL;
if (pRetrieve->compressed) {
p = pRetrieve->data + ntohl(pRetrieve->compLen) + pQueryInfo->fieldsInfo.numOfOutput * sizeof(int32_t);
} else {
p = pRetrieve->data + ntohl(pRetrieve->compLen);
}
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfTables = htonl(*(int32_t*)p);
p += sizeof(int32_t);
if (pSql->pSubscription != NULL) {
for (int i = 0; i < numOfTables; i++) {
int64_t uid = htobe64(*(int64_t*)p);
p += sizeof(int64_t);
......@@ -2820,35 +2835,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
}
} else {
p += numOfTables * sizeof(STableIdInfo);
}
pRes->row = 0;
tscDebug("0x%"PRIx64" numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:0x%"PRIx64, pSql->self, pRes->numOfRows, pRes->offset,
pRes->completed, pRes->qId);
if (pRetrieve->extend == 1) {
STLV* tlv = (STLV*)(p);
while (tlv->type != TLV_TYPE_END_MARK) {
switch (ntohs(tlv->type)) {
case TLV_TYPE_META_VERSION:
pRes->sVersion = ntohl(*(int32_t*)tlv->value);
pRes->tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t)));
break;
}
tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
}
}
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMeta->sversion < pSql->res.sVersion || pTableMeta->tversion < pSql->res.tVersion) {
return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION;
}
return 0;
}
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
......
......@@ -530,8 +530,6 @@ typedef struct {
int8_t extend;
int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle
int32_t sVersion;
int32_t tVersion;
} SQueryTableRsp;
// todo: the show handle should be replaced with id
......
......@@ -390,14 +390,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = pQueryAttr->resultRowSize * s;
int32_t contLenSubscriptions = (int32_t)(size + sizeof(SRetrieveTableRsp));
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
int32_t contLenBeforeTLV = *contLen;
*contLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t)); //tlv meta version
*contLen += sizeof(STLV); // tlv end mark
// current solution only avoid crash, but cannot return error code to client
*pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen);
if (*pRsp == NULL) {
......@@ -405,6 +402,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
(*pRsp)->numOfRows = htonl((int32_t)s);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQInfo->runtimeEnv.currentOffset);
(*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime);
......@@ -422,25 +420,16 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
setQueryStatus(pRuntimeEnv, QUERY_OVER);
}
RESET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv));
pQInfo->lastRetrieveTs = taosGetTimestampMs();
if ((*pRsp)->compressed && compLen != 0) {
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
int32_t origSize = pQueryAttr->resultRowSize * s;
int32_t compSize = compLen + numOfCols * sizeof(int32_t);
if ((*pRsp)->compressed && compLen != 0) {
*contLen = *contLen - origSize + compSize;
contLenSubscriptions = contLenSubscriptions - origSize + compSize;
contLenBeforeTLV = contLenBeforeTLV - origSize + compSize;
*pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen);
qDebug("QInfo:0x%"PRIx64" compress col data, uncompressed size:%d, compressed size:%d, ratio:%.2f",
pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize);
}
if ((*pRsp)->compressed) {
(*pRsp)->compLen = htonl(compLen);
} else {
(*pRsp)->compLen = htonl(origSize);
}
pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
......@@ -455,19 +444,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
qDebug("QInfo:0x%"PRIx64" has more results to retrieve", pQInfo->qId);
}
(*pRsp)->extend = 1;
*(int32_t*)((char*)(*pRsp) + contLenSubscriptions) = htonl(taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap));
STLV* tlv = (STLV*)((char*)(*pRsp) + contLenBeforeTLV);
tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t));
int32_t sVersion = htonl(pQueryAttr->tableGroupInfo.sVersion);
int32_t tVersion = htonl(pQueryAttr->tableGroupInfo.tVersion);
memcpy(tlv->value, &sVersion, sizeof(int32_t));
memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t));
STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
tlvEnd->type = htons(TLV_TYPE_END_MARK);
tlvEnd->len = 0;
// the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS
if (pQInfo->code != TSDB_CODE_SUCCESS) {
rpcFreeCont(*pRsp);
......
......@@ -243,14 +243,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
int32_t tagVersion = -1;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &schemaVersion, &tagVersion);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
size_t rspLen = sizeof(SQueryTableRsp);
rspLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t));
rspLen += sizeof(STLV);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(rspLen);
pRsp->code = code;
pRsp->qId = 0;
pRsp->sVersion = schemaVersion;
pRsp->tVersion = tagVersion;
pRsp->extend = 1;
STLV* tlv = (STLV*)((char*)(pRsp) + sizeof(SQueryTableRsp));
tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t));
int32_t sVersion = htonl(schemaVersion);
int32_t tVersion = htonl(tagVersion);
memcpy(tlv->value, &sVersion, sizeof(int32_t));
memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t));
pRet->len = sizeof(SQueryTableRsp);
STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
tlvEnd->type = htons(TLV_TYPE_END_MARK);
tlvEnd->len = 0;
pRet->len = rspLen;
pRet->rsp = pRsp;
int32_t vgId = pVnode->vgId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册