提交 8345a542 编写于 作者: H Haojun Liao

[td-13039] add min/max/systable-scanner.

上级 921500db
......@@ -132,11 +132,11 @@ struct SqlFunctionCtx;
struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SExtTagsInfo {
int16_t tagsLen; // keep the tags data for top/bottom query result
int16_t numOfTagCols;
struct SqlFunctionCtx **pTagCtxList;
} SExtTagsInfo;
typedef struct SSubsidiaryResInfo {
int16_t bufLen; // keep the tags data for top/bottom query result
int16_t numOfCols;
struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo;
typedef struct SResultDataInfo {
int16_t precision;
......@@ -187,7 +187,7 @@ typedef struct SqlFunctionCtx {
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SExtTagsInfo tagInfo;
SSubsidiaryResInfo subsidiaryRes;
SPoint1 start;
SPoint1 end;
SFuncExecFuncs fpSet;
......
......@@ -441,17 +441,24 @@ typedef struct SStreamBlockScanInfo {
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
void *pTransporter;
SEpSet epSet;
int32_t type; // show type
tsem_t ready;
void *readHandle;
SSchema *pSchema;
SSDataBlock *pRes;
int64_t numOfBlocks; // extract basic running information.
int64_t totalRows;
int64_t elapsedTime;
int64_t totalBytes;
union {
void* pTransporter;
void* readHandle;
};
void *pCur; // cursor
SRetrieveTableReq* pReq;
SEpSet epSet;
int32_t type; // show type
tsem_t ready;
SSchema* pSchema;
SSDataBlock* pRes;
int32_t capacity;
int64_t numOfBlocks; // extract basic running information.
int64_t totalRows;
int64_t elapsedTime;
int64_t totalBytes;
} SSysTableScanInfo;
typedef struct SOptrBasicInfo {
......@@ -630,7 +637,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSystemScanOperatorInfo(void* pSystemTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
......
......@@ -212,6 +212,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
......@@ -1920,9 +1921,9 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx *pCtx, int32_t numOfOutput) {
}
}
if (p != NULL) {
p->tagInfo.pTagCtxList = pTagCtx;
p->tagInfo.numOfTagCols = num;
p->tagInfo.tagsLen = tagLen;
p->subsidiaryRes.pCtx = pTagCtx;
p->subsidiaryRes.numOfCols = num;
p->subsidiaryRes.bufLen = tagLen;
} else {
tfree(pTagCtx);
}
......@@ -2127,7 +2128,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
return pFuncCtx;
}
static void* destroySQLFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
if (pCtx == NULL) {
return NULL;
}
......@@ -2138,7 +2139,7 @@ static void* destroySQLFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
}
taosVariantDestroy(&pCtx[i].tag);
tfree(pCtx[i].tagInfo.pTagCtxList);
tfree(pCtx[i].subsidiaryRes.pCtx);
}
tfree(pCtx);
......@@ -2222,46 +2223,6 @@ static void destroyTsComp(STaskRuntimeEnv *pRuntimeEnv, STaskAttr *pQueryAttr) {
}
}
static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo;
//qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId);
//destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput);
// destroyUdfInfo(pRuntimeEnv->pUdfInfo);
destroyDiskbasedBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv);
destroyTsComp(pRuntimeEnv, pQueryAttr);
pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf);
tfree(pRuntimeEnv->keyBuf);
tfree(pRuntimeEnv->prevRow);
tfree(pRuntimeEnv->tagVal);
taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
pRuntimeEnv->pResultRowHashTable = NULL;
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL;
taosHashCleanup(pRuntimeEnv->pResultRowListSet);
pRuntimeEnv->pResultRowListSet = NULL;
destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
taosArrayDestroy(pRuntimeEnv->pResultRowArrayList);
pRuntimeEnv->prevResult = NULL;
}
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
return pQInfo->rspContext != NULL;
}
bool isTaskKilled(SExecTaskInfo *pTaskInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
......@@ -5475,38 +5436,67 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
SRetrieveTableReq* req = calloc(1, sizeof(SRetrieveTableReq));
if (req == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// retrieve local table list info from vnode
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
}
req->type = pInfo->type;
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 0);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
char * name = NULL;
int32_t numOfRows = 0;
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) {
colDataAppend(pTableNameCol, numOfRows, name, false);
numOfRows += 1;
if (numOfRows >= pInfo->capacity) {
break;
}
}
pMsgSendInfo->param = NULL;
pMsgSendInfo->msgInfo.pData = req;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->fp = loadRemoteDataCallback;
pInfo->totalRows += numOfRows;
pInfo->pRes->info.rows = numOfRows;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
// pInfo->elapsedTime;
// pInfo->totalBytes;
return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes;
} else { // load the meta from mnode of the given epset
if (pInfo->pReq == NULL) {
pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq));
if (pInfo->pReq == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pInfo->pReq->type = pInfo->type;
}
tsem_wait(&pInfo->ready);
// handle the response and return to the caller
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pMsgSendInfo->param = NULL;
pMsgSendInfo->msgInfo.pData = pInfo->pReq;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready);
// handle the response and return to the caller
}
return NULL;
}
SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5516,6 +5506,17 @@ SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SAr
return NULL;
}
// todo: create the schema of result data block
pInfo->capacity = 4096;
pInfo->type = tableType;
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
pInfo->readHandle = pSysTableReadHandle;
blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity);
} else {
tsem_init(&pInfo->ready, 0, 0);
pInfo->epSet = epset;
}
pInfo->readHandle = pSysTableReadHandle;
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
......@@ -5524,6 +5525,7 @@ SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SAr
pOperator->info = pInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doSysTableScan;
pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
......@@ -7165,7 +7167,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
assert(pInfo != NULL);
destroySQLFunctionCtx(pInfo->pCtx, numOfOutput);
destroySqlFunctionCtx(pInfo->pCtx, numOfOutput);
tfree(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo);
......@@ -7185,6 +7187,7 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
......@@ -7233,6 +7236,16 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) {
SSysTableScanInfo* pInfo = (SSysTableScanInfo*) param;
tsem_destroy(&pInfo->ready);
blockDataDestroy(pInfo->pRes);
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
metaCloseTbCursor(pInfo->pCur);
}
}
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......
......@@ -31,6 +31,12 @@ void countFunction(SqlFunctionCtx *pCtx);
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void sumFunction(SqlFunctionCtx *pCtx);
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx);
#ifdef __cplusplus
}
#endif
......
......@@ -41,6 +41,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = sumFunction,
.finalizeFunc = functionFinalizer
},
{
.name = "min",
.type = FUNCTION_TYPE_MIN,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minFunctionSetup,
.processFunc = minFunction,
.finalizeFunc = functionFinalizer
},
{
.name = "max",
.type = FUNCTION_TYPE_MAX,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalizer
},
{
.name = "concat",
.type = FUNCTION_TYPE_CONCAT,
......
......@@ -14,6 +14,7 @@
*/
#include "builtinsimpl.h"
#include <querynodes.h>
#include "taggfunction.h"
#include "tdatablock.h"
......@@ -27,7 +28,6 @@
} while (0)
typedef struct SSumRes {
// int8_t hasResult;
union {
int64_t isum;
uint64_t usum;
......@@ -115,7 +115,7 @@ void countFunction(SqlFunctionCtx *pCtx) {
} \
} while (0)
static void do_sum(SqlFunctionCtx *pCtx) {
void sumFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
......@@ -179,14 +179,272 @@ bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true;
}
void sumFunction(SqlFunctionCtx *pCtx) {
do_sum(pCtx);
// keep the result data in output buffer, not in the intermediate buffer
// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// if (pResInfo->hasResult == DATA_SET_FLAG) {
// set the flag for super table query
// SSumRes *pSum = (SSumRes *)pCtx->pOutput;
// pSum->hasResult = DATA_SET_FLAG;
// }
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
switch (pCtx->input.pData[0]->info.type) {
case TSDB_DATA_TYPE_INT:
*((int32_t *)buf) = INT32_MIN;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)buf) = 0;
break;
case TSDB_DATA_TYPE_FLOAT:
*((float *)buf) = -FLT_MAX;
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(((double *)buf), -DBL_MAX);
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)buf) = INT64_MIN;
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)buf) = 0;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)buf) = INT16_MIN;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)buf) = 0;
break;
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)buf) = INT8_MIN;
break;
case TSDB_DATA_TYPE_UTINYINT:
*((uint8_t *)buf) = 0;
break;
default:
assert(0);
}
return true;
}
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
}
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
switch (pCtx->input.pData[0]->info.type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)buf) = INT8_MAX;
break;
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *) buf = UINT8_MAX;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)buf) = INT16_MAX;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)buf) = UINT16_MAX;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)buf) = INT32_MAX;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)buf) = UINT32_MAX;
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)buf) = INT64_MAX;
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)buf) = UINT64_MAX;
break;
case TSDB_DATA_TYPE_FLOAT:
*((float *)buf) = FLT_MAX;
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(((double *)buf), DBL_MAX);
break;
default:
assert(0);
}
return true;
}
bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = sizeof(int64_t);
return true;
}
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t* d = (_t*)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int32_t numOfElems = 0;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pCol->info.type;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
// data in current data block are qualified to the query
if (pInput->colDataAggIsSet) {
numOfElems = pInput->numOfRows - pAgg->numOfNull;
ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);
if (numOfElems == 0) {
return numOfElems;
}
void* tval = NULL;
int16_t index = 0;
if (isMinFunc) {
tval = &pInput->pColumnDataAgg[0]->min;
index = pInput->pColumnDataAgg[0]->minIndex;
} else {
tval = &pInput->pColumnDataAgg[0]->max;
index = pInput->pColumnDataAgg[0]->maxIndex;
}
TSKEY key = TSKEY_INITIAL_VAL;
if (pCtx->ptsList != NULL) {
// the index is the original position, not the relative position
key = pCtx->ptsList[index];
}
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t val = GET_INT64_VAL(tval);
#if defined(_DEBUG_VIEW)
qDebug("max value updated according to pre-cal:%d", *data);
#endif
if ((*(int64_t*)buf < val) ^ isMinFunc) {
*(int64_t*) buf = val;
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
__ctx->tag.i = key;
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
}
__ctx->fpSet.process(__ctx);
}
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
uint64_t val = GET_UINT64_VAL(tval);
UPDATE_DATA(pCtx, *(uint64_t*)buf, val, numOfElems, isMinFunc, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, *(float*)buf, (float)val, numOfElems, isMinFunc, key);
}
return numOfElems;
}
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *pData = (int32_t*)pCol->pData;
int64_t *val = (int64_t*) buf;
for (int32_t i = 0; i < pCtx->size; ++i) {
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i];
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i) : 0;
DO_UPDATE_SUBSID_RES(pCtx, ts);
}
numOfElems += 1;
}
#if defined(_DEBUG_VIEW)
qDebug("max value updated:%d", *retVal);
#endif
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) {
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) {
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) {
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) {
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
LOOPCHECK_N(*(double*) buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
LOOPCHECK_N(*(float*) buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
}
return numOfElems;
}
void minFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
}
void maxFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册