提交 870c016d 编写于 作者: H Hongze Cheng

Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine...

Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine into refact/tsdb_new_format
...@@ -1672,7 +1672,12 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1672,7 +1672,12 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
break; break;
} }
} }
if (!needConvert) return TSDB_CODE_SUCCESS;
if (!needConvert) {
return TSDB_CODE_SUCCESS;
}
tscDebug("start to convert form json format string");
char* p = (char*)pResultInfo->pData; char* p = (char*)pResultInfo->pData;
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows); int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
......
...@@ -2120,6 +2120,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ ...@@ -2120,6 +2120,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
int32_t* rows = (int32_t*)data; int32_t* rows = (int32_t*)data;
*rows = pBlock->info.rows; *rows = pBlock->info.rows;
data += sizeof(int32_t); data += sizeof(int32_t);
ASSERT(*rows > 0);
int32_t* cols = (int32_t*)data; int32_t* cols = (int32_t*)data;
*cols = numOfCols; *cols = numOfCols;
...@@ -2183,6 +2184,8 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ ...@@ -2183,6 +2184,8 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
*actualLen = *dataLen; *actualLen = *dataLen;
*groupId = pBlock->info.groupId; *groupId = pBlock->info.groupId;
ASSERT(*dataLen > 0);
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", *dataLen, *rows, *cols);
} }
const char* blockDecode(SSDataBlock* pBlock, const char* pData) { const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
......
...@@ -168,7 +168,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE ...@@ -168,7 +168,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
*pLen = pEntry->dataLen;
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
} }
......
...@@ -93,6 +93,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn ...@@ -93,6 +93,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data+8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data+8+4));
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;
...@@ -170,7 +172,13 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE ...@@ -170,7 +172,13 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
*pLen = pEntry->dataLen;
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data+8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data+8+4));
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
} }
...@@ -191,6 +199,9 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { ...@@ -191,6 +199,9 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->numOfCols = pEntry->numOfCols; pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed; pOutput->compressed = pEntry->compressed;
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data+8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data+8+4));
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册