提交 5b01c2df 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11463-3.0

...@@ -83,11 +83,6 @@ if(${BUILD_WITH_NURAFT}) ...@@ -83,11 +83,6 @@ if(${BUILD_WITH_NURAFT})
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT}) endif(${BUILD_WITH_NURAFT})
# iconv
if(${BUILD_WITH_ICONV})
cat("${CMAKE_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_ICONV})
# download dependencies # download dependencies
configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt") configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
...@@ -213,10 +208,9 @@ endif(${BUILD_WITH_TRAFT}) ...@@ -213,10 +208,9 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV # LIBUV
if(${BUILD_WITH_UV}) if(${BUILD_WITH_UV})
if (${TD_WINDOWS}) if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows")
file(READ "libuv/include/uv.h" CONTENTS) MESSAGE("Windows need set no-sign-compare")
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") add_compile_options(-Wno-sign-compare)
file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
endif () endif ()
add_subdirectory(libuv) add_subdirectory(libuv)
endif(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV})
...@@ -249,15 +243,7 @@ if(${BUILD_WITH_SQLITE}) ...@@ -249,15 +243,7 @@ if(${BUILD_WITH_SQLITE})
endif(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE})
# pthread # pthread
if(${BUILD_PTHREAD})
add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread)
endif(${BUILD_PTHREAD})
# iconv
if(${BUILD_WITH_ICONV})
add_subdirectory(iconv)
endif(${BUILD_WITH_ICONV})
# ================================================================================================ # ================================================================================================
# Build test # Build test
......
...@@ -120,7 +120,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); ...@@ -120,7 +120,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
...@@ -131,7 +131,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -131,7 +131,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock); void blockDataCleanup(SSDataBlock* pDataBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
......
...@@ -46,7 +46,7 @@ typedef struct { ...@@ -46,7 +46,7 @@ typedef struct {
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v)) #define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE)) #define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) #define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0])) #define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
#define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v)) #define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v))
......
...@@ -36,7 +36,7 @@ typedef struct SReadHandle { ...@@ -36,7 +36,7 @@ typedef struct SReadHandle {
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 #define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
......
...@@ -141,18 +141,27 @@ typedef struct STscObj { ...@@ -141,18 +141,27 @@ typedef struct STscObj {
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
} STscObj; } STscObj;
typedef struct SResultColumn {
union {
char* nullbitmap; // bitmap, one bit for each item in the list
int32_t* offset;
};
char* pData;
} SResultColumn;
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char* pRspMsg; const char* pRspMsg;
const char* pData; const char* pData;
TAOS_FIELD* fields; TAOS_FIELD* fields;
uint32_t numOfCols; uint32_t numOfCols;
int32_t* length; int32_t* length;
TAOS_ROW row; TAOS_ROW row;
char** pCol; SResultColumn* pCol;
uint32_t numOfRows; uint32_t numOfRows;
uint64_t totalRows; uint64_t totalRows;
uint32_t current; uint32_t current;
bool completed; bool completed;
int32_t payloadLen;
} SReqResultInfo; } SReqResultInfo;
typedef struct SShowReqInfo { typedef struct SShowReqInfo {
...@@ -227,7 +236,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, ...@@ -227,7 +236,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
void* doFetchRow(SRequestObj* pRequest); void* doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -556,13 +556,16 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -556,13 +556,16 @@ void* doFetchRow(SRequestObj* pRequest) {
} }
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pRequest->code = code; return NULL;
}
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
if (pRequest->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
...@@ -629,10 +632,23 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -629,10 +632,23 @@ void* doFetchRow(SRequestObj* pRequest) {
_return: _return:
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; SResultColumn* pCol = &pResultInfo->pCol[i];
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); if (pCol->offset[pResultInfo->current] != -1) {
pResultInfo->row[i] = varDataVal(pResultInfo->row[i]); char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
pResultInfo->length[i] = varDataLen(pStart);
pResultInfo->row[i] = varDataVal(pStart);
} else {
pResultInfo->row[i] = NULL;
}
} else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current;
} else {
pResultInfo->row[i] = NULL;
}
} }
} }
...@@ -640,30 +656,52 @@ _return: ...@@ -640,30 +656,52 @@ _return:
return pResultInfo->row; return pResultInfo->row;
} }
static void doPrepareResPtr(SReqResultInfo* pResInfo) { static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL) { if (pResInfo->row == NULL) {
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn));
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
} }
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
} }
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) { if (numOfRows == 0) {
return; return TSDB_CODE_SUCCESS;
} }
// todo check for the failure of malloc int32_t code = doPrepareResPtr(pResultInfo);
doPrepareResPtr(pResultInfo); if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t offset = 0; int32_t* colLength = (int32_t*)pResultInfo->pData;
char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
colLength[i] = htonl(colLength[i]);
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart;
pStart += numOfRows * sizeof(int32_t);
} else {
pResultInfo->pCol[i].nullbitmap = pStart;
pStart += BitmapLen(pResultInfo->numOfRows);
}
pResultInfo->pCol[i].pData = pStart;
pResultInfo->length[i] = pResultInfo->fields[i].bytes; pResultInfo->length[i] = pResultInfo->fields[i].bytes;
pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows); pResultInfo->row[i] = pResultInfo->pCol[i].pData;
pResultInfo->pCol[i] = pResultInfo->row[i];
offset += pResultInfo->fields[i].bytes; pStart += colLength[i];
} }
return TSDB_CODE_SUCCESS;
} }
char* getDbOfConnection(STscObj* pObj) { char* getDbOfConnection(STscObj* pObj) {
...@@ -685,15 +723,17 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { ...@@ -685,15 +723,17 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
taosThreadMutexUnlock(&pTscObj->mutex); taosThreadMutexUnlock(&pTscObj->mutex);
} }
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen);
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows; pResultInfo->totalRows += pResultInfo->numOfRows;
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
} }
...@@ -285,17 +285,17 @@ TEST(testCase, create_stable_Test) { ...@@ -285,17 +285,17 @@ TEST(testCase, create_stable_Test) {
ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); // pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); // printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
} // }
//
pRes = taos_query(pConn, "use abc1"); // pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); // taos_free_result(pRes);
pRes = taos_query(pConn, "drop stable `123_$^)`"); // pRes = taos_query(pConn, "drop stable `123_$^)`");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); // printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes));
} // }
taos_close(pConn); taos_close(pConn);
} }
...@@ -335,7 +335,7 @@ TEST(testCase, create_ctable_Test) { ...@@ -335,7 +335,7 @@ TEST(testCase, create_ctable_Test) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using sts tags('2021-10-10 1:1:1');"); pRes = taos_query(pConn, "create table tu using st1 tags('2021-10-10 1:1:1');");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
} }
...@@ -485,7 +485,9 @@ TEST(testCase, show_table_Test) { ...@@ -485,7 +485,9 @@ TEST(testCase, show_table_Test) {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "show abc1.tables"); taos_query(pConn, "use abc1");
pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
...@@ -658,13 +660,7 @@ TEST(testCase, agg_query_tables) { ...@@ -658,13 +660,7 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); pRes = taos_query(pConn, "select count(*), sum(k),min(k),max(k) from tu");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from t_x_19");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -497,11 +497,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 ...@@ -497,11 +497,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
/** /**
* *
* +------------------+---------------+--------------------+ * +------------------+---------------------------------------------+
* |the number of rows| column length | column #1 | * |the number of rows| column #1 |
* | (4 bytes) | (4 bytes) |--------------------+ * | (4 bytes) |------------+-----------------------+--------+
* | | | null bitmap| values| * | | null bitmap| column length(4bytes) | values |
* +------------------+---------------+--------------------+ * +------------------+------------+-----------------------+--------+
* @param buf * @param buf
* @param pBlock * @param pBlock
* @return * @return
...@@ -582,9 +582,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -582,9 +582,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
size_t blockDataGetRowSize(const SSDataBlock* pBlock) { size_t blockDataGetRowSize(SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
size_t rowSize = 0; if (pBlock->info.rowSize == 0) {
size_t rowSize = 0;
size_t numOfCols = pBlock->info.numOfCols; size_t numOfCols = pBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -592,7 +593,10 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) { ...@@ -592,7 +593,10 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
rowSize += pColInfo->info.bytes; rowSize += pColInfo->info.bytes;
} }
return rowSize; pBlock->info.rowSize = rowSize;
}
return pBlock->info.rowSize;
} }
/** /**
...@@ -633,7 +637,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ...@@ -633,7 +637,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
rowSize += sizeof(int32_t); rowSize += sizeof(int32_t);
} else { } else {
rowSize += 1 / 8.0; rowSize += 1/8.0; // one bit for each record
} }
} }
...@@ -1139,7 +1143,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -1139,7 +1143,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
return 0; return 0;
} }
void blockDataClearup(SSDataBlock* pDataBlock) { void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0; pDataBlock->info.rows = 0;
if (pDataBlock->info.hasVarCol) { if (pDataBlock->info.hasVarCol) {
......
...@@ -16,10 +16,13 @@ ...@@ -16,10 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndInfoSchema.h" #include "mndInfoSchema.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "max_vnodes",.bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "max_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
...@@ -38,7 +41,7 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt ...@@ -38,7 +41,7 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY}, {.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
}; };
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
...@@ -58,21 +61,21 @@ static const SInfosTableSchema userDBSchema[] = {{.name = "name", .b ...@@ -58,21 +61,21 @@ static const SInfosTableSchema userDBSchema[] = {{.name = "name", .b
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
}; };
static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY}, {.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
}; };
static const SInfosTableSchema userIdxSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, static const SInfosTableSchema userIdxSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, {.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY}, {.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY},
}; };
static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, static const SInfosTableSchema userStbsSchema[] = {{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "stable_name", .bytes = (TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
...@@ -80,25 +83,25 @@ static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", . ...@@ -80,25 +83,25 @@ static const SInfosTableSchema userStbsSchema[] = {{.name = "db_name", .
{.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_INT}, {.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_INT},
}; };
static const SInfosTableSchema userStreamsSchema[] = {{.name = "stream_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, static const SInfosTableSchema userStreamsSchema[] = {{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_BINARY}, {.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_BINARY},
{.name = "dest_table", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, {.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_BINARY}, {.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_BINARY},
}; };
static const SInfosTableSchema userTblsSchema[] = { static const SInfosTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "stable_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, {.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "table_comment", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "table_comment", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
}; };
static const SInfosTableSchema userTblDistSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, static const SInfosTableSchema userTblDistSchema[] = {{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
{.name = "table_name", .bytes = 192, .type = TSDB_DATA_TYPE_BINARY}, {.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_BINARY}, {.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_BINARY},
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
...@@ -117,7 +120,7 @@ static const SInfosTableSchema userUsersSchema[] = {{.name = "name", . ...@@ -117,7 +120,7 @@ static const SInfosTableSchema userUsersSchema[] = {{.name = "name", .
{.name = "account", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "account", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
}; };
static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
......
...@@ -1534,20 +1534,19 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 ...@@ -1534,20 +1534,19 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
cols = 0; cols = 0;
SName name = {0}; SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN] = {0};
mndExtractTableName(pStb->name, stbName);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stbName);
cols++;
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&name, db); tNameGetDbName(&name, db);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, db); STR_TO_VARSTR(pWrite, db);
cols++; cols++;
char stbName[TSDB_TABLE_NAME_LEN] = {0};
mndExtractTableName(pStb->name, stbName);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stbName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStb->createdTime; *(int64_t *)pWrite = pStb->createdTime;
cols++; cols++;
......
...@@ -65,50 +65,68 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc ...@@ -65,50 +65,68 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
} }
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows; int32_t colSize = colDataGetLength(pColRes, numOfRows);
return (*(tDataTypes[pColRes->info.type].compFunc))( return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t *compLen) { static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) {
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots); int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
int32_t *compSizes = (int32_t*)data; int32_t * colSizes = (int32_t*)data;
if (compressed) {
data += numOfCols * sizeof(int32_t); data += numOfCols * sizeof(int32_t);
} *dataLen = (numOfCols * sizeof(int32_t));
int32_t numOfRows = pInput->pData->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
// copy the null bitmap
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
size_t metaSize = numOfRows * sizeof(int32_t);
memcpy(data, pColRes->varmeta.offset, metaSize);
data += metaSize;
(*dataLen) += metaSize;
} else {
int32_t len = BitmapLen(numOfRows);
memcpy(data, pColRes->nullbitmap, len);
data += len;
(*dataLen) += len;
}
if (compressed) { if (compressed) {
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed); colSizes[col] = compressColData(pColRes, numOfRows, data, compressed);
data += compSizes[col]; data += colSizes[col];
*compLen += compSizes[col]; (*dataLen) += colSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else { } else {
for(int32_t i = 0; i < pInput->pData->info.rows; ++i) { colSizes[col] = colDataGetLength(pColRes, numOfRows);
char* pData = colDataGetData(pColRes, i); (*dataLen) += colSizes[col];
memmove(data, pData, pColRes->info.bytes); memmove(data, pColRes->pData, colSizes[col]);
data += pColRes->info.bytes; data += colSizes[col];
}
} }
colSizes[col] = htonl(colSizes[col]);
} }
} }
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... // data format:
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... // +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// | | sizeof(int32_t) * numOfCols | actual size | | actual size | |
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = sizeof(SRetrieveTableRsp); pBuf->useSize = sizeof(SRetrieveTableRsp);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; pEntry->dataLen = pEntry->dataLen;
} pBuf->useSize += pEntry->dataLen;
pBuf->useSize += pEntry->dataLen;
// todo completed
} }
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
...@@ -119,8 +137,11 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, ...@@ -119,8 +137,11 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false; return false;
} }
// struct size + data payload + length for each column // NOTE: there are four bytes of an integer more than the required buffer space.
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t); // struct size + data payload + length for each column + bitmap length
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) +
ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows);
pBuf->pData = malloc(pBuf->allocSize); pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) { if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
...@@ -173,6 +194,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE ...@@ -173,6 +194,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
*pLen = 0; *pLen = 0;
return; return;
} }
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
......
...@@ -3914,7 +3914,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI ...@@ -3914,7 +3914,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
blockDataClearup(pBlock); blockDataCleanup(pBlock);
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
return; return;
} }
...@@ -4737,7 +4737,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) ...@@ -4737,7 +4737,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup)
} }
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataClearup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->readerHandle)) { while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
...@@ -4852,17 +4852,35 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* ...@@ -4852,17 +4852,35 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo*
int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) {
blockDataEnsureCapacity(pRes, numOfRows); blockDataEnsureCapacity(pRes, numOfRows);
if (pColList == NULL) { if (pColList == NULL) { // data from other sources
int32_t* colLen = (int32_t*)pData;
char* pStart = pData + sizeof(int32_t) * numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] > 0);
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colLen[i];
pColInfoData->varmeta.allocLen = colLen[i];
for (int32_t j = 0; j < numOfRows; ++j) { memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t)*numOfRows);
colDataAppend(pColInfoData, j, pData, false); pStart += sizeof(int32_t)*numOfRows;
pData += pColInfoData->info.bytes;
pColInfoData->pData = malloc(colLen[i]);
} else {
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
pStart += BitmapLen(numOfRows);
} }
memcpy(pColInfoData->pData, pStart, colLen[i]);
pStart += colLen[i];
} }
} else { // extract data acording to pColList } else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList)); ASSERT(numOfOutput == taosArrayGetSize(pColList));
// data from mnode
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
for(int32_t j = 0; j < numOfOutput; ++j) { for(int32_t j = 0; j < numOfOutput; ++j) {
...@@ -5458,9 +5476,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -5458,9 +5476,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
} }
blockDataClearup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 1); int32_t tableNameSlotId = 1;
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId);
char * name = NULL; char * name = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
...@@ -5475,7 +5494,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -5475,7 +5494,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
} }
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
if (i == 1) { if (i == tableNameSlotId) {
continue; continue;
} }
...@@ -5782,7 +5801,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan ...@@ -5782,7 +5801,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
} }
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) { static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
while(1) { while(1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
...@@ -5938,7 +5957,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { ...@@ -5938,7 +5957,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
while(1) { while(1) {
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
while (1) { while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
...@@ -6354,8 +6373,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6354,8 +6373,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
blockDataClearup(pRes);
if (pProjectInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
......
...@@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
start = stop + 1; start = stop + 1;
} }
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
...@@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa ...@@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
} }
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
blockDataClearup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
while(1) { while(1) {
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) { if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
...@@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
} }
tMergeTreeDestroy(pHandle->pMergeTree); tMergeTreeDestroy(pHandle->pMergeTree);
......
...@@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
// //
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); // taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else { } else {
blockDataClearup(pInfo->pBlock); blockDataCleanup(pInfo->pBlock);
} }
SSDataBlock* pBlock = pInfo->pBlock; SSDataBlock* pBlock = pInfo->pBlock;
...@@ -151,7 +151,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -151,7 +151,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else { } else {
blockDataClearup(pInfo->pBlock); blockDataCleanup(pInfo->pBlock);
} }
SSDataBlock* pBlock = pInfo->pBlock; SSDataBlock* pBlock = pInfo->pBlock;
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include "function.h" #include "function.h"
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void functionFinalizer(SqlFunctionCtx *pCtx); void functionFinalize(SqlFunctionCtx *pCtx);
bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void countFunction(SqlFunctionCtx *pCtx); void countFunction(SqlFunctionCtx *pCtx);
...@@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); ...@@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void minFunction(SqlFunctionCtx* pCtx); void minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx); void maxFunction(SqlFunctionCtx *pCtx);
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void stddevFunction(SqlFunctionCtx* pCtx);
void stddevFinalize(SqlFunctionCtx* pCtx);
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void firstFunction(SqlFunctionCtx *pCtx); void firstFunction(SqlFunctionCtx *pCtx);
void lastFunction(SqlFunctionCtx *pCtx); void lastFunction(SqlFunctionCtx *pCtx);
......
...@@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -29,7 +29,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getCountFuncEnv, .getEnvFunc = getCountFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = countFunction, .processFunc = countFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "sum", .name = "sum",
...@@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -39,7 +39,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getSumFuncEnv, .getEnvFunc = getSumFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = sumFunction, .processFunc = sumFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "min", .name = "min",
...@@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -49,7 +49,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = minFunctionSetup, .initFunc = minFunctionSetup,
.processFunc = minFunction, .processFunc = minFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "max", .name = "max",
...@@ -59,7 +59,77 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -59,7 +59,77 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = maxFunctionSetup,
.processFunc = maxFunction, .processFunc = maxFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
},
{
.name = "stddev",
.type = FUNCTION_TYPE_STDDEV,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getStddevFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "percentile",
.type = FUNCTION_TYPE_PERCENTILE,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "apercentile",
.type = FUNCTION_TYPE_APERCENTILE,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "top",
.type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "bottom",
.type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "spread",
.type = FUNCTION_TYPE_SPREAD,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
}, },
{ {
.name = "first", .name = "first",
...@@ -69,7 +139,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -69,7 +139,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = firstFunction, .processFunc = firstFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
{ {
.name = "last", .name = "last",
...@@ -79,18 +149,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -79,18 +149,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = lastFunction, .processFunc = lastFunction,
.finalizeFunc = functionFinalizer .finalizeFunc = functionFinalize
}, },
// {
// .name = "valueAssigner",
// .type = FUNCTION_TYPE_ASSIGNER,
// .classification = FUNC_MGT_AGG_FUNC,
// .checkFunc = stubCheckAndGetResultType,
// .getEnvFunc = getFirstLastFuncEnv,
// .initFunc = functionSetup,
// .processFunc = valFunction,
// .finalizeFunc = functionFinalizer
// },
{ {
.name = "concat", .name = "concat",
.type = FUNCTION_TYPE_CONCAT, .type = FUNCTION_TYPE_CONCAT,
......
...@@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { ...@@ -50,7 +50,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); } static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
void functionFinalizer(SqlFunctionCtx *pCtx) { void functionFinalize(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
doFinalizer(pResInfo); doFinalizer(pResInfo);
} }
...@@ -441,6 +441,74 @@ void maxFunction(SqlFunctionCtx *pCtx) { ...@@ -441,6 +441,74 @@ void maxFunction(SqlFunctionCtx *pCtx) {
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
} }
typedef struct STopBotRes {
int32_t num;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
return true;
}
typedef struct SStddevRes {
int64_t count;
union {double quadraticDSum; int64_t quadraticISum;};
union {double dsum; int64_t isum;};
} SStddevRes;
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStddevRes);
return true;
}
void stddevFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// } else { // computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
switch(type) {
case TSDB_DATA_TYPE_INT: {
int32_t* plist = (int32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
}
break;
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
}
void stddevFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx);
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes; pEnv->calcMemSize = pNode->node.resType.bytes;
......
...@@ -465,7 +465,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int ...@@ -465,7 +465,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return func(&tmpVal, pSchema->bytes, param); return func(&tmpVal, pSchema->bytes, param);
} }
return func(getNullValue(pSchema->type), 0, param); return func(NULL, 0, param);
} }
switch (pSchema->type) { switch (pSchema->type) {
...@@ -638,9 +638,13 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p ...@@ -638,9 +638,13 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
varDataSetLen(rowEnd, output); varDataSetLen(rowEnd, output);
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, true, pa->toffset, pa->colIdx); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
} else { } else {
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); if (value == NULL) { // it is a null data
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
} else {
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -15,7 +15,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, ...@@ -15,7 +15,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = 0; int32_t code = 0;
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
...@@ -23,7 +23,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, ...@@ -23,7 +23,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
*ignore = true; *ignore = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -76,7 +76,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, ...@@ -76,7 +76,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
break; break;
case JOB_TASK_STATUS_CANCELLING: case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_CANCELLED) { if (newStatus != JOB_TASK_STATUS_CANCELLED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
...@@ -313,7 +313,7 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { ...@@ -313,7 +313,7 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_TASK_DLOG_E("task ctx not exist, may be dropped");
...@@ -498,9 +498,9 @@ _return: ...@@ -498,9 +498,9 @@ _return:
} }
int32_t qwDropTask(QW_FPARAMS_DEF) { int32_t qwDropTask(QW_FPARAMS_DEF) {
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -527,7 +527,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -527,7 +527,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
if (NULL == pRes) { if (NULL == pRes) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds); QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds); dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == ctx->taskType) { if (TASK_TYPE_TEMP == ctx->taskType) {
...@@ -542,7 +542,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -542,7 +542,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
} }
int32_t rows = pRes->info.rows; int32_t rows = pRes->info.rows;
ASSERT(pRes->info.rows > 0); ASSERT(pRes->info.rows > 0);
SInputData inputData = {.pData = pRes}; SInputData inputData = {.pData = pRes};
...@@ -654,12 +654,12 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -654,12 +654,12 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
} }
QW_TASK_DLOG_E("no data in sink and query end"); QW_TASK_DLOG_E("no data in sink and query end");
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; *rspMsg = rsp;
*dataLen = 0; *dataLen = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -721,7 +721,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -721,7 +721,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
switch (phase) { switch (phase) {
case QW_PHASE_PRE_QUERY: { case QW_PHASE_PRE_QUERY: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
break; break;
} }
...@@ -762,7 +762,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -762,7 +762,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = ctx->dropConnection; dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
...@@ -789,12 +789,12 @@ _return: ...@@ -789,12 +789,12 @@ _return:
} }
if (dropConnection) { if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code); qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
} }
if (cancelConnection) { if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code); qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
} }
...@@ -840,7 +840,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -840,7 +840,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
} }
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = ctx->dropConnection; dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
...@@ -870,17 +870,17 @@ _return: ...@@ -870,17 +870,17 @@ _return:
} }
if (readyConnection) { if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code); qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
} }
if (dropConnection) { if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code); qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
} }
if (cancelConnection) { if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code); qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
} }
...@@ -986,9 +986,9 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -986,9 +986,9 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
code = ctx->rspCode; code = ctx->rspCode;
goto _return; goto _return;
} }
QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase)); QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
_return: _return:
...@@ -1000,7 +1000,7 @@ _return: ...@@ -1000,7 +1000,7 @@ _return:
if (code) { if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
} }
if (ctx) { if (ctx) {
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
...@@ -1047,7 +1047,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1047,7 +1047,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
atomic_store_8((int8_t*)&ctx->queryEnd, qComplete); atomic_store_8((int8_t*)&ctx->queryEnd, qComplete);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
...@@ -1068,7 +1068,7 @@ _return: ...@@ -1068,7 +1068,7 @@ _return:
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -1082,7 +1082,7 @@ _return: ...@@ -1082,7 +1082,7 @@ _return:
} while (true); } while (true);
input.code = code; input.code = code;
QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL));
} }
...@@ -1119,7 +1119,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1119,7 +1119,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_IS_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t*)&ctx->queryContinue, 1); atomic_store_8((int8_t*)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t*)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t*)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
atomic_store_8((int8_t*)&ctx->queryInQueue, 1); atomic_store_8((int8_t*)&ctx->queryInQueue, 1);
...@@ -1192,7 +1192,7 @@ _return: ...@@ -1192,7 +1192,7 @@ _return:
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
} }
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
} }
......
...@@ -7,11 +7,11 @@ add_executable(pushServer "") ...@@ -7,11 +7,11 @@ add_executable(pushServer "")
target_sources(transUT target_sources(transUT
PRIVATE PRIVATE
"transUT.cc" "transUT.cpp"
) )
target_sources(transportTest target_sources(transportTest
PRIVATE PRIVATE
"transportTests.cc" "transportTests.cpp"
) )
target_sources (client target_sources (client
PRIVATE PRIVATE
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册