提交 afbb72da 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-14481-3.0

......@@ -124,7 +124,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL,
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
FUNCTION_TYPE_APERCENTILE_MERGE,
FUNCTION_TYPE_SPREAD_PARTIAL,
FUNCTION_TYPE_SPREAD_MERGE,
......@@ -134,6 +134,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HYPERLOGLOG_MERGE,
FUNCTION_TYPE_ELAPSED_PARTIAL,
FUNCTION_TYPE_ELAPSED_MERGE,
FUNCTION_TYPE_TOP_PARTIAL,
FUNCTION_TYPE_TOP_MERGE,
FUNCTION_TYPE_BOTTOM_PARTIAL,
FUNCTION_TYPE_BOTTOM_MERGE,
// user defined funcion
FUNCTION_TYPE_UDF = 10000
......
......@@ -968,7 +968,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
pAction->rawWritten = 0;
pAction->errCode = 0;
mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
mDebug("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
......@@ -1025,18 +1025,18 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
if (numOfExecuted == numOfActions) {
if (errCode == 0) {
pTrans->lastAction = 0;
pTrans->lastErrorNo = 0;
pTrans->lastMsgType = 0;
memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
pTrans->lastErrorNo = 0;
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0;
} else {
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
if (pErrAction != NULL) {
pTrans->lastMsgType = pErrAction->msgType;
pTrans->lastAction = pErrAction->id;
pTrans->lastErrorNo = pErrAction->errCode;
pTrans->lastMsgType = pErrAction->msgType;
pTrans->lastEpset = pErrAction->epSet;
pTrans->lastErrorNo = pErrAction->errCode;
}
mndTransResetActions(pMnode, pTrans, pArray);
terrno = errCode;
......@@ -1103,13 +1103,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
}
if (code == 0) {
pTrans->lastAction = 0;
pTrans->lastErrorNo = 0;
pTrans->lastAction = action;
pTrans->lastMsgType = 0;
pTrans->lastErrorNo = 0;
memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
} else {
pTrans->lastMsgType = pAction->msgType;
pTrans->lastAction = action;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastErrorNo = code;
pTrans->lastEpset = pAction->epSet;
}
......
......@@ -3157,6 +3157,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
}
// handle data in cache situation
// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid)
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
......
......@@ -1033,6 +1033,39 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key
EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
// sort method
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
if (pExecInfo->sortBuffer > 1024 * 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
} else if (pExecInfo->sortBuffer > 1024) {
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
} else {
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
}
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
}
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
......
......@@ -3195,7 +3195,6 @@ _error:
typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo;
SHashObj* groupIntervalHash;
bool hasGroupId;
uint64_t groupId;
SSDataBlock* prefetchedBlock;
......@@ -3204,39 +3203,24 @@ typedef struct SMergeIntervalAggOperatorInfo {
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
taosHashCleanup(miaInfo->groupIntervalHash);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
}
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
STimeWindow* newWin) {
static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
if (prevWin == NULL) {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
return 0;
}
if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) {
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
if (newWin == NULL) {
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
} else {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
}
}
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return 0;
}
......@@ -3252,13 +3236,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
int32_t numOfOutput = pOperatorInfo->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
iaInfo->interval.precision, &iaInfo->win);
STimeWindow win;
win.skey = blockStartTs;
win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
//TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
......@@ -3266,72 +3251,39 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
ASSERT(forwardRows > 0);
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup,
pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
STimeWindow nextWin = win;
TSKEY currTs = blockStartTs;
TSKEY currPos = startPos;
STimeWindow currWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order);
if (startPos < 0) {
++currPos;
if (currPos >= pBlock->info.rows) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset,
&iaInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
if (tsCols[currPos] == currTs) {
continue;
} else {
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
currTs = tsCols[currPos];
currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
// window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos,
forwardRows);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
if (iaInfo->timeWindowInterpo) {
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
}
outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
}
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
......@@ -3385,13 +3337,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
}
pRes->info.groupId = miaInfo->groupId;
} else {
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
if (p != NULL) {
size_t len = 0;
uint64_t* pKey = taosHashGetKey(p, &len);
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
}
}
if (pRes->info.rows == 0) {
......@@ -3421,7 +3366,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = primaryTsSlotId;
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
......
......@@ -99,11 +99,18 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t topFunctionMerge(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getTopBotInfoSize(int64_t numOfItems);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
......@@ -326,7 +326,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
return TSDB_CODE_SUCCESS;
}
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -361,8 +361,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTop(pFunc, pErrBuf, len);
static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (isPartial) {
if (2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
pValue->notReserved = true;
// set result type
pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (TSDB_DATA_TYPE_BINARY != para1Type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// Do nothing. We can only access output of partial functions as input,
// so original input type cannot be obtained, resType will be set same
// as original function input type after merge function created.
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateTopBotImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
......@@ -1483,23 +1537,71 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "top",
.type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTop,
.translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.initFunc = topBotFunctionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotFinalize,
.combineFunc = topCombine,
.pPartialFunc = "_top_partial",
.pMergeFunc = "_top_merge"
},
{
.name = "_top_partial",
.type = FUNCTION_TYPE_TOP_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup,
.processFunc = topFunction,
.finalizeFunc = topBotPartialFinalize,
.combineFunc = topCombine,
},
{
.name = "_top_merge",
.type = FUNCTION_TYPE_TOP_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotMergeFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunctionMerge,
.finalizeFunc = topBotMergeFinalize,
.combineFunc = topCombine,
},
{
.name = "bottom",
.type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateBottom,
.translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.initFunc = topBotFunctionSetup,
.processFunc = bottomFunction,
.finalizeFunc = topBotFinalize,
.combineFunc = bottomCombine,
.pPartialFunc = "_bottom_partial",
.pMergeFunc = "_bottom_merge"
},
{
.name = "_bottom_partial",
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotPartial,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup,
.processFunc = bottomFunction,
.finalizeFunc = topBotPartialFinalize,
.combineFunc = bottomCombine,
},
{
.name = "_bottom_merge",
.type = FUNCTION_TYPE_BOTTOM_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
.translateFunc = translateTopBotMerge,
.getEnvFunc = getTopBotMergeFuncEnv,
.initFunc = functionSetup,
.processFunc = bottomFunctionMerge,
.finalizeFunc = topBotMergeFinalize,
.combineFunc = bottomCombine,
},
{
.name = "spread",
......
......@@ -66,6 +66,9 @@ typedef struct STopBotResItem {
} STopBotResItem;
typedef struct STopBotRes {
int32_t maxSize;
int16_t type; //store the original input type, used in merge function
int32_t numOfItems;
STopBotResItem* pItems;
} STopBotRes;
......@@ -2437,7 +2440,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 &&
if (pSResInfo->numOfRes != 0 &&
(pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes)) ) {
memcpy(pDBuf, pSBuf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
......@@ -2647,12 +2650,35 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
int32_t getTopBotInfoSize(int64_t numOfItems) {
return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem);
}
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
return true;
}
bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
//intermediate result is binary and length contains VAR header size
pEnv->calcMemSize = pFunc->node.resType.bytes;
return true;
}
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
pRes->maxSize = pCtx->param[1].param.i;
return true;
}
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2664,6 +2690,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
int32_t topFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
......@@ -2671,7 +2699,8 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type;
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
pRes->type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
......@@ -2681,7 +2710,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
}
return TSDB_CODE_SUCCESS;
......@@ -2694,7 +2723,8 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type;
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
pRes->type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
......@@ -2704,8 +2734,53 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
}
return TSDB_CODE_SUCCESS;
}
static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) {
for (int32_t i = 0; i < pInput->numOfItems; i++) {
addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery);
}
}
int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->maxSize = pInputInfo->maxSize;
pInfo->type = pInputInfo->type;
topBotTransferInfo(pCtx, pInputInfo, true);
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->maxSize = pInputInfo->maxSize;
pInfo->type = pInputInfo->type;
topBotTransferInfo(pCtx, pInputInfo, false);
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
......@@ -2740,7 +2815,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
......@@ -2749,7 +2823,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
assert(pItems != NULL);
// not full yet
if (pEntryInfo->numOfRes < maxSize) {
if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = val;
pItem->uid = uid;
......@@ -2759,6 +2833,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++;
// accumulate number of items for each vgroup, this info is needed for merge
pRes->numOfItems++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery);
} else { // replace the minimum value in the result
......@@ -2860,11 +2936,11 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
releaseBufPage(pCtx->pBuf, pPage);
}
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t type = pCtx->input.pData[0]->info.type;
int16_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......@@ -2880,29 +2956,58 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
}
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
if (!isMerge) {
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
}
currentRow += 1;
}
return pEntryInfo->numOfRes;
}
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return topBotFinalizeImpl(pCtx, pBlock, false);
}
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return topBotFinalizeImpl(pCtx, pBlock, true);
}
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getTopBotInfoSize(pRes->maxSize);
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return 1;
}
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
bool isTopQuery) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
STopBotResItem* pItems = pRes->pItems;
assert(pItems != NULL);
// not full yet
if (pEntryInfo->numOfRes < maxSize) {
if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid;
pItem->tuplePos.pageId = -1;
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
pEntryInfo->numOfRes++;
// accumulate number of items for each vgroup, this info is needed for merge
pRes->numOfItems++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery);
} else { // replace the minimum value in the result
......@@ -2948,15 +3053,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t getSpreadInfoSize() {
return (int32_t)sizeof(SSpreadInfo);
}
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSpreadInfo);
return true;
}
int32_t getSpreadInfoSize() {
return (int32_t)sizeof(SSpreadInfo);
}
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
......@@ -3083,7 +3188,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = (int32_t)sizeof(SSpreadInfo);
int32_t resultBytes = getSpreadInfoSize();
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes);
......
......@@ -202,6 +202,27 @@ bool fmIsInvertible(int32_t funcId) {
return res;
}
//function has same input/output type
bool fmIsSameInOutType(int32_t funcId) {
bool res = false;
switch (funcMgtBuiltins[funcId].type) {
case FUNCTION_TYPE_MAX:
case FUNCTION_TYPE_MIN:
case FUNCTION_TYPE_TOP:
case FUNCTION_TYPE_BOTTOM:
case FUNCTION_TYPE_FIRST:
case FUNCTION_TYPE_LAST:
case FUNCTION_TYPE_SAMPLE:
case FUNCTION_TYPE_TAIL:
case FUNCTION_TYPE_UNIQUE:
res = true;
break;
default:
break;
}
return res;
}
static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) {
......@@ -276,6 +297,10 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
//overwrite function restype set by translate function
if (fmIsSameInOutType(pSrcFunc->funcId)) {
(*pMergeFunc)->node.resType = pSrcFunc->node.resType;
}
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
return TSDB_CODE_SUCCESS;
}
......
......@@ -133,8 +133,8 @@ class TDDnode:
"qDebugFlag": "143",
"rpcDebugFlag": "143",
"tmrDebugFlag": "131",
"uDebugFlag": "143",
"sDebugFlag": "135",
"uDebugFlag": "131",
"sDebugFlag": "143",
"wDebugFlag": "143",
"qdebugFlag": "143",
"numOfLogLines": "100000000",
......
......@@ -24,18 +24,18 @@ class TDTestCase:
self.rowNum = 10
self.ts = 1537146000000
def run(self):
tdSql.prepare()
tdSql.prepare()
tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute("create table test1 using test tags('beijing')")
for i in range(self.rowNum):
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
# bottom verifacation
# bottom verifacation
tdSql.error("select bottom(ts, 10) from test")
tdSql.error("select bottom(col1, 0) from test")
tdSql.error("select bottom(col1, 101) from test")
......@@ -48,8 +48,8 @@ class TDTestCase:
tdSql.error("select bottom(col5, 0) from test")
tdSql.error("select bottom(col5, 101) from test")
tdSql.error("select bottom(col6, 0) from test")
tdSql.error("select bottom(col6, 101) from test")
tdSql.error("select bottom(col7, 10) from test")
tdSql.error("select bottom(col6, 101) from test")
tdSql.error("select bottom(col7, 10) from test")
tdSql.error("select bottom(col8, 10) from test")
tdSql.error("select bottom(col9, 10) from test")
......@@ -90,7 +90,7 @@ class TDTestCase:
tdSql.checkRows(2)
tdSql.query("select ts,bottom(col1, 2),ts from test group by tbname")
tdSql.checkRows(2)
tdSql.query('select bottom(col2,1) from test interval(1y) order by col2')
tdSql.checkData(0,0,1)
......
......@@ -23,11 +23,11 @@ class TDTestCase:
self.rowNum = 10
self.ts = 1537146000000
def run(self):
tdSql.prepare()
tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
......@@ -35,9 +35,9 @@ class TDTestCase:
for i in range(self.rowNum):
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
# top verifacation
# top verifacation
tdSql.error("select top(ts, 10) from test")
tdSql.error("select top(col1, 0) from test")
tdSql.error("select top(col1, 101) from test")
......@@ -50,8 +50,8 @@ class TDTestCase:
tdSql.error("select top(col5, 0) from test")
tdSql.error("select top(col5, 101) from test")
tdSql.error("select top(col6, 0) from test")
tdSql.error("select top(col6, 101) from test")
tdSql.error("select top(col7, 10) from test")
tdSql.error("select top(col6, 101) from test")
tdSql.error("select top(col7, 10) from test")
tdSql.error("select top(col8, 10) from test")
tdSql.error("select top(col9, 10) from test")
tdSql.error("select top(col11, 0) from test")
......@@ -95,7 +95,7 @@ class TDTestCase:
tdSql.checkRows(2)
tdSql.query('select top(col2,1) from test interval(1y) order by col2')
tdSql.checkData(0,0,10)
tdSql.error("select * from test where bottom(col2,1)=1")
tdSql.error("select top(col14, 0) from test;")
def stop(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册