未验证 提交 0378928f 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #17095 from taosdata/feat/TS-1754-V26

Feat/TS-1754-V26 Support aggAlways config option to control count if zero row
...@@ -1372,6 +1372,10 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, b ...@@ -1372,6 +1372,10 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, b
// set the correct result // set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0; pRes->numOfRows = (p != NULL)? p->info.rows: 0;
bool completed = pRes->numOfRows == 0;
if (p && pRes->numOfRows == 0 && tsAggAlways) {
pRes->numOfRows = 1;
}
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
...@@ -1380,7 +1384,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, b ...@@ -1380,7 +1384,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, b
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows); tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
pRes->row = 0; pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0); pRes->completed = completed;
} }
/* /*
......
...@@ -219,6 +219,7 @@ extern int32_t cqDebugFlag; ...@@ -219,6 +219,7 @@ extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
extern int8_t tsClientMerge; extern int8_t tsClientMerge;
extern int8_t tsAggAlways;
// probe alive connection // probe alive connection
extern int32_t tsProbeSeconds; extern int32_t tsProbeSeconds;
......
...@@ -273,6 +273,7 @@ int32_t cqDebugFlag = 131; ...@@ -273,6 +273,7 @@ int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0; int8_t tsClientMerge = 0;
int8_t tsAggAlways = 0; // Agg function always return value even if zero row
// probe alive connection // probe alive connection
int32_t tsProbeSeconds = 5 * 60; // start probe link alive after tsProbeSeconds from starting query int32_t tsProbeSeconds = 5 * 60; // start probe link alive after tsProbeSeconds from starting query
...@@ -1690,6 +1691,16 @@ static void doInitGlobalConfig(void) { ...@@ -1690,6 +1691,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "aggAlways";
cfg.ptr = &tsAggAlways;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// default JSON string type option "binary"/"nchar" // default JSON string type option "binary"/"nchar"
cfg.option = "defaultJSONStrType"; cfg.option = "defaultJSONStrType";
cfg.ptr = tsDefaultJSONStrType; cfg.ptr = tsDefaultJSONStrType;
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "qUdf.h" #include "qUdf.h"
#include "tcompare.h" #include "tcompare.h"
#include "hashfunc.h" #include "hashfunc.h"
#include "tglobal.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) #define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
......
...@@ -6625,7 +6625,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -6625,7 +6625,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
} }
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; return pInfo->pRes;
} }
static SSDataBlock* doLimit(void* param, bool* newgroup) { static SSDataBlock* doLimit(void* param, bool* newgroup) {
......
...@@ -338,6 +338,45 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -338,6 +338,45 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
return code; return code;
} }
bool qFitAlwaysValue(SQInfo * pQInfo) {
// must agg query
if (!pQInfo->query.simpleAgg)
return false;
// must not include ts column
SSDataBlock* pBlock = pQInfo->runtimeEnv.outputBuf;
if (pBlock == NULL || pBlock->pDataBlock == NULL)
return false;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
if (pColInfoData == NULL)
return false;
// must not first column is timestamp
if (pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)
return false;
// fit ok
return true;
}
bool doFillResultWithNull(SQInfo* pQInfo, char * data, size_t numOfRows) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
int32_t numOfCols = pQueryAttr->numOfOutput;
for (int32_t i = 0; i < numOfCols; ++i) {
int16_t functionId = pQueryAttr->pExpr1[i].base.functionId;
// col
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
if (functionId != TSDB_FUNC_COUNT)
setNull(data, pCol->info.type, pCol->info.bytes);
data += pCol->info.bytes * numOfRows;
}
return true;
}
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) { int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
int32_t compLen = 0; int32_t compLen = 0;
...@@ -350,6 +389,14 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -350,6 +389,14 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); int32_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
bool fillNull = false;
if (s == 0 && tsAggAlways) {
if (qFitAlwaysValue(pQInfo)) {
s = 1;
fillNull = true;
}
}
size_t size = pQueryAttr->resultRowSize * s; size_t size = pQueryAttr->resultRowSize * s;
size += sizeof(int32_t); size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); size += sizeof(STableIdInfo) * taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap);
...@@ -378,6 +425,9 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -378,6 +425,9 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen);
} else { } else {
if (fillNull) {
doFillResultWithNull(pQInfo, (*pRsp)->data, 1);
}
setQueryStatus(pRuntimeEnv, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 136 #define TSDB_CFG_MAX_NUM 137
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册