提交 84818987 编写于 作者: H Haojun Liao

[td-13039] fix bug in show tables;

上级 cf1e4507
...@@ -32,6 +32,9 @@ typedef struct SReadHandle { ...@@ -32,6 +32,9 @@ typedef struct SReadHandle {
void* meta; void* meta;
} SReadHandle; } SReadHandle;
#define STREAM_DATA_TYPE_SUBMITBLK 0x1u
#define STREAM_DATA_TYPE_SSDATABLK 0x2u
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
...@@ -44,9 +47,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); ...@@ -44,9 +47,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
* Set the input data block for the stream scan. * Set the input data block for the stream scan.
* @param tinfo * @param tinfo
* @param input * @param input
* @param type
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "planner.h" #include "planner.h"
#include "tq.h" #include "tq.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -31,18 +31,23 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) ...@@ -31,18 +31,23 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
return doSetStreamBlock(pOperator->pDownstream[0], input, id); // TODO handle the join case
return doSetStreamBlock(pOperator->pDownstream[0], input, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (type == STREAM_DATA_TYPE_SUBMITBLK) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
return TSDB_CODE_QRY_APP_ERROR; qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
} else { // TODO
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -53,7 +58,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { ...@@ -53,7 +58,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include "function.h" #include "function.h"
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void functionFinalizer(SqlFunctionCtx *pCtx); void functionFinalize(SqlFunctionCtx *pCtx);
bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void countFunction(SqlFunctionCtx *pCtx); void countFunction(SqlFunctionCtx *pCtx);
...@@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); ...@@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void minFunction(SqlFunctionCtx* pCtx); void minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx); void maxFunction(SqlFunctionCtx *pCtx);
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void stddevFunction(SqlFunctionCtx* pCtx);
void stddevFinalize(SqlFunctionCtx* pCtx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getCountFuncEnv, .getEnvFunc = getCountFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = countFunction, .processFunc = countFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "sum", .name = "sum",
...@@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getSumFuncEnv, .getEnvFunc = getSumFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = sumFunction, .processFunc = sumFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "min", .name = "min",
...@@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = minFunctionSetup, .initFunc = minFunctionSetup,
.processFunc = minFunction, .processFunc = minFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "max", .name = "max",
...@@ -59,17 +59,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -59,17 +59,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "stddev", .name = "stddev",
.type = FUNCTION_TYPE_STDDEV, .type = FUNCTION_TYPE_STDDEV,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getStddevFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "percentile", .name = "percentile",
...@@ -79,7 +79,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -79,7 +79,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "apercentile", .name = "apercentile",
...@@ -89,7 +89,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -89,7 +89,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "top", .name = "top",
...@@ -99,7 +99,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -99,7 +99,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "bottom", .name = "bottom",
...@@ -109,7 +109,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -109,7 +109,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "spread", .name = "spread",
...@@ -119,7 +119,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -119,7 +119,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "last_row", .name = "last_row",
...@@ -129,7 +129,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -129,7 +129,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "concat", .name = "concat",
......
...@@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { ...@@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); } static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
void functionFinalizer(SqlFunctionCtx *pCtx) { void functionFinalize(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
doFinalizer(pResInfo); doFinalizer(pResInfo);
} }
...@@ -441,4 +441,71 @@ void minFunction(SqlFunctionCtx *pCtx) { ...@@ -441,4 +441,71 @@ void minFunction(SqlFunctionCtx *pCtx) {
void maxFunction(SqlFunctionCtx *pCtx) { void maxFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 0); int32_t numOfElems = doMinMaxHelper(pCtx, 0);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
} }
\ No newline at end of file
typedef struct STopBotRes {
int32_t num;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
return true;
}
typedef struct SStddevRes {
int64_t count;
union {double quadraticDSum; int64_t quadraticISum;};
union {double dsum; int64_t isum;};
} SStddevRes;
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStddevRes);
return true;
}
void stddevFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// } else { // computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
switch(type) {
case TSDB_DATA_TYPE_INT: {
int32_t* plist = (int32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
}
break;
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
}
void stddevFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx);
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册