未验证 提交 25e9d94f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19244 from taosdata/feature/stream_main

enh: put func state into tdb
...@@ -138,7 +138,7 @@ typedef struct SqlFunctionCtx { ...@@ -138,7 +138,7 @@ typedef struct SqlFunctionCtx {
char *pOutput; // final result output buffer, point to sdata->data char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param // input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param; SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum // corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput; SColumnInfoData *pTsOutput;
int32_t offset; int32_t offset;
...@@ -152,6 +152,7 @@ typedef struct SqlFunctionCtx { ...@@ -152,6 +152,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle; SSerializeDataHandle saveHandle;
int32_t exprIdx;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
...@@ -182,9 +183,9 @@ struct SScalarParam { ...@@ -182,9 +183,9 @@ struct SScalarParam {
int32_t numOfQualified; // number of qualified elements in the final results int32_t numOfQualified; // number of qualified elements in the final results
}; };
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry);
typedef struct SPoint { typedef struct SPoint {
int64_t key; int64_t key;
......
...@@ -693,7 +693,7 @@ void cleanupExprSupp(SExprSupp* pSup); ...@@ -693,7 +693,7 @@ void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey, void* pState);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
......
...@@ -605,9 +605,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB ...@@ -605,9 +605,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
} }
} }
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; }
return (0 != pTaskInfo->code) ? true : false;
}
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
...@@ -1350,7 +1348,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -1350,7 +1348,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
} }
} }
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) { static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
if (!tsCountAlwaysReturnValue) { if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1363,12 +1361,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc ...@@ -1363,12 +1361,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
((STableScanInfo *)downstream->info)->hasGroupByTag == true)) { ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
bool hasCountFunc = false; bool hasCountFunc = false;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
...@@ -1417,7 +1415,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc ...@@ -1417,7 +1415,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) { static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
if (!blockAllocated) { if (!blockAllocated) {
return; return;
} }
...@@ -1443,8 +1441,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -1443,8 +1441,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
bool hasValidBlock = false; bool hasValidBlock = false;
bool blockAllocated = false; bool blockAllocated = false;
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
...@@ -1487,7 +1485,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -1487,7 +1485,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
destroyDataBlockForEmptyInput(blockAllocated, &pBlock); destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
} }
// the downstream operator may return with error code, so let's check the code before generating results. // the downstream operator may return with error code, so let's check the code before generating results.
...@@ -1642,7 +1639,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { ...@@ -1642,7 +1639,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
} }
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey) { const char* pkey, void* pState) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -1654,7 +1651,13 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo ...@@ -1654,7 +1651,13 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; if (pState) {
pSup->pCtx[i].saveHandle.pBuf = NULL;
pSup->pCtx[i].saveHandle.pState = pState;
pSup->pCtx[i].exprIdx = i;
} else {
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1739,7 +1742,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -1739,7 +1742,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1766,7 +1770,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -1766,7 +1770,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info; STableScanInfo* pTableScanInfo = downstream->info;
......
...@@ -458,7 +458,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -458,7 +458,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -102,7 +102,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -102,7 +102,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -317,7 +318,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -317,7 +318,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if (pProjectInfo->mergeDataBlocks) { if (pProjectInfo->mergeDataBlocks) {
if (pRes->info.rows > 0) { if (pRes->info.rows > 0) {
pFinalRes->info.id.groupId = 0; //clear groupId pFinalRes->info.id.groupId = 0; // clear groupId
pFinalRes->info.version = pRes->info.version; pFinalRes->info.version = pRes->info.version;
// continue merge data, ignore the group id // continue merge data, ignore the group id
...@@ -402,7 +403,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -402,7 +403,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
blockDataEnsureCapacity(pResBlock, numOfRows); blockDataEnsureCapacity(pResBlock, numOfRows);
int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -32,7 +32,7 @@ typedef struct SSumRes { ...@@ -32,7 +32,7 @@ typedef struct SSumRes {
int16_t type; int16_t type;
int64_t prevTs; int64_t prevTs;
bool isPrevTsSet; bool isPrevTsSet;
bool overflow; // if overflow is true, dsum to be used for any type; bool overflow; // if overflow is true, dsum to be used for any type;
} SSumRes; } SSumRes;
typedef struct SMinmaxResInfo { typedef struct SMinmaxResInfo {
...@@ -46,7 +46,7 @@ typedef struct SMinmaxResInfo { ...@@ -46,7 +46,7 @@ typedef struct SMinmaxResInfo {
} SMinmaxResInfo; } SMinmaxResInfo;
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc); int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc);
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey); STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
......
...@@ -41,15 +41,15 @@ ...@@ -41,15 +41,15 @@
#define HLL_BUCKET_MASK (HLL_BUCKETS - 1) #define HLL_BUCKET_MASK (HLL_BUCKETS - 1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2) #define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
//typedef struct SMinmaxResInfo { // typedef struct SMinmaxResInfo {
// bool assign; // assign the first value or not // bool assign; // assign the first value or not
// int64_t v; // int64_t v;
// STuplePos tuplePos; // STuplePos tuplePos;
// //
// STuplePos nullTuplePos; // STuplePos nullTuplePos;
// bool nullTupleSaved; // bool nullTupleSaved;
// int16_t type; // int16_t type;
//} SMinmaxResInfo; // } SMinmaxResInfo;
typedef struct STopBotResItem { typedef struct STopBotResItem {
SVariant v; SVariant v;
...@@ -818,28 +818,31 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple ...@@ -818,28 +818,31 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
return; return;
} }
if (pCtx->saveHandle.pBuf != NULL) { if ((pCtx->saveHandle.pBuf != NULL && pTuplePos->pageId != -1) ||
if (pTuplePos->pageId != -1) { (pCtx->saveHandle.pState && pTuplePos->streamTupleKey.ts > 0)) {
int32_t numOfCols = pCtx->subsidiaries.num; int32_t numOfCols = pCtx->subsidiaries.num;
const char* p = loadTupleData(pCtx, pTuplePos); const char* p = loadTupleData(pCtx, pTuplePos);
bool* nullList = (bool*)p; bool* nullList = (bool*)p;
char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
// todo set the offset value to optimize the performance. // todo set the offset value to optimize the performance.
for (int32_t j = 0; j < numOfCols; ++j) { for (int32_t j = 0; j < numOfCols; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
if (nullList[j]) { if (nullList[j]) {
colDataAppendNULL(pDstCol, rowIndex); colDataAppendNULL(pDstCol, rowIndex);
} else { } else {
colDataAppend(pDstCol, rowIndex, pStart, false); colDataAppend(pDstCol, rowIndex, pStart, false);
}
pStart += pDstCol->info.bytes;
} }
pStart += pDstCol->info.bytes;
}
if (pCtx->saveHandle.pState) {
tdbFree((void*)p);
} }
} }
} }
...@@ -2018,7 +2021,7 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde ...@@ -2018,7 +2021,7 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde
} }
if (!pInfo->hasResult) { if (!pInfo->hasResult) {
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL); pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
} else { } else {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} }
...@@ -2071,8 +2074,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2071,8 +2074,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first
// we will use this opt implementation in an new version that is only available in scan subplan // function. we will use this opt implementation in an new version that is only available in scan subplan
#if 0 #if 0
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly // filter according to current result firstly
...@@ -2179,7 +2182,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2179,7 +2182,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first
// function.
#if 0 #if 0
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
...@@ -2236,9 +2240,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2236,9 +2240,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t round = pInput->numOfRows >> 2; int32_t round = pInput->numOfRows >> 2;
int32_t reminder = pInput->numOfRows & 0x03; int32_t reminder = pInput->numOfRows & 0x03;
for (int32_t i = pInput->startRowIndex, tick = 0; tick < round; i += 4, tick += 1) { for (int32_t i = pInput->startRowIndex, tick = 0; tick < round; i += 4, tick += 1) {
int64_t cts = pts[i]; int64_t cts = pts[i];
int32_t chosen = i; int32_t chosen = i;
if (cts < pts[i + 1]) { if (cts < pts[i + 1]) {
cts = pts[i + 1]; cts = pts[i + 1];
...@@ -2262,18 +2266,18 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2262,18 +2266,18 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
} }
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data); doSaveCurrentVal(pCtx, i, pts[i], type, data);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
} }
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
} }
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++; numOfElems++;
...@@ -2439,7 +2443,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2439,7 +2443,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type; int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes; pInfo->bytes = bytes;
...@@ -2777,7 +2781,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -2777,7 +2781,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2805,7 +2809,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { ...@@ -2805,7 +2809,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
...@@ -2863,7 +2867,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -2863,7 +2867,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple // save the data of this tuple
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL); pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
...@@ -2937,8 +2941,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid ...@@ -2937,8 +2941,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf; return buf;
} }
static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key) {
const STupleKey* pKey) {
STuplePos p = {0}; STuplePos p = {0};
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = NULL; SFilePage* pPage = NULL;
...@@ -2964,20 +2967,31 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf ...@@ -2964,20 +2967,31 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} else { } else {
// other tuple save policy // other tuple save policy
if (streamStateFuncPut(pHandle->pState, pKey, pBuf, length) < 0) { if (streamStateFuncPut(pHandle->pState, &key, pBuf, length) < 0) {
ASSERT(0); ASSERT(0);
} }
p.streamTupleKey = *pKey; p.streamTupleKey = key;
} }
return p; return p;
} }
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey) { STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) {
prepareBuf(pCtx); prepareBuf(pCtx);
STupleKey key;
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
int64_t skey = *(int64_t*)colDataGetData(pColInfo, rowIndex);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey;
key.exprIdx = pCtx->exprIdx;
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pKey); return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key);
} }
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
...@@ -3623,7 +3637,7 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn ...@@ -3623,7 +3637,7 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn
pInfo->totalCount = 0; pInfo->totalCount = 0;
pInfo->normalized = 0; pInfo->normalized = 0;
char *binTypeStr = strndup(varDataVal(pCtx->param[1].param.pz), varDataLen(pCtx->param[1].param.pz)); char* binTypeStr = strndup(varDataVal(pCtx->param[1].param.pz), varDataLen(pCtx->param[1].param.pz));
int8_t binType = getHistogramBinType(binTypeStr); int8_t binType = getHistogramBinType(binTypeStr);
taosMemoryFree(binTypeStr); taosMemoryFree(binTypeStr);
...@@ -3947,7 +3961,6 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) { ...@@ -3947,7 +3961,6 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) { for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data); SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data);
...@@ -4423,7 +4436,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da ...@@ -4423,7 +4436,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
...@@ -4454,7 +4467,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4454,7 +4467,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pInfo->nullTupleSaved = true; pInfo->nullTupleSaved = true;
} }
...@@ -4758,7 +4771,7 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, ...@@ -4758,7 +4771,7 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx,
pItem->count += 1; pItem->count += 1;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, NULL); pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock);
} }
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
...@@ -4798,7 +4811,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { ...@@ -4798,7 +4811,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pInfo->nullTupleSaved = true; pInfo->nullTupleSaved = true;
} }
...@@ -5299,7 +5312,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5299,7 +5312,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t len = sprintf(st + VARSTR_HEADER_SIZE, int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
"Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize/1024.0, compRatio, '%'); pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
varDataSetLen(st, len); varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false); colDataAppend(pColInfo, row++, st, false);
......
...@@ -64,7 +64,7 @@ ...@@ -64,7 +64,7 @@
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) { static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256; const int32_t bitWidth = 256;
*width = (bitWidth>>3u) / bytes; *width = (bitWidth >> 3u) / bytes;
*remainder = numOfRows % (*width); *remainder = numOfRows % (*width);
*rounds = numOfRows / (*width); *rounds = numOfRows / (*width);
} }
...@@ -92,8 +92,7 @@ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder ...@@ -92,8 +92,7 @@ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder
(_v) = (_sec)[j]; \ (_v) = (_sec)[j]; \
} \ } \
} }
static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal) { static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal) {
int8_t v = 0; int8_t v = 0;
const int8_t* p = pData; const int8_t* p = pData;
...@@ -116,7 +115,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu ...@@ -116,7 +115,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu
const int8_t* q = (const int8_t*)&initVal; const int8_t* q = (const int8_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v) EXTRACT_MAX_VAL(q, p, width, remain, v)
} else { // unsigned value } else { // unsigned value
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p); next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epu8(initVal, next); initVal = _mm256_max_epu8(initVal, next);
...@@ -126,7 +125,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu ...@@ -126,7 +125,7 @@ static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFu
const uint8_t* q = (const uint8_t*)&initVal; const uint8_t* q = (const uint8_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v) EXTRACT_MAX_VAL(q, p, width, remain, v)
} }
} else { // min function } else { // min function
if (signVal) { if (signVal) {
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
...@@ -241,7 +240,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is ...@@ -241,7 +240,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is
// let compare the final results // let compare the final results
const int32_t* q = (const int32_t*)&initVal; const int32_t* q = (const int32_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v) EXTRACT_MAX_VAL(q, p, width, remain, v)
} else { // unsigned value } else { // unsigned value
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p); next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epi32(initVal, next); initVal = _mm256_max_epi32(initVal, next);
...@@ -281,7 +280,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is ...@@ -281,7 +280,7 @@ static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool is
} }
static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMinFunc) { static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMinFunc) {
float v = 0; float v = 0;
const float* p = pData; const float* p = pData;
int32_t width, remain, rounds; int32_t width, remain, rounds;
...@@ -358,7 +357,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is ...@@ -358,7 +357,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is
static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows) { static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows) {
int32_t i = start; int32_t i = start;
while (i < (start + numOfRows) && (colDataIsNull_f(pCol->nullbitmap, i) == true)) { while (i < (start + numOfRows) && (colDataIsNull_f(pCol->nullbitmap, i) == true)) {
i += 1; i += 1;
} }
...@@ -495,7 +494,8 @@ static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, S ...@@ -495,7 +494,8 @@ static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, S
} }
} }
static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc) { static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf,
bool isMinFunc) {
float* pData = (float*)pCol->pData; float* pData = (float*)pCol->pData;
float* val = (float*)&pBuf->v; float* val = (float*)&pBuf->v;
...@@ -525,7 +525,8 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo ...@@ -525,7 +525,8 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
pBuf->assign = true; pBuf->assign = true;
} }
static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc) { static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf,
bool isMinFunc) {
double* pData = (double*)pCol->pData; double* pData = (double*)pCol->pData;
double* val = (double*)&pBuf->v; double* val = (double*)&pBuf->v;
...@@ -740,7 +741,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -740,7 +741,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} else { } else {
...@@ -754,7 +755,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -754,7 +755,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -768,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -768,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -782,7 +783,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -782,7 +783,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -798,7 +799,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -798,7 +799,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) { if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
...@@ -819,7 +820,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -819,7 +820,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes); memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
numOfElems = 1; numOfElems = 1;
...@@ -883,9 +884,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -883,9 +884,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
_over: _over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pBuf->nullTupleSaved = true; pBuf->nullTupleSaved = true;
} }
return numOfElems; return numOfElems;
} }
\ No newline at end of file
...@@ -107,7 +107,7 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, ...@@ -107,7 +107,7 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
} }
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
szPage = szPage < 0 ? 4096 : szPage; szPage = szPage < 0 ? (16 * 1024) : szPage;
pages = pages < 0 ? 256 : pages; pages = pages < 0 ? 256 : pages;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册