未验证 提交 4cfcca49 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11817 from taosdata/feature/3.0_liaohj

enh(query): Add the group id in the serialized block format to support the multi-table(super table) interval query.
......@@ -14,24 +14,9 @@ if(${BUILD_PTHREAD})
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# iconv
if(${BUILD_WITH_ICONV})
cat("${TD_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# msvc regex
if(${BUILD_MSVCREGEX})
cat("${TD_SUPPORT_DIR}/msvcregex_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# wcwidth
if(${BUILD_WCWIDTH})
cat("${TD_SUPPORT_DIR}/wcwidth_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# wingetopt
if(${BUILD_WINGETOPT})
cat("${TD_SUPPORT_DIR}/wingetopt_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# gnu regex
if(${BUILD_GNUREGEX})
cat("${TD_SUPPORT_DIR}/gnuregex_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# googletest
......@@ -114,27 +99,8 @@ if(${BUILD_TEST})
target_include_directories(
gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src>
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_linux>
)
if(${TD_WINDOWS})
target_include_directories(
gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_win>
)
endif(${TD_WINDOWS})
if(${TD_LINUX})
target_include_directories(
gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_linux>
)
endif(${TD_LINUX})
if(${TD_DARWIN})
target_include_directories(
gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_darwin>
)
endif(${TD_DARWIN})
endif(${BUILD_TEST})
# cJson
......@@ -216,53 +182,6 @@ if(${BUILD_WITH_NURAFT})
add_subdirectory(nuraft)
endif(${BUILD_WITH_NURAFT})
# pthread
if(${BUILD_PTHREAD})
set(CMAKE_BUILD_TYPE release)
add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread)
set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread)
add_library(pthread STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib)
endif()
# iconv
if(${BUILD_WITH_ICONV})
add_subdirectory(iconv)
endif(${BUILD_WITH_ICONV})
# wingetopt
if(${BUILD_WINGETOPT})
add_subdirectory(wingetopt)
endif(${BUILD_WINGETOPT})
# msvcregex
if(${BUILD_MSVCREGEX})
add_library(msvcregex STATIC "")
target_sources(msvcregex
PRIVATE "msvcregex/regex.c"
)
target_include_directories(msvcregex
PRIVATE "msvcregex"
)
target_link_libraries(msvcregex
INTERFACE Shell32
)
SET_TARGET_PROPERTIES(msvcregex PROPERTIES OUTPUT_NAME msvcregex)
endif(${BUILD_MSVCREGEX})
# msvcregex
if(${BUILD_WCWIDTH})
add_library(wcwidth STATIC "")
target_sources(wcwidth
PRIVATE "wcwidth/wcwidth.c"
)
target_include_directories(wcwidth
PRIVATE "wcwidth"
)
SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth)
endif(${BUILD_WCWIDTH})
# CRAFT
if(${BUILD_WITH_CRAFT})
add_library(craft STATIC IMPORTED GLOBAL)
......@@ -319,12 +238,8 @@ if(${BUILD_WITH_SQLITE})
target_link_libraries(sqlite
INTERFACE m
INTERFACE pthread
INTERFACE dl
)
if(NOT TD_WINDOWS)
target_link_libraries(sqlite
INTERFACE dl
)
endif(NOT TD_WINDOWS)
endif(${BUILD_WITH_SQLITE})
# pthread
......
add_executable(simulate_vnode "simulate_vnode.c")
target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a)
if(${BUILD_WINGETOPT})
target_link_libraries(simulate_vnode PUBLIC wingetopt)
target_include_directories(
simulate_vnode
PUBLIC "${TD_SOURCE_DIR}/contrib/wingetopt/src"
)
endif()
\ No newline at end of file
target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a)
\ No newline at end of file
......@@ -6,39 +6,43 @@
#define POINTER_SHIFT(ptr, s) ((void *)(((char *)ptr) + (s)))
#define POINTER_DISTANCE(pa, pb) ((char *)(pb) - (char *)(pa))
static inline void tPutA(void **buf, uint64_t val) {
memcpy(buf, &val, sizeof(val));
*buf = POINTER_SHIFT(buf, sizeof(val));
}
#define tPutA(buf, val) \
({ \
memcpy(buf, &val, sizeof(val)); \
POINTER_SHIFT(buf, sizeof(val)); \
})
static inline void tPutB(void **buf, uint64_t val) {
((uint8_t *)buf)[7] = ((val) >> 56) & 0xff;
((uint8_t *)buf)[6] = ((val) >> 48) & 0xff;
((uint8_t *)buf)[5] = ((val) >> 40) & 0xff;
((uint8_t *)buf)[4] = ((val) >> 32) & 0xff;
((uint8_t *)buf)[3] = ((val) >> 24) & 0xff;
((uint8_t *)buf)[2] = ((val) >> 16) & 0xff;
((uint8_t *)buf)[1] = ((val) >> 8) & 0xff;
((uint8_t *)buf)[0] = (val)&0xff;
*buf = POINTER_SHIFT(buf, sizeof(val));
}
#define tPutB(buf, val) \
({ \
((uint8_t *)buf)[7] = ((val) >> 56) & 0xff; \
((uint8_t *)buf)[6] = ((val) >> 48) & 0xff; \
((uint8_t *)buf)[5] = ((val) >> 40) & 0xff; \
((uint8_t *)buf)[4] = ((val) >> 32) & 0xff; \
((uint8_t *)buf)[3] = ((val) >> 24) & 0xff; \
((uint8_t *)buf)[2] = ((val) >> 16) & 0xff; \
((uint8_t *)buf)[1] = ((val) >> 8) & 0xff; \
((uint8_t *)buf)[0] = (val)&0xff; \
POINTER_SHIFT(buf, sizeof(val)); \
})
static inline void tPutC(void **buf, uint64_t val) {
if (buf) {
((uint64_t *)buf)[0] = (val);
POINTER_SHIFT(buf, sizeof(val));
}
*buf = NULL;
}
#define tPutC(buf, val) \
({ \
if (buf) { \
((uint64_t *)buf)[0] = (val); \
POINTER_SHIFT(buf, sizeof(val)); \
} \
NULL; \
})
static inline void tPutD(void **buf, uint64_t val) {
uint64_t tmp = val;
for (size_t i = 0; i < sizeof(val); i++) {
((uint8_t *)buf)[i] = tmp & 0xff;
tmp >>= 8;
}
*buf = POINTER_SHIFT(buf, sizeof(val));
}
#define tPutD(buf, val) \
({ \
uint64_t tmp = val; \
for (size_t i = 0; i < sizeof(val); i++) { \
((uint8_t *)buf)[i] = tmp & 0xff; \
tmp >>= 8; \
} \
POINTER_SHIFT(buf, sizeof(val)); \
})
static inline void tPutE(void **buf, uint64_t val) {
if (buf) {
......@@ -57,7 +61,7 @@ static void func(T t) {
switch (t) {
case A:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
tPutA(pBuf, val);
pBuf = tPutA(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
......@@ -65,7 +69,7 @@ static void func(T t) {
break;
case B:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
tPutB(pBuf, val);
pBuf = tPutB(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
......@@ -73,7 +77,7 @@ static void func(T t) {
break;
case C:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
tPutC(pBuf, val);
pBuf = tPutC(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
......@@ -81,7 +85,7 @@ static void func(T t) {
break;
case D:
for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) {
tPutD(pBuf, val);
pBuf = tPutD(pBuf, val);
if (POINTER_DISTANCE(buf, pBuf) == 1024) {
pBuf = buf;
}
......
......@@ -238,10 +238,16 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) {
int32_t* colSizes = (int32_t*)data;
int32_t* actualLen = (int32_t*) data;
data += sizeof(int32_t);
uint64_t* groupId = (uint64_t*) data;
data += sizeof(uint64_t);
int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t));
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) {
......@@ -273,6 +279,9 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da
colSizes[col] = htonl(colSizes[col]);
}
*actualLen = *dataLen;
*groupId = pBlock->info.groupId;
}
#ifdef __cplusplus
......
......@@ -37,7 +37,7 @@ typedef struct SFuncExecEnv {
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock, int32_t slotId);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
......@@ -141,8 +141,7 @@ struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SSubsidiaryResInfo {
int16_t bufLen; // keep the tags data for top/bottom query result
int16_t numOfCols;
int16_t num;
struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo;
......@@ -187,8 +186,8 @@ typedef struct SqlFunctionCtx {
uint8_t currentStage; // record current running step, default: 0
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
/////////////////////////////////////////////////////////////////
bool stableQuery;
/////////////////////////////////////////////////////////////////
int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
......@@ -198,11 +197,15 @@ typedef struct SqlFunctionCtx {
int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaryRes;
SPoint1 start;
SPoint1 end;
SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
SSubsidiaryResInfo subsidiaries;
SPoint1 start;
SPoint1 end;
SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock;
int32_t curBufPage;
} SqlFunctionCtx;
enum {
......
......@@ -105,9 +105,9 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
epSet.epSet.eps[0].port = port;
}
char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo** pInst = NULL;
char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo** pInst = NULL;
taosThreadMutexLock(&appInfo.mutex);
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
......@@ -840,10 +840,21 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
return code;
}
int32_t* colLength = (int32_t*)pResultInfo->pData;
char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
char* p = (char*) pResultInfo->pData;
int32_t dataLen = *(int32_t*) p;
p += sizeof(int32_t);
uint64_t groupId = *(uint64_t*) p;
p += sizeof(uint64_t);
int32_t* colLength = (int32_t*)p;
p += sizeof(int32_t) * numOfCols;
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) {
colLength[i] = htonl(colLength[i]);
ASSERT(colLength[i] < dataLen);
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart;
......
......@@ -644,12 +644,12 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
/**
* @refitem blockDataToBuf for the meta size
*
* @param pBlock
* @return
*/
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
// | total rows/total length | block group id | each column length |
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
......@@ -1219,7 +1219,6 @@ void colDataDestroy(SColumnInfoData* pColData) {
taosMemoryFree(pColData->pData);
}
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t len = BitmapLen(total);
......
......@@ -3223,8 +3223,13 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
cur->win.skey, cur->win.ekey, pHandle->idStr);
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign
// the table uid
pDataBlockInfo->uid = uid;
#if 0
// for multi-group data query processing test purpose
pDataBlockInfo->groupId = uid;
#endif
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
......
......@@ -304,8 +304,8 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t
memcpy(row.buf, tbuf, len);
row.level = level;
row.len = len;
ctx->dataSize += len;
row.len = len;
ctx->dataSize += row.len;
if (NULL == taosArrayPush(ctx->rows, &row)) {
qError("taosArrayPush row to explain res rows failed");
......@@ -756,7 +756,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
}
int32_t colNum = 1;
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) {
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
......@@ -766,29 +766,38 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1;
rsp->numOfRows = htonl(rowNum);
*(int32_t *)rsp->data = htonl(pCtx->dataSize);
// payload length
*(int32_t *)rsp->data = sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t));
// group id
*(uint64_t*)(rsp->data + sizeof(int32_t)) = 0;
// column length
int32_t* colLength = (int32_t *)(rsp->data + sizeof(int32_t) + sizeof(uint64_t));
// varchar column offset segment
int32_t *offset = (int32_t *)((char *)colLength + sizeof(int32_t));
// varchar data real payload
char *data = (char *)(offset + rowNum);
int32_t tOffset = 0;
char* start = data;
for (int32_t i = 0; i < rowNum; ++i) {
SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i);
*offset = tOffset;
tOffset += row->len;
offset[i] = data - start;
memcpy(data, row->buf, row->len);
++offset;
varDataCopy(data, row->buf);
ASSERT(varDataTLen(row->buf) == row->len);
data += row->len;
}
*pRsp = rsp;
*colLength = htonl(data - start);
rsp->compLen = htonl(rspSize);
*pRsp = rsp;
return TSDB_CODE_SUCCESS;
}
int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
int32_t code = 0;
SNodeListNode *plans = NULL;
......@@ -895,9 +904,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
return TSDB_CODE_SUCCESS;
......@@ -967,13 +974,10 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
SExplainCtx *pCtx = NULL;
QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx));
QRY_ERR_JRET(qExplainGenerateRsp(pCtx, pRsp));
_return:
qExplainFreeCtx(pCtx);
QRY_RET(code);
}
......
......@@ -40,8 +40,6 @@
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
typedef struct SGroupResInfo {
int32_t totalGroup;
int32_t currentGroup;
......@@ -68,11 +66,16 @@ typedef struct SResultRowPosition {
int32_t offset;
} SResultRowPosition;
typedef struct SResKeyPos {
SResultRowPosition pos;
uint64_t groupId;
char key[];
} SResKeyPos;
typedef struct SResultRowInfo {
SResultRowPosition *pPosition;
int32_t size; // number of result set
int32_t capacity; // max capacity
// int32_t curPos; // current active result row index of pResult list
SResultRowPosition cur;
} SResultRowInfo;
......@@ -135,7 +138,7 @@ typedef struct {
int32_t colId;
} SStddevInterResult;
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
......
......@@ -347,10 +347,11 @@ typedef struct STableScanInfo {
} STableScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
SColumnInfo *pCols;
SSDataBlock *pRes;
int32_t totalTables;
int32_t curPos;
void *pReader;
} STagScanInfo;
typedef struct SStreamBlockScanInfo {
......@@ -376,13 +377,11 @@ typedef struct SSysTableScanInfo {
SEpSet epSet;
tsem_t ready;
int32_t accountId;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void* pCur; // cursor for iterate the local table meta store.
SArray* scanCols; // SArray<int16_t> scan column id list
// int32_t type; // show type, TODO remove it
int32_t accountId;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void* pCur; // cursor for iterate the local table meta store.
SArray* scanCols; // SArray<int16_t> scan column id list
SName name;
SSDataBlock* pRes;
int32_t capacity;
......@@ -628,7 +627,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
......@@ -668,12 +667,12 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
#endif
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
......
......@@ -64,10 +64,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
}
// data format:
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// | | sizeof(int32_t) * numOfCols | actual size | | actual size | |
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// | | (4 bytes) |(8 bytes) | sizeof(int32_t) * numOfCols | actual size | | actual size | |
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
......
......@@ -186,12 +186,50 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
pGroupResInfo->index = 0;
}
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) {
static int32_t resultrowCompar1(const void* p1, const void* p2) {
SResKeyPos* pp1 = *(SResKeyPos**) p1;
SResKeyPos* pp2 = *(SResKeyPos**) p2;
if (pp1->groupId == pp2->groupId) {
int64_t pts1 = *(int64_t*) pp1->key;
int64_t pts2 = *(int64_t*) pp2->key;
if (pts1 == pts2) {
return 0;
} else {
return pts1 < pts2? -1:1;
}
} else {
return pp1->groupId < pp2->groupId? -1:1;
}
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult) {
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);
}
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition));
// extract the result rows information from the hash map
void* pData = NULL;
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
size_t keyLen = 0;
while((pData = taosHashIterate(pHashmap, pData)) != NULL) {
void* key = taosHashGetKey(pData, &keyLen);
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
p->groupId = *(uint64_t*) key;
p->pos = *(SResultRowPosition*) pData;
memcpy(p->key, key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
taosArrayPush(pGroupResInfo->pRows, &p);
}
if (sortGroupResult) {
qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, resultrowCompar1);
}
pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
}
......
......@@ -308,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
// }
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
......
......@@ -271,9 +271,7 @@ static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCt
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
*newgroup = false;
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
......@@ -284,18 +282,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
// todo opt
// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
// STableQueryInfo** pTableQueryInfo =
// (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid));
// if (pTableQueryInfo == NULL) {
// break;
// }
//
// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
// }
// this function never returns error?
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
......@@ -308,6 +294,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
continue;
}
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream operator.
pBlock->info.blockId = 0;
return pBlock;
}
......@@ -405,11 +393,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
static int32_t cost = 0;
pOperator->cost.openCost = ++cost;
......@@ -824,12 +812,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
int32_t tableNameSlotId = 1;
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId);
char* name = NULL;
char* tb = NULL;
int32_t numOfRows = 0;
char n[TSDB_TABLE_NAME_LEN] = {0};
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) {
STR_TO_VARSTR(n, name);
while ((tb = metaTbCursorNext(pInfo->pCur)) != NULL) {
STR_TO_VARSTR(n, tb);
colDataAppend(pTableNameCol, numOfRows, n, false);
numOfRows += 1;
if (numOfRows >= pInfo->capacity) {
......@@ -992,3 +980,167 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return pOperator;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes;
*newgroup = false;
int32_t count = 0;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
assert(pQueryAttr->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0];
int32_t rsize = pExprInfo->base.resSchema.bytes;
count = 0;
int16_t bytes = pExprInfo->base.resSchema.bytes;
int16_t type = pExprInfo->base.resSchema.type;
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
bytes = pQueryAttr->tagColList[i].bytes;
type = pQueryAttr->tagColList[i].type;
break;
}
}
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
int32_t i = pInfo->curPos++;
STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pColInfo->pData + count * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
STableId* id = TSDB_TABLEID(item->pTable);
*(int16_t *)output = 0;
output += sizeof(int16_t);
*(int64_t *)output = id->uid; // memory align problem, todo serialize
output += sizeof(id->uid);
*(int32_t *)output = id->tid;
output += sizeof(id->tid);
*(int32_t *)output = pQueryAttr->vgId;
output += sizeof(pQueryAttr->vgId);
char* data = NULL;
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
}
doSetTagValueToResultBuf(output, data, type, bytes);
count += 1;
}
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
*(int64_t*)pColInfo->pData = pInfo->totalTables;
count = 1;
pOperator->status = OP_EXEC_DONE;
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
} else { // return only the tags|table name etc.
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
count = 0;
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
int32_t i = pInfo->curPos++;
STableQueryInfo* item = taosArrayGetP(pa, i);
char *data = NULL, *dst = NULL;
int16_t type = 0, bytes = 0;
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
// not assign value in case of user defined constant output column
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
continue;
}
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
type = pExprInfo[j].base.resSchema.type;
bytes = pExprInfo[j].base.resSchema.bytes;
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
}
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
doSetTagValueToResultBuf(dst, data, type, bytes);
}
count += 1;
}
if (pInfo->curPos >= pInfo->totalTables) {
pOperator->status = OP_EXEC_DONE;
}
//qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
}
if (pOperator->status == OP_EXEC_DONE) {
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
}
pRes->info.rows = count;
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif
return TSDB_CODE_SUCCESS;
}
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pReader = pReaderHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->getNextFn = doTagScan;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroyTagScanOperatorInfo;
return pOperator;
_error:
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -24,7 +24,7 @@ extern "C" {
#include "functionMgt.h"
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......@@ -43,17 +43,17 @@ int32_t maxFunction(SqlFunctionCtx *pCtx);
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t avgFunction(SqlFunctionCtx* pCtx);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stddevFunction(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t percentileFunction(SqlFunctionCtx *pCtx);
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
......@@ -65,7 +65,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
#ifdef __cplusplus
}
......
......@@ -14,7 +14,7 @@
*/
#include "builtinsimpl.h"
#include <libs/nodes/querynodes.h>
#include "function.h"
#include "querynodes.h"
#include "taggfunction.h"
#include "tdatablock.h"
......@@ -44,8 +44,6 @@ typedef struct STopBotResItem {
} STopBotResItem;
typedef struct STopBotRes {
int32_t pageId;
// int32_t num;
STopBotResItem *pItems;
} STopBotRes;
......@@ -92,18 +90,6 @@ typedef struct SDiffInfo {
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
......@@ -139,7 +125,8 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
return true;
}
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
......@@ -406,7 +393,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
......@@ -416,7 +403,7 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
}
return functionFinalize(pCtx, pBlock, slotId);
return functionFinalize(pCtx, pBlock);
}
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
......@@ -521,6 +508,49 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t *d = (_t *)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int32_t numOfElems = 0;
......@@ -564,8 +594,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
*(int64_t*) buf = val;
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
__ctx->tag.i = key;
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
......@@ -581,8 +611,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
*(uint64_t*) buf = val;
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
__ctx->tag.i = key;
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
......@@ -797,7 +827,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
......@@ -810,7 +840,7 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
}
return functionFinalize(pCtx, pBlock, slotId);
return functionFinalize(pCtx, pBlock);
}
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......@@ -923,7 +953,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SVariant* pVal = &pCtx->param[1].param;
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
......@@ -936,7 +966,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t sl
}
tMemBucketDestroy(pMemBucket);
return functionFinalize(pCtx, pBlock, slotId);
return functionFinalize(pCtx, pBlock);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......@@ -1353,15 +1383,16 @@ static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
return pRes;
}
static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
int32_t topFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
// }
......@@ -1381,7 +1412,7 @@ int32_t topFunction(SqlFunctionCtx *pCtx) {
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
}
return TSDB_CODE_SUCCESS;
......@@ -1414,9 +1445,11 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par
return (val1->v.d > val2->v.d) ? 1 : -1;
}
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
......@@ -1428,22 +1461,9 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
if (pRes->pageId == -1) {
SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId);
pPage->num = sizeof(SFilePage);
// keep the current row data
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
bool isNull = colDataIsNull_s(pCol, rowIndex);
colDataGetData(pCol, rowIndex);
}
}
// save the data of this tuple
saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
// allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++;
......@@ -1452,22 +1472,100 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
// replace the old data and the coresponding tuple data
STopBotResItem* pItem = &pItems[0];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
// save the data of this tuple by over writing the old data
copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
}
}
}
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
SFilePage* pPage = NULL;
int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);
if (pCtx->curBufPage == -1) {
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
pPage->num = sizeof(SFilePage);
}
}
pItem->tuplePos.pageId = pCtx->curBufPage;
// keep the current row data, extract method
int32_t offset = 0;
bool* nullList = (bool*)((char*)pPage + pPage->num);
char* pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols);
for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
bool isNull = colDataIsNull_s(pCol, rowIndex);
if (isNull) {
nullList[i] = true;
continue;
}
char* p = colDataGetData(pCol, rowIndex);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
memcpy(pStart + offset, p, varDataTLen(p));
} else {
memcpy(pStart + offset, p, pCol->info.bytes);
}
offset += pCol->info.bytes;
}
pItem->tuplePos.offset = pPage->num;
pPage->num += completeRowSize;
setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage);
}
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);
bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
int32_t offset = 0;
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
continue;
}
char* p = colDataGetData(pCol, rowIndex);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
memcpy(pStart + offset, p, varDataTLen(p));
} else {
memcpy(pStart + offset, p, pCol->info.bytes);
}
offset += pCol->info.bytes;
}
setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage);
}
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data
......@@ -1476,19 +1574,45 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
case TSDB_DATA_TYPE_INT: {
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STopBotResItem* pItem = &pRes->pItems[i];
colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i);
colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
int32_t pageId = pItem->tuplePos.pageId;
int32_t offset = pItem->tuplePos.offset;
if (pageId != -1) {
// todo
if (pItem->tuplePos.pageId != -1) {
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
bool* nullList = (bool*)((char*)pPage + offset);
char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
// todo set the offset value to optimize the performance.
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
SFunctParam *pFuncParam = &pc->pExpr->base.pParam[0];
int32_t srcSlotId = pFuncParam->pCol->slotId;
int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
int32_t ps = 0;
for(int32_t k = 0; k < srcSlotId; ++k) {
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
ps += pSrcCol->info.bytes;
}
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
if (nullList[srcSlotId]) {
colDataAppendNULL(pDstCol, currentRow);
} else {
colDataAppend(pDstCol, currentRow, (pStart + ps), false);
}
}
}
currentRow += 1;
}
break;
}
}
return pEntryInfo->numOfRes;
// return functionFinalize(pCtx, pBlock, slotId);
}
......@@ -3673,7 +3673,9 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
if (nodeType(pQuery->pRoot) == QUERY_NODE_SELECT_STMT) {
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
}
}
if (NULL != pCxt->pDbs) {
......
......@@ -476,6 +476,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
return (void*)buf;
}
// todo remove it
// order array<type *>
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
......
......@@ -115,19 +115,19 @@ endi
if $data00 != 10 then
return -1
endi
if $data01 != 0 then
if $data10 != 10 then
return -1
endi
if $data10 != 10 then
if $data90 != 10 then
return -1
endi
if $data11 != 1 then
if $data01 != 7 then
return -1
endi
if $data90 != 10 then
if $data11 != 6 then
return -1
endi
if $data91 != 9 then
if $data91 != 3 then
return -1
endi
......@@ -143,16 +143,16 @@ if $row != 10 then
return -1
endi
if $data00 != @22-01-01 00:00:00.000@ then
if $data00 != @22-01-01 00:00:00.007@ then
return -1
endi
if $data01 != 0 then
if $data01 != 7 then
return -1
endi
if $data90 != @22-01-01 00:00:00.009@ then
if $data90 != @22-01-01 00:00:00.003@ then
return -1
endi
if $data91 != 9 then
if $data91 != 3 then
return -1
endi
......
......@@ -53,6 +53,7 @@ endi
sql select count(tbcol) from $mt
print select count(tbcol) from $mt ===> $data00
if $data00 != $totalNum then
print expect $totalNum , actual: $data00
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册