diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 9b71e8c454948b9be257986e01ee146a09a10e55..66b81efc5b32b961de01fce1dbe5a5a6cee808ef 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -227,6 +227,9 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); +void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); +const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); + void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, @@ -246,54 +249,6 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, - int8_t needCompress) { - int32_t* actualLen = (int32_t*)data; - data += sizeof(int32_t); - - uint64_t* groupId = (uint64_t*)data; - data += sizeof(uint64_t); - - int32_t* colSizes = (int32_t*)data; - data += numOfCols * sizeof(int32_t); - - *dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t)); - - int32_t numOfRows = pBlock->info.rows; - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col); - - // copy the null bitmap - if (IS_VAR_DATA_TYPE(pColRes->info.type)) { - size_t metaSize = numOfRows * sizeof(int32_t); - memcpy(data, pColRes->varmeta.offset, metaSize); - data += metaSize; - (*dataLen) += metaSize; - } else { - int32_t len = BitmapLen(numOfRows); - memcpy(data, pColRes->nullbitmap, len); - data += len; - (*dataLen) += len; - } - - if (needCompress) { - colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress); - data += colSizes[col]; - (*dataLen) += colSizes[col]; - } else { - colSizes[col] = colDataGetLength(pColRes, numOfRows); - (*dataLen) += colSizes[col]; - memmove(data, pColRes->pData, colSizes[col]); - data += colSizes[col]; - } - - colSizes[col] = htonl(colSizes[col]); - } - - *actualLen = *dataLen; - *groupId = pBlock->info.groupId; -} - #ifdef __cplusplus } #endif diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b5b6ea65e0bab73e3d7801fc81fbcd29c013cc25..914e5aefc2e16595e3c8831f4255bdb26c4738a9 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -567,6 +567,7 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -625,23 +626,23 @@ TEST(testCase, projection_query_tables) { printf("start to insert next table\n"); - for(int32_t i = 0; i < 1000000; i += 20) { - char sql[1024] = {0}; - sprintf(sql, - "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", - i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7, - i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14, - i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); - TAOS_RES* p = taos_query(pConn, sql); - if (taos_errno(p) != 0) { - printf("failed to insert data, reason:%s\n", taos_errstr(p)); - } - - taos_free_result(p); - } +// for(int32_t i = 0; i < 1000000; i += 20) { +// char sql[1024] = {0}; +// sprintf(sql, +// "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" +// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" +// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" +// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", +// i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7, +// i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14, +// i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); +// TAOS_RES* p = taos_query(pConn, sql); +// if (taos_errno(p) != 0) { +// printf("failed to insert data, reason:%s\n", taos_errstr(p)); +// } +// +// taos_free_result(p); +// } // pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { @@ -663,7 +664,7 @@ TEST(testCase, projection_query_tables) { // taos_free_result(pRes); taos_close(pConn); } - +#if 0 TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) { taos_close(pConn); } -#endif - TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -734,5 +733,6 @@ TEST(testCase, agg_query_tables) { taos_free_result(pRes); taos_close(pConn); } +#endif #pragma GCC diagnostic pop diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 1bd35bdefe58db66678420ec36edbc9ebf18f293..199fc35c72b08ed5322a15884629a013cc1103f8 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1764,3 +1764,98 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ret->length = htonl(ret->length); return ret; } + +void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { + int32_t* actualLen = (int32_t*)data; + data += sizeof(int32_t); + + uint64_t* groupId = (uint64_t*)data; + data += sizeof(uint64_t); + + int32_t* colSizes = (int32_t*)data; + data += numOfCols * sizeof(int32_t); + + *dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t)); + + int32_t numOfRows = pBlock->info.rows; + for (int32_t col = 0; col < numOfCols; ++col) { + SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col); + + // copy the null bitmap + if (IS_VAR_DATA_TYPE(pColRes->info.type)) { + size_t metaSize = numOfRows * sizeof(int32_t); + memcpy(data, pColRes->varmeta.offset, metaSize); + data += metaSize; + (*dataLen) += metaSize; + } else { + int32_t len = BitmapLen(numOfRows); + memcpy(data, pColRes->nullbitmap, len); + data += len; + (*dataLen) += len; + } + + if (needCompress) { + colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress); + data += colSizes[col]; + (*dataLen) += colSizes[col]; + } else { + colSizes[col] = colDataGetLength(pColRes, numOfRows); + (*dataLen) += colSizes[col]; + memmove(data, pColRes->pData, colSizes[col]); + data += colSizes[col]; + } + + colSizes[col] = htonl(colSizes[col]); + } + + *actualLen = *dataLen; + *groupId = pBlock->info.groupId; +} + +const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) { + blockDataEnsureCapacity(pBlock, numOfRows); + const char* pStart = pData; + + int32_t dataLen = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + pBlock->info.groupId = *(uint64_t*)pStart; + pStart += sizeof(uint64_t); + + int32_t* colLen = (int32_t*)pStart; + pStart += sizeof(int32_t) * numOfCols; + + for (int32_t i = 0; i < numOfCols; ++i) { + colLen[i] = htonl(colLen[i]); + ASSERT(colLen[i] >= 0); + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + pColInfoData->varmeta.length = colLen[i]; + pColInfoData->varmeta.allocLen = colLen[i]; + + memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows); + pStart += sizeof(int32_t) * numOfRows; + + if (colLen[i] > 0) { + pColInfoData->pData = taosMemoryMalloc(colLen[i]); + } + } else { + memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows)); + pStart += BitmapLen(numOfRows); + } + + if (colLen[i] > 0) { + memcpy(pColInfoData->pData, pStart, colLen[i]); + } + + // TODO + // setting this flag to true temporarily so aggregate function on stable will + // examine NULL value for non-primary key column + pColInfoData->hasNull = true; + pStart += colLen[i]; + } + + ASSERT(pStart - pData == dataLen); + return pStart; +} \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 23493ae6474f05571200c3033863843435f9f25a..9128167065b1d62e282e280c1358c86a5ac202b3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -356,8 +356,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && pResult->offset != pResultRowInfo->cur.offset))) { SResultRowPosition pos = pResultRowInfo->cur; - SFilePage* - pPage = getBufPage(pResultBuf, pos.pageId); + SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); releaseBufPage(pResultBuf, pPage); } @@ -2523,46 +2522,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { if (pColList == NULL) { // data from other sources - blockDataEnsureCapacity(pRes, numOfRows); - - int32_t dataLen = *(int32_t*)pData; - pData += sizeof(int32_t); - - pRes->info.groupId = *(uint64_t*)pData; - pData += sizeof(uint64_t); - - int32_t* colLen = (int32_t*)pData; - - char* pStart = pData + sizeof(int32_t) * numOfOutput; - for (int32_t i = 0; i < numOfOutput; ++i) { - colLen[i] = htonl(colLen[i]); - ASSERT(colLen[i] >= 0); - - SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - pColInfoData->varmeta.length = colLen[i]; - pColInfoData->varmeta.allocLen = colLen[i]; - - memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows); - pStart += sizeof(int32_t) * numOfRows; - - if (colLen[i] > 0) { - pColInfoData->pData = taosMemoryMalloc(colLen[i]); - } - } else { - memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows)); - pStart += BitmapLen(numOfRows); - } - - if (colLen[i] > 0) { - memcpy(pColInfoData->pData, pStart, colLen[i]); - } - - // TODO setting this flag to true temporarily so aggregate function on stable will - // examine NULL value for non-primary key column - pColInfoData->hasNull = true; - pStart += colLen[i]; - } + blockCompressDecode(pRes, numOfOutput, numOfRows, pData); } else { // extract data according to pColList ASSERT(numOfOutput == taosArrayGetSize(pColList)); char* pStart = pData;