提交 938ce798 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tdb

...@@ -36,7 +36,7 @@ typedef struct SFuncExecEnv { ...@@ -36,7 +36,7 @@ typedef struct SFuncExecEnv {
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
...@@ -154,7 +154,9 @@ typedef struct SResultDataInfo { ...@@ -154,7 +154,9 @@ typedef struct SResultDataInfo {
int32_t interBufSize; int32_t interBufSize;
} SResultDataInfo; } SResultDataInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
typedef struct SInputColumnInfoData { typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data int32_t totalRows; // total rows in current columnar data
...@@ -192,7 +194,8 @@ typedef struct SqlFunctionCtx { ...@@ -192,7 +194,8 @@ typedef struct SqlFunctionCtx {
int32_t numOfParams; int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag; SVariant tag;
struct SResultRowEntryInfo *resultInfo; struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaryRes; SSubsidiaryResInfo subsidiaryRes;
......
aux_source_directory(src EXECUTOR_SRC) aux_source_directory(src EXECUTOR_SRC)
#add_library(executor ${EXECUTOR_SRC}) #add_library(executor ${EXECUTOR_SRC})
#target_link_libraries(
# executor
# PRIVATE os util common function parser planner qcom tsdb
#)
add_library(executor STATIC ${EXECUTOR_SRC}) add_library(executor STATIC ${EXECUTOR_SRC})
#set_target_properties(executor PROPERTIES #set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a" # IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor" # INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
# ) # )
target_link_libraries(executor target_link_libraries(executor
PRIVATE os util common function parser planner qcom vnode scalar nodes PRIVATE os util common function parser planner qcom vnode scalar nodes
) )
......
...@@ -385,6 +385,12 @@ typedef struct SExchangeInfo { ...@@ -385,6 +385,12 @@ typedef struct SExchangeInfo {
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
} SExchangeInfo; } SExchangeInfo;
typedef struct SColMatchInfo {
int32_t colId;
int32_t targetSlotId;
bool output;
} SColMatchInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void* dataReader; void* dataReader;
int32_t numOfBlocks; // extract basic running information. int32_t numOfBlocks; // extract basic running information.
...@@ -497,8 +503,9 @@ typedef struct SAggOperatorInfo { ...@@ -497,8 +503,9 @@ typedef struct SAggOperatorInfo {
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup;
SSDataBlock *existDataBlock; SSDataBlock *existDataBlock;
int32_t threshold; SArray *pPseudoColInfo;
SLimit limit; SLimit limit;
int64_t curOffset; int64_t curOffset;
int64_t curOutput; int64_t curOutput;
...@@ -623,13 +630,28 @@ typedef struct SDistinctOperatorInfo { ...@@ -623,13 +630,28 @@ typedef struct SDistinctOperatorInfo {
SHashObj* pSet; SHashObj* pSet;
SSDataBlock* pRes; SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold; int64_t threshold; // todo remove it
int64_t outputCapacity; int64_t outputCapacity;// todo remove it
int32_t totalBytes; int32_t totalBytes; // todo remove it
char* buf; char* buf;
SArray* pDistinctDataInfo; SArray* pDistinctDataInfo;
} SDistinctOperatorInfo; } SDistinctOperatorInfo;
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey);
void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
uint64_t* total, SArray* pColList);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo); int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo);
...@@ -647,12 +669,12 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -647,12 +669,12 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
int32_t numOfOutput);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "function.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
}
static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) {
pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pGroupColVals == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i);
pInfo->groupKeyLen += pCol->bytes;
struct SGroupKeys key = {0};
key.bytes = pCol->bytes;
key.type = pCol->type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pCol->bytes);
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pInfo->pGroupColVals, &key);
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize);
if (pInfo->keyBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex,
int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
if (pkey->isNull && isNull) {
continue;
}
if (isNull || pkey->isNull) {
return false;
}
char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) {
int32_t len = varDataLen(val);
if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
continue;
} else {
return false;
}
} else {
if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
return false;
}
}
}
return true;
}
static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
} else {
char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
}
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
if (pkey->isNull) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
if (IS_VAR_DATA_TYPE(pkey->type)) {
varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else {
memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes;
}
}
*length = (pStart - (char*)pKey);
return 0;
}
// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pCtx[i].functionId == -1) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
char* data = colDataGetData(pColInfoData, rowIndex);
// set result exists, todo refactor
memcpy(dest, data, pColInfoData->info.bytes);
pEntryInfo->hasResult = DATA_SET_FLAG;
pEntryInfo->numOfRes = 1;
}
}
}
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
// return;
// }
int32_t len = 0;
STimeWindow w = TSWINDOW_INITIALIZER;
int32_t num = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) {
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
pInfo->isInit = true;
num++;
continue;
}
bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols);
if (equal) {
num++;
continue;
}
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = j - num;
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
num = 1;
}
if (num > 0) {
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
}
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SGroupbyOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->binfo.pRes;
}
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
doHashGroupbyAgg(pOperator, pBlock);
}
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo,
// pInfo->binfo.rowCellInfoOffset);
// }
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->binfo.pRes;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pGroupCols = pGroupColList;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
\ No newline at end of file
此差异已折叠。
...@@ -26,31 +26,34 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); ...@@ -26,31 +26,34 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void functionFinalize(SqlFunctionCtx *pCtx); void functionFinalize(SqlFunctionCtx *pCtx);
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void countFunction(SqlFunctionCtx *pCtx); int32_t countFunction(SqlFunctionCtx *pCtx);
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void sumFunction(SqlFunctionCtx *pCtx); int32_t sumFunction(SqlFunctionCtx *pCtx);
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void minFunction(SqlFunctionCtx* pCtx); int32_t minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx); int32_t maxFunction(SqlFunctionCtx *pCtx);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFunction(SqlFunctionCtx* pCtx);
void stddevFinalize(SqlFunctionCtx* pCtx); void stddevFinalize(SqlFunctionCtx* pCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void percentileFunction(SqlFunctionCtx *pCtx); int32_t percentileFunction(SqlFunctionCtx *pCtx);
void percentileFinalize(SqlFunctionCtx* pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void firstFunction(SqlFunctionCtx *pCtx); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
void lastFunction(SqlFunctionCtx *pCtx); int32_t diffFunction(SqlFunctionCtx *pCtx);
void valFunction(SqlFunctionCtx *pCtx); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -52,8 +52,6 @@ typedef struct SInterpInfoDetail { ...@@ -52,8 +52,6 @@ typedef struct SInterpInfoDetail {
int8_t primaryCol; int8_t primaryCol;
} SInterpInfoDetail; } SInterpInfoDetail;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
typedef struct STwaInfo { typedef struct STwaInfo {
int8_t hasResult; // flag to denote has value int8_t hasResult; // flag to denote has value
double dOutput; double dOutput;
......
...@@ -63,74 +63,74 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -63,74 +63,74 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
.name = "stddev", .name = "stddev",
.type = FUNCTION_TYPE_STDDEV, .type = FUNCTION_TYPE_STDDEV,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getStddevFuncEnv, .getEnvFunc = getStddevFuncEnv,
.initFunc = stddevFunctionSetup, .initFunc = stddevFunctionSetup,
.processFunc = stddevFunction, .processFunc = stddevFunction,
.finalizeFunc = stddevFinalize .finalizeFunc = stddevFinalize
}, },
{ {
.name = "percentile", .name = "percentile",
.type = FUNCTION_TYPE_PERCENTILE, .type = FUNCTION_TYPE_PERCENTILE,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getPercentileFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = percentileFunctionSetup,
.processFunc = maxFunction, .processFunc = percentileFunction,
.finalizeFunc = functionFinalize .finalizeFunc = percentileFinalize
}, },
{ {
.name = "apercentile", .name = "apercentile",
.type = FUNCTION_TYPE_APERCENTILE, .type = FUNCTION_TYPE_APERCENTILE,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
.name = "top", .name = "top",
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
.name = "bottom", .name = "bottom",
.type = FUNCTION_TYPE_BOTTOM, .type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
.name = "spread", .name = "spread",
.type = FUNCTION_TYPE_SPREAD, .type = FUNCTION_TYPE_SPREAD,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
.name = "last_row", .name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW, .type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
.name = "first", .name = "first",
...@@ -152,6 +152,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -152,6 +152,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = lastFunction, .processFunc = lastFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{
.name = "diff",
.type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup,
.processFunc = diffFunction,
.finalizeFunc = functionFinalize
},
{ {
.name = "abs", .name = "abs",
.type = FUNCTION_TYPE_ABS, .type = FUNCTION_TYPE_ABS,
...@@ -377,7 +387,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -377,7 +387,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_ROWTS, .type = FUNCTION_TYPE_ROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL, .getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = NULL, .sprocessFunc = NULL,
.finalizeFunc = NULL .finalizeFunc = NULL
...@@ -459,9 +469,11 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun ...@@ -459,9 +469,11 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
switch(pFunc->funcType) { switch(pFunc->funcType) {
case FUNCTION_TYPE_WDURATION: case FUNCTION_TYPE_WDURATION:
case FUNCTION_TYPE_COUNT: case FUNCTION_TYPE_COUNT: {
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
break; break;
}
case FUNCTION_TYPE_SUM: { case FUNCTION_TYPE_SUM: {
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
int32_t paraType = pParam->node.resType.type; int32_t paraType = pParam->node.resType.type;
...@@ -480,6 +492,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -480,6 +492,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType }; pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
break; break;
} }
case FUNCTION_TYPE_DIFF:
case FUNCTION_TYPE_FIRST: case FUNCTION_TYPE_FIRST:
case FUNCTION_TYPE_LAST: case FUNCTION_TYPE_LAST:
case FUNCTION_TYPE_MIN: case FUNCTION_TYPE_MIN:
...@@ -490,10 +504,11 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -490,10 +504,11 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break; break;
} }
case FUNCTION_TYPE_QENDTS: case FUNCTION_TYPE_ROWTS:
case FUNCTION_TYPE_QSTARTTS: case FUNCTION_TYPE_QSTARTTS:
case FUNCTION_TYPE_WENDTS: case FUNCTION_TYPE_QENDTS:
case FUNCTION_TYPE_WSTARTTS: { case FUNCTION_TYPE_WSTARTTS:
case FUNCTION_TYPE_WENDTS:{
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
break; break;
} }
...@@ -508,6 +523,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -508,6 +523,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break; break;
} }
case FUNCTION_TYPE_PERCENTILE:
case FUNCTION_TYPE_STDDEV: case FUNCTION_TYPE_STDDEV:
case FUNCTION_TYPE_SIN: case FUNCTION_TYPE_SIN:
case FUNCTION_TYPE_COS: case FUNCTION_TYPE_COS:
...@@ -531,36 +547,35 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -531,36 +547,35 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_CONCAT:
case FUNCTION_TYPE_CONCAT_WS: { case FUNCTION_TYPE_CONCAT_WS: {
int32_t paraType, paraBytes = 0; int32_t paraType, paraBytes = 0;
bool typeSet = false;
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) { for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i); SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
paraBytes += pParam->node.resType.bytes; if (pParam->node.type == QUERY_NODE_COLUMN) {
paraType = pParam->node.resType.type; if (typeSet == false) {
paraType = pParam->node.resType.type;
typeSet = true;
} else {
//columns have to be the same type
if (paraType != pParam->node.resType.type) {
return TSDB_CODE_FAILED;
}
}
paraBytes += pParam->node.resType.bytes;
}
}
for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
if (pParam->node.type == QUERY_NODE_VALUE) {
if (paraType == TSDB_DATA_TYPE_NCHAR) {
paraBytes += pParam->node.resType.bytes * TSDB_NCHAR_SIZE;
} else {
paraBytes += pParam->node.resType.bytes;
}
}
} }
pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType }; pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType };
break; break;
//int32_t paraTypeFirst, totalBytes = 0, sepBytes = 0;
//int32_t firstParamIndex = 0;
//if (pFunc->funcType == FUNCTION_TYPE_CONCAT_WS) {
// firstParamIndex = 1;
// SColumnNode* pSep = nodesListGetNode(pFunc->pParameterList, 0);
// sepBytes = pSep->node.resType.type;
//}
//for (int32_t i = firstParamIndex; i < pFunc->pParameterList->length; ++i) {
// SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, i);
// int32_t paraType = pParam->node.resType.type;
// if (i == firstParamIndex) {
// paraTypeFirst = paraType;
// }
// if (paraType != paraTypeFirst) {
// return TSDB_CODE_FAILED;
// }
// //TODO: for constants also needs numOfRows
// totalBytes += pParam->node.resType.bytes;
//}
////TODO: need to get numOfRows to decide how much space separator needed. Currently set to 100.
//totalBytes += sepBytes * (pFunc->pParameterList->length - 2) * 100;
//pFunc->node.resType = (SDataType) { .bytes = totalBytes, .type = paraTypeFirst };
//break;
} }
case FUNCTION_TYPE_LOWER: case FUNCTION_TYPE_LOWER:
case FUNCTION_TYPE_UPPER: case FUNCTION_TYPE_UPPER:
...@@ -574,7 +589,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -574,7 +589,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break; break;
} }
case FUNCTION_TYPE_ROWTS:
case FUNCTION_TYPE_TBNAME: { case FUNCTION_TYPE_TBNAME: {
// todo // todo
break; break;
......
...@@ -65,7 +65,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { ...@@ -65,7 +65,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
* count function does need the finalize, if data is missing, the default value, which is 0, is used * count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer * count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/ */
void countFunction(SqlFunctionCtx *pCtx) { int32_t countFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0; int32_t numOfElem = 0;
/* /*
...@@ -111,7 +111,7 @@ void countFunction(SqlFunctionCtx *pCtx) { ...@@ -111,7 +111,7 @@ void countFunction(SqlFunctionCtx *pCtx) {
} \ } \
} while (0) } while (0)
void sumFunction(SqlFunctionCtx *pCtx) { int32_t sumFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0; int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded // Only the pre-computing information loaded and actual data does not loaded
...@@ -432,12 +432,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { ...@@ -432,12 +432,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
return numOfElems; return numOfElems;
} }
void minFunction(SqlFunctionCtx *pCtx) { int32_t minFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 1); int32_t numOfElems = doMinMaxHelper(pCtx, 1);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
} }
void maxFunction(SqlFunctionCtx *pCtx) { int32_t maxFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 0); int32_t numOfElems = doMinMaxHelper(pCtx, 0);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
} }
...@@ -475,12 +475,11 @@ bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) ...@@ -475,12 +475,11 @@ bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo)
return true; return true;
} }
void stddevFunction(SqlFunctionCtx* pCtx) { int32_t stddevFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0; int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded // Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type; int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
...@@ -601,6 +600,7 @@ void stddevFinalize(SqlFunctionCtx* pCtx) { ...@@ -601,6 +600,7 @@ void stddevFinalize(SqlFunctionCtx* pCtx) {
} }
typedef struct SPercentileInfo { typedef struct SPercentileInfo {
double result;
tMemBucket *pMemBucket; tMemBucket *pMemBucket;
int32_t stage; int32_t stage;
double minval; double minval;
...@@ -627,19 +627,24 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI ...@@ -627,19 +627,24 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
return true; return true;
} }
void percentileFunction(SqlFunctionCtx *pCtx) { int32_t percentileFunction(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
#if 0 SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
SColumnInfoData *pCol = pInput->pData[0];
int32_t type = pCol->info.type;
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1; pInfo->stage += 1;
// all data are null, set it completed // all data are null, set it completed
if (pInfo->numOfElems == 0) { if (pInfo->numOfElems == 0) {
pResInfo->complete = true; pResInfo->complete = true;
return; return 0;
} else { } else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
} }
...@@ -647,19 +652,17 @@ void percentileFunction(SqlFunctionCtx *pCtx) { ...@@ -647,19 +652,17 @@ void percentileFunction(SqlFunctionCtx *pCtx) {
// the first stage, only acquire the min/max value // the first stage, only acquire the min/max value
if (pInfo->stage == 0) { if (pInfo->stage == 0) {
if (pCtx->preAggVals.isSet) { if (pCtx->input.colDataAggIsSet) {
double tmin = 0.0, tmax = 0.0; double tmin = 0.0, tmax = 0.0;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min); tmin = (double)GET_INT64_VAL(&pAgg->min);
tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max); tmax = (double)GET_INT64_VAL(&pAgg->max);
} else if (IS_FLOAT_TYPE(pCtx->inputType)) { } else if (IS_FLOAT_TYPE(type)) {
tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min); tmin = GET_DOUBLE_VAL(&pAgg->min);
tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max); tmax = GET_DOUBLE_VAL(&pAgg->max);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
tmin = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.min); tmin = (double)GET_UINT64_VAL(&pAgg->min);
tmax = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.max); tmax = (double)GET_UINT64_VAL(&pAgg->max);
} else {
assert(true);
} }
if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) { if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
...@@ -670,17 +673,19 @@ void percentileFunction(SqlFunctionCtx *pCtx) { ...@@ -670,17 +673,19 @@ void percentileFunction(SqlFunctionCtx *pCtx) {
SET_DOUBLE_VAL(&pInfo->maxval, tmax); SET_DOUBLE_VAL(&pInfo->maxval, tmax);
} }
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull); pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull);
} else { } else {
for (int32_t i = 0; i < pCtx->size; ++i) { // check the valid data one by one
char *data = GET_INPUT_DATA(pCtx, i); int32_t start = pInput->startRowIndex;
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
char *data = colDataGetData(pCol, i);
double v = 0; double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data); GET_TYPED_DATA(v, double, pCtx->inputType, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) { if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v); SET_DOUBLE_VAL(&pInfo->minval, v);
} }
...@@ -693,24 +698,40 @@ void percentileFunction(SqlFunctionCtx *pCtx) { ...@@ -693,24 +698,40 @@ void percentileFunction(SqlFunctionCtx *pCtx) {
} }
} }
return; return 0;
} }
// the second stage, calculate the true percentile value // the second stage, calculate the true percentile value
for (int32_t i = 0; i < pCtx->size; ++i) { int32_t start = pInput->startRowIndex;
char *data = GET_INPUT_DATA(pCtx, i); for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
char *data = colDataGetData(pCol, i);
notNullElems += 1; notNullElems += 1;
tMemBucketPut(pInfo->pMemBucket, data, 1); tMemBucketPut(pInfo->pMemBucket, data, 1);
} }
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pResInfo, notNullElems, 1);
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
#endif }
// TODO set the correct parameter.
void percentileFinalize(SqlFunctionCtx* pCtx) {
double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo);
tMemBucket * pMemBucket = ppInfo->pMemBucket;
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
}
tMemBucketDestroy(pMemBucket);
functionFinalize(pCtx);
} }
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
...@@ -721,9 +742,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -721,9 +742,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
// TODO fix this // TODO fix this
// This ordinary first function only handle the data block in ascending order // This ordinary first function only handle the data block in ascending order
void firstFunction(SqlFunctionCtx *pCtx) { int32_t firstFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return; return 0;
} }
int32_t numOfElems = 0; int32_t numOfElems = 0;
...@@ -737,7 +758,7 @@ void firstFunction(SqlFunctionCtx *pCtx) { ...@@ -737,7 +758,7 @@ void firstFunction(SqlFunctionCtx *pCtx) {
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true); ASSERT(pInputCol->hasNull == true);
return; return 0;
} }
// Check for the first not null data // Check for the first not null data
...@@ -764,9 +785,9 @@ void firstFunction(SqlFunctionCtx *pCtx) { ...@@ -764,9 +785,9 @@ void firstFunction(SqlFunctionCtx *pCtx) {
SET_VAL(pResInfo, numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
} }
void lastFunction(SqlFunctionCtx *pCtx) { int32_t lastFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order != TSDB_ORDER_DESC) { if (pCtx->order != TSDB_ORDER_DESC) {
return; return 0;
} }
int32_t numOfElems = 0; int32_t numOfElems = 0;
...@@ -775,13 +796,12 @@ void lastFunction(SqlFunctionCtx *pCtx) { ...@@ -775,13 +796,12 @@ void lastFunction(SqlFunctionCtx *pCtx) {
char* buf = GET_ROWCELL_INTERBUF(pResInfo); char* buf = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
// All null data column, return directly. // All null data column, return directly.
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
ASSERT(pInputCol->hasNull == true); ASSERT(pInputCol->hasNull == true);
return; return 0;
} }
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
...@@ -826,10 +846,242 @@ void lastFunction(SqlFunctionCtx *pCtx) { ...@@ -826,10 +846,242 @@ void lastFunction(SqlFunctionCtx *pCtx) {
SET_VAL(pResInfo, numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
} }
void valFunction(SqlFunctionCtx *pCtx) { typedef struct SDiffInfo {
bool hasPrev;
bool includeNull;
bool ignoreNegative;
bool firstOutput;
union { int64_t i64; double d64;} prev;
} SDiffInfo;
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo);
return true;
}
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->hasPrev = false;
pDiffInfo->prev.i64 = 0;
pDiffInfo->ignoreNegative = false; // TODO set correct param
pDiffInfo->includeNull = false;
pDiffInfo->firstOutput = false;
return true;
}
int32_t diffFunction(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo); SDiffInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
bool isFirstBlock = (pDiffInfo->hasPrev == false);
int32_t numOfElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t startOffset = pCtx->offset;
switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: {
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
int32_t pos = startOffset + (isFirstBlock? (numOfElems-1):numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1;
}
continue;
}
int32_t v = *(int32_t*) colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
}
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
int32_t v = 0;
if (pDiffInfo->hasPrev) {
v = *(int64_t*) colDataGetData(pInputCol, i);
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (pDiffInfo->ignoreNegative) {
continue;
}
// *(pOutput++) = delta;
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
//
// pOutput += 1;
// pTimestamp += 1;
}
SColumnInfoData* pInputCol = pCtx->input.pData[0]; pDiffInfo->prev.i64 = v;
memcpy(buf, pInputCol->pData, pInputCol->info.bytes); pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
#if 0
case TSDB_DATA_TYPE_DOUBLE: {
double *pData = (double *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->d64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
float *pOutput = (float *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->d64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
int16_t *pOutput = (int16_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->i64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *pData = (int8_t *)data;
int8_t *pOutput = (int8_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
continue;
}
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
continue;
}
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pDiffInfo->i64Prev = pData[i];
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
#endif
default:
break;
// qError("error input type");
}
// initial value is not set yet
if (!pDiffInfo->hasPrev || numOfElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
*/
assert(pCtx->hasNull);
} else {
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
// }
// }
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
return forwardStep;
}
} }
...@@ -116,6 +116,11 @@ bool fmIsWindowClauseFunc(int32_t funcId) { ...@@ -116,6 +116,11 @@ bool fmIsWindowClauseFunc(int32_t funcId) {
return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId);
} }
bool fmIsNonstandardSQLFunc(int32_t funcId) {
return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC);
}
void fmFuncMgtDestroy() { void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable; void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
......
...@@ -1902,10 +1902,10 @@ static void copyTopBotRes(SqlFunctionCtx *pCtx, int32_t type) { ...@@ -1902,10 +1902,10 @@ static void copyTopBotRes(SqlFunctionCtx *pCtx, int32_t type) {
} }
// set the output timestamp of each record. // set the output timestamp of each record.
TSKEY *output = pCtx->ptsOutputBuf; // TSKEY *output = pCtx->pTsOutput;
for (int32_t i = 0; i < len; ++i, output += step) { // for (int32_t i = 0; i < len; ++i, output += step) {
*output = tvp[i]->timestamp; // *output = tvp[i]->timestamp;
} // }
// set the corresponding tag data for each record // set the corresponding tag data for each record
// todo check malloc failure // todo check malloc failure
...@@ -2687,7 +2687,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) { ...@@ -2687,7 +2687,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY *pTimestamp = pCtx->ptsOutputBuf; TSKEY *pTimestamp = NULL;//pCtx->pTsOutput;
TSKEY *tsList = GET_TS_LIST(pCtx); TSKEY *tsList = GET_TS_LIST(pCtx);
double *pOutput = (double *)pCtx->pOutput; double *pOutput = (double *)pCtx->pOutput;
...@@ -2867,7 +2867,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) { ...@@ -2867,7 +2867,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
} else { \ } else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \ *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \
*(type *)(&(ctx)->param[1].i) = *(type *)(d); \ *(type *)(&(ctx)->param[1].i) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ *(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \
} \ } \
} while (0); } while (0);
...@@ -2881,7 +2881,7 @@ static void diff_function(SqlFunctionCtx *pCtx) { ...@@ -2881,7 +2881,7 @@ static void diff_function(SqlFunctionCtx *pCtx) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* pTimestamp = NULL;//pCtx->pTsOutput;
TSKEY* tsList = GET_TS_LIST(pCtx); TSKEY* tsList = GET_TS_LIST(pCtx);
switch (pCtx->inputType) { switch (pCtx->inputType) {
......
...@@ -12,7 +12,7 @@ typedef void (*_trim_fn)(char *, char*, int32_t, int32_t); ...@@ -12,7 +12,7 @@ typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
typedef int16_t (*_len_fn)(char *, int32_t); typedef int16_t (*_len_fn)(char *, int32_t);
/** Math functions **/ /** Math functions **/
double tlog(double v, double base) { static double tlog(double v, double base) {
return log(v) / log(base); return log(v) / log(base);
} }
...@@ -113,7 +113,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu ...@@ -113,7 +113,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -138,7 +138,7 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa ...@@ -138,7 +138,7 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) { if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -167,7 +167,7 @@ int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarP ...@@ -167,7 +167,7 @@ int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarP
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) { static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -215,11 +215,11 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p ...@@ -215,11 +215,11 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p
} }
/** String functions **/ /** String functions **/
int16_t tlength(char *input, int32_t type) { static int16_t tlength(char *input, int32_t type) {
return varDataLen(input); return varDataLen(input);
} }
int16_t tcharlength(char *input, int32_t type) { static int16_t tcharlength(char *input, int32_t type) {
if (type == TSDB_DATA_TYPE_VARCHAR) { if (type == TSDB_DATA_TYPE_VARCHAR) {
return varDataLen(input); return varDataLen(input);
} else { //NCHAR } else { //NCHAR
...@@ -227,7 +227,7 @@ int16_t tcharlength(char *input, int32_t type) { ...@@ -227,7 +227,7 @@ int16_t tcharlength(char *input, int32_t type) {
} }
} }
void tltrim(char *input, char *output, int32_t type, int32_t charLen) { static void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
int32_t numOfSpaces = 0; int32_t numOfSpaces = 0;
if (type == TSDB_DATA_TYPE_VARCHAR) { if (type == TSDB_DATA_TYPE_VARCHAR) {
for (int32_t i = 0; i < charLen; ++i) { for (int32_t i = 0; i < charLen; ++i) {
...@@ -257,7 +257,7 @@ void tltrim(char *input, char *output, int32_t type, int32_t charLen) { ...@@ -257,7 +257,7 @@ void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
varDataSetLen(output, resLen); varDataSetLen(output, resLen);
} }
void trtrim(char *input, char *output, int32_t type, int32_t charLen) { static void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
int32_t numOfSpaces = 0; int32_t numOfSpaces = 0;
if (type == TSDB_DATA_TYPE_VARCHAR) { if (type == TSDB_DATA_TYPE_VARCHAR) {
for (int32_t i = charLen - 1; i >= 0; --i) { for (int32_t i = charLen - 1; i >= 0; --i) {
...@@ -286,7 +286,7 @@ void trtrim(char *input, char *output, int32_t type, int32_t charLen) { ...@@ -286,7 +286,7 @@ void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
varDataSetLen(output, resLen); varDataSetLen(output, resLen);
} }
int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) { static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -312,12 +312,22 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -312,12 +312,22 @@ int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) { static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCol, int32_t type, int16_t *dataLen) {
pOutputData->pData = taosMemoryCalloc(len, sizeof(char)); if (hasNcharCol && type == TSDB_DATA_TYPE_VARCHAR) {
pOutputData->info.type = type; TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1);
pOutputData->info.bytes = len; bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL);
pOutputData->varmeta.length = len; if (!ret) {
pOutputData->varmeta.allocLen = len; taosMemoryFree(newBuf);
return TSDB_CODE_FAILED;
}
memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE);
*dataLen += varDataLen(input) * TSDB_NCHAR_SIZE;
taosMemoryFree(newBuf);
} else {
memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input));
*dataLen += varDataLen(input);
}
return TSDB_CODE_SUCCESS;
} }
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
...@@ -332,11 +342,15 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -332,11 +342,15 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t inputLen = 0; int32_t inputLen = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
bool hasNcharCol = false;
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || int32_t type = GET_PARAM_TYPE(&pInput[i]);
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) { if (!IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (type == TSDB_DATA_TYPE_NCHAR) {
hasNcharCol = true;
}
if (pInput[i].numOfRows > numOfRows) { if (pInput[i].numOfRows > numOfRows) {
numOfRows = pInput[i].numOfRows; numOfRows = pInput[i].numOfRows;
} }
...@@ -344,8 +358,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -344,8 +358,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
pInputData[i] = pInput[i].columnData; pInputData[i] = pInput[i].columnData;
input[i] = pInputData[i]->pData; input[i] = pInputData[i]->pData;
int32_t factor = 1;
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
factor = TSDB_NCHAR_SIZE;
}
if (pInput[i].numOfRows == 1) { if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * numOfRows;
} else { } else {
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
} }
...@@ -371,8 +389,10 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -371,8 +389,10 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int16_t dataLen = 0; int16_t dataLen = 0;
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
dataLen += varDataLen(input[i]); if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (pInput[i].numOfRows != 1) { if (pInput[i].numOfRows != 1) {
input[i] += varDataTLen(input[i]); input[i] += varDataTLen(input[i]);
} }
...@@ -390,6 +410,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -390,6 +410,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -402,27 +423,34 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -402,27 +423,34 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t inputLen = 0; int32_t inputLen = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
bool hasNcharCol = false;
for (int32_t i = 1; i < inputNum; ++i) { for (int32_t i = 1; i < inputNum; ++i) {
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) || int32_t type = GET_PARAM_TYPE(&pInput[i]);
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[1])) { if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i]))) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (type == TSDB_DATA_TYPE_NCHAR) {
hasNcharCol = true;
}
if (pInput[i].numOfRows > numOfRows) { if (pInput[i].numOfRows > numOfRows) {
numOfRows = pInput[i].numOfRows; numOfRows = pInput[i].numOfRows;
} }
} }
for (int32_t i = 0; i < inputNum; ++i) { for (int32_t i = 0; i < inputNum; ++i) {
pInputData[i] = pInput[i].columnData; pInputData[i] = pInput[i].columnData;
input[i] = pInputData[i]->pData;
int32_t factor = 1;
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
factor = TSDB_NCHAR_SIZE;
}
if (i == 0) { if (i == 0) {
// calculate required separator space // calculate required separator space
int32_t factor = (GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NCHAR) ? TSDB_NCHAR_SIZE : 1;
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor; inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
} else if (pInput[i].numOfRows == 1) { } else if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows; inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * factor;
} else { } else {
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
} }
input[i] = pInputData[i]->pData;
} }
int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE; int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
...@@ -441,8 +469,11 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -441,8 +469,11 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
continue; continue;
} }
memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i])); int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
dataLen += varDataLen(input[i]); if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (pInput[i].numOfRows != 1) { if (pInput[i].numOfRows != 1) {
input[i] += varDataTLen(input[i]); input[i] += varDataTLen(input[i]);
} }
...@@ -450,8 +481,10 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -450,8 +481,10 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
if (i < inputNum - 1) { if (i < inputNum - 1) {
//insert the separator //insert the separator
char *sep = pInputData[0]->pData; char *sep = pInputData[0]->pData;
memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep)); int32_t ret = concatCopyHelper(sep, output, hasNcharCol, GET_PARAM_TYPE(&pInput[0]), &dataLen);
dataLen += varDataLen(sep); if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
} }
} }
varDataSetLen(output, dataLen); varDataSetLen(output, dataLen);
...@@ -467,7 +500,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -467,7 +500,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) { static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -512,7 +545,7 @@ int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam ...@@ -512,7 +545,7 @@ int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
} }
int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) { static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
int32_t type = GET_PARAM_TYPE(pInput); int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) { if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -576,7 +609,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -576,7 +609,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
char *input = pInputData->pData; char *input = pInputData->pData;
char *output = NULL; char *output = NULL;
int32_t outputLen = pInputData->varmeta.length; int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows;
char *outputBuf = taosMemoryCalloc(outputLen, 1); char *outputBuf = taosMemoryCalloc(outputLen, 1);
output = outputBuf; output = outputBuf;
...@@ -597,12 +630,12 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -597,12 +630,12 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
startPosBytes = MAX(startPosBytes, 0); startPosBytes = MAX(startPosBytes, 0);
} }
subLen = MIN(subLen, len - startPosBytes); int32_t resLen = MIN(subLen, len - startPosBytes);
if (subLen > 0) { if (resLen > 0) {
memcpy(varDataVal(output), varDataVal(input) + startPosBytes, subLen); memcpy(varDataVal(output), varDataVal(input) + startPosBytes, resLen);
} }
varDataSetLen(output, subLen); varDataSetLen(output, resLen);
colDataAppend(pOutputData, i , output, false); colDataAppend(pOutputData, i , output, false);
input += varDataTLen(input); input += varDataTLen(input);
output += varDataTLen(output); output += varDataTLen(output);
...@@ -687,6 +720,7 @@ int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam ...@@ -687,6 +720,7 @@ int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return doLengthFunction(pInput, inputNum, pOutput, tcharlength); return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
} }
#if 0
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) { static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
switch(type) { switch(type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
...@@ -751,6 +785,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf ...@@ -751,6 +785,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
default: assert(0); default: assert(0);
} }
} }
#endif
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(int64_t); pEnv->calcMemSize = sizeof(int64_t);
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$dbPrefix = db
$tbPrefix = ctb
$mtPrefix = stb
$tbNum = 10
$rowNum = 20
$totalNum = 200
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sleep 100
print =============== step2
$i = 1
$tb = $tbPrefix . $i
print ===> select diff(tbcol) from $tb
sql select diff(tbcol) from $tb
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $data11 != 1 then
return -1
endi
print =============== step3
$cc = 4 * 60000
$ms = 1601481600000 + $cc
print ===> select diff(tbcol) from $tb where ts > $ms
sql select diff(tbcol) from $tb where ts > $ms
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $data11 != 1 then
return -1
endi
$cc = 4 * 60000
$ms = 1601481600000 + $cc
print ===> select diff(tbcol) from $tb where ts <= $ms
sql select diff(tbcol) from $tb where ts <= $ms
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $data11 != 1 then
return -1
endi
print =============== step4
print ===> select diff(tbcol) as b from $tb
sql select diff(tbcol) as b from $tb
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $data11 != 1 then
return -1
endi
print =============== step5
print ===> select diff(tbcol) as b from $tb interval(1m)
sql select diff(tbcol) as b from $tb interval(1m) -x step5
return -1
step5:
print =============== step6
$cc = 4 * 60000
$ms = 1601481600000 + $cc
print ===> select diff(tbcol) as b from $tb where ts <= $ms interval(1m)
sql select diff(tbcol) as b from $tb where ts <= $ms interval(1m) -x step6
return -1
step6:
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -284,14 +284,15 @@ print ====> select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtyp ...@@ -284,14 +284,15 @@ print ====> select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtyp
# return -1 # return -1
#endi #endi
sql_error select * from dev_001 session(ts,1w) print ================> syntax error check not active ================> reactive
sql_error select count(*) from st session(ts,1w) #sql_error select * from dev_001 session(ts,1w)
sql_error select count(*) from dev_001 group by tagtype session(ts,1w) #sql_error select count(*) from st session(ts,1w)
sql_error select count(*) from dev_001 session(ts,1n) #sql_error select count(*) from dev_001 group by tagtype session(ts,1w)
sql_error select count(*) from dev_001 session(ts,1y) #sql_error select count(*) from dev_001 session(ts,1n)
sql_error select count(*) from dev_001 session(ts,0s) #sql_error select count(*) from dev_001 session(ts,1y)
sql_error select count(*) from dev_001 session(i,1y) #sql_error select count(*) from dev_001 session(ts,0s)
sql_error select count(*) from dev_001 session(ts,1d) where ts <'2020-05-20 0:0:0' #sql_error select count(*) from dev_001 session(i,1y)
#sql_error select count(*) from dev_001 session(ts,1d) where ts <'2020-05-20 0:0:0'
print ====> create database d1 precision 'us' print ====> create database d1 precision 'us'
sql create database d1 precision 'us' sql create database d1 precision 'us'
...@@ -299,17 +300,19 @@ sql use d1 ...@@ -299,17 +300,19 @@ sql use d1
sql create table dev_001 (ts timestamp ,i timestamp ,j int) sql create table dev_001 (ts timestamp ,i timestamp ,j int)
sql insert into dev_001 values(1623046993681000,now,1)(1623046993681001,now+1s,2)(1623046993681002,now+2s,3)(1623046993681004,now+5s,4) sql insert into dev_001 values(1623046993681000,now,1)(1623046993681001,now+1s,2)(1623046993681002,now+2s,3)(1623046993681004,now+5s,4)
print ====> select count(*) from dev_001 session(ts,1u) print ====> select count(*) from dev_001 session(ts,1u)
sql select count(*) from dev_001 session(ts,1u) sql select _wstartts, count(*) from dev_001 session(ts,1u)
if $rows != 2 then if $rows != 2 then
print expect 2, actual: $rows
return -1 return -1
endi endi
if $data01 != 3 then if $data01 != 3 then
return -1 return -1
endi endi
sql_error select count(*) from dev_001 session(i,1s)
sql create table secondts(ts timestamp,t2 timestamp,i int)
sql_error select count(*) from secondts session(t2,2s)
#sql_error select count(*) from dev_001 session(i,1s)
#sql create table secondts(ts timestamp,t2 timestamp,i int)
#sql_error select count(*) from secondts session(t2,2s)
if $loop_test == 0 then if $loop_test == 0 then
print =============== stop and restart taosd print =============== stop and restart taosd
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$dbPrefix = db
$tbPrefix = ctb
$mtPrefix = stb
$tbNum = 10
$rowNum = 20
$totalNum = 200
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sleep 100
print =============== step2
$i = 1
$tb = $tbPrefix . $i
sql select stddev(tbcol) from $tb
print ===> $data00
if $data00 != 5.766281297 then
return -1
endi
print =============== step3
$cc = 4 * 60000
$ms = 1601481600000 + $cc
print ===> select stddev(tbcol) from $tb where ts <= $ms
sql select stddev(tbcol) from $tb where ts <= $ms
print ====> $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1.414213562 then
return -1
endi
print =============== step4
sql select stddev(tbcol) as b from $tb
print ===> $data00
if $data00 != 5.766281297 then
return -1
endi
print =============== step5
sql select stddev(tbcol) as b from $tb interval(1m)
print ===> $data01
if $data01 != 0.000000000 then
return -1
endi
sql select stddev(tbcol) as b from $tb interval(1d)
print ===> $data01
if $data01 != 5.766281297 then
return -1
endi
print =============== step6
$cc = 4 * 60000
$ms = 1601481600000 + $cc
sql select stddev(tbcol) as b from $tb where ts <= $ms interval(1m)
print ===> $data01
if $data01 != 0.000000000 then
return -1
endi
if $rows != 5 then
return -1
endi
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册