提交 8412703d 编写于 作者: H Haojun Liao

[td-225]

上级 4fd1d1f1
......@@ -331,6 +331,7 @@ enum OPERATOR_TYPE_E {
OP_Distinct = 20,
OP_Join = 21,
OP_StateWindow = 22,
OP_Order = 23,
};
typedef struct SOperatorInfo {
......@@ -506,7 +507,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t start;
char* prevData; // previous data
bool reptScan;
} SStateWindowOperatorInfo ;
} SStateWindowOperatorInfo;
typedef struct SDistinctOperatorInfo {
SHashObj *pSet;
......@@ -539,6 +540,13 @@ typedef struct SMultiwayMergeInfo {
SArray *udfInfo;
} SMultiwayMergeInfo;
// todo support the disk-based sort
typedef struct SOrderOperatorInfo {
int32_t colIndex;
int32_t order;
SSDataBlock *pDataBlock;
} SOrderOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
......
......@@ -16,7 +16,6 @@
#include "qFill.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "talgo.h"
#include "exception.h"
#include "hash.h"
......@@ -242,8 +241,7 @@ static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex);
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId);
SArray* getOrderCheckColumns(SQueryAttr* pQuery);
......@@ -886,8 +884,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
break;
}
}
return;
}
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
......@@ -5218,6 +5214,35 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
return pOperator;
}
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
{
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData col = {0};
col.info.bytes = pExpr->base.resBytes;
col.info.colId = pExpr->base.resColId;
col.info.type = pExpr->base.resType;
taosArrayPush(pDataBlock->pDataBlock, &col);
}
pDataBlock->info.numOfCols = numOfOutput;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "Order";
pOperator->operatorType = OP_Order;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->exec = doSort;
pOperator->cleanup = NULL;
return pOperator;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order;
}
......@@ -5642,7 +5667,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
......@@ -5775,6 +5799,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -5986,6 +6011,106 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
}
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
int32_t numOfCols = pSrc->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i);
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
char* tmp = realloc(pCol2->pData, newSize);
if (tmp != NULL) {
pCol2->pData = tmp;
int32_t offset = pCol2->info.bytes * pDest->info.rows;
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
} else {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SOrderOperatorInfo* pInfo = pOperator->info;
// SSDataBlock* pRes = pInfo->pRes;
// pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
// pOperator->status = OP_EXEC_DONE;
break;
}
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
// return code;
}
/*SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
// ensure the output buffer size
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0);
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
char* tmp = realloc(pResultColInfoData->pData, newSize * bytes);
if (tmp == NULL) {
return NULL;
} else {
pResultColInfoData->pData = tmp;
pInfo->outputCapacity = newSize;
}
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
char* val = ((char*)pColInfoData->pData) + bytes * i;
if (isNull(val, type)) {
continue;
}
size_t keyLen = 0;
if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) {
tstr* var = (tstr*)(val);
keyLen = varDataLen(var);
} else {
keyLen = bytes;
}
int dummy;
void* res = taosHashGet(pInfo->pSet, val, keyLen);
if (res == NULL) {
taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy));
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
memcpy(start, val, bytes);
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->threshold) {
break;
}*/
}
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) {
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
......@@ -6607,11 +6732,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
......
......@@ -286,10 +286,6 @@ int32_t taosArrayCompareString(const void* a, const void* b) {
return compareLenPrefixedStr(x, y);
}
//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) {
// const SArray* arr = (const SArray*) pRight;
// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1;
//}
static int32_t compareFindItemInSet(const void *pLeft, const void* pRight) {
return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册