提交 33b455df 编写于 作者: G Ganlin Zhao

add top merge function

上级 546f8d89
...@@ -66,6 +66,7 @@ typedef struct STopBotResItem { ...@@ -66,6 +66,7 @@ typedef struct STopBotResItem {
} STopBotResItem; } STopBotResItem;
typedef struct STopBotRes { typedef struct STopBotRes {
int32_t maxSize;
STopBotResItem* pItems; STopBotResItem* pItems;
} STopBotRes; } STopBotRes;
...@@ -2659,6 +2660,16 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -2659,6 +2660,16 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true; return true;
} }
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
pRes->maxSize= pCtx->param[1].param.i;
return true;
}
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
...@@ -2670,6 +2681,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { ...@@ -2670,6 +2681,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t topFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
...@@ -2693,29 +2706,22 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -2693,29 +2706,22 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void topBotTransfer(STopBotRes* pInput, STopBotRes* pOutput) { static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, int16_t type) {
for (int32_t i = 0; i < pInput->maxSize; i++) {
addResult(pCtx, &pInput->pItems[i], type, true);
}
} }
int32_t topBotFunctionMerge(SqlFunctionCtx *pCtx) { int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start); char* data = colDataGetData(pCol, start);
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
pInfo->hasResult = pInputInfo->hasResult; topTransferInfo(pCtx, pInputInfo, pCol->info.type);
if (pInputInfo->max > pInfo->max) {
pInfo->max = pInputInfo->max;
}
if (pInputInfo->min < pInfo->min) {
pInfo->min = pInputInfo->min;
}
SET_VAL(GET_RES_INFO(pCtx), 1, 1); SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2774,7 +2780,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par ...@@ -2774,7 +2780,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
SVariant val = {0}; SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
...@@ -2783,7 +2788,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -2783,7 +2788,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
assert(pItems != NULL); assert(pItems != NULL);
// not full yet // not full yet
if (pEntryInfo->numOfRes < maxSize) { if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = val; pItem->v = val;
pItem->uid = uid; pItem->uid = uid;
...@@ -2924,12 +2929,11 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, ...@@ -2924,12 +2929,11 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
bool isTopQuery) { bool isTopQuery) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
STopBotResItem* pItems = pRes->pItems; STopBotResItem* pItems = pRes->pItems;
assert(pItems != NULL); assert(pItems != NULL);
// not full yet // not full yet
if (pEntryInfo->numOfRes < maxSize) { if (pEntryInfo->numOfRes < pRes->maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = pSourceItem->v; pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid; pItem->uid = pSourceItem->uid;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册