提交 5479692b 编写于 作者: S shenglian zhou

(query,insert,other):tlv start point

上级 181c71ee
......@@ -2781,17 +2781,14 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
}
int32_t subDataLen = 0;
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfTables = htonl(*(int32_t*)p);
p += sizeof(int32_t);
int32_t numOfTables = htonl(*(int32_t*)p);
p += sizeof(int32_t);
if (pSql->pSubscription != NULL) {
for (int i = 0; i < numOfTables; i++) {
int64_t uid = htobe64(*(int64_t*)p);
p += sizeof(int64_t);
......@@ -2800,22 +2797,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
}
subDataLen = sizeof(int32_t) + numOfTables * sizeof(STableIdInfo);
} else {
p += numOfTables * sizeof(STableIdInfo);
}
pRes->row = 0;
tscDebug("0x%"PRIx64" numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:0x%"PRIx64, pSql->self, pRes->numOfRows, pRes->offset,
pRes->completed, pRes->qId);
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
int32_t rowBytes = offset + pField->bytes;
int32_t origSize = rowBytes * pRes->numOfRows + subDataLen;
int32_t compSize = htonl(pRetrieve->compLen) + numOfCols * sizeof(int32_t) + subDataLen;
int32_t dataLen = (pRetrieve->compressed) ? compSize : origSize;
if (pRetrieve->extend == 1) {
STLV* tlv = (STLV*)(pRetrieve->data + dataLen);
STLV* tlv = (STLV*)(p);
while (tlv->type != TLV_TYPE_END_MARK) {
switch (ntohs(tlv->type)) {
case TLV_TYPE_META_VERSION:
......
......@@ -391,6 +391,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
int32_t contLenBeforeTLV = *contLen;
*contLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t)); //tlv meta version
*contLen += sizeof(STLV); // tlv end mark
// current solution only avoid crash, but cannot return error code to client
......@@ -424,10 +425,9 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
int32_t origSize = pQueryAttr->resultRowSize * s;
int32_t compSize = compLen + numOfCols * sizeof(int32_t);
int32_t dataLen = origSize;
if ((*pRsp)->compressed && compLen != 0) {
dataLen = compSize;
*contLen = *contLen - origSize + compSize;
contLenBeforeTLV = contLenBeforeTLV - origSize + compSize;
*pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen);
qDebug("QInfo:0x%"PRIx64" compress col data, uncompressed size:%d, compressed size:%d, ratio:%.2f",
pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize);
......@@ -448,7 +448,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
(*pRsp)->extend = 1;
STLV* tlv = (STLV*)((*pRsp)->data + dataLen);
STLV* tlv = (STLV*)((char*)(*pRsp) + contLenBeforeTLV);
tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t));
int32_t sVersion = htonl(pQueryAttr->tableGroupInfo.sVersion);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册