提交 ea18b8a7 编写于 作者: H Haojun Liao

feature(query): bottom function is available.

上级 a030a9f3
...@@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); ...@@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
...@@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
} }
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -569,7 +570,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -569,7 +570,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "top", .name = "top",
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.translateFunc = translateTop, .translateFunc = translateTop,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
...@@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "bottom", .name = "bottom",
.type = FUNCTION_TYPE_BOTTOM, .type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.translateFunc = translateBottom, .translateFunc = translateBottom,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = maxFunction, .processFunc = bottomFunction,
.finalizeFunc = functionFinalize .finalizeFunc = topBotFinalize
}, },
{ {
.name = "spread", .name = "spread",
......
...@@ -1104,22 +1104,32 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { ...@@ -1104,22 +1104,32 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value // todo assign the tag value
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
if (type)
colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); colDataAppend(pCol, currentRow, (const char*)&pRes->v, false);
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
int32_t pageId = pRes->tuplePos.pageId; return pEntryInfo->numOfRes;
int32_t offset = pRes->tuplePos.offset; }
if (pRes->tuplePos.pageId != -1) {
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex) {
int32_t pageId = pTuplePos->pageId;
int32_t offset = pTuplePos->offset;
if (pTuplePos->pageId != -1) {
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
bool* nullList = (bool*)((char*)pPage + offset); bool* nullList = (bool*)((char*)pPage + offset);
...@@ -1141,14 +1151,12 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -1141,14 +1151,12 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
if (nullList[srcSlotId]) { if (nullList[srcSlotId]) {
colDataAppendNULL(pDstCol, currentRow); colDataAppendNULL(pDstCol, rowIndex);
} else { } else {
colDataAppend(pDstCol, currentRow, (pStart + ps), false); colDataAppend(pDstCol, rowIndex, (pStart + ps), false);
} }
} }
} }
return pEntryInfo->numOfRes;
} }
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
...@@ -1880,32 +1888,49 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { ...@@ -1880,32 +1888,49 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
} }
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo); uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t topFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
// }
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type; int32_t type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows; for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElems++; numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true);
}
return TSDB_CODE_SUCCESS;
}
int32_t bottomFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo); doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1939,7 +1964,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par ...@@ -1939,7 +1964,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
} }
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo) { uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i; int32_t maxSize = pCtx->param[1].param.i;
...@@ -1961,10 +1986,17 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -1961,10 +1986,17 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// allocate the buffer and keep the data of this row into the new allocated buffer // allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++; pEntryInfo->numOfRes++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
false); !isTopQuery);
} else { // replace the minimum value in the result } else { // replace the minimum value in the result
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || if ((isTopQuery && (
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { (IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)))
|| (!isTopQuery && (
(IS_SIGNED_NUMERIC_TYPE(type) && val.i < pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u < pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && val.d < pItems[0].v.d))
)) {
// replace the old data and the coresponding tuple data // replace the old data and the coresponding tuple data
STopBotResItem* pItem = &pItems[0]; STopBotResItem* pItem = &pItems[0];
pItem->v = val; pItem->v = val;
...@@ -1973,7 +2005,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -1973,7 +2005,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple by over writing the old data // save the data of this tuple by over writing the old data
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
topBotResComparFn, NULL, false); topBotResComparFn, NULL, !isTopQuery);
} }
} }
} }
...@@ -2005,6 +2037,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -2005,6 +2037,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
bool isNull = colDataIsNull_s(pCol, rowIndex); bool isNull = colDataIsNull_s(pCol, rowIndex);
if (isNull) { if (isNull) {
nullList[i] = true; nullList[i] = true;
offset += pCol->info.bytes;
continue; continue;
} }
...@@ -2057,54 +2090,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2057,54 +2090,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true; pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type; int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data // todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
switch (type) { for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
case TSDB_DATA_TYPE_INT: { STopBotResItem* pItem = &pRes->pItems[i];
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { if (type == TSDB_DATA_TYPE_FLOAT) {
STopBotResItem* pItem = &pRes->pItems[i]; float v = pItem->v.d;
colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i); colDataAppend(pCol, currentRow, (const char*)&v, false);
} else {
int32_t pageId = pItem->tuplePos.pageId; colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
int32_t offset = pItem->tuplePos.offset;
if (pItem->tuplePos.pageId != -1) {
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
bool* nullList = (bool*)((char*)pPage + offset);
char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
// todo set the offset value to optimize the performance.
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
int32_t srcSlotId = pFuncParam->pCol->slotId;
int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
int32_t ps = 0;
for (int32_t k = 0; k < srcSlotId; ++k) {
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
ps += pSrcCol->info.bytes;
}
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
if (nullList[srcSlotId]) {
colDataAppendNULL(pDstCol, currentRow);
} else {
colDataAppend(pDstCol, currentRow, (pStart + ps), false);
}
}
}
currentRow += 1;
}
break;
} }
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
currentRow += 1;
} }
return pEntryInfo->numOfRes; return pEntryInfo->numOfRes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册