提交 c8d49b60 编写于 作者: wmmhello's avatar wmmhello

fix:roll back htol length for block raw data

上级 8a21d7f3
...@@ -115,7 +115,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS ...@@ -115,7 +115,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields); int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
......
...@@ -1771,10 +1771,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1771,10 +1771,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* pStart = p; char* pStart = p;
char* pStart1 = p1; char* pStart1 = p1;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
// int32_t colLen = htonl(colLength[i]); int32_t colLen = htonl(colLength[i]);
// int32_t colLen1 = htonl(colLength1[i]); int32_t colLen1 = htonl(colLength1[i]);
int32_t colLen = colLength[i];
int32_t colLen1 = colLength1[i];
if(ASSERT(colLen < dataLen)){ if(ASSERT(colLen < dataLen)){
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
...@@ -1833,8 +1831,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1833,8 +1831,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
} }
colLen1 = len; colLen1 = len;
totalLen += colLen1; totalLen += colLen1;
// colLength1[i] = htonl(len); colLength1[i] = htonl(len);
colLength1[i] = len;
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
len = numOfRows * sizeof(int32_t); len = numOfRows * sizeof(int32_t);
memcpy(pStart1, pStart, len); memcpy(pStart1, pStart, len);
...@@ -1922,7 +1919,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -1922,7 +1919,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
char* pStart = p; char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
// colLength[i] = htonl(colLength[i]); colLength[i] = htonl(colLength[i]);
if (colLength[i] >= dataLen) { if (colLength[i] >= dataLen) {
tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen); tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
......
...@@ -1307,7 +1307,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch ...@@ -1307,7 +1307,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
goto end; goto end;
...@@ -1387,7 +1387,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1387,7 +1387,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
goto end; goto end;
...@@ -1515,7 +1515,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1515,7 +1515,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
fields[i].bytes = pSW->pSchema[i].bytes; fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
} }
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols); code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true);
taosMemoryFree(fields); taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
...@@ -1686,7 +1686,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1686,7 +1686,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
fields[i].bytes = pSW->pSchema[i].bytes; fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
} }
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols); code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true);
taosMemoryFree(fields); taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
......
...@@ -2482,7 +2482,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { ...@@ -2482,7 +2482,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
} }
data += colSizes[col]; data += colSizes[col];
// colSizes[col] = htonl(colSizes[col]); colSizes[col] = htonl(colSizes[col]);
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
} }
...@@ -2547,7 +2547,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { ...@@ -2547,7 +2547,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(int32_t) * numOfCols; pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
// colLen[i] = htonl(colLen[i]); colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0); ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
......
...@@ -707,7 +707,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -707,7 +707,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
/*A(pHandle->fetchMeta);*/ /*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/ /*A(IS_META_MSG(pHead->msgType));*/
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
......
...@@ -454,12 +454,13 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { ...@@ -454,12 +454,13 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) { while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true; if (filterOutUids == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
if (ret == NULL) { if (ret == NULL) {
return true; return true;
} }
pReader->nextBlk++;
} }
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
......
...@@ -608,7 +608,7 @@ static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* f ...@@ -608,7 +608,7 @@ static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* f
} }
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
int numFields) { int numFields, bool needChangeLength) {
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true);
...@@ -682,8 +682,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate ...@@ -682,8 +682,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
pStart += colLength[c]; if(needChangeLength) {
// pStart += htonl(colLength[c]); pStart += htonl(colLength[c]);
}else{
pStart += colLength[c];
}
} }
end: end:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册