未验证 提交 c28af700 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13256 from taosdata/feature/3_liaohj

enh(query): enable twa function in select clause.
...@@ -227,6 +227,9 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); ...@@ -227,6 +227,9 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
...@@ -246,54 +249,6 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 ...@@ -246,54 +249,6 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) {
int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t);
uint64_t* groupId = (uint64_t*)data;
data += sizeof(uint64_t);
int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
// copy the null bitmap
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
size_t metaSize = numOfRows * sizeof(int32_t);
memcpy(data, pColRes->varmeta.offset, metaSize);
data += metaSize;
(*dataLen) += metaSize;
} else {
int32_t len = BitmapLen(numOfRows);
memcpy(data, pColRes->nullbitmap, len);
data += len;
(*dataLen) += len;
}
if (needCompress) {
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
data += colSizes[col];
(*dataLen) += colSizes[col];
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
(*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
}
colSizes[col] = htonl(colSizes[col]);
}
*actualLen = *dataLen;
*groupId = pBlock->info.groupId;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -45,6 +45,8 @@ extern bool tsEnableSlaveQuery; ...@@ -45,6 +45,8 @@ extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth; extern bool tsPrintAuth;
extern int64_t tsTickPerMin[3]; extern int64_t tsTickPerMin[3];
extern int32_t tsCountAlwaysReturnValue;
// multi-process // multi-process
extern int32_t tsMultiProcess; extern int32_t tsMultiProcess;
extern int32_t tsMnodeShmSize; extern int32_t tsMnodeShmSize;
...@@ -102,7 +104,6 @@ extern int32_t tsMaxStreamComputDelay; ...@@ -102,7 +104,6 @@ extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay; extern int32_t tsStreamCompStartDelay;
extern int32_t tsRetryStreamCompDelay; extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow; extern int64_t tsMaxRetentWindow;
// build info // build info
......
...@@ -571,13 +571,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR ...@@ -571,13 +571,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
int16_t flag; // denote if it is a tag or a normal column
char name[TSDB_DB_FNAME_LEN];
} SColIndex;
typedef struct { typedef struct {
int16_t lowerRelOptr; int16_t lowerRelOptr;
int16_t upperRelOptr; int16_t upperRelOptr;
......
...@@ -156,18 +156,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); ...@@ -156,18 +156,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
*/ */
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
/**
* Create the table group according to the group by tags info
* @param pTableIdList
* @param skey
* @param groupInfo
* @param groupByIndex
* @param numOfIndex
* @return
*/
// int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex*
// groupByIndex, int32_t numOfIndex);
/** /**
* Update the table id list of a given query. * Update the table id list of a given query.
* @param uid child table uid * @param uid child table uid
......
...@@ -61,56 +61,9 @@ typedef struct SFileBlockInfo { ...@@ -61,56 +61,9 @@ typedef struct SFileBlockInfo {
#define TSDB_BLOCK_DIST_STEP_ROWS 8 #define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_TYPE_SCALAR 1
#define FUNCTION_TYPE_AGG 2
#define TOP_BOTTOM_QUERY_LIMIT 100 #define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 16 #define FUNCTIONS_NAME_MAX_LENGTH 16
#define FUNCTION_INVALID_ID -1
#define FUNCTION_COUNT 0
#define FUNCTION_SUM 1
#define FUNCTION_AVG 2
#define FUNCTION_MIN 3
#define FUNCTION_MAX 4
#define FUNCTION_STDDEV 5
#define FUNCTION_PERCT 6
#define FUNCTION_APERCT 7
#define FUNCTION_FIRST 8
#define FUNCTION_LAST 9
#define FUNCTION_LAST_ROW 10
#define FUNCTION_TOP 11
#define FUNCTION_BOTTOM 12
#define FUNCTION_SPREAD 13
#define FUNCTION_TWA 14
#define FUNCTION_LEASTSQR 15
#define FUNCTION_TS 16
#define FUNCTION_TS_DUMMY 17
#define FUNCTION_TAG_DUMMY 18
#define FUNCTION_TS_COMP 19
#define FUNCTION_TAG 20
#define FUNCTION_PRJ 21
#define FUNCTION_TAGPRJ 22
#define FUNCTION_ARITHM 23
#define FUNCTION_DIFF 24
#define FUNCTION_FIRST_DST 25
#define FUNCTION_LAST_DST 26
#define FUNCTION_STDDEV_DST 27
#define FUNCTION_INTERP 28
#define FUNCTION_RATE 29
#define FUNCTION_IRATE 30
#define FUNCTION_TID_TAG 31
#define FUNCTION_DERIVATIVE 32
#define FUNCTION_BLKINFO 33
#define FUNCTION_COV 38
typedef struct SResultRowEntryInfo { typedef struct SResultRowEntryInfo {
bool initialized:1; // output buffer has been initialized bool initialized:1; // output buffer has been initialized
bool complete:1; // query has completed bool complete:1; // query has completed
...@@ -180,10 +133,9 @@ typedef struct SqlFunctionCtx { ...@@ -180,10 +133,9 @@ typedef struct SqlFunctionCtx {
char *pOutput; // final result output buffer, point to sdata->data char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list, todo remove it
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset; int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo; struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries; SSubsidiaryResInfo subsidiaries;
SPoint1 start; SPoint1 start;
...@@ -210,9 +162,6 @@ enum { ...@@ -210,9 +162,6 @@ enum {
typedef struct tExprNode { typedef struct tExprNode {
int32_t nodeType; int32_t nodeType;
union { union {
SSchema *pSchema;// column node
struct SVariant *pVal; // value node
struct {// function node struct {// function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId; int32_t functionId;
...@@ -255,47 +204,23 @@ struct SScalarParam { ...@@ -255,47 +204,23 @@ struct SScalarParam {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable); bool isSuperTable);
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock); int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// fill api
struct SFillInfo;
struct SFillColInfo;
typedef struct SPoint { typedef struct SPoint {
int64_t key; int64_t key;
void * val; void * val;
} SPoint; } SPoint;
//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
//
//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
// SInterval* pInterval, int32_t fillType,
// struct SFillColInfo* pCol, const char* id);
//
//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
//int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api // udf api
struct SUdfInfo; struct SUdfInfo;
void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
/** /**
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
* @return error code * @return error code
......
...@@ -229,7 +229,7 @@ int32_t tdListAppend(SList *list, void *data); ...@@ -229,7 +229,7 @@ int32_t tdListAppend(SList *list, void *data);
SListNode *tdListPopHead(SList *list); SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list); SListNode *tdListPopTail(SList *list);
SListNode *tdListGetHead(SList *list); SListNode *tdListGetHead(SList *list);
SListNode *tsListGetTail(SList *list); SListNode *tdListGetTail(SList *list);
SListNode *tdListPopNode(SList *list, SListNode *node); SListNode *tdListPopNode(SList *list, SListNode *node);
void tdListMove(SList *src, SList *dst); void tdListMove(SList *src, SList *dst);
void tdListDiscard(SList *list); void tdListDiscard(SList *list);
......
...@@ -567,6 +567,7 @@ TEST(testCase, insert_test) { ...@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -625,23 +626,23 @@ TEST(testCase, projection_query_tables) { ...@@ -625,23 +626,23 @@ TEST(testCase, projection_query_tables) {
printf("start to insert next table\n"); printf("start to insert next table\n");
for(int32_t i = 0; i < 1000000; i += 20) { // for(int32_t i = 0; i < 1000000; i += 20) {
char sql[1024] = {0}; // char sql[1024] = {0};
sprintf(sql, // sprintf(sql,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" // "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" // "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" // "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", // "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7, // i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14, // i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); // i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql); // TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) { // if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p)); // printf("failed to insert data, reason:%s\n", taos_errstr(p));
} // }
//
taos_free_result(p); // taos_free_result(p);
} // }
// pRes = taos_query(pConn, "select * from tu"); // pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
...@@ -663,7 +664,7 @@ TEST(testCase, projection_query_tables) { ...@@ -663,7 +664,7 @@ TEST(testCase, projection_query_tables) {
// taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#if 0
TEST(testCase, projection_query_stables) { TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) { ...@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -734,5 +733,6 @@ TEST(testCase, agg_query_tables) { ...@@ -734,5 +733,6 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -355,14 +355,19 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) ...@@ -355,14 +355,19 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
return -1; return -1;
} }
int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex; int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
return 0; return 0;
} }
pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0); TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
pDataBlock->info.window.skey = TMIN(skey, ekey);
pDataBlock->info.window.ekey = TMAX(skey, ekey);
return 0; return 0;
} }
...@@ -1273,25 +1278,39 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { ...@@ -1273,25 +1278,39 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
memmove(nullBitmap, nullBitmap + n / 8, newLen); memmove(nullBitmap, nullBitmap + n / 8, newLen);
} else { } else {
int32_t tail = n % 8; int32_t tail = n % 8;
int32_t i = 0; int32_t i = 0;
uint8_t* p = (uint8_t*)nullBitmap; uint8_t* p = (uint8_t*)nullBitmap;
while (i < len) {
uint8_t v = p[i];
p[i] = 0; if (n < 8) {
p[i] = (v << tail); while (i < len) {
uint8_t v = p[i]; // source bitmap value
p[i] = (v << tail);
if (i < len - 1) {
uint8_t next = p[i + 1];
p[i] |= (next >> (8 - tail));
}
if (i < len - 1) { i += 1;
uint8_t next = p[i + 1];
p[i] |= (next >> (8 - tail));
} }
} else if (n > 8) {
int32_t gap = len - newLen;
while(i < newLen) {
uint8_t v = p[i + gap];
p[i] = (v << tail);
if (i < newLen - 1) {
uint8_t next = p[i + gap + 1];
p[i] |= (next >> (8 - tail));
}
i += 1; i += 1;
}
} }
} }
} }
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
...@@ -1803,3 +1822,99 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1803,3 +1822,99 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return ret; return ret;
} }
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) {
int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t);
uint64_t* groupId = (uint64_t*)data;
data += sizeof(uint64_t);
int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
// copy the null bitmap
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
size_t metaSize = numOfRows * sizeof(int32_t);
memcpy(data, pColRes->varmeta.offset, metaSize);
data += metaSize;
(*dataLen) += metaSize;
} else {
int32_t len = BitmapLen(numOfRows);
memcpy(data, pColRes->nullbitmap, len);
data += len;
(*dataLen) += len;
}
if (needCompress) {
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
data += colSizes[col];
(*dataLen) += colSizes[col];
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
(*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
}
colSizes[col] = htonl(colSizes[col]);
}
*actualLen = *dataLen;
*groupId = pBlock->info.groupId;
}
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
blockDataEnsureCapacity(pBlock, numOfRows);
const char* pStart = pData;
int32_t dataLen = *(int32_t*)pStart;
pStart += sizeof(int32_t);
pBlock->info.groupId = *(uint64_t*)pStart;
pStart += sizeof(uint64_t);
int32_t* colLen = (int32_t*)pStart;
pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colLen[i];
pColInfoData->varmeta.allocLen = colLen[i];
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
pStart += sizeof(int32_t) * numOfRows;
if (colLen[i] > 0) {
taosMemoryFreeClear(pColInfoData->pData);
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
}
} else {
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
pStart += BitmapLen(numOfRows);
}
if (colLen[i] > 0) {
memcpy(pColInfoData->pData, pStart, colLen[i]);
}
// TODO
// setting this flag to true temporarily so aggregate function on stable will
// examine NULL value for non-primary key column
pColInfoData->hasNull = true;
pStart += colLen[i];
}
ASSERT(pStart - pData == dataLen);
return pStart;
}
\ No newline at end of file
...@@ -109,8 +109,11 @@ int32_t tsCompressColData = -1; ...@@ -109,8 +109,11 @@ int32_t tsCompressColData = -1;
*/ */
int32_t tsCompatibleModel = 1; int32_t tsCompatibleModel = 1;
// count/hyperloglog function always return values in case of all NULL data or Empty data set.
int32_t tsCountAlwaysReturnValue = 1;
// 10 ms for sliding time, the value will changed in case of time precision changed // 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10; int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result // the maxinum number of distict query result
int32_t tsMaxNumOfDistinctResults = 1000 * 10000; int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
...@@ -130,7 +133,6 @@ int32_t tsRetryStreamCompDelay = 10 * 1000; ...@@ -130,7 +133,6 @@ int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default. // The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f; float tsStreamComputDelayRatio = 0.1f;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node. // the maximum allowed query buffer size during query processing for each data node.
...@@ -374,6 +376,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -374,6 +376,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
...@@ -567,6 +570,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -567,6 +570,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32; tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32; tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32; tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32; tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
......
...@@ -75,15 +75,15 @@ typedef struct SResultRowInfo { ...@@ -75,15 +75,15 @@ typedef struct SResultRowInfo {
int32_t size; // number of result set int32_t size; // number of result set
int32_t capacity; // max capacity int32_t capacity; // max capacity
SResultRowPosition cur; SResultRowPosition cur;
SList* openWindow;
} SResultRowInfo; } SResultRowInfo;
struct SqlFunctionCtx; struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow *pResultRow); void initResultRow(SResultRow *pResultRow);
...@@ -92,15 +92,6 @@ bool isResultRowClosed(SResultRow* pResultRow); ...@@ -92,15 +92,6 @@ bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset);
static FORCE_INLINE SResultRow *getResultRow(SDiskbasedBuf* pBuf, SResultRowInfo *pResultRowInfo, int32_t slot) {
ASSERT(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
SResultRowPosition* pos = &pResultRowInfo->pPosition[slot];
SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow;
}
static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
......
...@@ -454,13 +454,14 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -454,13 +454,14 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindow win; // query time range STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock char** pRow; // previous row/tuple of already processed datablock
SArray* pInterpCols; // interpolation columns
STableQueryInfo* pCurrent; // current tableQueryInfo struct STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info
bool invertible; bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo { typedef struct SStreamFinalIntervalOperatorInfo {
...@@ -832,7 +833,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary ...@@ -832,7 +833,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t order); int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
int32_t* pIndex); int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
......
...@@ -101,20 +101,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow ...@@ -101,20 +101,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
} }
int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {
int32_t i = 0;
// while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
// ++i;
// }
return i;
}
void closeAllResultRows(SResultRowInfo *pResultRowInfo) { void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); // do nothing
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
}
} }
bool isResultRowClosed(SResultRow* pRow) { bool isResultRowClosed(SResultRow* pRow) {
...@@ -258,32 +246,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { ...@@ -258,32 +246,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows); return (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
} }
static int64_t getNumOfResultWindowRes(STaskRuntimeEnv* pRuntimeEnv, SResultRowPosition *pos, int32_t* rowCellInfoOffset) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
ASSERT(0);
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
continue;
}
// SResultRowEntryInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset);
// assert(pResultInfo != NULL);
//
// if (pResultInfo->numOfRes > 0) {
// return pResultInfo->numOfRes;
// }
}
return 0;
}
static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft; int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight; int32_t right = *(int32_t *)pRight;
...@@ -381,7 +343,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe ...@@ -381,7 +343,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
} }
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset); int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset);
if (num <= 0) { if (num <= 0) {
continue; continue;
} }
......
...@@ -239,36 +239,6 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { ...@@ -239,36 +239,6 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
return true; return true;
} }
static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) {
int64_t newCapacity = 0;
// more than the capacity, reallocate the resources
if (pResultRowInfo->size < pResultRowInfo->capacity) {
return;
}
if (pResultRowInfo->capacity > 10000) {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25);
} else {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
}
if (newCapacity <= pResultRowInfo->capacity) {
newCapacity += 4;
}
char* p = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
if (p == NULL) {
longjmp(env, TSDB_CODE_OUT_OF_MEMORY);
}
pResultRowInfo->pPosition = (SResultRowPosition*)p;
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
pResultRowInfo->capacity = (int32_t)newCapacity;
}
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData, static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false; bool existed = false;
...@@ -306,7 +276,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR ...@@ -306,7 +276,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
return p1 != NULL; return p1 != NULL;
} }
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SFilePage* pData = NULL; SFilePage* pData = NULL;
// in the first scan, new space needed for results // in the first scan, new space needed for results
...@@ -375,6 +345,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -375,6 +345,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
// pResultRowInfo object. // pResultRowInfo object.
if (p1 != NULL) { if (p1 != NULL) {
// todo
pResult = getResultRowByPos(pResultBuf, p1); pResult = getResultRowByPos(pResultBuf, p1);
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
} }
...@@ -383,34 +355,28 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -383,34 +355,28 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// 1. close current opened time window // 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
pResult->offset != pResultRowInfo->cur.offset))) { pResult->offset != pResultRowInfo->cur.offset))) {
// todo extract function
SResultRowPosition pos = pResultRowInfo->cur; SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset);
closeResultRow(pRow);
releaseBufPage(pResultBuf, pPage); releaseBufPage(pResultBuf, pPage);
} }
// allocate a new buffer page // allocate a new buffer page
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
if (pResult == NULL) { if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0); ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
initResultRow(pResult); initResultRow(pResult);
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
sizeof(SResultRowPosition));
} }
// 2. set the new time window to be the new active time window // 2. set the new time window to be the new active time window
pResultRowInfo->pPosition[pResultRowInfo->size++] =
(SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
// too many time window in query // too many time window in query
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -585,11 +551,13 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow ...@@ -585,11 +551,13 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
} }
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily // keep it temporarily
// todo no need this??
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
...@@ -609,7 +577,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -609,7 +577,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};
idata.info.type = TSDB_DATA_TYPE_BIGINT; idata.info.type = TSDB_DATA_TYPE_BIGINT;
...@@ -620,22 +589,23 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -620,22 +589,23 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out); pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1; pEntryInfo->numOfRes = 1;
continue; } else {
} int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS; if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { code = pCtx[k].fpSet.process(&pCtx[k]);
code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code; taskInfo->code = code;
longjmp(taskInfo->env, code); longjmp(taskInfo->env, code);
}
} }
}
// restore it // restore it
pCtx[k].input.colDataAggIsSet = hasAgg; pCtx[k].input.colDataAggIsSet = hasAgg;
pCtx[k].input.startRowIndex = startOffset; pCtx[k].input.startRowIndex = startOffset;
pCtx[k].input.numOfRows = numOfRows; pCtx[k].input.numOfRows = numOfRows;
}
} }
} }
...@@ -774,12 +744,14 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct ...@@ -774,12 +744,14 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check // todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) { if (pCtx[k].fpSet.process == NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]); continue;
if (code != TSDB_CODE_SUCCESS) { }
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
return code; int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
} if (code != TSDB_CODE_SUCCESS) {
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
return code;
} }
} }
} }
...@@ -1218,7 +1190,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1218,7 +1190,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
taosVariantDestroy(&pCtx[i].param[j].param); taosVariantDestroy(&pCtx[i].param[j].param);
} }
taosVariantDestroy(&pCtx[i].tag);
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFree(pCtx[i].input.pData); taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg); taosMemoryFree(pCtx[i].input.pColumnDataAgg);
...@@ -1248,9 +1219,9 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q ...@@ -1248,9 +1219,9 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q
static bool isCachedLastQuery(STaskAttr* pQueryAttr) { static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) { // if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) {
continue; // continue;
} // }
return false; return false;
} }
...@@ -1300,7 +1271,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { ...@@ -1300,7 +1271,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]); int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
#if 0
if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG || if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
functionId == FUNCTION_TAG_DUMMY) { functionId == FUNCTION_TAG_DUMMY) {
continue; continue;
...@@ -1311,6 +1282,8 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { ...@@ -1311,6 +1282,8 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
} else { } else {
hasOtherFunc = true; hasOtherFunc = true;
} }
#endif
} }
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) { if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
...@@ -1786,41 +1759,13 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf ...@@ -1786,41 +1759,13 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf
// set the correct pointer after the memory buffer reallocated. // set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
#if 0
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF ||
functionId == FUNCTION_DERIVATIVE) { functionId == FUNCTION_DERIVATIVE) {
// if (i > 0) pBInfo->pCtx[i].pTsOutput = pBInfo->pCtx[i - 1].pOutput; // if (i > 0) pBInfo->pCtx[i].pTsOutput = pBInfo->pCtx[i - 1].pOutput;
} }
} #endif
}
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
bool needCopyTs = false;
int32_t tsNum = 0;
char* src = NULL;
for (int32_t i = 0; i < numOfOutput; i++) {
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) {
needCopyTs = true;
if (i > 0 && pCtx[i - 1].functionId == FUNCTION_TS_DUMMY) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
src = pColRes->pData;
}
} else if (functionId == FUNCTION_TS_DUMMY) {
tsNum++;
}
}
if (!needCopyTs) return;
if (tsNum < 2) return;
if (src == NULL) return;
for (int32_t i = 0; i < numOfOutput; i++) {
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TS_DUMMY) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i);
memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows);
}
} }
} }
...@@ -2577,47 +2522,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2577,47 +2522,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) { SArray* pColList) {
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataEnsureCapacity(pRes, numOfRows); blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
int32_t dataLen = *(int32_t*)pData;
pData += sizeof(int32_t);
pRes->info.groupId = *(uint64_t*)pData;
pData += sizeof(uint64_t);
int32_t* colLen = (int32_t*)pData;
char* pStart = pData + sizeof(int32_t) * numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colLen[i];
pColInfoData->varmeta.allocLen = colLen[i];
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
pStart += sizeof(int32_t) * numOfRows;
if (colLen[i] > 0) {
taosMemoryFreeClear(pColInfoData->pData);
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
}
} else {
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
pStart += BitmapLen(numOfRows);
}
if (colLen[i] > 0) {
memcpy(pColInfoData->pData, pStart, colLen[i]);
}
// TODO setting this flag to true temporarily so aggregate function on stable will
// examine NULL value for non-primary key column
pColInfoData->hasNull = true;
pStart += colLen[i];
}
} else { // extract data according to pColList } else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList)); ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData; char* pStart = pData;
...@@ -3587,7 +3492,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { ...@@ -3587,7 +3492,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
offset += sizeof(int32_t); offset += sizeof(int32_t);
uint64_t tableGroupId = *(uint64_t*)(result + offset); uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow) { if (!resultRow) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
...@@ -3610,10 +3515,6 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { ...@@ -3610,10 +3515,6 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
offset += valueLen; offset += valueLen;
initResultRow(resultRow); initResultRow(resultRow);
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
} }
...@@ -3905,18 +3806,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { ...@@ -3905,18 +3806,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
} }
} }
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId == FUNCTION_STDDEV || functionId == FUNCTION_PERCT) {
return 2;
}
}
return 1;
}
static void destroyOperatorInfo(SOperatorInfo* pOperator) { static void destroyOperatorInfo(SOperatorInfo* pOperator) {
if (pOperator == NULL) { if (pOperator == NULL) {
return; return;
......
...@@ -110,9 +110,11 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo ...@@ -110,9 +110,11 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
return true; return true;
} }
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupCols, i); SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
...@@ -208,7 +210,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -208,7 +210,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical. // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) { if (!pInfo->isInit) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
pInfo->isInit = true; pInfo->isInit = true;
num++; num++;
continue; continue;
...@@ -223,7 +225,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -223,7 +225,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// The first row of a new block does not belongs to the previous existed group // The first row of a new block does not belongs to the previous existed group
if (j == 0) { if (j == 0) {
num++; num++;
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
continue; continue;
} }
...@@ -238,7 +240,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -238,7 +240,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
num = 1; num = 1;
} }
...@@ -409,7 +411,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -409,7 +411,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* pGInfo = NULL; SDataGroupInfo* pGInfo = NULL;
......
...@@ -531,7 +531,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -531,7 +531,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
......
...@@ -140,6 +140,10 @@ bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) ...@@ -140,6 +140,10 @@ bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo)
int32_t uniqueFunction(SqlFunctionCtx *pCtx); int32_t uniqueFunction(SqlFunctionCtx *pCtx);
//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); //int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t twaFunction(SqlFunctionCtx *pCtx);
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
...@@ -52,13 +52,6 @@ typedef struct SInterpInfoDetail { ...@@ -52,13 +52,6 @@ typedef struct SInterpInfoDetail {
int8_t primaryCol; int8_t primaryCol;
} SInterpInfoDetail; } SInterpInfoDetail;
typedef struct STwaInfo {
int8_t hasResult; // flag to denote has value
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval); bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
/** /**
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "querynodes.h" #include "querynodes.h"
#include "scalar.h" #include "scalar.h"
#include "taoserror.h" #include "taoserror.h"
#include "tdatablock.h"
static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) { static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList; va_list vArgList;
...@@ -362,7 +361,7 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len ...@@ -362,7 +361,7 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
pValue->notReserved = true; pValue->notReserved = true;
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) { if (!IS_INTEGER_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -456,7 +455,7 @@ static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -456,7 +455,7 @@ static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
"The input parameter of HYPERLOGLOG function can only be column"); "The input parameter of HYPERLOGLOG function can only be column");
} }
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UBIGINT].bytes, .type = TSDB_DATA_TYPE_UBIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1312,6 +1311,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1312,6 +1311,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = firstLastFinalize, .finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine, .combineFunc = lastCombine,
}, },
{
.name = "twa",
.type = FUNCTION_TYPE_TWA,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateInNumOutDou,
.getEnvFunc = getTwaFuncEnv,
.initFunc = twaFunctionSetup,
.processFunc = twaFunction,
.finalizeFunc = twaFinalize
},
{ {
.name = "histogram", .name = "histogram",
.type = FUNCTION_TYPE_HISTOGRAM, .type = FUNCTION_TYPE_HISTOGRAM,
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "builtinsimpl.h" #include "builtinsimpl.h"
#include "tglobal.h"
#include "cJSON.h" #include "cJSON.h"
#include "function.h" #include "function.h"
#include "querynodes.h" #include "querynodes.h"
...@@ -300,7 +301,7 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -300,7 +301,7 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
//pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
char* in = GET_ROWCELL_INTERBUF(pResInfo); char* in = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
...@@ -357,7 +358,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { ...@@ -357,7 +358,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true; return true;
} }
static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) { static FORCE_INLINE int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0; int32_t numOfElem = 0;
/* /*
...@@ -392,11 +393,12 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) { ...@@ -392,11 +393,12 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
* count function does not use the pCtx->interResBuf to keep the intermediate buffer * count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/ */
int32_t countFunction(SqlFunctionCtx* pCtx) { int32_t countFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumofElem(pCtx); int32_t numOfElem = getNumOfElems(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
int32_t type = pInput->pData[0]->info.type;
char* buf = GET_ROWCELL_INTERBUF(pResInfo); char* buf = GET_ROWCELL_INTERBUF(pResInfo);
if (IS_NULL_TYPE(type)) { if (IS_NULL_TYPE(type)) {
...@@ -407,12 +409,17 @@ int32_t countFunction(SqlFunctionCtx* pCtx) { ...@@ -407,12 +409,17 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
*((int64_t*)buf) += numOfElem; *((int64_t*)buf) += numOfElem;
} }
SET_VAL(pResInfo, numOfElem, 1); if (tsCountAlwaysReturnValue) {
pResInfo->numOfRes = 1;
} else {
SET_VAL(pResInfo, 1, 1);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t countInvertFunction(SqlFunctionCtx* pCtx) { int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumofElem(pCtx); int32_t numOfElem = getNumOfElems(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo); char* buf = GET_ROWCELL_INTERBUF(pResInfo);
...@@ -829,8 +836,10 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { ...@@ -829,8 +836,10 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (IS_INTEGER_TYPE(type)) { if (IS_INTEGER_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count); pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
} else { } else {
...@@ -1832,7 +1841,7 @@ bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultI ...@@ -1832,7 +1841,7 @@ bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultI
} }
int32_t percentileFunction(SqlFunctionCtx* pCtx) { int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t notNullElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
...@@ -1910,11 +1919,11 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { ...@@ -1910,11 +1919,11 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
} }
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
notNullElems += 1; numOfElems += 1;
tMemBucketPut(pInfo->pMemBucket, data, 1); tMemBucketPut(pInfo->pMemBucket, data, 1);
} }
SET_VAL(pResInfo, notNullElems, 1); SET_VAL(pResInfo, numOfElems, 1);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1988,7 +1997,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult ...@@ -1988,7 +1997,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult
} }
int32_t apercentileFunction(SqlFunctionCtx* pCtx) { int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
int32_t notNullElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
...@@ -2005,7 +2014,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { ...@@ -2005,7 +2014,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
if (colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
notNullElems += 1; numOfElems += 1;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
double v = 0; // value double v = 0; // value
...@@ -2018,7 +2027,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { ...@@ -2018,7 +2027,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
if (colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
notNullElems += 1; numOfElems += 1;
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
double v = 0; double v = 0;
...@@ -2027,7 +2036,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { ...@@ -2027,7 +2036,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
} }
} }
SET_VAL(pResInfo, notNullElems, 1); SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3240,13 +3249,13 @@ static uint64_t hllCountCnt(uint8_t *buckets) { ...@@ -3240,13 +3249,13 @@ static uint64_t hllCountCnt(uint8_t *buckets) {
z += buckethisto[j]; z += buckethisto[j];
z *= 0.5; z *= 0.5;
} }
z += m * hllSigma(buckethisto[0]/(double)m); z += m * hllSigma(buckethisto[0]/(double)m);
double E = (double)llroundl(HLL_ALPHA_INF*m*m/z); double E = (double)llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E; return (uint64_t) E;
} }
int32_t hllFunction(SqlFunctionCtx *pCtx) { int32_t hllFunction(SqlFunctionCtx *pCtx) {
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
...@@ -3279,7 +3288,6 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) { ...@@ -3279,7 +3288,6 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
if (count > oldcount) { if (count > oldcount) {
pInfo->buckets[index] = count; pInfo->buckets[index] = count;
} }
} }
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
...@@ -3287,9 +3295,13 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) { ...@@ -3287,9 +3295,13 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
} }
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SResultRowEntryInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->result = hllCountCnt(pInfo->buckets); SHLLInfo* pHllInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pHllInfo->result = hllCountCnt(pHllInfo->buckets);
if (tsCountAlwaysReturnValue && pHllInfo->result == 0) {
pInfo->numOfRes = 1;
}
return functionFinalize(pCtx, pBlock); return functionFinalize(pCtx, pBlock);
} }
...@@ -3695,7 +3707,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -3695,7 +3707,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
TSKEY* tsList = (int64_t*)pInput->pPTS->pData; TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset; int32_t startOffset = pCtx->offset;
...@@ -3718,24 +3729,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -3718,24 +3729,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
return pInfo->numSampled; return pInfo->numSampled;
} }
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
// SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
// SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
//
// //int32_t currentRow = pBlock->info.rows;
// pResInfo->numOfRes = pInfo->numSampled;
//
// for (int32_t i = 0; i < pInfo->numSampled; ++i) {
// colDataAppend(pCol, i, pInfo->data + i * pInfo->colBytes, false);
// //TODO: handle ts output
// }
//
// return pResInfo->numOfRes;
//}
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
...@@ -3812,7 +3805,6 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) { ...@@ -3812,7 +3805,6 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) {
TSKEY* tsList = (int64_t*)pInput->pPTS->pData; TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset; int32_t startOffset = pCtx->offset;
...@@ -3879,7 +3871,6 @@ bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { ...@@ -3879,7 +3871,6 @@ bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pInfo->numOfPoints = 0; pInfo->numOfPoints = 0;
pInfo->colType = pCtx->resDataInfo.type; pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes; pInfo->colBytes = pCtx->resDataInfo.bytes;
pInfo->hasNull = false;
if (pInfo->pHash != NULL) { if (pInfo->pHash != NULL) {
taosHashClear(pInfo->pHash); taosHashClear(pInfo->pHash);
} else { } else {
...@@ -3917,8 +3908,6 @@ static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) { ...@@ -3917,8 +3908,6 @@ static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
} else if (pHashItem->timestamp > ts) { } else if (pHashItem->timestamp > ts) {
pHashItem->timestamp = ts; pHashItem->timestamp = ts;
} }
return;
} }
int32_t uniqueFunction(SqlFunctionCtx* pCtx) { int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
...@@ -3960,7 +3949,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) { ...@@ -3960,7 +3949,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
...@@ -3973,3 +3962,260 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3973,3 +3962,260 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
typedef struct STwaInfo {
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo);
return true;
}
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->p.key = INT64_MIN;
pInfo->win = TSWINDOW_INITIALIZER;
return true;
}
static double twa_get_area(SPoint1 s, SPoint1 e) {
if ((s.val >= 0 && e.val >= 0)|| (s.val <=0 && e.val <= 0)) {
return (s.val + e.val) * (e.key - s.key) / 2;
}
double x = (s.key * e.val - e.key * s.val)/(e.val - s.val);
double val = (s.val * (x - s.key) + e.val * (e.key - x)) / 2;
return val;
}
#define INIT_INTP_POINT(_p, _k, _v) \
do { \
(_p).key = (_k); \
(_p).val = (_v); \
} while (0)
int32_t twaFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SPoint1 *last = &pInfo->p;
int32_t numOfElems = 0;
int32_t i = pInput->startRowIndex;
if (pCtx->start.key != INT64_MIN) {
ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) ||
(pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC));
ASSERT(last->key == INT64_MIN);
last->key = tsList[i];
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
pInfo->dOutput += twa_get_area(pCtx->start, *last);
pInfo->win.skey = pCtx->start.key;
numOfElems++;
i += 1;
} else if (pInfo->p.key == INT64_MIN) {
last->key = tsList[i];
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
pInfo->win.skey = last->key;
numOfElems++;
i += 1;
}
SPoint1 st = {0};
// calculate the value of
switch(pInputCol->info.type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t *val = (uint8_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t *val = (uint16_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t *val = (uint32_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t *val = (uint64_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
INIT_INTP_POINT(st, tsList[i], val[i]);
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
default: ASSERT(0);
}
// the last interpolated time window value
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
pInfo->p = pCtx->end;
}
pInfo->win.ekey = pInfo->p.key;
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
/*
* To copy the input to interResBuf to avoid the input buffer space be over writen
* by next input data. The TWA function only applies to each table, so no merge procedure
* is required, we simply copy to the resut ot interResBuffer.
*/
//void twa_function_copy(SQLFunctionCtx *pCtx) {
// assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
//
// memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes);
// pResInfo->hasResult = ((STwaInfo *)pCtx->pInput)->hasResult;
//}
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pResInfo->numOfRes == 0) {
pResInfo->isNullRes = 1;
} else {
// assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult);
if (pInfo->win.ekey == pInfo->win.skey) {
pInfo->dOutput = pInfo->p.val;
} else {
pInfo->dOutput = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey);
}
pResInfo->numOfRes = 1;
}
return functionFinalize(pCtx, pBlock);
}
...@@ -236,7 +236,7 @@ bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) { ...@@ -236,7 +236,7 @@ bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) {
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) { bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) {
return pEntry->initialized; return pEntry->initialized;
} }
#if 0
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable/*, SUdfInfo* pUdfInfo*/) { bool isSuperTable/*, SUdfInfo* pUdfInfo*/) {
if (!isValidDataType(dataType)) { if (!isValidDataType(dataType)) {
...@@ -470,6 +470,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -470,6 +470,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (pResultInfo->initialized) { if (pResultInfo->initialized) {
......
...@@ -36,12 +36,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) { ...@@ -36,12 +36,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) { if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) {
doExprTreeDestroy(&pNode, fp); doExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TEXPR_VALUE_NODE) {
taosVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TEXPR_COL_NODE) {
taosMemoryFreeClear(pNode->pSchema);
} }
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
...@@ -49,15 +44,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) { ...@@ -49,15 +44,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) { if (*pExpr == NULL) {
return; return;
} }
int32_t type = (*pExpr)->nodeType;
if (type == TEXPR_VALUE_NODE) {
taosVariantDestroy((*pExpr)->pVal);
taosMemoryFree((*pExpr)->pVal);
} else if (type == TEXPR_COL_NODE) {
taosMemoryFree((*pExpr)->pSchema);
}
taosMemoryFree(*pExpr); taosMemoryFree(*pExpr);
*pExpr = NULL; *pExpr = NULL;
} }
......
...@@ -95,7 +95,7 @@ SListNode *tdListPopTail(SList *list) { ...@@ -95,7 +95,7 @@ SListNode *tdListPopTail(SList *list) {
SListNode *tdListGetHead(SList *list) { return TD_DLIST_HEAD(list); } SListNode *tdListGetHead(SList *list) { return TD_DLIST_HEAD(list); }
SListNode *tsListGetTail(SList *list) { return TD_DLIST_TAIL(list); } SListNode *tdListGetTail(SList *list) { return TD_DLIST_TAIL(list); }
SListNode *tdListPopNode(SList *list, SListNode *node) { SListNode *tdListPopNode(SList *list, SListNode *node) {
TD_DLIST_POP(list, node); TD_DLIST_POP(list, node);
......
...@@ -63,7 +63,8 @@ if $data02 != 234 then ...@@ -63,7 +63,8 @@ if $data02 != 234 then
return -1 return -1
endi endi
if $data03 != 234 then if $data03 != 234 then
print expect 234, actual $data03
return -1 return -1
endi endi
......
...@@ -288,15 +288,18 @@ class TDTestCase: ...@@ -288,15 +288,18 @@ class TDTestCase:
else: else:
tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}")
offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0 offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0
pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] # print(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}")
pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k if not tdSql.queryResult:
tdSql.query(self.mavg_query_form( pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr,
table_expr=table_expr, condition=condition pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k
)) tdSql.query(self.mavg_query_form(
for i in range(tdSql.queryRows): sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr,
print(f"case in {line}: ", end='') table_expr=table_expr, condition=condition
tdSql.checkData(i, 0, pre_mavg[i]) ))
for i in range(tdSql.queryRows):
print(f"case in {line}: ", end='')
tdSql.checkData(i, 0, pre_mavg[i])
pass pass
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册