提交 459b0ef4 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 cde4c9a7
......@@ -56,8 +56,9 @@ typedef struct SColumnDataAgg {
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t tupleSize;
int32_t numOfCols;
int64_t uid;
union {int64_t uid; int64_t blockId;};
} SDataBlockInfo;
typedef struct SConstantItem {
......@@ -108,7 +109,7 @@ static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlo
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
tlen += taosEncodeFixedI16(buf, pColData->info.type);
tlen += taosEncodeFixedI16(buf, pColData->info.bytes);
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
int32_t colSz = rows * pColData->info.bytes;
tlen += taosEncodeBinary(buf, pColData->pData, colSz);
......@@ -127,7 +128,7 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock)
SColumnInfoData data = {0};
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI16(buf, &data.info.bytes);
buf = taosDecodeFixedI32(buf, &data.info.bytes);
int32_t colSz = pBlock->info.rows * data.info.bytes;
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
taosArrayPush(pBlock->pDataBlock, &data);
......@@ -212,10 +213,23 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
// the following structure shared by parser and executor
typedef struct SColumn {
uint64_t uid;
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
SColumnInfo info;
union {
uint64_t uid;
int64_t dataBlockId;
char name[TSDB_COL_NAME_LEN];
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
union {
int16_t colId;
int16_t slotId;
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
// SColumnInfo info;
} SColumn;
typedef struct SLimit {
......@@ -233,21 +247,24 @@ typedef struct SGroupbyExpr {
bool groupbyTag; // group by tag or column
} SGroupbyExpr;
// the structure for sql function in select clause
typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
typedef struct SFunctParam {
int32_t type;
SColumn *pCol;
SVariant param;
} SFunctParam;
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
// the structure for sql function in select clause
typedef struct SExprBasicInfo {
SSchema resSchema; // TODO refactor
int32_t interBytes; // inter result buffer size, TODO remove it
int16_t numOfParams; // argument value of each function
SFunctParam *pParam;
// SVariant param[3]; // parameters are not more than 3
} SExprBasicInfo;
typedef struct SExprInfo {
struct SSqlExpr base;
struct tExprNode* pExpr;
struct SExprBasicInfo base;
struct tExprNode *pExpr;
} SExprInfo;
typedef struct SStateWindow {
......@@ -94,8 +94,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t colDataGetNumOfCols(const SSDataBlock* pBlock);
size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
......@@ -424,9 +424,10 @@ typedef struct {
int16_t slotId;
int16_t type;
int16_t bytes;
SColumnFilterList flist;
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SColumnInfo;
typedef struct {
......@@ -456,57 +457,6 @@ typedef struct {
int64_t offset;
} SInterval;
typedef struct {
SMsgHead head;
char version[TSDB_VERSION_LEN];
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool interpQuery; // interp query or not
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo; // if the time window start/end required interpolation
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
bool tsCompQuery; // is tscomp query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
STimeWindow window;
int32_t numOfTables;
int16_t order;
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval;
// SSessionWindow sw; // session window
int16_t tagCondLen; // tag length in current query
int16_t colCondLen; // column length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query.
int64_t limit;
int64_t offset;
int32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers
int16_t fillType; // interpolate type
int64_t fillVal; // default value array list
int32_t secondStageOutput;
STsBufInfo tsBuf; // tsBuf info
int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length
int32_t numOfOperator;
int32_t tableScanOperator; // table scan operator. -1 means no scan operator
int32_t udfNum; // number of udf function
int32_t udfContentOffset;
int32_t udfContentLen;
SColumnInfo tableCols[];
} SQueryTableReq;
typedef struct {
int32_t code;
} SQueryTableRsp;
......@@ -20,9 +20,30 @@
extern "C" {
#include "tbuffer.h"
#include "tcommon.h"
#include "tvariant.h"
#include "tbuffer.h"
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
typedef struct SFunctionNode SFunctionNode;
typedef struct SFuncExecEnv {
int32_t calcMemSize;
} SFuncExecEnv;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef struct SFuncExecFuncs {
FExecGetEnv getEnv;
FExecInit init;
FExecProcess process;
FExecFinalize finalize;
} SFuncExecFuncs;
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
......@@ -118,57 +139,58 @@ typedef struct SExtTagsInfo {
} SExtTagsInfo;
typedef struct SResultDataInfo {
int16_t precision;
int16_t scale;
int16_t type;
int16_t bytes;
int32_t intermediateBytes;
int32_t interBufSize;
} SResultDataInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
typedef struct SFunctionFpSet {
bool (*init)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(struct SqlFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result.
void (*finalize)(struct SqlFunctionCtx *pCtx);
void (*combine)(struct SqlFunctionCtx *pCtx);
} SFunctionFpSet;
extern SFunctionFpSet fpSet[1];
typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int32_t numOfRows; // the number of rows needs to be handled
int32_t numOfInputCols; // PTS is not included
bool colDataAggIsSet;// if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column
SColumnInfoData **pData;
SColumnDataAgg **pColumnDataAgg;
} SInputColumnInfoData;
// sql function runtime context
typedef struct SqlFunctionCtx {
int32_t startRow;
int32_t size; // number of rows
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // asc|desc
int32_t startRow; // start row index
int32_t size; // handled processed row number
SColumnInfoData* pInput;
uint32_t order; // asc|desc
int16_t inputType;
int16_t inputBytes;
SResultDataInfo resDataInfo;
bool hasNull; // null value exist in current block
bool requireNull; // require null in some function
bool stableQuery;
int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data
uint8_t currentStage; // record current running step, default: 0
int64_t startTs; // timestamp range of current query when function is executed on a specific data block
int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SVariant tag;
bool isAggSet;
SColumnDataAgg agg;
SColumnDataAgg agg;
int16_t inputType; // TODO remove it
int16_t inputBytes; // TODO remove it
bool hasNull; // null value exist in current block, TODO remove it
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
uint8_t currentStage; // record current running step, default: 0
bool isAggSet;
bool stableQuery;
int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data
int64_t startTs; // timestamp range of current query when function is executed on a specific data block
int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
int32_t columnIndex;
SFunctionFpSet* fpSet;
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
SFuncExecFuncs fpSet;
} SqlFunctionCtx;
enum {
......@@ -194,9 +216,10 @@ typedef struct tExprNode {
struct SVariant *pVal; // value node
struct {// function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId;
int32_t num;
SFunctionNode *pFunctNode;
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
// calculation instead.
// E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes.
......@@ -207,7 +230,6 @@ typedef struct tExprNode {
} tExprNode;
//TODO create?
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
......@@ -294,7 +316,7 @@ tExprNode* exprdup(tExprNode* pTree);
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
......@@ -102,22 +102,6 @@ struct SqlFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
typedef struct SFuncExecEnv {
int32_t calcMemSize;
} SFuncExecEnv;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef struct SFuncExecFuncs {
FExecGetEnv getEnv;
FExecInit init;
FExecProcess process;
FExecFinalize finalize;
} SFuncExecFuncs;
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
......@@ -435,11 +435,11 @@ static int32_t hbCreateThread() {
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
// if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
// pthread_attr_destroy(&thAttr);
return 0;
......@@ -53,7 +53,7 @@ TEST(testCase, driverInit_Test) {
// taos_init();
#if 1
#if 0
TEST(testCase, connect_Test) {
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
......@@ -571,15 +571,15 @@ TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "use abc1");
#if 0
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
......@@ -592,7 +592,7 @@ TEST(testCase, projection_query_tables) {
for(int32_t i = 0; i < 10000000; ++i) {
for(int32_t i = 0; i < 10000; ++i) {
char sql[512] = {0};
sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
TAOS_RES* p = taos_query(pConn, sql);
......@@ -602,8 +602,9 @@ TEST(testCase, projection_query_tables) {
pRes = taos_query(pConn, "select * from tu");
pRes = taos_query(pConn, "select count(ts) from tu");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
......@@ -623,8 +624,8 @@ TEST(testCase, projection_query_tables) {
#if 0
#if 0
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -239,7 +239,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
return numOfRow1 + numOfRow2;
size_t colDataGetNumOfCols(const SSDataBlock* pBlock) {
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0;
......@@ -247,7 +247,7 @@ size_t colDataGetNumOfCols(const SSDataBlock* pBlock) {
return pBlock->info.numOfCols;
size_t colDataGetNumOfRows(const SSDataBlock* pBlock) {
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) {
return pBlock->info.rows;
......@@ -160,8 +160,8 @@ TEST(testCase, Datablock_test) {
printf("binary column length:%d\n", *(int32_t*) p1->pData);
ASSERT_EQ(colDataGetNumOfCols(b), 2);
ASSERT_EQ(colDataGetNumOfRows(b), 40);
ASSERT_EQ(blockDataGetNumOfCols(b), 2);
ASSERT_EQ(blockDataGetNumOfRows(b), 40);
char* pData = colDataGetData(p1, 3);
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
......@@ -13,18 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <tdatablock.h>
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "texception.h"
#include "tsdb.h"
#include "tsdbDef.h"
#include "tsdbFS.h"
#include "tsdbLog.h"
#include "tsdbReadImpl.h"
#include "ttime.h"
#include "texception.h"
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tskiplist.h"
#include "ttime.h"
#include "taosdef.h"
#include "tlosertree.h"
......@@ -1472,6 +1473,8 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
return numOfRows + num;
// TODO fix bug for reverse copy data
// TODO handle the null data
// Note: row1 always has high priority
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
......@@ -1515,7 +1518,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
int32_t i = 0, j = 0, k = 0;
while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
......@@ -1586,7 +1588,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal);
if (colId == pColInfo->info.colId) {
if (tdValTypeIsNorm(sVal.valType)) {
switch (pColInfo->info.type) {
......@@ -1594,7 +1595,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
memcpy(pData, sVal.val, varDataTLen(sVal.val));
......@@ -1625,11 +1625,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
memcpy(pData, sVal.val, pColInfo->info.bytes);
} else if (forceSetNull) {
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
colDataAppend(pColInfo, numOfRows, NULL, true);
......@@ -1640,11 +1636,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else {
if(forceSetNull) {
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
colDataAppend(pColInfo, numOfRows, NULL, true);
......@@ -1653,18 +1645,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
if(forceSetNull) {
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
colDataAppend(pColInfo, numOfRows, NULL, true);
......@@ -111,7 +111,6 @@ void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultR
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
......@@ -362,8 +362,8 @@ typedef struct STaskParam {
char* tbnameCond;
char* prevResult;
SArray* pTableIdList;
SSqlExpr** pExpr;
SSqlExpr** pSecExpr;
SExprBasicInfo** pExpr;
SExprBasicInfo** pSecExpr;
SExprInfo* pExprs;
SExprInfo* pSecExprs;
......@@ -448,7 +448,6 @@ typedef struct SOptrBasicInfo {
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx;
SSDataBlock* pRes;
uint32_t resRowSize;
int32_t capacity;
} SOptrBasicInfo;
......@@ -617,8 +616,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
......@@ -650,8 +649,6 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
......@@ -676,16 +673,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg,
SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo* pUdfInfo);
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
SGroupbyExpr* createGroupbyExprFromMsg(SQueryTableReq* pQueryMsg, SColIndex* pColIndex, int32_t* code);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger);
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifdef __cplusplus
extern "C" {
#include "function.h"
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void functionFinalizer(SqlFunctionCtx *pCtx);
bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void countFunction(SqlFunctionCtx *pCtx);
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void sumFunction(SqlFunctionCtx *pCtx);
#ifdef __cplusplus
......@@ -83,9 +83,7 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
......@@ -14,7 +14,9 @@
#include "builtins.h"
#include "builtinsimpl.h"
#include "taoserror.h"
#include "tdatablock.h"
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc);
......@@ -23,29 +25,29 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "count",
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.processFunc = NULL,
.finalizeFunc = NULL
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getCountFuncEnv,
.initFunc = functionSetup,
.processFunc = countFunction,
.finalizeFunc = functionFinalizer
.name = "sum",
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.processFunc = NULL,
.finalizeFunc = NULL
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getSumFuncEnv,
.initFunc = functionSetup,
.processFunc = sumFunction,
.finalizeFunc = functionFinalizer
.name = "concat",
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
......@@ -54,5 +56,11 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
const int funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition));
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
switch(pFunc->funcType) {
case FUNCTION_TYPE_COUNT: pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};break;
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "builtinsimpl.h"
#include "taggfunction.h"
#include "tdatablock.h"
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
break; \
} \
(_info)->numOfRes = (res); \
(_info)->hasResult = DATA_SET_FLAG; \
} while (0)
typedef struct SSumRes {
// int8_t hasResult;
union {
int64_t isum;
uint64_t usum;
double dsum;
} SSumRes;
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (pResultInfo->initialized) {
return false;
if (pCtx->pOutput != NULL) {
memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes);
initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
return true;
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
void functionFinalizer(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(int64_t);
return true;
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
void countFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0;
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true;
* 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
ASSERT(numOfElem >= 0);
} else {
if (pInputCol->hasNull) {
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
numOfElem += 1;
} else {
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
numOfElem = pInput->numOfRows;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
*((int64_t *)buf) += numOfElem;
SET_VAL(pResInfo, numOfElem, 1);
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t *d = (_t *)(_col->pData); \
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
}; \
(_res) += (d)[i]; \
(numOfElem)++; \
} \
} while (0)
static void do_sum(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type;
if (pInput->colDataAggIsSet) {
numOfElem = pInput->numOfRows - pAgg->numOfNull;
ASSERT(numOfElem >= 0);
SSumRes* pSumInfo = (SSumRes*) pCtx->pOutput;
pSumInfo->isum += pAgg->sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pSumInfo->usum += pAgg->sum;
} else if (IS_FLOAT_TYPE(type)) {
pSumInfo->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
} else { // computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
SSumRes* pSum = (SSumRes*) pCtx->pOutput;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int8_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int16_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int32_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int64_t, numOfElem);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) {
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint8_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) {
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint16_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) {
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint32_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) {
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint64_t, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
LIST_ADD_N(pSum->dsum, pCol, start, numOfRows, double, numOfElem);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
LIST_ADD_N(pSum->dsum, pCol, start, numOfRows, float, numOfElem);
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSumRes);
return true;
void sumFunction(SqlFunctionCtx *pCtx) {
// keep the result data in output buffer, not in the intermediate buffer
// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// if (pResInfo->hasResult == DATA_SET_FLAG) {
// set the flag for super table query
// SSumRes *pSum = (SSumRes *)pCtx->pOutput;
// pSum->hasResult = DATA_SET_FLAG;
// }
......@@ -26,19 +26,27 @@ typedef struct SFuncMgtService {
} SFuncMgtService;
static SFuncMgtService gFunMgtService;
static pthread_once_t functionHashTableInit = PTHREAD_ONCE_INIT;
static int32_t initFunctionCode = 0;
// todo refactor
int32_t fmFuncMgtInit() {
static void doInitFunctionHashTable() {
gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == gFunMgtService.pFuncNameHashTable) {
initFunctionCode = TSDB_CODE_FAILED;
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name, strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) {
initFunctionCode = TSDB_CODE_FAILED;
int32_t fmFuncMgtInit() {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
return initFunctionCode;
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
......@@ -543,7 +543,7 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co
pFillCol[i].col.offset = offset;
pFillCol[i].col.colId = pExprInfo->base.resSchema.colId;
pFillCol[i].tagIndex = -2;
pFillCol[i].flag = pExprInfo->base.pColumns->flag; // always be the normal column for table query
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
pFillCol[i].fillVal.i = fillVal[i];
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册