提交 1ae36227 编写于 作者: H Haojun Liao

[td-11818] refactor.

上级 04bbc0bf
......@@ -128,6 +128,8 @@ int32_t getPageId(const SPageInfo* pPgInfo);
*/
int32_t getBufPageSize(const SDiskbasedBuf* pBuf);
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf);
/**
*
* @param pBuf
......
......@@ -25,7 +25,7 @@
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedfile.h"
#include "tpagedbuf.h"
#include "tref.h"
struct tmq_list_t {
......
......@@ -100,9 +100,9 @@ typedef struct STableQueryInfo {
TSKEY lastKey;
int32_t groupIndex; // group id in table list
SVariant tag;
STimeWindow win; // todo remove it later
STSCursor cur;
void* pTable; // for retrieve the page id list
// STimeWindow win; // todo remove it later
// STSCursor cur;
// void* pTable; // for retrieve the page id list
SResultRowInfo resInfo;
} STableQueryInfo;
......@@ -454,8 +454,7 @@ typedef struct SAggOperatorInfo {
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool*
pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
STableQueryInfo* current;
} SAggOperatorInfo;
......@@ -586,7 +585,8 @@ typedef struct SOrderOperatorInfo {
SDiskbasedBuf *pSortInternalBuf;
SMultiwayMergeTreeInfo *pMergeTree;
SArray *pSources; // SArray<SExternalMemSource*>
int32_t capacity;
int32_t bufPageSize;
int32_t numOfRowsInRes;
SMsortComparParam cmpParam;
} SOrderOperatorInfo;
......@@ -596,6 +596,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
......@@ -609,8 +610,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......
......@@ -2780,7 +2780,7 @@ void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo
int8_t *p = calloc(numOfRows, sizeof(int8_t));
bool all = true;
#if 0
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
......@@ -2808,6 +2808,7 @@ void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo
} else {
all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p);
}
#endif
if (!all) {
doCompactSDataBlock(pBlock, numOfRows, p);
......@@ -2846,7 +2847,7 @@ void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock,
}
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
// pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
// all = filterExecute(pRuntimeEnv->pQueryAttr->pFilters, numOfRows, &p, pBlock->pBlockAgg, pRuntimeEnv->pQueryAttr->numOfCols);
}
......@@ -3286,11 +3287,11 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo)
return;
}
TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
// TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
// pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
SWITCH_ORDER(pTableQueryInfo->cur.order);
pTableQueryInfo->cur.vgroupIndex = -1;
// SWITCH_ORDER(pTableQueryInfo->cur.order);
// pTableQueryInfo->cur.vgroupIndex = -1;
// set the index to be the end slot of result rows array
SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
......@@ -3600,11 +3601,11 @@ static bool hasMainOutput(STaskAttr *pQueryAttr) {
STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) {
STableQueryInfo *pTableQueryInfo = buf;
pTableQueryInfo->win = win;
// pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
pTableQueryInfo->pTable = pTable;
pTableQueryInfo->cur.vgroupIndex = -1;
// pTableQueryInfo->pTable = pTable;
// pTableQueryInfo->cur.vgroupIndex = -1;
// set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) || groupbyColumn) {
......@@ -3622,12 +3623,9 @@ STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) {
STableQueryInfo* pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win;
// pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
pTableQueryInfo->pTable = NULL;
pTableQueryInfo->cur.vgroupIndex = -1;
// set more initial size of interval/groupby query
int32_t initialSize = 16;
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT);
......@@ -3773,7 +3771,7 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
assert(pRuntimeEnv->pTsBuf != NULL);
#if 0
// both the master and supplement scan needs to set the correct ts comp start position
if (pTableQueryInfo->cur.vgroupIndex == -1) {
taosVariantAssign(&pTableQueryInfo->tag, pTag);
......@@ -3807,7 +3805,7 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S
//qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
}
#endif
return 0;
}
......@@ -3897,7 +3895,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
return;
}
pTableQueryInfo->win.skey = key;
// pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
/**
......@@ -3920,7 +3918,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
// pResultRowInfo->prevSKey = w.skey;
// }
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
// pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
}
/**
......@@ -4645,7 +4643,8 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
}
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
if (order == TSDB_ORDER_ASC) {
#if 0
if (order == TSDB_ORDER_ASC) {
assert(
(pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
......@@ -4656,6 +4655,8 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
(pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
}
#endif
}
//STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
......@@ -5800,6 +5801,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
pInfo->numOfCompleted += 1;
pSource->rowIndex = -1;
pSource->pageIndex = -1;
pSource->pBlock = blockDataDestroy(pSource->pBlock);
} else {
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
......@@ -5872,8 +5874,16 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) {
pInfo->sourceId += 1;
pInfo->cmpParam.numOfSources += 1;
if (pInfo->cmpParam.numOfSources > getNumOfInMemBufPages(pInfo->pSortInternalBuf)) {
// TODO sort memory not enough, return with error code.
}
ASSERT(pInfo->cmpParam.numOfSources == taosArrayGetSize(pInfo->pSources));
return blockDataEnsureCapacity(pSource->pBlock, pInfo->capacity);
int32_t rowSize = blockDataGetRowSize(pSource->pBlock);
int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize;
return blockDataEnsureCapacity(pSource->pBlock, numOfRows);
}
void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) {
......@@ -5952,7 +5962,7 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI
longjmp(pTaskInfo->env, code);
}
if (pInfo->pDataBlock->info.rows >= pInfo->capacity) {
if (pInfo->pDataBlock->info.rows >= pInfo->numOfRowsInRes) {
return pInfo->pDataBlock;
}
}
......@@ -6014,7 +6024,9 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
addToDiskbasedBuf(pInfo, pTaskInfo->env);
}
blockDataEnsureCapacity(pInfo->pDataBlock, pInfo->capacity);
int32_t rowSize = blockDataGetRowSize(pInfo->pDataBlock);
int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize;
blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows);
int32_t code = sortComparInit(&pInfo->cmpParam, pInfo);
if (code != TSDB_CODE_SUCCESS) {
......@@ -6064,8 +6076,9 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
}
pInfo->sortBufSize = 1024 * 1024 * 50; // 1MB
pInfo->capacity = 64*1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity);
pInfo->bufPageSize = 64 * 1024;
pInfo->numOfRowsInRes = 4096;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal);
......@@ -6077,7 +6090,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
}
}
int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->capacity, pInfo->capacity*1000, 1, "/tmp/");
int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->bufPageSize*1000, 1, "/tmp/");
if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) {
tfree(pOperator);
destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo));
......@@ -6142,7 +6155,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
return (pInfo->pRes->info.rows != 0)? pInfo->pRes:NULL;
}
static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -6177,7 +6190,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
break;
}
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// if (downstream->operatorType == OP_DataBlocksOptScan) {
// STableScanInfo* pScanInfo = downstream->info;
......@@ -6237,7 +6250,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// todo dynamic set tags
if (pTableQueryInfo != NULL) {
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
// the pDataBlock are always the same one, no need to call this again
......@@ -6287,7 +6300,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// todo dynamic set tags
if (pTableQueryInfo != NULL) {
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
// the pDataBlock are always the same one, no need to call this again
......@@ -6421,7 +6434,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
break;
}
setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
......@@ -6481,7 +6494,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
break;
}
setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
......@@ -6547,7 +6560,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
......@@ -6602,7 +6615,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
......@@ -6850,7 +6863,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order);
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
}
......@@ -7018,31 +7031,41 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree(pOperator);
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1;//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
size_t numOfOutput = taosArrayGetSize(pExprInfo);
static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows) {
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
pInfo->pResultRowHashTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pInfo->pResultRowListSet = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->keyBuf = malloc(1024 + sizeof(int64_t) + POINTER_BYTES); // TODO:
pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo));
pInfo->keyBuf = malloc(1024 + sizeof(int64_t) + POINTER_BYTES); // TODO:
pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo));
pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
}
static SExprInfo* exprArrayDup(SArray* pExprInfo) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
assignExprInfo(&p[i], pExpr);
}
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1;
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
initAggInfo(pInfo, pExprInfo, numOfRows);
pInfo->seed = rand();
setDefaultOutputBuf_rv(pInfo, pInfo->seed, MAIN_SCAN, pTaskInfo);
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
assignExprInfo(&p[i], pExpr);
}
SExprInfo* p = exprArrayDup(pExprInfo);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate";
......@@ -7052,7 +7075,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->info = pInfo;
pOperator->pExpr = p;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = NULL;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doAggregate;
......@@ -7136,26 +7158,26 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
size_t tableGroup = 1; // GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
int32_t numOfRows = 1;
size_t numOfOutput = taosArrayGetSize(pExprInfo);
initAggInfo(pInfo, pExprInfo, numOfRows);
SExprInfo* p = exprArrayDup(pExprInfo);
// initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableAggregate";
// pOperator->operatorType = OP_MultiTableAggregate;
pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->pExpr = p;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableAggregate;
pOperator->exec = doMultiTableAggregate;
pOperator->cleanupFn = destroyAggOperatorInfo;
appendDownstream(pOperator, downstream);
......
......@@ -33,193 +33,311 @@
#include "stub.h"
#include "executor.h"
/**
{
"Id": {
"QueryId": 1.3108161807422521e+19,
"TemplateId": 0,
"SubplanId": 0
},
"Node": {
"Name": "TableScan",
"Targets": [{
"Base": {
"Schema": {
"Type": 9,
"ColId": 5000,
"Bytes": 8
},
"Columns": [{
"TableId": 1,
"Flag": 0,
"Info": {
"ColId": 1,
"Type": 9,
"Bytes": 8
}
}],
"InterBytes": 0
},
"Expr": {
"Type": 4,
"Column": {
"Type": 9,
"ColId": 1,
"Bytes": 8
}
}
}, {
"Base": {
"Schema": {
"Type": 4,
"ColId": 5001,
"Bytes": 4
},
"Columns": [{
"TableId": 1,
"Flag": 0,
"Info": {
"ColId": 2,
"Type": 4,
"Bytes": 4
}
}],
"InterBytes": 0
},
"Expr": {
"Type": 4,
"Column": {
"Type": 4,
"ColId": 2,
"Bytes": 4
}
}
}],
"InputSchema": [{
"Type": 9,
"ColId": 5000,
"Bytes": 8
}, {
"Type": 4,
"ColId": 5001,
"Bytes": 4
}],
"TableScan": {
"TableId": 1,
"TableType": 2,
"Flag": 0,
"Window": {
"StartKey": -9.2233720368547758e+18,
"EndKey": 9.2233720368547758e+18
}
}
},
"DataSink": {
"Name": "Dispatch",
"Dispatch": {
}
}
namespace {
typedef struct SDummyInputInfo {
int32_t max;
int32_t current;
int32_t startVal;
SSDataBlock* pBlock;
} SDummyInputInfo;
SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->max) {
return NULL;
}
int32_t numOfRows = 1000;
if (pInfo->pBlock == NULL) {
pInfo->pBlock = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
pInfo->pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = static_cast<char*>(calloc(numOfRows, sizeof(int32_t)));
colInfo.nullbitmap = static_cast<char*>(calloc(1, (numOfRows + 7) / 8));
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo);
// SColumnInfoData colInfo1 = {0};
// colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
// colInfo1.info.bytes = 40;
// colInfo1.info.colId = 2;
//
// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
// colInfo1.varmeta.length = 0;
// colInfo1.varmeta.offset = static_cast<int32_t*>(calloc(1, numOfRows * sizeof(int32_t)));
//
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else {
blockDataClearup(pInfo->pBlock, true);
}
SSDataBlock* pBlock = pInfo->pBlock;
char buf[128] = {0};
char b1[128] = {0};
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
int32_t v = rand();//(++pInfo->startVal);
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock->info.rows = numOfRows;
pBlock->info.numOfCols = 1;
pInfo->current += 1;
return pBlock;
}
*/
SOperatorInfo* createDummyOperator(int32_t numOfBlocks) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo)));
pOperator->name = "dummyInputOpertor4Test";
pOperator->exec = getDummyBlock;
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
pInfo->max = numOfBlocks;
pInfo->startVal = 5000000;
pOperator->info = pInfo;
return pOperator;
}
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n"
"\t\"Id\":\t{\n"
"\t\t\"QueryId\":\t1.3108161807422521e+19,\n"
"\t\t\"TemplateId\":\t0,\n"
"\t\t\"SubplanId\":\t0\n"
"\t},\n"
"\t\"Node\":\t{\n"
"\t\t\"Name\":\t\"TableScan\",\n"
"\t\t\"Targets\":\t[{\n"
"\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t}\n"
"\t\t\t\t}\n"
"\t\t\t}, {\n"
"\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t}\n"
"\t\t\t\t}\n"
"\t\t\t}],\n"
"\t\t\"InputSchema\":\t[{\n"
"\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t}, {\n"
"\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t}],\n"
"\t\t\"TableScan\":\t{\n"
"\t\t\t\"TableId\":\t1,\n"
"\t\t\t\"TableType\":\t2,\n"
"\t\t\t\"Flag\":\t0,\n"
"\t\t\t\"Window\":\t{\n"
"\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n"
"\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n"
"\t\t\t}\n"
"\t\t}\n"
"\t},\n"
"\t\"DataSink\":\t{\n"
"\t\t\"Name\":\t\"Dispatch\",\n"
"\t\t\"Dispatch\":\t{\n"
"\t\t}\n"
"\t}\n"
"}";
"\t\"Id\":\t{\n"
"\t\t\"QueryId\":\t1.3108161807422521e+19,\n"
"\t\t\"TemplateId\":\t0,\n"
"\t\t\"SubplanId\":\t0\n"
"\t},\n"
"\t\"Node\":\t{\n"
"\t\t\"Name\":\t\"TableScan\",\n"
"\t\t\"Targets\":\t[{\n"
"\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t}\n"
"\t\t\t\t}\n"
"\t\t\t}, {\n"
"\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t}\n"
"\t\t\t\t}\n"
"\t\t\t}],\n"
"\t\t\"InputSchema\":\t[{\n"
"\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t}, {\n"
"\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t}],\n"
"\t\t\"TableScan\":\t{\n"
"\t\t\t\"TableId\":\t1,\n"
"\t\t\t\"TableType\":\t2,\n"
"\t\t\t\"Flag\":\t0,\n"
"\t\t\t\"Window\":\t{\n"
"\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n"
"\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n"
"\t\t\t}\n"
"\t\t}\n"
"\t},\n"
"\t\"DataSink\":\t{\n"
"\t\t\"Name\":\t\"Dispatch\",\n"
"\t\t\"Dispatch\":\t{\n"
"\t\t}\n"
"\t}\n"
"}";
SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr;
int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
SReadHandle handle = {.reader = reinterpret_cast<void*>(0x1), .meta = reinterpret_cast<void*>(0x1)};
// int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
}
//TEST(testCase, inMem_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
// SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
// taosArrayPush(pExprInfo, &exp);
//
// SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
//
// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal);
//
// bool newgroup = false;
// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup);
//
// SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
// for(int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i);
// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
// }
//}
typedef struct su {
int32_t v;
char *c;
} su;
int32_t cmp(const void* p1, const void* p2) {
su* v1 = (su*) p1;
su* v2 = (su*) p2;
int32_t x1 = *(int32_t*) v1->c;
int32_t x2 = *(int32_t*) v2->c;
if (x1 == x2) {
return 0;
} else {
return x1 < x2? -1:1;
}
}
TEST(testCase, external_sort_Test) {
#if 0
su* v = static_cast<su*>(calloc(1000000, sizeof(su)));
for(int32_t i = 0; i < 1000000; ++i) {
v[i].v = rand();
v[i].c = static_cast<char*>(malloc(4));
*(int32_t*) v[i].c = i;
}
qsort(v, 1000000, sizeof(su), cmp);
// for(int32_t i = 0; i < 1000; ++i) {
// printf("%d ", v[i]);
// }
// printf("\n");
return;
#endif
srand(time(NULL));
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
o.order = TSDB_ORDER_ASC;
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
taosArrayPush(pExprInfo, &exp);
SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(50000), pExprInfo, pOrderVal);
bool newgroup = false;
SSDataBlock* pRes = NULL;
int32_t total = 1;
int64_t s1 = taosGetTimestampUs();
int32_t t = 1;
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->exec(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
printf("---------------elapsed:%ld\n", e - s);
}
if (pRes == NULL) {
break;
}
// SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
// for (int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i);
// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
// }
}
printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf);
int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1);
pOperator->cleanupFn(pOperator->info, 2);
tfree(exp);
tfree(exp1);
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#pragma GCC diagnostic pop
......@@ -557,6 +557,10 @@ int32_t getBufPageSize(const SDiskbasedBuf* pBuf) {
return pBuf->pageSize;
}
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) {
return pBuf->inMemPages;
}
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) {
return pBuf->fileSize == 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册