提交 04f134a2 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

......@@ -8,135 +8,202 @@ def skipbuild=0
def win_stop=0
def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger()
def jobs = Jenkins.instance.getItemByFullName(currentJobName)
def builds = jobs.getBuilds()
def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger()
def jobs = Jenkins.instance.getItemByFullName(currentJobName)
def builds = jobs.getBuilds()
for (build in builds) {
if (!build.isBuilding()) {
continue;
}
for (build in builds) {
if (!build.isBuilding()) {
continue;
}
if (currentBuildNumber == build.getNumber().toInteger()) {
continue;
}
if (currentBuildNumber == build.getNumber().toInteger()) {
continue;
}
build.doKill() //doTerm(),doKill(),doTerm()
}
build.doKill() //doTerm(),doKill(),doTerm()
}
}
// abort previous build
abortPreviousBuilds()
def abort_previous(){
def buildNumber = env.BUILD_NUMBER as int
if (buildNumber > 1) milestone(buildNumber - 1)
milestone(buildNumber)
def buildNumber = env.BUILD_NUMBER as int
if (buildNumber > 1) milestone(buildNumber - 1)
milestone(buildNumber)
}
def pre_test(){
sh'hostname'
sh 'hostname'
sh '''
date
sudo rmtaos || echo "taosd has not installed"
date
sudo rmtaos || echo "taosd has not installed"
'''
sh '''
killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running"
killall -9 python3.8 || echo "no python program running"
cd ${WKC}
killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running"
killall -9 python3.8 || echo "no python program running"
cd ${WKC}
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WKC}
git checkout master
'''
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WKC}
git checkout master
'''
} else if(env.CHANGE_TARGET == '2.0') {
sh '''
cd ${WKC}
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
sh '''
cd ${WKC}
git checkout 3.0
[ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../..
'''
} else {
sh '''
cd ${WKC}
git checkout develop
'''
}
else if(env.CHANGE_TARGET == '2.0'){
sh '''
cd ${WKC}
git checkout 2.0
'''
}
else if(env.CHANGE_TARGET == '3.0'){
sh '''
}
sh '''
cd ${WKC}
git checkout 3.0
[ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../..
'''
}
else{
sh '''
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git submodule update --init --recursive
'''
sh '''
cd ${WKC}
git checkout develop
'''
}
}
sh'''
cd ${WKC}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git submodule update --init --recursive
export TZ=Asia/Harbin
date
rm -rf debug
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
'''
sh'''
cd ${WKC}
export TZ=Asia/Harbin
date
rm -rf debug
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
sh '''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
sh'''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
return 1
}
def pre_test_win(){
bat '''
hostname
date /t
time /t
taskkill /f /t /im python.exe
taskkill /f /t /im bash.exe
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug
exit 0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git reset --hard
git fetch || git fetch
git checkout -f
'''
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout master
'''
} else if(env.CHANGE_TARGET == '2.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout 3.0
'''
} else {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout develop
'''
}
}
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git branch
git pull || git pull
git fetch origin +refs/pull/%CHANGE_ID%/merge
git checkout -qf FETCH_HEAD
'''
}
def pre_test_build_win() {
bat '''
echo "building ..."
time /t
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
mkdir debug
cd debug
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
set CL=/MP8
cmake .. -G "NMake Makefiles JOM"
jom -j 4 || exit 8
time /t
'''
return 1
}
pipeline {
agent none
options { skipDefaultCheckout() }
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
}
stages {
stage('pre_build'){
agent{label " slave3_0 || slave15 || slave16 || slave17 "}
options { skipDefaultCheckout() }
when {
changeRequest()
}
steps {
script{
abort_previous()
abortPreviousBuilds()
}
timeout(time: 45, unit: 'MINUTES'){
pre_test()
sh'''
cd ${WKC}/debug
ctest -VV
'''
sh'''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
sh'''
cd ${WKC}/tests
./test-all.sh b1fq
'''
agent none
options { skipDefaultCheckout() }
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
}
stages {
stage('run test') {
parallel {
stage('windows test') {
agent {label " windows11 "}
steps {
pre_test_win()
pre_test_build_win()
}
}
stage('linux test') {
agent{label " slave3_0 || slave15 || slave16 || slave17 "}
options { skipDefaultCheckout() }
when {
changeRequest()
}
steps {
timeout(time: 45, unit: 'MINUTES'){
pre_test()
sh '''
cd ${WKC}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
sh '''
cd ${WKC}/tests
./test-all.sh b1fq
'''
}
}
}
}
}
}
}
post {
}
}
post {
success {
emailext (
subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' SUCCESS",
......
......@@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 3.16)
set(CMAKE_VERBOSE_MAKEFILE OFF)
SET(BUILD_SHARED_LIBS "OFF")
#set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin)
......
......@@ -54,8 +54,13 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
......
......@@ -22,6 +22,7 @@
#include "tmsg.h"
#include "tcommon.h"
#include "function.h"
#include "tdatablock.h"
#ifdef __cplusplus
extern "C" {
......@@ -54,14 +55,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes; // <0 var length, others fixed length bytes
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
bool varLengthColumn;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
......@@ -72,9 +73,10 @@ typedef struct SUdfColumnData {
struct {
int32_t varOffsetsLen;
char *varOffsets;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
......@@ -131,10 +133,114 @@ typedef int32_t (*TUdfSetupFunc)();
typedef int32_t (*TUdfTeardownFunc)();
//TODO: add API to check function arguments type, number etc.
//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory.
// then UDF framework will free the memory
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
#define UDF_MEMORY_EXP_GROWTH 1.5
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = TMAX(data->rowsAlloc, 8);
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
if (isVarCol) {
data->varLenCol.varOffsets[currentRow] = -1;
} else {
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
}
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = TMAX(currentRow + 1, data->numOfRows);
return 0;
}
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
......
......@@ -59,6 +59,8 @@ bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4);
bool taosValidateEncodec(const char *encodec);
int32_t taosHexEncode(const char *src, char *dst, int32_t len);
int32_t taosHexDecode(const char *src, char *dst, int32_t len);
int32_t taosWcharWidth(TdWchar wchar);
int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size);
......
......@@ -491,7 +491,12 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
bool isNull = false;
if (pBlock->pBlockAgg == NULL) {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, NULL);
} else {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
}
char* p = colDataGetData(pColData, j);
colDataAppend(pDstCol, j - startIndex, p, isNull);
......
......@@ -3308,7 +3308,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
}
/*
......
......@@ -277,7 +277,7 @@ typedef struct SOperatorInfo {
uint8_t operatorType;
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results
int32_t numOfExprs; // number of columns of the current operator results
char* name; // name, used to show the query execution plan
void* info; // extension attribution
SExprInfo* pExpr;
......@@ -415,8 +415,6 @@ typedef struct SOptrBasicInfo {
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
// SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
......@@ -577,13 +575,13 @@ typedef struct SSortedMergeOperatorInfo {
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock* pDataBlock;
SArray* pSortInfo;
SSortHandle* pSortHandle;
SArray* inputSlotMap; // for index map from table scan output
int32_t bufPageSize;
int32_t numOfRowsInRes;
// int32_t numOfRowsInRes;
// TODO extact struct
int64_t startTs; // sort start time
......@@ -645,10 +643,13 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity);
SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowCellInfoOffset);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
......@@ -663,7 +664,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
......
......@@ -89,7 +89,7 @@ int32_t tsortClose(SSortHandle* pHandle);
*
* @return
*/
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param);
/**
*
......
......@@ -202,9 +202,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {{0}};
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i);
if (!pDescNode->output) {
continue;
}
// if (!pDescNode->output) { // todo disable it temporarily
// continue;
// }
idata.info.type = pDescNode->dataType.type;
idata.info.bytes = pDescNode->dataType.bytes;
......@@ -651,7 +651,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
int32_t order) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
......@@ -713,7 +713,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock;
......@@ -798,7 +798,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
}
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs; // this can be set during create the struct
pCtx[k].fpSet.process(&pCtx[k]);
......@@ -2815,7 +2815,6 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
// NOTE: sources columns are more than the destination SSDatablock columns.
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) {
size_t numOfSrcCols = taosArrayGetSize(pCols);
ASSERT(numOfSrcCols >= pBlock->info.numOfCols);
int32_t i = 0, j = 0;
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
......@@ -3287,7 +3286,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pBlock->info.numOfCols;
pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
......@@ -3345,49 +3344,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
cleanupAggSup(&pInfo->aggSup);
}
// TODO merge aggregate super table
static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
bool isNull = tsortIsNullVal(pTupleHandle, i);
if (isNull) {
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
} else {
char* pData = tsortGetValue(pTupleHandle, i);
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
}
}
pBlock->info.rows += 1;
}
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
blockDataCleanup(pDataBlock);
blockDataEnsureCapacity(pDataBlock, capacity);
blockDataEnsureCapacity(pDataBlock, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
if (pDataBlock->info.rows >= capacity) {
return pDataBlock;
}
}
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
return pOperator->fpSet.getNextFn(pOperator);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
size_t size = taosArrayGetSize(groupInfo);
if (size == 0) {
......@@ -3490,8 +3446,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
} else {
doFinalizeResultImpl(pCtx, numOfExpr);
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
// TODO check for available buffer;
......@@ -3541,13 +3497,13 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true);
doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock);
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
}
doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput);
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs);
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
// TODO check for available buffer;
......@@ -3571,7 +3527,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
......@@ -3678,7 +3634,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = num;
pOperator->numOfExprs = num;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -3703,79 +3659,6 @@ _error:
return NULL;
}
static SSDataBlock* doSort(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT,
pInfo->bufPageSize, numOfBufPage, pInfo->pDataBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = tsortOpen(pInfo->pSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
SArray* pIndexMap, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->inputSlotMap = pIndexMap;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
int32_t getTableScanOrder(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
......@@ -3813,7 +3696,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
break;
}
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
// there is an scalar expression that needs to be calculated before apply the group aggregation.
......@@ -3827,7 +3710,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo);
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
......@@ -3848,7 +3731,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf,
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf,
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false);
......@@ -4092,17 +3975,17 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// todo dynamic set tags
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs);
if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs);
return pRes;
}
}
......@@ -4127,14 +4010,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pProjectInfo->existDataBlock = pBlock;
break;
} else { // init output buffer for a new group data
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs);
}
}
// todo dynamic set tags
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
// the pDataBlock are always the same one, no need to call this again
......@@ -4143,7 +4026,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput,
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
pProjectInfo->pPseudoColInfo);
int32_t status = handleLimitOffset(pOperator, pBlock);
......@@ -4156,7 +4039,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pProjectInfo->curOutput += pInfo->pRes->info.rows;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
......@@ -4289,7 +4172,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
if (pOperator->fpSet.closeFn != NULL) {
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput);
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs);
}
if (pOperator->pDownstream != NULL) {
......@@ -4425,7 +4308,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
......@@ -4477,14 +4360,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->inputSlotMap);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosArrayDestroy(pExInfo->pSources);
......@@ -4538,7 +4413,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = num;
pOperator->numOfExprs = num;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL);
......@@ -4621,7 +4496,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet =
......@@ -4979,7 +4854,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
if (pSortPhyNode->pExprs != NULL) {
pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
}
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
......@@ -5566,7 +5448,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
// only the timestamp match support for ordinary table
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
SExprInfo* pExprInfo = &pOperator->pExpr[i];
......@@ -5633,7 +5515,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......
......@@ -227,16 +227,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = j - num;
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
num = 1;
}
......@@ -244,15 +244,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (num > 0) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
}
}
......@@ -291,19 +291,19 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
doHashGroupbyAgg(pOperator, pBlock);
}
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo,
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
// pInfo->binfo.rowCellInfoOffset);
// }
......@@ -357,7 +357,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -392,7 +392,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t* rows = (int32_t*) pPage;
size_t numOfCols = pOperator->numOfOutput;
size_t numOfCols = pOperator->numOfExprs;
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->pExpr[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
......@@ -565,7 +565,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break;
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
doHashPartition(pOperator, pBlock);
}
......@@ -616,7 +616,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->pExpr = pExprInfo;
pOperator->info = pInfo;
......
......@@ -386,7 +386,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
......@@ -646,7 +646,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pOperator->fpSet.getNextFn = doStreamBlockScan;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
......@@ -1020,7 +1020,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info
doFilterResult(pInfo);
......@@ -1150,7 +1150,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
......@@ -1247,7 +1247,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
char *data = NULL, *dst = NULL;
int16_t type = 0, bytes = 0;
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
for(int32_t j = 0; j < pOperator->numOfExprs; ++j) {
// not assign value in case of user defined constant output column
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
continue;
......@@ -1308,7 +1308,7 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
......
#include "tdatablock.h"
#include "executorimpl.h"
static SSDataBlock* doSort(SOperatorInfo* pOperator);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
SArray* pIndexMap, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
goto _error;
}
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pInfo->pSortInfo = pSortInfo;
pInfo->inputSlotMap = pIndexMap;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
// TODO merge aggregate super table
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
bool isNull = tsortIsNullVal(pTupleHandle, i);
if (isNull) {
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
} else {
char* pData = tsortGetValue(pTupleHandle, i);
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
}
}
pBlock->info.rows += 1;
}
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
blockDataCleanup(pDataBlock);
blockDataEnsureCapacity(pDataBlock, capacity);
blockDataEnsureCapacity(pDataBlock, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
if (pDataBlock->info.rows >= capacity) {
return pDataBlock;
}
}
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
return pOperator->fpSet.getNextFn(pOperator);
}
// todo refactor: merged with fetch fp
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->pExpr != NULL) {
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
}
}
SSDataBlock* doSort(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT,
pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = tsortOpen(pInfo->pSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
}
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->inputSlotMap);
}
......@@ -325,7 +325,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
SqlFunctionCtx* pCtx = pInfo->pCtx;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
int32_t functionId = pCtx[k].functionId;
if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
pCtx[k].start.key = INT64_MIN;
......@@ -406,12 +406,12 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF
// start exactly from this point, no need to do interpolation
TSKEY key = ascQuery ? win->skey : win->ekey;
if (key == curTs) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
return true;
}
if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
return true;
}
......@@ -427,7 +427,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
STimeWindow* win) {
int32_t order = TSDB_ORDER_ASC;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
TSKEY actualEndKey = tsCols[endRowIndex];
TSKEY key = order ? win->ekey : win->skey;
......@@ -572,7 +572,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
} else {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
}
// point interpolation does not require the end key time window interpolation.
......@@ -592,7 +592,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_END_INTERP);
}
}
......@@ -612,7 +612,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
SArray* pUpdated = NULL;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
......@@ -683,7 +683,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
......@@ -773,7 +773,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break;
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
......@@ -798,7 +798,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
......@@ -813,7 +813,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
int64_t gid = pBlock->info.groupId;
bool masterScan = true;
int32_t numOfOutput = pOperator->numOfOutput;
int32_t numOfOutput = pOperator->numOfExprs;
int16_t bytes = pStateColInfoData->info.bytes;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
......@@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
pBInfo->rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
......@@ -1013,13 +1013,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}
finalizeUpdatedResult(pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......@@ -1082,7 +1082,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
......@@ -1141,7 +1141,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
......@@ -1169,7 +1169,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
bool masterScan = true;
int32_t numOfOutput = pOperator->numOfOutput;
int32_t numOfOutput = pOperator->numOfExprs;
int64_t gid = pBlock->info.groupId;
int64_t gap = pInfo->gap;
......@@ -1270,7 +1270,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
pBInfo->rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
......@@ -1309,7 +1309,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
break;
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
......@@ -1319,7 +1319,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pSliceInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs);
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
......@@ -1346,7 +1346,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -1388,7 +1388,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
......@@ -1440,7 +1440,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
......
......@@ -42,11 +42,7 @@ struct SSortHandle {
_sort_fetch_block_fn_t fetchfp;
_sort_merge_compar_fn_t comparFn;
void *pParam;
SMultiwayMergeTreeInfo *pMergeTree;
int32_t numOfCols;
int64_t startTs;
uint64_t sortElapsed;
uint64_t totalElapsed;
......@@ -61,6 +57,9 @@ struct SSortHandle {
bool inMemSort;
bool needAdjust;
STupleHandle tupleHandle;
void *param;
void (*beforeFp)(SSDataBlock* pBlock, void* param);
};
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
......@@ -533,6 +532,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
// perform the scalar function calculation before apply the sort
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
// todo relocate the columns
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
if (code != 0) {
return code;
......@@ -623,8 +629,10 @@ int32_t tsortClose(SSortHandle* pHandle) {
return TSDB_CODE_SUCCESS;
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
pHandle->fetchfp = fp;
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) {
pHandle->fetchfp = fetchFp;
pHandle->beforeFp = fp;
pHandle->param = param;
return TSDB_CODE_SUCCESS;
}
......
......@@ -210,7 +210,7 @@ TEST(testCase, inMem_sort_Test) {
taosArrayPush(orderInfo, &oi);
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL);
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
pInfo->startVal = 0;
......@@ -299,7 +299,7 @@ TEST(testCase, external_mem_sort_Test) {
taosArrayPush(orderInfo, &oi);
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL);
SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
ps->param = &pInfo[i];
......@@ -366,7 +366,7 @@ TEST(testCase, ordered_merge_sort_Test) {
}
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL);
tsortSetComparFp(phandle, docomp);
SSortSource* p[10] = {0};
......
......@@ -48,8 +48,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
udf1 PUBLIC os
)
udf1 PUBLIC os)
add_library(udf2 MODULE test/udf2.c)
target_include_directories(
......
......@@ -19,9 +19,6 @@
extern "C" {
#endif
//TODO replaces them with fnDebug
//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__)
#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
......@@ -107,7 +104,7 @@ void* decodeUdfRequest(const void *buf, SUdfRequest* request);
int32_t encodeUdfResponse(void **buf, const SUdfResponse *response);
void* decodeUdfResponse(const void* buf, SUdfResponse *response);
void freeUdfColumnData(SUdfColumnData *data);
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
......
......@@ -481,8 +481,8 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
return (void*)buf;
}
void freeUdfColumnData(SUdfColumnData *data) {
if (data->varLengthColumn) {
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
if (IS_VAR_DATA_TYPE(meta->type)) {
taosMemoryFree(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
taosMemoryFree(data->varLenCol.payload);
......@@ -496,7 +496,7 @@ void freeUdfColumnData(SUdfColumnData *data) {
}
void freeUdfColumn(SUdfColumn* col) {
freeUdfColumnData(&col->colData);
freeUdfColumnData(&col->colData, &col->colMeta);
}
void freeUdfDataDataBlock(SUdfDataBlock *block) {
......@@ -528,8 +528,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colMeta.scale = col->info.scale;
udfCol->colMeta.precision = col->info.precision;
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
if (udfCol->colData.varLengthColumn) {
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
......@@ -555,7 +554,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.numOfCols = 1;
block->info.rows = udfCol->colData.numOfRows;
block->info.hasVarCol = udfCol->colData.varLengthColumn;
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArraySetSize(block->pDataBlock, 1);
......
......@@ -75,8 +75,8 @@ typedef struct SUdf {
char path[PATH_MAX];
uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc;
TUdfFreeUdfColumnFunc freeUdfColumn;
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
......@@ -106,11 +106,6 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *freeSuffix = "_free";
strncpy(freeFuncName, processFuncName, strlen(processFuncName));
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
......@@ -215,7 +210,7 @@ void udfdProcessRequest(uv_work_t *req) {
udf->scalarProcFunc(&input, &output);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
udf->freeUdfColumn(&output);
freeUdfColumn(&output);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
......
......@@ -18,52 +18,20 @@ int32_t udf1_destroy() {
}
int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
SUdfColumnData *srcData = &block->udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) {
resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen;
resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen);
memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen);
resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen;
resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen);
memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen);
} else {
resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen;
resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen);
memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen);
resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen;
resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen);
memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen);
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
*(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
return 0;
}
int32_t udf1_free(SUdfColumn *col) {
SUdfColumnData *data = &col->colData;
if (data->varLengthColumn) {
free(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
free(data->varLenCol.payload);
data->varLenCol.payload = NULL;
} else {
free(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
free(data->fixLenCol.data);
data->fixLenCol.data = NULL;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
SUdfColumnData *srcData = &block->udfCols[0]->colData;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int32_t luckyNum = 88;
udfColSetRow(resultCol, i, (char*)&luckyNum, false);
}
return 0;
}
\ No newline at end of file
......@@ -1971,7 +1971,18 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d);
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_NCHAR: {
//cJSON only support utf-8 encoding. Convert memory content to hex string.
char *buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char));
code = taosHexEncode(varDataVal(pNode->datum.p), buf, varDataLen(pNode->datum.p));
if(code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
return TSDB_CODE_TSC_INVALID_VALUE;
}
code = tjsonAddStringToObject(pJson, jkValueDatum, buf);
taosMemoryFree(buf);
break;
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
......@@ -2074,7 +2085,26 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
break;
}
varDataSetLen(pNode->datum.p, pNode->node.resType.bytes);
code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p));
if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) {
char *buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1);
if (NULL == buf) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
code = tjsonGetStringValue(pJson, jkValueDatum, buf);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
break;
}
code = taosHexDecode(buf, varDataVal(pNode->datum.p), pNode->node.resType.bytes - VARSTR_HEADER_SIZE);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
break;
}
taosMemoryFree(buf);
} else {
code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p));
}
break;
}
case TSDB_DATA_TYPE_JSON:
......
......@@ -363,10 +363,8 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d
CHECK_OUT_OF_MEM(func);
strcpy(func->functionName, "cast");
func->node.resType = dt;
if (TSDB_DATA_TYPE_BINARY == dt.type) {
func->node.resType.bytes += 2;
} else if (TSDB_DATA_TYPE_NCHAR == dt.type) {
func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + 2;
if (TSDB_DATA_TYPE_NCHAR == dt.type) {
func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE;
}
nodesListMakeAppend(&func->pParameterList, pExpr);
return (SNode*)func;
......
......@@ -1043,6 +1043,7 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) {
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyInsertParseContextForTable(pCxt);
taosHashCleanup(pCxt->pVgroupsHashObj);
taosHashCleanup(pCxt->pSubTableHashObj);
destroyBlockHashmap(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pVgDataBlocks);
......
......@@ -4062,7 +4062,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
STranslateContext cxt = {0};
int32_t code = initTranslateContext(pParseCxt, &cxt);
int32_t code = initTranslateContext(pParseCxt, &cxt);
if (TSDB_CODE_SUCCESS == code) {
code = fmFuncMgtInit();
}
......
......@@ -709,6 +709,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
if (IS_VAR_DATA_TYPE(outputType)) {
outputLen += VARSTR_HEADER_SIZE;
}
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
char *output = outputBuf;
......@@ -790,29 +794,30 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
int32_t len;
if (inputType == TSDB_DATA_TYPE_BOOL) {
char tmp[8] = {0};
int32_t len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" );
len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" );
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
return TSDB_CODE_FAILED;
}
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen;
len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen;
bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
return TSDB_CODE_FAILED;
}
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
int32_t len = TMIN(outputLen, varDataLen(input) + VARSTR_HEADER_SIZE);
memcpy(output, input, len);
varDataSetLen(output, len - VARSTR_HEADER_SIZE);
len = TMIN(outputLen - VARSTR_HEADER_SIZE, varDataLen(input));
memcpy(output, input, len + VARSTR_HEADER_SIZE);
varDataSetLen(output, len);
} else {
char tmp[400] = {0};
NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
int32_t len = (int32_t)strlen(tmp);
len = (int32_t)strlen(tmp);
len = outputCharLen > len ? len : outputCharLen;
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
......@@ -820,6 +825,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
}
varDataSetLen(output, len);
}
//for constant conversion, need to set proper length of pOutput description
if (len < outputLen - VARSTR_HEADER_SIZE) {
pOutput->columnData->info.bytes = len;
}
break;
}
default: {
......
......@@ -435,7 +435,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
if (minNode) {
SDelayTask* minTask = container_of(minNode, SDelayTask, node);
if (minTask->execTime < task->execTime) {
timeoutMs = minTask->execTime <= now ? 0 : now - minTask->execTime;
timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now;
}
}
......
......@@ -44,7 +44,7 @@ int wordexp(char *words, wordexp_t *pwordexp, int flags) {
return -1;
}
printf("parse relative path:%s to abs path:%s\n", words, pwordexp->wordPos);
// printf("parse relative path:%s to abs path:%s\n", words, pwordexp->wordPos);
return 0;
}
......
......@@ -195,6 +195,36 @@ int32_t taosUcs4len(TdUcs4 *ucs4) {
return n;
}
//dst buffer size should be at least 2*len + 1
int32_t taosHexEncode(const char *src, char *dst, int32_t len) {
if (!dst) {
return -1;
}
for (int32_t i = 0; i < len; ++i) {
sprintf(dst + i * 2, "%02x", src[i] & 0xff);
}
return 0;
}
int32_t taosHexDecode(const char *src, char *dst, int32_t len) {
if (!dst) {
return -1;
}
uint16_t hn, ln, out;
for (int i = 0, j = 0; i < len * 2; i += 2, ++j ) {
hn = src[i] > '9' ? src[i] - 'A' + 10 : src[i] - '0';
ln = src[i + 1] > '9' ? src[i + 1] - 'A' + 10 : src[i + 1] - '0';
out = (hn << 4) | ln;
memcpy(dst + j, &out, 1);
}
return 0;
}
int32_t taosWcharWidth(TdWchar wchar) { return wcwidth(wchar); }
int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size) { return wcswidth(pWchar, size); }
......
......@@ -39,6 +39,7 @@
./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/session.sim
./test.sh -f tsim/query/scalarNull.sim
./test.sh -f tsim/query/udf.sim
# ---- qnode
./test.sh -f tsim/qnode/basic1.sim
......
#!/bin/bash
set +e
#set -x
echo "Executing copy_udf.sh"
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
IN_TDINTERNAL="community"
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
cd ../../..
else
cd ../../
fi
TAOS_DIR=`pwd`
UDF1_DIR=`find $TAOS_DIR -name "libudf1.so"|grep lib|head -n1`
UDF2_DIR=`find $TAOS_DIR -name "libudf2.so"|grep lib|head -n1`
echo $UDF1_DIR
echo $UDF2_DIR
UDF_TMP=/tmp/udf
mkdir $UDF_TMP
rm $UDF_TMP/libudf1.so
rm $UDF_TMP/libudf2.so
echo "Copy udf shared library files to $UDF_TMP"
cp $UDF1_DIR $UDF_TMP
cp $UDF2_DIR $UDF_TMP
......@@ -66,7 +66,7 @@ print ============= create database
# | REPLICA value [1 | 3]
# | WAL value [1 | 2]
sql create database db CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 1 WAL 2 VGROUPS 6 SINGLE_STABLE 1
sql create database db CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 3 WAL 2 VGROUPS 6 SINGLE_STABLE 1
sql show databases
print rows: $rows
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
......@@ -86,7 +86,7 @@ endi
if $data3_db != 0 then # ntables
return -1
endi
if $data4_db != 1 then # replica
if $data4_db != 3 then # replica
return -1
endi
if $data5_db != nostrict then # strict
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
print ======== step1 udf
system sh/copy_udf.sh
sql create database udf vgroups 3;
sql use udf;
sql show databases;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2);
sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;
sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;
sql show functions;
if $rows != 2 then
return -1
endi
sql select udf1(f) from t;
if $rows != 2 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
sql select udf2(f) from t;
if $rows != 1 then
return -1
endi
if $data00 != 2.236067977 then
return -1
endi
#sql drop function udf1;
#sql drop function udf2;
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
......@@ -191,13 +191,13 @@ class TDTestCase:
def support_types(self):
type_error_sql_lists = [
"select log(ts ,2 ) from t1" ,
"select log(c7,2 ) from t1",
"select log(c8,2 ) from t1",
"select log(c9,2 ) from t1",
"select log(ts,2 ) from ct1" ,
"select log(c7,2 ) from ct1",
"select log(c8,2 ) from ct1",
"select log(c9,2 ) from ct1",
"select log(c7,c2 ) from t1",
"select log(c8,c1 ) from t1",
"select log(c9,c2 ) from t1",
"select log(ts,c7 ) from ct1" ,
"select log(c7,c9 ) from ct1",
"select log(c8,c2 ) from ct1",
"select log(c9,c1 ) from ct1",
"select log(ts,2 ) from ct3" ,
"select log(c7,2 ) from ct3",
"select log(c8,2 ) from ct3",
......
此差异已折叠。
......@@ -22,4 +22,5 @@ python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/ceil.py
python3 ./test.py -f 2-query/floor.py
python3 ./test.py -f 2-query/round.py
python3 ./test.py -f 2-query/log.py
\ No newline at end of file
python3 ./test.py -f 2-query/log.py
python3 ./test.py -f 2-query/pow.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册