diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9cf68b87f9764b4670cf7091c2f7d83bbb347063..575a0e6274691d3b24429a891979507a36eed543 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -83,11 +83,6 @@ if(${BUILD_WITH_NURAFT}) cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_NURAFT}) -# iconv -if(${BUILD_WITH_ICONV}) - cat("${CMAKE_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_ICONV}) - # download dependencies configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . @@ -213,10 +208,9 @@ endif(${BUILD_WITH_TRAFT}) # LIBUV if(${BUILD_WITH_UV}) - if (${TD_WINDOWS}) - file(READ "libuv/include/uv.h" CONTENTS) - string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") - file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}") + if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows") + MESSAGE("Windows need set no-sign-compare") + add_compile_options(-Wno-sign-compare) endif () add_subdirectory(libuv) endif(${BUILD_WITH_UV}) @@ -249,15 +243,7 @@ if(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE}) # pthread -if(${BUILD_PTHREAD}) - add_definitions(-DPTW32_STATIC_LIB) - add_subdirectory(pthread) -endif(${BUILD_PTHREAD}) - -# iconv -if(${BUILD_WITH_ICONV}) - add_subdirectory(iconv) -endif(${BUILD_WITH_ICONV}) + # ================================================================================================ # Build test diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 67946c9218b861a6baa4a128a44803a71aad128d..f181c26e92aed6a14576a69ef97d7e885f5d4b6b 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -120,7 +120,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); size_t blockDataGetSize(const SSDataBlock* pBlock); -size_t blockDataGetRowSize(const SSDataBlock* pBlock); +size_t blockDataGetRowSize(SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); @@ -131,7 +131,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void blockDataClearup(SSDataBlock* pDataBlock); +void blockDataCleanup(SSDataBlock* pDataBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/include/common/ttypes.h b/include/common/ttypes.h index a936ea3b5410a4942bd8775a169e7841bc0083cd..d7fcc28410872a56fab3ec2b62b02a22347cc9f6 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -46,7 +46,7 @@ typedef struct { #define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v)) #define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len)) -#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) +#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR)) #define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0])) #define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v)) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b9d94c2254ac919f2ec5edc6354a917e021c8f21..4ad7e2dfc2c7b5ce8065534c315e800c0d5eec26 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,7 +36,7 @@ typedef struct SReadHandle { #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 #define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 - + /** * Create the exec task for streaming mode * @param pMsg diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 24be465313206beec1d2935c5714dc114c38ecdc..de6e72336d586874f85af0d3cc0760f04f554712 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -141,18 +141,27 @@ typedef struct STscObj { SAppInstInfo* pAppInfo; } STscObj; +typedef struct SResultColumn { + union { + char* nullbitmap; // bitmap, one bit for each item in the list + int32_t* offset; + }; + char* pData; +} SResultColumn; + typedef struct SReqResultInfo { - const char* pRspMsg; - const char* pData; - TAOS_FIELD* fields; - uint32_t numOfCols; - int32_t* length; - TAOS_ROW row; - char** pCol; - uint32_t numOfRows; - uint64_t totalRows; - uint32_t current; - bool completed; + const char* pRspMsg; + const char* pData; + TAOS_FIELD* fields; + uint32_t numOfCols; + int32_t* length; + TAOS_ROW row; + SResultColumn* pCol; + uint32_t numOfRows; + uint64_t totalRows; + uint32_t current; + bool completed; + int32_t payloadLen; } SReqResultInfo; typedef struct SShowReqInfo { @@ -227,7 +236,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, void* doFetchRow(SRequestObj* pRequest); -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a7f15cbe4546b07149c83b2334fadb4781a0ebf8..b6bb14e9a4a5c1a6298ba22751651f51f859b6f0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -12,7 +12,7 @@ static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -556,13 +556,16 @@ void* doFetchRow(SRequestObj* pRequest) { } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); - if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; + pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); + if (pRequest->code != TSDB_CODE_SUCCESS) { + return NULL; + } + + pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); + if (pRequest->code != TSDB_CODE_SUCCESS) { return NULL; } - setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); @@ -629,10 +632,23 @@ void* doFetchRow(SRequestObj* pRequest) { _return: for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { - pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; + SResultColumn* pCol = &pResultInfo->pCol[i]; + if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { - pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); - pResultInfo->row[i] = varDataVal(pResultInfo->row[i]); + if (pCol->offset[pResultInfo->current] != -1) { + char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData; + + pResultInfo->length[i] = varDataLen(pStart); + pResultInfo->row[i] = varDataVal(pStart); + } else { + pResultInfo->row[i] = NULL; + } + } else { + if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) { + pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current; + } else { + pResultInfo->row[i] = NULL; + } } } @@ -640,30 +656,52 @@ _return: return pResultInfo->row; } -static void doPrepareResPtr(SReqResultInfo* pResInfo) { +static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { - pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); - pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn)); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); } + + if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } else { + return TSDB_CODE_SUCCESS; + } } -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { - return; + return TSDB_CODE_SUCCESS; } - // todo check for the failure of malloc - doPrepareResPtr(pResultInfo); + int32_t code = doPrepareResPtr(pResultInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t offset = 0; + int32_t* colLength = (int32_t*)pResultInfo->pData; + char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { + colLength[i] = htonl(colLength[i]); + + if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { + pResultInfo->pCol[i].offset = (int32_t*)pStart; + pStart += numOfRows * sizeof(int32_t); + } else { + pResultInfo->pCol[i].nullbitmap = pStart; + pStart += BitmapLen(pResultInfo->numOfRows); + } + + pResultInfo->pCol[i].pData = pStart; pResultInfo->length[i] = pResultInfo->fields[i].bytes; - pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows); - pResultInfo->pCol[i] = pResultInfo->row[i]; - offset += pResultInfo->fields[i].bytes; + pResultInfo->row[i] = pResultInfo->pCol[i].pData; + + pStart += colLength[i]; } + + return TSDB_CODE_SUCCESS; } char* getDbOfConnection(STscObj* pObj) { @@ -685,15 +723,17 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { taosThreadMutexUnlock(&pTscObj->mutex); } -void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { assert(pResultInfo != NULL && pRsp != NULL); - pResultInfo->pRspMsg = (const char*)pRsp; - pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonl(pRsp->numOfRows); - pResultInfo->current = 0; - pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->pRspMsg = (const char*)pRsp; + pResultInfo->pData = (void*)pRsp->data; + pResultInfo->numOfRows = htonl(pRsp->numOfRows); + pResultInfo->current = 0; + pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->payloadLen = htonl(pRsp->compLen); + // TODO handle the compressed case pResultInfo->totalRows += pResultInfo->numOfRows; - setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); + return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 17eaa70ae0aab7dfdcd087b781f1e2807bf524e2..ac092c8f101b808956f3908a8a839852630d3176 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -285,17 +285,17 @@ TEST(testCase, create_stable_Test) { ASSERT_EQ(numOfFields, 0); taos_free_result(pRes); - pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); - } - - pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - pRes = taos_query(pConn, "drop stable `123_$^)`"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); - } +// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); +// } +// +// pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// pRes = taos_query(pConn, "drop stable `123_$^)`"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); +// } taos_close(pConn); } @@ -335,7 +335,7 @@ TEST(testCase, create_ctable_Test) { } taos_free_result(pRes); - pRes = taos_query(pConn, "create table tu using sts tags('2021-10-10 1:1:1');"); + pRes = taos_query(pConn, "create table tu using st1 tags('2021-10-10 1:1:1');"); if (taos_errno(pRes) != 0) { printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); } @@ -485,7 +485,9 @@ TEST(testCase, show_table_Test) { taos_free_result(pRes); - pRes = taos_query(pConn, "show abc1.tables"); + taos_query(pConn, "use abc1"); + + pRes = taos_query(pConn, "show tables"); if (taos_errno(pRes) != 0) { printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -658,13 +660,7 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "select count(*) from t_x_19"); + pRes = taos_query(pConn, "select count(*), sum(k),min(k),max(k) from tu"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index aadb74ba861a6ee69bfd4378797bc72e110fd365..67e7333597a6f805d132a338c78b25ac28091a47 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -497,11 +497,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 /** * - * +------------------+---------------+--------------------+ - * |the number of rows| column length | column #1 | - * | (4 bytes) | (4 bytes) |--------------------+ - * | | | null bitmap| values| - * +------------------+---------------+--------------------+ + * +------------------+---------------------------------------------+ + * |the number of rows| column #1 | + * | (4 bytes) |------------+-----------------------+--------+ + * | | null bitmap| column length(4bytes) | values | + * +------------------+------------+-----------------------+--------+ * @param buf * @param pBlock * @return @@ -582,9 +582,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { return TSDB_CODE_SUCCESS; } -size_t blockDataGetRowSize(const SSDataBlock* pBlock) { +size_t blockDataGetRowSize(SSDataBlock* pBlock) { ASSERT(pBlock != NULL); - size_t rowSize = 0; + if (pBlock->info.rowSize == 0) { + size_t rowSize = 0; size_t numOfCols = pBlock->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { @@ -592,7 +593,10 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) { rowSize += pColInfo->info.bytes; } - return rowSize; + pBlock->info.rowSize = rowSize; + } + + return pBlock->info.rowSize; } /** @@ -633,7 +637,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { rowSize += sizeof(int32_t); } else { - rowSize += 1 / 8.0; + rowSize += 1/8.0; // one bit for each record } } @@ -1139,7 +1143,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF return 0; } -void blockDataClearup(SSDataBlock* pDataBlock) { +void blockDataCleanup(SSDataBlock* pDataBlock) { pDataBlock->info.rows = 0; if (pDataBlock->info.hasVarCol) { diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 7ef4c5e5053ba930c4e2822f938ce61275228015..964c4ab424e4f6de0015413429e5b0c7d668e555 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -16,10 +16,13 @@ #define _DEFAULT_SOURCE #include "mndInfoSchema.h" +#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) + static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, - {.name = "max_vnodes",.bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "max_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, @@ -38,7 +41,7 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt {.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; -static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, +static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, @@ -58,21 +61,21 @@ static const SInfosTableSchema userDBSchema[] = {{.name = "name", .b // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update }; static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, }; static const SInfosTableSchema userIdxSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "index_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "index_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY}, }; -static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "stable_name", .bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, +static const SInfosTableSchema userStbsSchema[] = {{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, @@ -80,25 +83,25 @@ static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", . {.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_INT}, }; -static const SInfosTableSchema userStreamsSchema[] = {{.name = "stream_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, +static const SInfosTableSchema userStreamsSchema[] = {{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "dest_table", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_BINARY}, }; static const SInfosTableSchema userTblsSchema[] = { - {.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "stable_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, - {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "table_comment", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "table_comment", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, }; static const SInfosTableSchema userTblDistSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_BINARY}, {.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, @@ -117,7 +120,7 @@ static const SInfosTableSchema userUsersSchema[] = {{.name = "name", . {.name = "account", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, }; static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8929709b1c47b7b811dd958c39940d8ce8a974c2..7b0de9d8bdcf70c94321c2af7cb614a9ee0937aa 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1534,20 +1534,19 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 cols = 0; SName name = {0}; + char stbName[TSDB_TABLE_NAME_LEN] = {0}; + mndExtractTableName(pStb->name, stbName); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, stbName); + cols++; + char db[TSDB_DB_NAME_LEN] = {0}; tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB); tNameGetDbName(&name, db); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, db); cols++; - char stbName[TSDB_TABLE_NAME_LEN] = {0}; - mndExtractTableName(pStb->name, stbName); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, stbName); - cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int64_t *)pWrite = pStb->createdTime; cols++; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 4c13820bca4aa68b42f7c0063974644bd2b000d4..1c2d9424d3e174481a6a8e0a7a41f6939c30e952 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -65,50 +65,68 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc } static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { - int32_t colSize = pColRes->info.bytes * numOfRows; + int32_t colSize = colDataGetLength(pColRes, numOfRows); return (*(tDataTypes[pColRes->info.type].compFunc))( pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t *compLen) { +static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) { int32_t numOfCols = LIST_LENGTH(pSchema->pSlots); - int32_t *compSizes = (int32_t*)data; - if (compressed) { - data += numOfCols * sizeof(int32_t); - } + int32_t * colSizes = (int32_t*)data; + + data += numOfCols * sizeof(int32_t); + *dataLen = (numOfCols * sizeof(int32_t)); + int32_t numOfRows = pInput->pData->info.rows; for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); + + // copy the null bitmap + if (IS_VAR_DATA_TYPE(pColRes->info.type)) { + size_t metaSize = numOfRows * sizeof(int32_t); + memcpy(data, pColRes->varmeta.offset, metaSize); + data += metaSize; + (*dataLen) += metaSize; + } else { + int32_t len = BitmapLen(numOfRows); + memcpy(data, pColRes->nullbitmap, len); + data += len; + (*dataLen) += len; + } + if (compressed) { - compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed); - data += compSizes[col]; - *compLen += compSizes[col]; - compSizes[col] = htonl(compSizes[col]); + colSizes[col] = compressColData(pColRes, numOfRows, data, compressed); + data += colSizes[col]; + (*dataLen) += colSizes[col]; } else { - for(int32_t i = 0; i < pInput->pData->info.rows; ++i) { - char* pData = colDataGetData(pColRes, i); - memmove(data, pData, pColRes->info.bytes); - data += pColRes->info.bytes; - } + colSizes[col] = colDataGetLength(pColRes, numOfRows); + (*dataLen) += colSizes[col]; + memmove(data, pColRes->pData, colSizes[col]); + data += colSizes[col]; } + + colSizes[col] = htonl(colSizes[col]); } } -// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... -// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... +// data format: +// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ +// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... +// | | sizeof(int32_t) * numOfCols | actual size | | actual size | | +// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ +// The length of bitmap is decided by number of rows of this data block, and the length of each column data is +// recorded in the first segment, next to the struct header static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); - pEntry->numOfRows = pInput->pData->info.rows; - pEntry->dataLen = 0; + pEntry->numOfRows = pInput->pData->info.rows; + pEntry->dataLen = 0; pBuf->useSize = sizeof(SRetrieveTableRsp); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); - if (0 == pEntry->compressed) { - pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; - } - pBuf->useSize += pEntry->dataLen; - // todo completed + + pEntry->dataLen = pEntry->dataLen; + pBuf->useSize += pEntry->dataLen; } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { @@ -119,8 +137,11 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - // struct size + data payload + length for each column - pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t); + // NOTE: there are four bytes of an integer more than the required buffer space. + // struct size + data payload + length for each column + bitmap length + pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + + ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows); + pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); @@ -173,6 +194,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE *pLen = 0; return; } + SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 407df1e90b0803b10f4d949f24eb82c4c80c8aac..e927b577deb3d37cedd8e7e101e92bda5ba6ea6f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3914,7 +3914,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); - blockDataClearup(pBlock); + blockDataCleanup(pBlock); if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { return; } @@ -4737,7 +4737,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) } SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - blockDataClearup(pInfo->pRes); + blockDataCleanup(pInfo->pRes); while (tqNextDataBlock(pInfo->readerHandle)) { pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); @@ -4852,17 +4852,35 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { blockDataEnsureCapacity(pRes, numOfRows); - if (pColList == NULL) { + if (pColList == NULL) { // data from other sources + int32_t* colLen = (int32_t*)pData; + char* pStart = pData + sizeof(int32_t) * numOfOutput; + for (int32_t i = 0; i < numOfOutput; ++i) { + colLen[i] = htonl(colLen[i]); + ASSERT(colLen[i] > 0); + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + pColInfoData->varmeta.length = colLen[i]; + pColInfoData->varmeta.allocLen = colLen[i]; - for (int32_t j = 0; j < numOfRows; ++j) { - colDataAppend(pColInfoData, j, pData, false); - pData += pColInfoData->info.bytes; + memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t)*numOfRows); + pStart += sizeof(int32_t)*numOfRows; + + pColInfoData->pData = malloc(colLen[i]); + } else { + memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows)); + pStart += BitmapLen(numOfRows); } + + memcpy(pColInfoData->pData, pStart, colLen[i]); + pStart += colLen[i]; } - } else { // extract data acording to pColList + } else { // extract data according to pColList ASSERT(numOfOutput == taosArrayGetSize(pColList)); + + // data from mnode for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t j = 0; j < numOfOutput; ++j) { @@ -5458,9 +5476,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); } - blockDataClearup(pInfo->pRes); + blockDataCleanup(pInfo->pRes); - SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 1); + int32_t tableNameSlotId = 1; + SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); char * name = NULL; int32_t numOfRows = 0; @@ -5475,7 +5494,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { } for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { - if (i == 1) { + if (i == tableNameSlotId) { continue; } @@ -5782,7 +5801,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan } static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) { - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -5938,7 +5957,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { while(1) { - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { @@ -6354,8 +6373,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SSDataBlock* pRes = pInfo->pRes; - - blockDataClearup(pRes); + blockDataCleanup(pRes); if (pProjectInfo->existDataBlock) { // TODO refactor // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 08bab762be048b5a6db1a1205a00853e667e1eb8..d424599758baf87eefe85b6bcc28abb374247fbf 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { start = stop + 1; } - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock); int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); @@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa } static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { - blockDataClearup(pHandle->pDataBlock); + blockDataCleanup(pHandle->pDataBlock); while(1) { if (cmpParam->numOfSources == pHandle->numOfCompletedSources) { @@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataClearup(pDataBlock); + blockDataCleanup(pDataBlock); } tMergeTreeDestroy(pHandle->pMergeTree); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 36ed879e4a5f9334cba5f5857ece44a80162c0ac..2f1caab30b75b27761f358365a2051bc2270d794 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { // // taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); } else { - blockDataClearup(pInfo->pBlock); + blockDataCleanup(pInfo->pBlock); } SSDataBlock* pBlock = pInfo->pBlock; @@ -151,7 +151,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); } else { - blockDataClearup(pInfo->pBlock); + blockDataCleanup(pInfo->pBlock); } SSDataBlock* pBlock = pInfo->pBlock; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 06425df8f157940dd556eed4a878d9475fa6375c..214204723fd5ec8c9ee3b09bf7c565e41c7627b1 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -23,7 +23,7 @@ extern "C" { #include "function.h" bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -void functionFinalizer(SqlFunctionCtx *pCtx); +void functionFinalize(SqlFunctionCtx *pCtx); bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void countFunction(SqlFunctionCtx *pCtx); @@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void minFunction(SqlFunctionCtx* pCtx); void maxFunction(SqlFunctionCtx *pCtx); +bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void stddevFunction(SqlFunctionCtx* pCtx); +void stddevFinalize(SqlFunctionCtx* pCtx); + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void firstFunction(SqlFunctionCtx *pCtx); void lastFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0f834d4420ac86a8a9f12a018462f1b173c9437a..e27d53c1eedebfa40e3c1f9bf6be111e0fe1fed0 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getCountFuncEnv, .initFunc = functionSetup, .processFunc = countFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "sum", @@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSumFuncEnv, .initFunc = functionSetup, .processFunc = sumFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "min", @@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = minFunctionSetup, .processFunc = minFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "max", @@ -59,7 +59,77 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = maxFunctionSetup, .processFunc = maxFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize + }, + { + .name = "stddev", + .type = FUNCTION_TYPE_STDDEV, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getStddevFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "percentile", + .type = FUNCTION_TYPE_PERCENTILE, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "apercentile", + .type = FUNCTION_TYPE_APERCENTILE, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "top", + .type = FUNCTION_TYPE_TOP, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "bottom", + .type = FUNCTION_TYPE_BOTTOM, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "spread", + .type = FUNCTION_TYPE_SPREAD, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "last_row", + .type = FUNCTION_TYPE_LAST_ROW, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize }, { .name = "first", @@ -69,7 +139,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = firstFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, { .name = "last", @@ -79,18 +149,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = lastFunction, - .finalizeFunc = functionFinalizer + .finalizeFunc = functionFinalize }, -// { -// .name = "valueAssigner", -// .type = FUNCTION_TYPE_ASSIGNER, -// .classification = FUNC_MGT_AGG_FUNC, -// .checkFunc = stubCheckAndGetResultType, -// .getEnvFunc = getFirstLastFuncEnv, -// .initFunc = functionSetup, -// .processFunc = valFunction, -// .finalizeFunc = functionFinalizer -// }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index eed2a3c481c4fd4b34e658aa90256b91f2be3557..98170208178d58244a8d662690689b431d1be916 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); } -void functionFinalizer(SqlFunctionCtx *pCtx) { +void functionFinalize(SqlFunctionCtx *pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); doFinalizer(pResInfo); } @@ -441,6 +441,74 @@ void maxFunction(SqlFunctionCtx *pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); } +typedef struct STopBotRes { + int32_t num; +} STopBotRes; + +bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); + int32_t bytes = pColNode->node.resType.bytes; + SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); + return true; +} + +typedef struct SStddevRes { + int64_t count; + union {double quadraticDSum; int64_t quadraticISum;}; + union {double dsum; int64_t isum;}; +} SStddevRes; + +bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SStddevRes); + return true; +} + +void stddevFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0]; + int32_t type = pInput->pData[0]->info.type; + + SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + +// } else { // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch(type) { + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; + } + } + break; + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); +} + +void stddevFinalize(SqlFunctionCtx* pCtx) { + functionFinalize(pCtx); + + SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count); +} + + + + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = pNode->node.resType.bytes; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 2c9b2e848a5d391c410e00d875d59b3a91502e31..213a3956c17bc760719af8844c4103b8e63a41c5 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -465,7 +465,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int return func(&tmpVal, pSchema->bytes, param); } - return func(getNullValue(pSchema->type), 0, param); + return func(NULL, 0, param); } switch (pSchema->type) { @@ -638,9 +638,13 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } varDataSetLen(rowEnd, output); - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx); + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); } else { - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); + if (value == NULL) { // it is a null data + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx); + } else { + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); + } } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ce9f115e4a11169ffc3fb6ac903a23c583aea542..ae2dccfe397f1a5c2dc4e4ac3d2240ed1de45402 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -15,7 +15,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, if (!gQWDebug.statusEnable) { return TSDB_CODE_SUCCESS; } - + int32_t code = 0; if (oriStatus == newStatus) { @@ -23,7 +23,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, *ignore = true; return TSDB_CODE_SUCCESS; } - + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -76,7 +76,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; - + case JOB_TASK_STATUS_CANCELLING: if (newStatus != JOB_TASK_STATUS_CANCELLED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); @@ -313,7 +313,7 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - + *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { QW_TASK_DLOG_E("task ctx not exist, may be dropped"); @@ -498,9 +498,9 @@ _return: } int32_t qwDropTask(QW_FPARAMS_DEF) { - QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); - + return TSDB_CODE_SUCCESS; } @@ -527,7 +527,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { if (NULL == pRes) { QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds); - + dsEndPut(sinkHandle, useconds); if (TASK_TYPE_TEMP == ctx->taskType) { @@ -542,7 +542,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { } int32_t rows = pRes->info.rows; - + ASSERT(pRes->info.rows > 0); SInputData inputData = {.pData = pRes}; @@ -654,12 +654,12 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void } QW_TASK_DLOG_E("no data in sink and query end"); - + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED); QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + *rspMsg = rsp; *dataLen = 0; - return TSDB_CODE_SUCCESS; } @@ -721,7 +721,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu switch (phase) { case QW_PHASE_PRE_QUERY: { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); + QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); break; } @@ -762,7 +762,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - + dropConnection = ctx->dropConnection; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -789,12 +789,12 @@ _return: } if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, code); + qwBuildAndSendDropRsp(dropConnection, code); QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, code); + qwBuildAndSendCancelRsp(cancelConnection, code); QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); } @@ -840,7 +840,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - + dropConnection = ctx->dropConnection; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -870,17 +870,17 @@ _return: } if (readyConnection) { - qwBuildAndSendReadyRsp(readyConnection, code); + qwBuildAndSendReadyRsp(readyConnection, code); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); } if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, code); + qwBuildAndSendDropRsp(dropConnection, code); QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, code); + qwBuildAndSendCancelRsp(cancelConnection, code); QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); } @@ -986,9 +986,9 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { code = ctx->rspCode; goto _return; } - + QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase)); - + QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); _return: @@ -1000,7 +1000,7 @@ _return: if (code) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } - + if (ctx) { QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); @@ -1047,7 +1047,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); atomic_store_8((int8_t*)&ctx->queryEnd, qComplete); - + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); @@ -1068,7 +1068,7 @@ _return: qwFreeFetchRsp(rsp); rsp = NULL; qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); - QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code)); } QW_LOCK(QW_WRITE, &ctx->lock); @@ -1082,7 +1082,7 @@ _return: } while (true); input.code = code; - QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); + QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); } @@ -1119,7 +1119,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t*)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t*)&ctx->queryInQueue)) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING); atomic_store_8((int8_t*)&ctx->queryInQueue, 1); @@ -1192,7 +1192,7 @@ _return: if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); } - + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index b29bad07f08982c7644287d4ef9fa98cf10245bc..1245121d94376b2b5d677a7e918d2e20746449cf 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -7,11 +7,11 @@ add_executable(pushServer "") target_sources(transUT PRIVATE - "transUT.cc" + "transUT.cpp" ) target_sources(transportTest PRIVATE - "transportTests.cc" + "transportTests.cpp" ) target_sources (client PRIVATE diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cpp similarity index 100% rename from source/libs/transport/test/transUT.cc rename to source/libs/transport/test/transUT.cpp diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cpp similarity index 100% rename from source/libs/transport/test/transportTests.cc rename to source/libs/transport/test/transportTests.cpp