提交 17ead5ed 编写于 作者: H Haojun Liao

[td-11818] refactor.

上级 eb017b8d
...@@ -55,7 +55,7 @@ typedef struct SDiskbasedBufStatis { ...@@ -55,7 +55,7 @@ typedef struct SDiskbasedBufStatis {
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/** /**
* *
...@@ -108,13 +108,13 @@ size_t getTotalBufSize(const SDiskbasedBuf* pBuf); ...@@ -108,13 +108,13 @@ size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
* @param pBuf * @param pBuf
* @return * @return
*/ */
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf); size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf);
/** /**
* destroy result buffer * destroy result buffer
* @param pBuf * @param pBuf
*/ */
void destroyResultBuf(SDiskbasedBuf* pBuf); void destroyDiskbasedBuf(SDiskbasedBuf* pBuf);
/** /**
* *
......
...@@ -627,7 +627,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -627,7 +627,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, void* param, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); // SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); // SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
...@@ -2215,7 +2215,7 @@ static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) { ...@@ -2215,7 +2215,7 @@ static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) {
destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput); destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput);
// destroyUdfInfo(pRuntimeEnv->pUdfInfo); // destroyUdfInfo(pRuntimeEnv->pUdfInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyDiskbasedBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv); doFreeQueryHandle(pRuntimeEnv);
destroyTsComp(pRuntimeEnv, pQueryAttr); destroyTsComp(pRuntimeEnv, pQueryAttr);
...@@ -4629,7 +4629,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4629,7 +4629,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024*1024*10; int32_t TENMB = 1024*1024*10;
int32_t code = createDiskbasedBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, tsTempDir); int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, tsTempDir);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -5600,6 +5600,10 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { ...@@ -5600,6 +5600,10 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
} }
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param;
taosArrayDestroy(pInfo->orderInfo);
destroySortHandle(pInfo->pSortHandle);
blockDataDestroy(pInfo->pDataBlock);
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -5655,7 +5659,10 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB ...@@ -5655,7 +5659,10 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB
} }
SSDataBlock* loadNextDataBlock(void* param) { SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
bool newgroup = false;
return pOperator->exec(pOperator, &newgroup);
} }
static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
...@@ -5672,14 +5679,17 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { ...@@ -5672,14 +5679,17 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize, pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTIWAY_MERGE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
tfree(p); tfree(p);
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) { for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
sortAddSource(pInfo->pSortHandle, pOperator->pDownstream[i]); SOperatorSource* ps = calloc(1, sizeof(SOperatorSource));
ps->param = pOperator->pDownstream[i];
sortAddSource(pInfo->pSortHandle, ps);
} }
int32_t code = sortOpen(pInfo->pSortHandle); int32_t code = sortOpen(pInfo->pSortHandle);
...@@ -5714,7 +5724,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { ...@@ -5714,7 +5724,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
return pOrderInfo; return pOrderInfo;
} }
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, void* param, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo)); SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
......
...@@ -39,7 +39,7 @@ typedef struct SSortHandle { ...@@ -39,7 +39,7 @@ typedef struct SSortHandle {
bool nullFirst; bool nullFirst;
bool hasVarCol; bool hasVarCol;
SArray *pSources; SArray *pSources; // TODO refactor, remove it
SArray *pOrderedSource; SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp; _sort_fetch_block_fn_t fetchfp;
...@@ -119,6 +119,7 @@ void destroySortHandle(SSortHandle* pSortHandle) { ...@@ -119,6 +119,7 @@ void destroySortHandle(SSortHandle* pSortHandle) {
tMergeTreeDestroy(pSortHandle->pMergeTree); tMergeTreeDestroy(pSortHandle->pMergeTree);
} }
destroyDiskbasedBuf(pSortHandle->pBuf);
tfree(pSortHandle->idStr); tfree(pSortHandle->idStr);
tfree(pSortHandle); tfree(pSortHandle);
} }
...@@ -171,7 +172,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -171,7 +172,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0; int32_t start = 0;
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
setPrintStatis(pHandle->pBuf); setPrintStatis(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -235,7 +236,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -235,7 +236,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} else { } else {
// multi-pass internal merge sort is required // multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
setPrintStatis(pHandle->pBuf); setPrintStatis(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -614,7 +615,10 @@ int32_t sortOpen(SSortHandle* pHandle) { ...@@ -614,7 +615,10 @@ int32_t sortOpen(SSortHandle* pHandle) {
} }
int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
if (pHandle->pBuf != NULL) {
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
}
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle); code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -35,10 +35,17 @@ ...@@ -35,10 +35,17 @@
namespace { namespace {
enum {
data_rand = 0x1,
data_asc = 0x2,
data_desc = 0x3,
};
typedef struct SDummyInputInfo { typedef struct SDummyInputInfo {
int32_t max; int32_t max;
int32_t current; int32_t current;
int32_t startVal; int32_t startVal;
int32_t type;
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SDummyInputInfo; } SDummyInputInfo;
...@@ -83,10 +90,18 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { ...@@ -83,10 +90,18 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
char buf[128] = {0}; char buf[128] = {0};
char b1[128] = {0}; char b1[128] = {0};
int32_t v = 0;
for(int32_t i = 0; i < numOfRows; ++i) { for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
int32_t v = (--pInfo->startVal); if (pInfo->type == data_desc) {
v = (--pInfo->startVal);
} else if (pInfo->type == data_asc) {
v = ++pInfo->startVal;
} else if (pInfo->type == data_rand) {
v = random();
}
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false); colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
// sprintf(buf, "this is %d row", i); // sprintf(buf, "this is %d row", i);
...@@ -103,7 +118,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { ...@@ -103,7 +118,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { SOperatorInfo* createDummyOperator(int32_t numOfBlocks, int32_t type) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo))); SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo)));
pOperator->name = "dummyInputOpertor4Test"; pOperator->name = "dummyInputOpertor4Test";
pOperator->exec = getDummyBlock; pOperator->exec = getDummyBlock;
...@@ -111,6 +126,7 @@ SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { ...@@ -111,6 +126,7 @@ SOperatorInfo* createDummyOperator(int32_t numOfBlocks) {
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
pInfo->max = numOfBlocks; pInfo->max = numOfBlocks;
pInfo->startVal = 1500000; pInfo->startVal = 1500000;
pInfo->type = type;
pOperator->info = pInfo; pOperator->info = pInfo;
return pOperator; return pOperator;
...@@ -121,6 +137,7 @@ int main(int argc, char** argv) { ...@@ -121,6 +137,7 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
#if 0
TEST(testCase, build_executor_tree_Test) { TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n" const char* msg = "{\n"
"\t\"Id\":\t{\n" "\t\"Id\":\t{\n"
...@@ -216,34 +233,34 @@ TEST(testCase, build_executor_tree_Test) { ...@@ -216,34 +233,34 @@ TEST(testCase, build_executor_tree_Test) {
// int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); // int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
} }
//TEST(testCase, inMem_sort_Test) { TEST(testCase, inMem_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC}; SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1; o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT; o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o); taosArrayPush(pOrderVal, &o);
//
// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
// SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo))); SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
// taosArrayPush(pExprInfo, &exp); taosArrayPush(pExprInfo, &exp);
//
// SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo))); SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1); taosArrayPush(pExprInfo, &exp1);
//
// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal); SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal, NULL);
//
// bool newgroup = false; bool newgroup = false;
// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup); SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup);
//
// SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0)); SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1)); SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
// for(int32_t i = 0; i < pRes->info.rows; ++i) { for(int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i); char* p = colDataGet(pCol2, i);
// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
// } }
//} }
typedef struct su { typedef struct su {
int32_t v; int32_t v;
...@@ -339,6 +356,7 @@ TEST(testCase, external_sort_Test) { ...@@ -339,6 +356,7 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pExprInfo); taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal); taosArrayDestroy(pOrderVal);
} }
#endif
TEST(testCase, sorted_merge_Test) { TEST(testCase, sorted_merge_Test) {
srand(time(NULL)); srand(time(NULL));
...@@ -359,7 +377,13 @@ TEST(testCase, sorted_merge_Test) { ...@@ -359,7 +377,13 @@ TEST(testCase, sorted_merge_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1); // taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal, NULL); int32_t numOfSources = 10;
SOperatorInfo** plist = (SOperatorInfo**) calloc(numOfSources, sizeof(void*));
for(int32_t i = 0; i < numOfSources; ++i) {
plist[i] = createDummyOperator(1, data_asc);
}
SOperatorInfo* pOperator = createSortedMergeOperatorInfo(plist, numOfSources, pExprInfo, pOrderVal, NULL);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
......
...@@ -219,50 +219,50 @@ int32_t docomp(const void* p1, const void* p2, void* param) { ...@@ -219,50 +219,50 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
// destroySortHandle(phandle); // destroySortHandle(phandle);
//} //}
TEST(testCase, ordered_merge_sort_Test) { //TEST(testCase, ordered_merge_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); // SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC}; // SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1; // o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT; // o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o); // taosArrayPush(pOrderVal, &o);
//
int32_t numOfRows = 1000; // int32_t numOfRows = 1000;
SBlockOrderInfo oi = {0}; // SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC; // oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0; // oi.colIndex = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); // SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi); // taosArrayPush(orderInfo, &oi);
//
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; // SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc"); // SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc");
setFetchRawDataFp(phandle, getSingleColDummyBlock); // setFetchRawDataFp(phandle, getSingleColDummyBlock);
setComparFn(phandle, docomp); // setComparFn(phandle, docomp);
//
for(int32_t i = 0; i < 10; ++i) { // for(int32_t i = 0; i < 10; ++i) {
SOperatorSource* p = static_cast<SOperatorSource*>(calloc(1, sizeof(SOperatorSource))); // SOperatorSource* p = static_cast<SOperatorSource*>(calloc(1, sizeof(SOperatorSource)));
_info* c = static_cast<_info*>(calloc(1, sizeof(_info))); // _info* c = static_cast<_info*>(calloc(1, sizeof(_info)));
c->count = 1; // c->count = 1;
c->pageRows = 1000; // c->pageRows = 1000;
c->startVal = 0; // c->startVal = 0;
//
p->param = c; // p->param = c;
sortAddSource(phandle, p); // sortAddSource(phandle, p);
} // }
//
int32_t code = sortOpen(phandle); // int32_t code = sortOpen(phandle);
int32_t row = 1; // int32_t row = 1;
//
while(1) { // while(1) {
STupleHandle* pTupleHandle = sortNextTuple(phandle); // STupleHandle* pTupleHandle = sortNextTuple(phandle);
if (pTupleHandle == NULL) { // if (pTupleHandle == NULL) {
break; // break;
} // }
//
void* v = sortGetValue(pTupleHandle, 0); // void* v = sortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v); // printf("%d: %d\n", row++, *(int32_t*) v);
//
} // }
destroySortHandle(phandle); // destroySortHandle(phandle);
} //}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket); resetSlotInfo(pBucket);
int32_t ret = createDiskbasedBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
...@@ -269,7 +269,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -269,7 +269,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return; return;
} }
destroyResultBuf(pBucket->pBuffer); destroyDiskbasedBuf(pBucket->pBuffer);
tfree(pBucket->pSlots); tfree(pBucket->pSlots);
tfree(pBucket); tfree(pBucket);
} }
......
...@@ -53,7 +53,7 @@ typedef struct SDiskbasedBuf { ...@@ -53,7 +53,7 @@ typedef struct SDiskbasedBuf {
static void printStatisData(const SDiskbasedBuf* pBuf); static void printStatisData(const SDiskbasedBuf* pBuf);
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf)); *pBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pBuf; SDiskbasedBuf* pResBuf = *pBuf;
...@@ -473,7 +473,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { ...@@ -473,7 +473,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
pBuf->statis.releasePages += 1; pBuf->statis.releasePages += 1;
} }
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); } size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
...@@ -488,7 +488,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { ...@@ -488,7 +488,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
} }
} }
void destroyResultBuf(SDiskbasedBuf* pBuf) { void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
if (pBuf == NULL) { if (pBuf == NULL) {
return; return;
} }
......
...@@ -13,7 +13,7 @@ namespace { ...@@ -13,7 +13,7 @@ namespace {
// simple test // simple test
void simpleTest() { void simpleTest() {
SDiskbasedBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4096, 1, "/tmp/"); int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, 1, "/tmp/");
int32_t pageId = 0; int32_t pageId = 0;
int32_t groupId = 0; int32_t groupId = 0;
...@@ -25,7 +25,7 @@ void simpleTest() { ...@@ -25,7 +25,7 @@ void simpleTest() {
SIDList list = getDataBufPagesIdList(pResultBuf, groupId); SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1); ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1); ASSERT_EQ(getNumOfBufGroupId(pResultBuf), 1);
releaseBufPage(pResultBuf, pBufPage); releaseBufPage(pResultBuf, pBufPage);
...@@ -50,12 +50,12 @@ void simpleTest() { ...@@ -50,12 +50,12 @@ void simpleTest() {
SFilePage* t4 = getBufPage(pResultBuf, pageId); SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5); ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf); destroyDiskbasedBuf(pResultBuf);
} }
void writeDownTest() { void writeDownTest() {
SDiskbasedBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
int32_t pageId = 0; int32_t pageId = 0;
int32_t writePageId = 0; int32_t writePageId = 0;
...@@ -97,12 +97,12 @@ void writeDownTest() { ...@@ -97,12 +97,12 @@ void writeDownTest() {
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 5); ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyResultBuf(pResultBuf); destroyDiskbasedBuf(pResultBuf);
} }
void recyclePageTest() { void recyclePageTest() {
SDiskbasedBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
int32_t pageId = 0; int32_t pageId = 0;
int32_t writePageId = 0; int32_t writePageId = 0;
...@@ -150,7 +150,7 @@ void recyclePageTest() { ...@@ -150,7 +150,7 @@ void recyclePageTest() {
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6); ASSERT_EQ(taosArrayGetSize(pa), 6);
destroyResultBuf(pResultBuf); destroyDiskbasedBuf(pResultBuf);
} }
} // namespace } // namespace
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册