diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9063d92ce684a1dcdaad20661614603ad2fc4661..b477a65e9c4ea476d9ac4db7e10cd0a6ff51ceda 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2704,14 +2704,29 @@ int tscProcessQueryRsp(SSqlObj *pSql) { pRes->data = NULL; tscResetForNextRetrieve(pRes); - tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId); + int32_t sVersion = -1; + int32_t tVersion = -1; + if (pQueryAttr->extend == 1) { + STLV* tlv = (STLV*)(pRes->pRsp + sizeof(SQueryTableRsp)); + while (tlv->type != TLV_TYPE_END_MARK) { + switch (ntohs(tlv->type)) { + case TLV_TYPE_META_VERSION: + sVersion = ntohl(*(int32_t*)tlv->value); + tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t))); + break; + } + tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len)); + } + } STableMetaInfo* tableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - if (tableMetaInfo->pTableMeta->sversion < pQueryAttr->sVersion || - tableMetaInfo->pTableMeta->tversion < pQueryAttr->tVersion) { + if (tableMetaInfo->pTableMeta->sversion < sVersion || + tableMetaInfo->pTableMeta->tversion < tVersion) { return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION; } + tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId); + return 0; } @@ -2802,16 +2817,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted); } - char* p = NULL; - if (pRetrieve->compressed) { - p = pRetrieve->data + ntohl(pRetrieve->compLen) + pQueryInfo->fieldsInfo.numOfOutput * sizeof(int32_t); - } else { - p = pRetrieve->data + ntohl(pRetrieve->compLen); - } - - int32_t numOfTables = htonl(*(int32_t*)p); - p += sizeof(int32_t); if (pSql->pSubscription != NULL) { + int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); + + char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; + + int32_t numOfTables = htonl(*(int32_t*)p); + p += sizeof(int32_t); for (int i = 0; i < numOfTables; i++) { int64_t uid = htobe64(*(int64_t*)p); p += sizeof(int64_t); @@ -2820,35 +2835,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { p += sizeof(TSKEY); tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); } - } else { - p += numOfTables * sizeof(STableIdInfo); } pRes->row = 0; tscDebug("0x%"PRIx64" numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:0x%"PRIx64, pSql->self, pRes->numOfRows, pRes->offset, - pRes->completed, pRes->qId); - - if (pRetrieve->extend == 1) { - STLV* tlv = (STLV*)(p); - while (tlv->type != TLV_TYPE_END_MARK) { - switch (ntohs(tlv->type)) { - case TLV_TYPE_META_VERSION: - pRes->sVersion = ntohl(*(int32_t*)tlv->value); - pRes->tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t))); - break; - } - tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len)); - } - } - - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pTableMeta->sversion < pSql->res.sVersion || pTableMeta->tversion < pSql->res.tVersion) { - return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION; - } + pRes->completed, pRes->qId); return 0; } + void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c4a6c564cf1ecd94cb6179698924ccb6a796b6c2..0dd4cb5c867f0326a17917ac4d1b50d3f9c20ce6 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -530,8 +530,6 @@ typedef struct { int8_t extend; int32_t code; union{uint64_t qhandle; uint64_t qId;}; // query handle - int32_t sVersion; - int32_t tVersion; } SQueryTableRsp; // todo: the show handle should be replaced with id diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index f93dbbef3294ca20b99408ad21524b54f6e90198..73d0b341d65dcaa31e62eb288b806bb1b39fbf8d 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -390,14 +390,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); size_t size = pQueryAttr->resultRowSize * s; - int32_t contLenSubscriptions = (int32_t)(size + sizeof(SRetrieveTableRsp)); size += sizeof(int32_t); size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp)); - int32_t contLenBeforeTLV = *contLen; - *contLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t)); //tlv meta version - *contLen += sizeof(STLV); // tlv end mark + // current solution only avoid crash, but cannot return error code to client *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); if (*pRsp == NULL) { @@ -405,6 +402,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->numOfRows = htonl((int32_t)s); + if (pQInfo->code == TSDB_CODE_SUCCESS) { (*pRsp)->offset = htobe64(pQInfo->runtimeEnv.currentOffset); (*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime); @@ -422,25 +420,16 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co setQueryStatus(pRuntimeEnv, QUERY_OVER); } - RESET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)); - pQInfo->lastRetrieveTs = taosGetTimestampMs(); - - int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; - int32_t origSize = pQueryAttr->resultRowSize * s; - int32_t compSize = compLen + numOfCols * sizeof(int32_t); if ((*pRsp)->compressed && compLen != 0) { + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + int32_t origSize = pQueryAttr->resultRowSize * s; + int32_t compSize = compLen + numOfCols * sizeof(int32_t); *contLen = *contLen - origSize + compSize; - contLenSubscriptions = contLenSubscriptions - origSize + compSize; - contLenBeforeTLV = contLenBeforeTLV - origSize + compSize; *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); qDebug("QInfo:0x%"PRIx64" compress col data, uncompressed size:%d, compressed size:%d, ratio:%.2f", - pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize); - } - if ((*pRsp)->compressed) { - (*pRsp)->compLen = htonl(compLen); - } else { - (*pRsp)->compLen = htonl(origSize); + pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize); } + (*pRsp)->compLen = htonl(compLen); pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; @@ -455,19 +444,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co qDebug("QInfo:0x%"PRIx64" has more results to retrieve", pQInfo->qId); } - (*pRsp)->extend = 1; - *(int32_t*)((char*)(*pRsp) + contLenSubscriptions) = htonl(taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap)); - STLV* tlv = (STLV*)((char*)(*pRsp) + contLenBeforeTLV); - tlv->type = htons(TLV_TYPE_META_VERSION); - tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t)); - int32_t sVersion = htonl(pQueryAttr->tableGroupInfo.sVersion); - int32_t tVersion = htonl(pQueryAttr->tableGroupInfo.tVersion); - memcpy(tlv->value, &sVersion, sizeof(int32_t)); - memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t)); - - STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len)); - tlvEnd->type = htons(TLV_TYPE_END_MARK); - tlvEnd->len = 0; // the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS if (pQInfo->code != TSDB_CODE_SUCCESS) { rpcFreeCont(*pRsp); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 0bb5da6443f6f709c6c5f8f6a2c8e3cc84956226..7fd74110cbb4aa101d179d16fc66769c133da246 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -243,14 +243,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { int32_t tagVersion = -1; code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &schemaVersion, &tagVersion); - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); + size_t rspLen = sizeof(SQueryTableRsp); + rspLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t)); + rspLen += sizeof(STLV); + SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(rspLen); pRsp->code = code; pRsp->qId = 0; - pRsp->sVersion = schemaVersion; - pRsp->tVersion = tagVersion; + pRsp->extend = 1; + STLV* tlv = (STLV*)((char*)(pRsp) + sizeof(SQueryTableRsp)); + tlv->type = htons(TLV_TYPE_META_VERSION); + tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t)); + int32_t sVersion = htonl(schemaVersion); + int32_t tVersion = htonl(tagVersion); + memcpy(tlv->value, &sVersion, sizeof(int32_t)); + memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t)); - pRet->len = sizeof(SQueryTableRsp); + STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len)); + tlvEnd->type = htons(TLV_TYPE_END_MARK); + tlvEnd->len = 0; + + pRet->len = rspLen; pRet->rsp = pRsp; int32_t vgId = pVnode->vgId;