/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include #include #include #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" #include "os.h" #include "executorimpl.h" #include "executor.h" #include "stub.h" #include "taos.h" #include "tdatablock.h" #include "tdef.h" #include "trpc.h" #include "tvariant.h" #include "tcompare.h" namespace { typedef struct { int32_t startVal; int32_t count; int32_t pageRows; int16_t type; } _info; int16_t VARCOUNT = 16; float rand_f2() { unsigned r = taosRand(); r &= 0x007fffff; r |= 0x40800000; return *(float*)&r - 6.0; } static const int32_t TEST_NUMBER = 1; #define bigendian() ((*(char *)&TEST_NUMBER) == 0) SSDataBlock* getSingleColDummyBlock(void* param) { _info* pInfo = (_info*) param; if (--pInfo->count < 0) { return NULL; } SSDataBlock* pBlock = static_cast(taosMemoryCalloc(1, sizeof(SSDataBlock))); pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); SColumnInfoData colInfo = {0}; colInfo.info.type = pInfo->type; if (pInfo->type == TSDB_DATA_TYPE_NCHAR){ colInfo.info.bytes = TSDB_NCHAR_SIZE * VARCOUNT + VARSTR_HEADER_SIZE; colInfo.varmeta.offset = static_cast(taosMemoryCalloc(pInfo->pageRows, sizeof(int32_t))); } else if(pInfo->type == TSDB_DATA_TYPE_BINARY) { colInfo.info.bytes = VARCOUNT + VARSTR_HEADER_SIZE; colInfo.varmeta.offset = static_cast(taosMemoryCalloc(pInfo->pageRows, sizeof(int32_t))); } else{ colInfo.info.bytes = tDataTypes[pInfo->type].bytes; colInfo.pData = static_cast(taosMemoryCalloc(pInfo->pageRows, colInfo.info.bytes)); colInfo.nullbitmap = static_cast(taosMemoryCalloc(1, (pInfo->pageRows + 7) / 8)); } colInfo.info.colId = 1; taosArrayPush(pBlock->pDataBlock, &colInfo); for (int32_t i = 0; i < pInfo->pageRows; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); if (pInfo->type == TSDB_DATA_TYPE_NCHAR){ int32_t size = taosRand() % VARCOUNT; char str[128] = {0}; char strOri[128] = {0}; taosRandStr(strOri, size); int32_t len = 0; bool ret = taosMbsToUcs4(strOri, size, (TdUcs4*)varDataVal(str), size * TSDB_NCHAR_SIZE, &len); if (!ret){ printf("error\n"); return NULL; } varDataSetLen(str, len); colDataAppend(pColInfo, i, reinterpret_cast(str), false); pBlock->info.hasVarCol = true; printf("nchar: %s\n",strOri); } else if(pInfo->type == TSDB_DATA_TYPE_BINARY){ int32_t size = taosRand() % VARCOUNT; char str[64] = {0}; taosRandStr(varDataVal(str), size); varDataSetLen(str, size); colDataAppend(pColInfo, i, reinterpret_cast(str), false); pBlock->info.hasVarCol = true; printf("binary: %s\n", varDataVal(str)); } else if(pInfo->type == TSDB_DATA_TYPE_DOUBLE || pInfo->type == TSDB_DATA_TYPE_FLOAT) { double v = rand_f2(); colDataAppend(pColInfo, i, reinterpret_cast(&v), false); printf("float: %f\n", v); } else{ int64_t v = ++pInfo->startVal; char *result = static_cast(taosMemoryCalloc(tDataTypes[pInfo->type].bytes, 1)); if (!bigendian()){ memcpy(result, &v, tDataTypes[pInfo->type].bytes); }else{ memcpy(result, (char*)(&v) + sizeof(int64_t) - tDataTypes[pInfo->type].bytes, tDataTypes[pInfo->type].bytes); } colDataAppend(pColInfo, i, result, false); printf("int: %ld\n", v); taosMemoryFree(result); } } pBlock->info.rows = pInfo->pageRows; pBlock->info.numOfCols = 1; return pBlock; } int32_t docomp(const void* p1, const void* p2, void* param) { int32_t pLeftIdx = *(int32_t *)p1; int32_t pRightIdx = *(int32_t *)p2; SMsortComparParam *pParam = (SMsortComparParam *)param; SGenericSource** px = reinterpret_cast(pParam->pSources); SArray *pInfo = pParam->orderInfo; SGenericSource* pLeftSource = px[pLeftIdx]; SGenericSource* pRightSource = px[pRightIdx]; // this input is exhausted, set the special value to denote this if (pLeftSource->src.rowIndex == -1) { return 1; } if (pRightSource->src.rowIndex == -1) { return -1; } SSDataBlock* pLeftBlock = pLeftSource->src.pBlock; SSDataBlock* pRightBlock = pRightSource->src.pBlock; for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); bool leftNull = false; if (pLeftColInfoData->hasNull) { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); } SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); } if (leftNull && rightNull) { continue; // continue to next slot } if (rightNull) { return pParam->nullFirst? 1:-1; } if (leftNull) { return pParam->nullFirst? -1:1; } void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); int ret = fn(left1, right1); if (ret == 0) { continue; } else { return ret; } } return 0; } } // namespace #if 1 TEST(testCase, inMem_sort_Test) { SBlockOrderInfo oi = {0}; oi.order = TSDB_ORDER_ASC; oi.slotId = 0; SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc"); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); _info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info)); pInfo->startVal = 0; pInfo->pageRows = 100; pInfo->count = 6; pInfo->type = TSDB_DATA_TYPE_USMALLINT; SGenericSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SGenericSource))); ps->param = pInfo; tsortAddSource(phandle, ps); int32_t code = tsortOpen(phandle); int32_t row = 1; taosMemoryFreeClear(ps); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(phandle); if (pTupleHandle == NULL) { break; } void* v = tsortGetValue(pTupleHandle, 0); printf("%d: %d\n", row, *(uint16_t*) v); ASSERT_EQ(row++, *(uint16_t*) v); } taosArrayDestroy(orderInfo); tsortDestroySortHandle(phandle); taosMemoryFree(pInfo); } TEST(testCase, external_mem_sort_Test) { _info* pInfo = (_info*) taosMemoryCalloc(8, sizeof(_info)); pInfo[0].startVal = 0; pInfo[0].pageRows = 10; pInfo[0].count = 6; pInfo[0].type = TSDB_DATA_TYPE_BOOL; pInfo[1].startVal = 0; pInfo[1].pageRows = 10; pInfo[1].count = 6; pInfo[1].type = TSDB_DATA_TYPE_TINYINT; pInfo[2].startVal = 0; pInfo[2].pageRows = 100; pInfo[2].count = 6; pInfo[2].type = TSDB_DATA_TYPE_USMALLINT; pInfo[3].startVal = 0; pInfo[3].pageRows = 100; pInfo[3].count = 6; pInfo[3].type = TSDB_DATA_TYPE_INT; pInfo[4].startVal = 0; pInfo[4].pageRows = 100; pInfo[4].count = 6; pInfo[4].type = TSDB_DATA_TYPE_UBIGINT; pInfo[5].startVal = 0; pInfo[5].pageRows = 100; pInfo[5].count = 6; pInfo[5].type = TSDB_DATA_TYPE_DOUBLE; pInfo[6].startVal = 0; pInfo[6].pageRows = 50; pInfo[6].count = 6; pInfo[6].type = TSDB_DATA_TYPE_NCHAR; pInfo[7].startVal = 0; pInfo[7].pageRows = 100; pInfo[7].count = 6; pInfo[7].type = TSDB_DATA_TYPE_BINARY; for (int i = 0; i < 8; i++){ SBlockOrderInfo oi = {0}; oi.order = TSDB_ORDER_ASC; oi.slotId = 0; SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); SGenericSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SGenericSource))); ps->param = &pInfo[i]; tsortAddSource(phandle, ps); int32_t code = tsortOpen(phandle); int32_t row = 1; taosMemoryFreeClear(ps); printf("--------start with %s-----------\n", tDataTypes[pInfo[i].type].name); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(phandle); if (pTupleHandle == NULL) { break; } void* v = tsortGetValue(pTupleHandle, 0); if(pInfo[i].type == TSDB_DATA_TYPE_NCHAR){ char buf[128] = {0}; int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(v), varDataLen(v), buf); printf("%d: %s\n", row++, buf); }else if(pInfo[i].type == TSDB_DATA_TYPE_BINARY){ char buf[128] = {0}; memcpy(buf, varDataVal(v), varDataLen(v)); printf("%d: %s\n", row++, buf); }else if(pInfo[i].type == TSDB_DATA_TYPE_DOUBLE) { printf("double: %lf\n", *(double*)v); }else if (pInfo[i].type == TSDB_DATA_TYPE_FLOAT) { printf("float: %f\n", *(float*)v); }else{ int64_t result = 0; if (!bigendian()){ memcpy(&result, v, tDataTypes[pInfo[i].type].bytes); }else{ memcpy((char*)(&result) + sizeof(int64_t) - tDataTypes[pInfo[i].type].bytes, v, tDataTypes[pInfo[i].type].bytes); } printf("%d: %ld\n", row++, result); } } taosArrayDestroy(orderInfo); tsortDestroySortHandle(phandle); } taosMemoryFree(pInfo); } TEST(testCase, ordered_merge_sort_Test) { SBlockOrderInfo oi = {0}; oi.order = TSDB_ORDER_ASC; oi.slotId = 0; SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); taosArrayPush(orderInfo, &oi); SSDataBlock* pBlock = static_cast(taosMemoryCalloc(1, sizeof(SSDataBlock))); pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pBlock->info.numOfCols = 1; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info.type = TSDB_DATA_TYPE_INT; colInfo.info.bytes = sizeof(int32_t); colInfo.info.colId = 1; taosArrayPush(pBlock->pDataBlock, &colInfo); } SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); tsortSetComparFp(phandle, docomp); for(int32_t i = 0; i < 10; ++i) { SGenericSource* p = static_cast(taosMemoryCalloc(1, sizeof(SGenericSource))); _info* c = static_cast<_info*>(taosMemoryCalloc(1, sizeof(_info))); c->count = 1; c->pageRows = 1000; c->startVal = i*1000; p->param = c; tsortAddSource(phandle, p); } int32_t code = tsortOpen(phandle); int32_t row = 1; while(1) { STupleHandle* pTupleHandle = tsortNextTuple(phandle); if (pTupleHandle == NULL) { break; } void* v = tsortGetValue(pTupleHandle, 0); printf("%d: %d\n", row, *(int32_t*) v); ASSERT_EQ(row++, *(int32_t*) v); } tsortDestroySortHandle(phandle); taosMemoryFree(pBlock); } #endif #pragma GCC diagnostic pop