提交 15248304 编写于 作者: W wpan

Merge branch 'develop' into feature/TD-5331

......@@ -83,6 +83,8 @@ IF (TD_ARM_64)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm64 is defined")
SET(COMMON_FLAGS "-Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF ()
IF (TD_ARM_32)
......@@ -91,6 +93,8 @@ IF (TD_ARM_32)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm32 is defined")
SET(COMMON_FLAGS "-Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF ()
IF (TD_MIPS_64)
......@@ -143,6 +147,7 @@ IF (TD_LINUX)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF ()
IF (TD_DARWIN_64)
......@@ -164,6 +169,7 @@ IF (TD_DARWIN_64)
SET(RELEASE_FLAGS "-Og")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF ()
IF (TD_WINDOWS)
......@@ -194,6 +200,7 @@ IF (TD_WINDOWS)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/regex)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/wepoll/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MsvcLibX/include)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF ()
IF (TD_WINDOWS_64)
......
......@@ -182,7 +182,13 @@ function install_jemalloc() {
${csudo} /usr/bin/install -c -d /usr/local/share/man/man3
${csudo} /usr/bin/install -c -m 644 ${jemalloc_dir}/share/man/man3/jemalloc.3 /usr/local/share/man/man3
fi
${csudo} ldconfig
if [ -d /etc/ld.so.conf.d ]; then
${csudo} echo "/usr/local/lib" > /etc/ld.so.conf.d/jemalloc.conf
${csudo} ldconfig
else
echo "/etc/ld.so.conf.d not found!"
fi
fi
}
......
......@@ -41,10 +41,10 @@ fi
if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
#strip ${build_dir}/bin/taosd
#strip ${build_dir}/bin/taosd
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh"
else
else
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo \
${script_dir}/remove_client.sh ${script_dir}/set_core.sh ${script_dir}/get_client.sh ${script_dir}/taosd-dump-cfg.gdb"
fi
......@@ -139,7 +139,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/C# ${install_dir}/examples
fi
# Copy driver
mkdir -p ${install_dir}/driver
mkdir -p ${install_dir}/driver
cp ${lib_files} ${install_dir}/driver
# Copy connector
......@@ -168,7 +168,7 @@ fi
# exit 1
cd ${release_dir}
cd ${release_dir}
# install_dir has been distinguishes cluster from edege, so comments this code
pkg_name=${install_dir}-${osType}-${cpuType}
......@@ -195,6 +195,15 @@ if [ "$pagMode" == "lite" ]; then
pkg_name=${pkg_name}-Lite
fi
if [ "$verType" == "beta" ]; then
pkg_name=${pkg_name}-${verType}
elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name}
else
echo "unknow verType, nor stable or beta"
exit 1
fi
if [ "$osType" != "Darwin" ]; then
tar -zcv -f "$(basename ${pkg_name}).tar.gz" $(basename ${install_dir}) --remove-files || :
else
......
......@@ -41,10 +41,10 @@ fi
if [ "$osType" != "Darwin" ]; then
# if [ "$pagMode" == "lite" ]; then
# strip ${build_dir}/bin/powerd
# strip ${build_dir}/bin/powerd
# strip ${build_dir}/bin/power
# bin_files="${build_dir}/bin/power ${script_dir}/remove_client_power.sh"
# else
# else
# bin_files="${build_dir}/bin/power ${build_dir}/bin/powerdemo ${script_dir}/remove_client_power.sh ${script_dir}/set_core.sh"
# fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
......@@ -67,6 +67,39 @@ mkdir -p ${install_dir}
mkdir -p ${install_dir}/inc && cp ${header_files} ${install_dir}/inc
mkdir -p ${install_dir}/cfg && cp ${cfg_dir}/taos.cfg ${install_dir}/cfg/taos.cfg
if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
cp ${build_dir}/bin/jemalloc-config ${install_dir}/jemalloc/bin
if [ -f ${build_dir}/bin/jemalloc.sh ]; then
cp ${build_dir}/bin/jemalloc.sh ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/bin/jeprof ]; then
cp ${build_dir}/bin/jeprof ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/include/jemalloc/jemalloc.h ]; then
cp ${build_dir}/include/jemalloc/jemalloc.h ${install_dir}/jemalloc/include/jemalloc
fi
if [ -f ${build_dir}/lib/libjemalloc.so.2 ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi
if [ -f ${build_dir}/share/doc/jemalloc/jemalloc.html ]; then
cp ${build_dir}/share/doc/jemalloc/jemalloc.html ${install_dir}/jemalloc/share/doc/jemalloc
fi
if [ -f ${build_dir}/share/man/man3/jemalloc.3 ]; then
cp ${build_dir}/share/man/man3/jemalloc.3 ${install_dir}/jemalloc/share/man/man3
fi
fi
sed -i '/dataDir/ {s/taos/power/g}' ${install_dir}/cfg/taos.cfg
sed -i '/logDir/ {s/taos/power/g}' ${install_dir}/cfg/taos.cfg
sed -i "s/TDengine/PowerDB/g" ${install_dir}/cfg/taos.cfg
......@@ -77,11 +110,11 @@ if [ "$osType" != "Darwin" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
else
else
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
......@@ -158,15 +191,15 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/JDBC ${install_dir}/examples
cp -r ${examples_dir}/matlab ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/matlab/TDengineDemo.m
cp -r ${examples_dir}/python ${install_dir}/examples
cp -r ${examples_dir}/python ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/python/read_example.py
cp -r ${examples_dir}/R ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
cp -r ${examples_dir}/go ${install_dir}/examples
cp -r ${examples_dir}/go ${install_dir}/examples
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
fi
# Copy driver
mkdir -p ${install_dir}/driver
mkdir -p ${install_dir}/driver
cp ${lib_files} ${install_dir}/driver
# Copy connector
......@@ -188,11 +221,11 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
echo "WARNING: go connector not found, please check if want to use it!"
fi
cp -r ${connector_dir}/python ${install_dir}/connector
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/subscription.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/connection.py
fi
# Copy release note
......@@ -200,7 +233,7 @@ fi
# exit 1
cd ${release_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${osType}-${cpuType}
......@@ -217,8 +250,8 @@ fi
if [ "$verType" == "beta" ]; then
pkg_name=${pkg_name}-${verType}
elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name}
elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name}
else
echo "unknow verType, nor stable or beta"
exit 1
......
......@@ -21,7 +21,7 @@ IF (TD_LINUX)
IF (TD_LINUX_64)
TARGET_LINK_LIBRARIES(taos lua)
ENDIF ()
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
#set version of .so
......@@ -37,13 +37,13 @@ ELSEIF (TD_DARWIN)
# set the static lib name
ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m)
TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
# generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m)
TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
#set version of .dylib
......@@ -68,19 +68,19 @@ ELSEIF (TD_WINDOWS)
IF (NOT TD_GODLL)
SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def)
ENDIF ()
TARGET_LINK_LIBRARIES(taos trpc tutil query)
TARGET_LINK_LIBRARIES(taos trpc tutil query lua)
ELSEIF (TD_DARWIN)
SET(CMAKE_MACOSX_RPATH 1)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m)
TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
# generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m)
TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
......
......@@ -94,11 +94,21 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
typedef struct SBlockKeyTuple {
TSKEY skey;
void* payloadAddr;
} SBlockKeyTuple;
typedef struct SBlockKeyInfo {
int32_t maxBytesAlloc;
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
......@@ -108,6 +118,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
......@@ -262,6 +273,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo);
void tscResetForNextRetrieve(SSqlRes* pRes);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
......@@ -308,10 +320,9 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
......
......@@ -88,13 +88,43 @@ typedef struct SBoundColumn {
int32_t offset; // all column offset value
} SBoundColumn;
typedef struct {
uint16_t schemaColIdx;
uint16_t boundIdx;
uint16_t finalIdx;
} SBoundIdxInfo;
typedef enum _COL_ORDER_STATUS {
ORDER_STATUS_UNKNOWN = 0,
ORDER_STATUS_ORDERED = 1,
ORDER_STATUS_DISORDERED = 2,
} EOrderStatus;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
int32_t *boundedColumns;
SBoundColumn *cols;
int16_t numOfCols;
int16_t numOfBound;
int32_t * boundedColumns; // bounded column idx according to schema
SBoundColumn * cols;
SBoundIdxInfo *colIdxInfo;
int8_t orderStatus; // bounded columns:
} SParsedDataColInfo;
#define IS_DATA_COL_ORDERED(s) ((s) == (int8_t)ORDER_STATUS_ORDERED)
typedef struct {
SSchema * pSchema;
int16_t sversion;
int32_t flen;
uint16_t nCols;
void * buf;
void * pDataBlock;
SSubmitBlk *pSubmitBlk;
} SMemRowBuilder;
typedef struct {
TDRowLenT allNullLen;
} SMemRowHelper;
typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client
......@@ -110,12 +140,13 @@ typedef struct STableDataBlocks {
char *pData;
bool cloned;
SParsedDataColInfo boundColumnInfo;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo *params;
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo * params;
SMemRowHelper rowHelper;
} STableDataBlocks;
typedef struct {
......@@ -147,6 +178,7 @@ typedef struct {
SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32
int32_t count; // todo remove it
bool subCmd;
char reserve2[3]; // fix bus error on arm32
int16_t numOfCols;
......@@ -243,6 +275,7 @@ typedef struct SSqlObj {
void * pStream;
void * pSubscription;
char * sqlstr;
void * pBuf; // table meta buffer
char parseRetry;
char retry;
char maxRetry;
......@@ -382,10 +415,15 @@ extern int tscRefId;
extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SSqlCmd* pCmd);
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen);
int32_t getExtendedRowSize(STableComInfo *tinfo);
#ifdef __cplusplus
}
#endif
......
......@@ -602,6 +602,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} else {
......@@ -610,6 +619,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]);
}
......@@ -626,7 +644,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
}
for(int32_t j = 0; j < numOfExpr; ++j) {
aAggs[pCtx[j].functionId].init(&pCtx[j]);
if (pCtx[j].functionId < 0) {
continue;
}
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
}
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -638,6 +660,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
......@@ -651,6 +682,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
......@@ -871,7 +911,6 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
{
if (pAggInfo->hasDataBlockForNewGroup) {
pAggInfo->binfo.pRes->info.rows = 0;
pAggInfo->hasPrev = false; // now we start from a new group data set.
// not belongs to the same group, return the result of current group;
......@@ -880,7 +919,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
{ // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
aAggs[pAggInfo->binfo.pCtx[j].functionId].init(&pAggInfo->binfo.pCtx[j]);
SQLFunctionCtx* pCtx = &pAggInfo->binfo.pCtx[j];
if (pCtx->functionId < 0) {
clearOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity);
continue;
}
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
}
}
......@@ -933,6 +978,14 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
......
此差异已折叠。
......@@ -293,6 +293,7 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
return taosStringBuilderGetResult(&sb, NULL);
}
#if 0
static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
int32_t offset = 0;
......@@ -320,8 +321,129 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
return TSDB_CODE_SUCCESS;
}
#endif
/**
* input:
* - schema:
* - payload:
* - spd:
* output:
* - pBlock with data block replaced by K-V format
*/
static int refactorPayload(STableDataBlocks* pBlock, int32_t rowNum) {
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
SSchema* schema = (SSchema*)pBlock->pTableMeta->schema;
SMemRowHelper* pHelper = &pBlock->rowHelper;
STableMeta* pTableMeta = pBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
int code = TSDB_CODE_SUCCESS;
int32_t extendedRowSize = getExtendedRowSize(&tinfo);
TDRowTLenT destPayloadSize = sizeof(SSubmitBlk);
ASSERT(pHelper->allNullLen >= 8);
TDRowTLenT destAllocSize = sizeof(SSubmitBlk) + rowNum * extendedRowSize;
SSubmitBlk* pDestBlock = tcalloc(destAllocSize, 1);
if (pDestBlock == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(pDestBlock, pBlock->pData, sizeof(SSubmitBlk));
char* destPayload = (char*)pDestBlock + sizeof(SSubmitBlk);
char* srcPayload = (char*)pBlock->pData + sizeof(SSubmitBlk);
for (int n = 0; n < rowNum; ++n) {
payloadSetNCols(destPayload, spd->numOfBound);
TDRowTLenT dataRowLen = pHelper->allNullLen;
TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE + sizeof(SColIdx) * spd->numOfBound;
TDRowTLenT payloadValOffset = payloadValuesOffset(destPayload); // rely on payloadNCols
TDRowLenT colValOffset = 0;
char* kvPrimaryKeyStart = destPayload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
char* kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey
for (int32_t i = 0; i < spd->numOfBound; ++i) {
int32_t colIndex = spd->boundedColumns[i];
ASSERT(spd->cols[colIndex].hasVal);
char* start = srcPayload + spd->cols[colIndex].offset;
SSchema* pSchema = &schema[colIndex]; // get colId here
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
// the primary key locates in 1st column
if (!IS_DATA_COL_ORDERED(spd->orderStatus)) {
ASSERT(spd->colIdxInfo != NULL);
if (!isPrimaryKey) {
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
} else {
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
}
}
if (isPrimaryKey) {
payloadColSetId(kvPrimaryKeyStart, pSchema->colId);
payloadColSetType(kvPrimaryKeyStart, pSchema->type);
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]);
colValOffset += TYPE_BYTES[pSchema->type];
kvRowLen += TYPE_BYTES[pSchema->type];
} else {
payloadColSetId(kvStart, pSchema->colId);
payloadColSetType(kvStart, pSchema->type);
payloadColSetOffset(kvStart, colValOffset);
if (IS_VAR_DATA_TYPE(pSchema->type)) {
varDataCopy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start);
colValOffset += varDataTLen(start);
kvRowLen += varDataTLen(start);
if (pSchema->type == TSDB_DATA_TYPE_BINARY) {
dataRowLen += (varDataLen(start) - CHAR_BYTES);
} else if (pSchema->type == TSDB_DATA_TYPE_NCHAR) {
dataRowLen += (varDataLen(start) - TSDB_NCHAR_SIZE);
} else {
ASSERT(0);
}
} else {
memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]);
colValOffset += TYPE_BYTES[pSchema->type];
kvRowLen += TYPE_BYTES[pSchema->type];
}
if (IS_DATA_COL_ORDERED(spd->orderStatus)) {
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
}
}
} // end of column
if (kvRowLen < dataRowLen) {
payloadSetType(destPayload, SMEM_ROW_KV);
} else {
payloadSetType(destPayload, SMEM_ROW_DATA);
}
ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW);
TDRowTLenT len = payloadValOffset + colValOffset;
payloadSetTLen(destPayload, len);
// next loop
srcPayload += pBlock->rowSize;
destPayload += len;
destPayloadSize += len;
} // end of row
ASSERT(destPayloadSize <= destAllocSize);
tfree(pBlock->pData);
pBlock->pData = (char*)pDestBlock;
pBlock->nAllocSize = destAllocSize;
pBlock->size = destPayloadSize;
return code;
}
#if 0
int32_t fillTablesColumnsNull(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
......@@ -344,16 +466,97 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
return TSDB_CODE_SUCCESS;
}
#endif
/**
* check and sort
*/
static int initPayloadEnv(STableDataBlocks* pBlock, int32_t rowNum) {
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
if (spd->orderStatus != ORDER_STATUS_UNKNOWN) {
return TSDB_CODE_SUCCESS;
}
bool isOrdered = true;
int32_t lastColIdx = -1;
for (int32_t i = 0; i < spd->numOfBound; ++i) {
ASSERT(spd->cols[i].hasVal);
int32_t colIdx = spd->boundedColumns[i];
if (isOrdered) {
if (lastColIdx > colIdx) {
isOrdered = false;
break;
} else {
lastColIdx = colIdx;
}
}
}
spd->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
if (isOrdered) {
spd->colIdxInfo = NULL;
} else {
spd->colIdxInfo = calloc(spd->numOfBound, sizeof(SBoundIdxInfo));
if (spd->colIdxInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SBoundIdxInfo* pColIdx = spd->colIdxInfo;
for (uint16_t i = 0; i < spd->numOfBound; ++i) {
pColIdx[i].schemaColIdx = (uint16_t)spd->boundedColumns[i];
pColIdx[i].boundIdx = i;
}
qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (uint16_t i = 0; i < spd->numOfBound; ++i) {
pColIdx[i].finalIdx = i;
}
qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
}
return TSDB_CODE_SUCCESS;
}
/**
* Refactor the raw payload structure to K-V format as the in tsParseOneRow()
*/
int32_t fillTablesPayload(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
int code = TSDB_CODE_SUCCESS;
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
while (pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) {
initSMemRowHelper(&pOneTableBlock->rowHelper, tscGetTableSchema(pOneTableBlock->pTableMeta),
tscGetNumOfColumns(pOneTableBlock->pTableMeta), 0);
if ((code = initPayloadEnv(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) {
return code;
}
if ((code = refactorPayload(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) {
return code;
};
}
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
}
return code;
}
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static FORCE_INLINE int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes);
return TSDB_CODE_SUCCESS;
}
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes);
return TSDB_CODE_SUCCESS;
}
#if 0
if (0) {
......@@ -1202,9 +1405,12 @@ static int insertStmtExecute(STscStmt* stmt) {
pBlk->uid = pTableMeta->id.uid;
pBlk->tid = pTableMeta->id.tid;
fillTablesColumnsNull(stmt->pSql);
int code = fillTablesPayload(stmt->pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1269,8 +1475,6 @@ static void insertBatchClean(STscStmt* pStmt) {
static int insertBatchStmtExecute(STscStmt* pStmt) {
int32_t code = 0;
int64_t st1 = taosGetTimestampUs();
if(pStmt->mtb.nameSet == false) {
tscError("0x%"PRIx64" no table name set", pStmt->pSql->self);
return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set");
......@@ -1283,37 +1487,25 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return TSDB_CODE_TSC_APP_ERROR;
}
fillTablesColumnsNull(pStmt->pSql);
int64_t st2 = taosGetTimestampUs();
fillTablesPayload(pStmt->pSql);
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
return code;
}
int64_t st3 = taosGetTimestampUs();
code = tscHandleMultivnodeInsert(pStmt->pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t st4 = taosGetTimestampUs();
// wait for the callback function to post the semaphore
tsem_wait(&pStmt->pSql->rspSem);
int64_t st5 = taosGetTimestampUs();
code = pStmt->pSql->res.code;
insertBatchClean(pStmt);
int64_t st6 = taosGetTimestampUs();
tscDebug("use time:%"PRId64 ",%"PRId64 ",%"PRId64 ",%"PRId64 ",%"PRId64, st2-st1, st3-st2, st4-st3, st5-st4, st6-st5);
return code;
}
......
此差异已折叠。
此差异已折叠。
......@@ -566,7 +566,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
if ((pQueryInfo == NULL) || pQueryInfo->globalMerge) {
return true;
}
......@@ -679,7 +679,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
if (!pQueryInfo->globalMerge) {
return;
}
......@@ -730,7 +730,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
if (pQueryInfo->globalMerge) {
assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql);
} else {
......@@ -945,28 +945,35 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_DISCONNECTED;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t length = (int32_t)strlen(tableNameList);
if (length == 0) {
return TSDB_CODE_SUCCESS;
}
if (length > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, length, MAX_TABLE_NAME_LENGTH);
tscFreeSqlObj(pSql);
tscError("tableNameList too long, length:%d, maximum allowed:%d", length, MAX_TABLE_NAME_LENGTH);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
char *str = calloc(1, length + 1);
if (str == NULL) {
tscError("0x%"PRIx64" failed to allocate sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
tscError("failed to allocate sql string buffer, size:%d", length);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(str, tableNameList);
SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
if (plist == NULL || vgroupList == NULL) {
tfree(str);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos;
pSql->signature = pSql;
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
......@@ -976,10 +983,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code;
}
pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, NULL, loadMultiTableMetaCallback, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS;
}
......
......@@ -23,6 +23,7 @@
#include "tscSubquery.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "qUdf.h"
#include "qUtil.h"
#include "qPlan.h"
......@@ -32,7 +33,7 @@ typedef struct SInsertSupporter {
} SInsertSupporter;
static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
//static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
if (left == right) {
......@@ -711,6 +712,15 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo->window = *win;
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
__compar_fn_t func = getComparFunc(t1->padding, 0);
return func(t1->tag, t2->tag);
}
int32_t tidTagsCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) (p1);
const STidTags* t2 = (const STidTags*) (p2);
......@@ -719,28 +729,7 @@ int32_t tidTagsCompar(const void* p1, const void* p2) {
return (t1->vgId > t2->vgId) ? 1 : -1;
}
tstr* tag1 = (tstr*) t1->tag;
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return strncmp(tag1->data, tag2->data, tag1->len);
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
tstr* tag1 = (tstr*) t1->tag;
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return memcmp(tag1->data, tag2->data, tag1->len);
return tagValCompar(p1, p2);
}
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
......@@ -870,6 +859,12 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
return true;
}
static void setTidTagType(SJoinSupporter* p, uint8_t type) {
for (int32_t i = 0; i < p->num; ++i) {
STidTags * tag = (STidTags*) varDataVal(p->pIdTagList + i * p->tagSize);
tag->padding = type;
}
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub;
......@@ -889,6 +884,8 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
for (int32_t i = 0; i < joinNum; i++) {
SJoinSupporter* p = pParentSql->pSubs[i]->param;
setTidTagType(p, pColSchema->type);
ctxlist[i].p = p;
ctxlist[i].res = taosArrayInit(p->num, size);
......@@ -1896,7 +1893,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
int16_t type = 0;
int32_t inter = 0;
getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL);
SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
pSupporter->tagSize = s1.bytes;
......@@ -2295,6 +2292,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SArray* pColList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL;
pNewQueryInfo->fillType = TSDB_FILL_NONE;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
......@@ -2438,9 +2436,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tOrderDescriptor *pDesc = NULL;
pRes->qId = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 16u); // 64KB
const uint32_t nBufferSize = (1u << 18u); // 256KB
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
......@@ -2801,6 +2799,28 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscFreeRetrieveSup(pSql);
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE;
SQueryInfo *pQueryInfo2 = tscGetQueryInfo(&pParentSql->cmd);
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pExprInfo = tscExprGet(pQueryInfo2, j);
int32_t functionId = pExprInfo->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo2->pUdfInfo, -1 * functionId - 1);
code = initUdfInfo(pUdfInfo);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
}
}
}
}
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
......@@ -3469,20 +3489,20 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd1);
assert(pQueryInfo1->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
/*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, goes on
......@@ -3493,14 +3513,14 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
break;
}
}
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == 0) {
continue;
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
......
......@@ -28,6 +28,7 @@
#include "tconfig.h"
#include "ttimezone.h"
#include "tlocale.h"
#include "qScript.h"
// global, not configurable
#define TSC_VAR_NOT_RELEASE 1
......@@ -148,6 +149,8 @@ void taos_init_imp(void) {
taosInitNotes();
rpcInit();
scriptEnvPoolInit();
tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
}
......@@ -202,7 +205,9 @@ void taos_cleanup(void) {
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return;
}
if (tscEmbedded == 0) {
scriptEnvPoolCleanup();
}
taosHashCleanup(tscTableMetaInfo);
tscTableMetaInfo = NULL;
......
此差异已折叠。
......@@ -41,8 +41,10 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" )
......@@ -74,6 +76,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_STABLEVGROUP, "stable-vgroup" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MULTI_META, "multi-meta" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_HB, "heart-beat" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_FUNC, "retrieve-function" )
// SQL below for client local
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
......
......@@ -24,6 +24,35 @@
extern "C" {
#endif
#pragma pack(push, 1)
typedef struct {
VarDataLenT len;
uint8_t data;
} SBinaryNullT;
typedef struct {
VarDataLenT len;
uint32_t data;
} SNCharNullT;
#pragma pack(pop)
extern const uint8_t BoolNull;
extern const uint8_t TinyintNull;
extern const uint16_t SmallintNull;
extern const uint32_t IntNull;
extern const uint64_t BigintNull;
extern const uint64_t TimestampNull;
extern const uint8_t UTinyintNull;
extern const uint16_t USmallintNull;
extern const uint32_t UIntNull;
extern const uint64_t UBigintNull;
extern const uint32_t FloatNull;
extern const uint64_t DoubleNull;
extern const SBinaryNullT BinaryNull;
extern const SNCharNullT NcharNull;
const void *tdGetNullVal(int8_t type);
#define STR_TO_VARSTR(x, str) \
do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \
......@@ -31,6 +60,14 @@ extern "C" {
memcpy(varDataVal(x), (str), __len); \
} while (0);
#define STR_TO_NET_VARSTR(x, str) \
do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \
*(VarDataLenT *)(x) = htons(__len); \
memcpy(varDataVal(x), (str), __len); \
} while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \
......@@ -45,10 +82,10 @@ extern "C" {
// ----------------- TSDB COLUMN DEFINITION
typedef struct {
int8_t type; // Column type
int16_t colId; // column ID
int16_t bytes; // column bytes
int16_t offset; // point offset in SDataRow after the header part
int8_t type; // Column type
int16_t colId; // column ID
uint16_t bytes; // column bytes
uint16_t offset; // point offset in SDataRow after the header part.
} STColumn;
#define colType(col) ((col)->type)
......@@ -159,10 +196,11 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
return 0;
}
}
// ----------------- Data row structure
/* A data row, the format is like below:
* |<--------------------+--------------------------- len ---------------------------------->|
* |<------------------------------------------------ len ---------------------------------->|
* |<-- Head -->|<--------- flen -------------->| |
* +---------------------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
......@@ -176,8 +214,8 @@ typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535
#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
#define dataRowKey(r) tdGetKey(dataRowTKey(r))
......@@ -193,20 +231,19 @@ void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
if (IS_VAR_DATA_TYPE(type)) {
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
memcpy(ptr, value, varDataTLen(value));
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value));
dataRowLen(row) += varDataTLen(value);
} else {
if (offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]);
memcpy(POINTER_SHIFT(row, toffset), (const void *)(&tvalue), TYPE_BYTES[type]);
} else {
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
}
......@@ -237,17 +274,21 @@ typedef struct SDataCol {
TSKEY ts; // only used in last NULL column
} SDataCol;
#define isAllRowsNull(pCol) ((pCol)->len == 0)
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (isAllRowsNull(pCol)) {
return tdGetNullVal(pCol->type);
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else {
......@@ -279,7 +320,7 @@ typedef struct {
} SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) {
if (pCols->numOfRows) {
......@@ -323,13 +364,13 @@ void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
SDataCols *tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset);
// ----------------- K-V data row structure
/*
/* |<-------------------------------------- len -------------------------------------------->|
* |<----- header ----->|<--------------------------- body -------------------------------->|
* +----------+----------+---------------------------------+---------------------------------+
* | int16_t | int16_t | | |
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
......@@ -337,14 +378,14 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge
typedef void *SKVRow;
typedef struct {
int16_t colId;
int16_t offset;
int16_t colId;
uint16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t))
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define kvRowLen(r) (*(int16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define kvRowLen(r) (*(TDRowLenT *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
......@@ -354,6 +395,9 @@ typedef struct {
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) tfree(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
SKVRow tdKVRowDup(SKVRow row);
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
......@@ -377,13 +421,44 @@ static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) {
return kvRowColVal(row, (SColIdx *)ret);
}
static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
}
// offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE;
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset);
char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row));
pColIdx->colId = colId;
pColIdx->offset = kvRowLen(row); // offset of pColIdx including the TD_KV_ROW_HEAD_SIZE
if (IS_VAR_DATA_TYPE(type)) {
memcpy(ptr, value, varDataTLen(value));
kvRowLen(row) += varDataTLen(value);
} else {
if (offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]);
} else {
memcpy(ptr, value, TYPE_BYTES[type]);
}
kvRowLen(row) += TYPE_BYTES[type];
}
return 0;
}
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
int16_t nCols;
SColIdx *pColIdx;
int16_t alloc;
int16_t size;
uint16_t alloc;
uint16_t size;
void * buf;
} SKVRowBuilder;
......@@ -419,8 +494,146 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
return 0;
}
// ----------------- SMemRow appended with sequential data row structure
/*
* |---------|------------------------------------------------- len ---------------------------------->|
* |<-------- Head ------>|<--------- flen -------------->| |
* |---------+---------------------+---------------------------------+---------------------------------+
* | uint8_t | uint16_t | int16_t | | |
* |---------+----------+----------+---------------------------------+---------------------------------+
* | flag | len | sversion | First part | Second part |
* +---------+----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
// ----------------- SMemRow appended with extended K-V data row structure
/* |--------------------|------------------------------------------------ len ---------------------------------->|
* |<------------- Head ------------>|<--------- flen -------------->| |
* |--------------------+----------+--------------------------------------------+---------------------------------+
* | uint8_t | int16_t | uint16_t | int16_t | | |
* |---------+----------+----------+----------+---------------------------------+---------------------------------+
* | flag | sversion | len | ncols | cols index | data part |
* |---------+----------+----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SMemRow;
#define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t)
#define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t)
#define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE)
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
// #define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define SMEM_ROW_DATA 0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow
#define memRowType(r) (*(uint8_t *)(r))
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \
POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse SKVRow
#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r)) // 0~65535
#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) // 0~65535
#define memRowDataTLen(r) \
((TDRowTLenT)(memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE)) // using uint32_t/int32_t to store the TLen
#define memRowKvTLen(r) ((TDRowTLenT)(memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE))
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen
#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r))
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
#define memRowSetKvVersion(r, v) (memRowKvVersion(r) = (v))
#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowValues(memRowKvBody(r)))
#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r)))
#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowDataBody(r)) : kvRowKey(memRowKvBody(r)))
#define memRowSetTKey(r, k) \
do { \
if (isDataRow(r)) { \
dataRowTKey(memRowDataBody(r)) = (k); \
} else { \
kvRowTKey(memRowKvBody(r)) = (k); \
} \
} while (0)
#define memRowSetType(r, t) (memRowType(r) = (t))
#define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowKvSetVersion(r, v))
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE)
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
SMemRow tdMemRowDup(SMemRow row);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols);
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); }
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) {
if (isDataRow(row)) {
return tdGetRowDataOfCol(row, type, offset);
} else if (isKvRow(row)) {
return tdGetKvRowDataOfCol(row, offset);
} else {
ASSERT(0);
}
return NULL;
}
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
* +-----------+----------+----------+--------------------------------------|--------------------|
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
* +-----------+----------+----------+--------------------------------------+--------------------|
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
* 2. dataTLen: total length including the header and body.
*/
#define PAYLOAD_NCOLS_LEN sizeof(uint16_t)
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowTLenT))
#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN)
#define PAYLOAD_ID_LEN sizeof(int16_t)
#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t))
#define PAYLOAD_COL_HEAD_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(uint16_t))
#define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY))
#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN)
#define payloadType(r) (*(uint8_t *)(r))
#define payloadSetType(r, t) (payloadType(r) = (t))
#define payloadTLen(r) (*(TDRowTLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
#define payloadSetTLen(r, l) (payloadTLen(r) = (l))
#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET))
#define payloadSetNCols(r, n) (payloadNCols(r) = (n))
#define payloadValuesOffset(r) \
(PAYLOAD_HEADER_LEN + payloadNCols(r) * PAYLOAD_COL_HEAD_LEN) // avoid using the macro in loop
#define payloadValues(r) POINTER_SHIFT(r, payloadValuesOffset(r)) // avoid using the macro in loop
#define payloadColId(c) (*(int16_t *)(c))
#define payloadColType(c) (*(uint8_t *)POINTER_SHIFT(c, PAYLOAD_ID_LEN))
#define payloadColOffset(c) (*(uint16_t *)POINTER_SHIFT(c, PAYLOAD_ID_TYPE_LEN))
#define payloadColValue(c) POINTER_SHIFT(c, payloadColOffset(c))
#define payloadColSetId(c, i) (payloadColId(c) = (i))
#define payloadColSetType(c, t) (payloadColType(c) = (t))
#define payloadColSetOffset(c, o) (payloadColOffset(c) = (o))
#define payloadTSKey(r) (*(TSKEY *)POINTER_SHIFT(r, payloadValuesOffset(r)))
#define payloadTKey(r) (*(TKEY *)POINTER_SHIFT(r, payloadValuesOffset(r)))
#define payloadKey(r) tdGetKey(payloadTKey(r))
static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); }
#ifdef __cplusplus
}
#endif
#endif // _TD_DATA_FORMAT_H_
#endif // _TD_DATA_FORMAT_H_
\ No newline at end of file
......@@ -18,6 +18,21 @@
#include "tcoding.h"
#include "wchar.h"
const uint8_t BoolNull = TSDB_DATA_BOOL_NULL;
const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL;
const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL;
const uint32_t IntNull = TSDB_DATA_INT_NULL;
const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL;
const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL;
const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL;
const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL;
const uint32_t UIntNull = TSDB_DATA_UINT_NULL;
const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL;
const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL;
const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL;
const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL};
const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL};
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
......@@ -198,6 +213,14 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow;
}
SMemRow tdMemRowDup(SMemRow row) {
SMemRow trow = malloc(memRowTLen(row));
if (trow == NULL) return NULL;
memRowCpy(trow, row);
return trow;
}
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
pDataCol->type = colType(pCol);
pDataCol->colId = colColId(pCol);
......@@ -217,11 +240,22 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
}
}
// value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
}
if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
......@@ -243,7 +277,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
return true;
}
void dataColSetNullAt(SDataCol *pCol, int index) {
FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
......@@ -399,8 +433,7 @@ void tdResetDataCols(SDataCols *pCols) {
}
}
}
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0;
......@@ -419,7 +452,8 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColSetNullAt(pDataCol, pCols->numOfRows);
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
continue;
}
......@@ -433,7 +467,8 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
dataColSetNullAt(pDataCol, pCols->numOfRows);
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
}
}
......@@ -441,6 +476,62 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
pCols->numOfRows++;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0;
int dcol = 0;
if (kvRowDeleted(row)) {
for (; dcol < pCols->numOfCols; dcol++) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (dcol == 0) {
dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints);
} else {
dataColSetNullAt(pDataCol, pCols->numOfRows);
}
}
} else {
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol;
continue;
}
SColIdx *colIdx = kvRowColIdxAt(row, rcol);
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol;
}
}
}
pCols->numOfRows++;
}
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) {
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols);
} else {
ASSERT(0);
}
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols);
......@@ -563,7 +654,7 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
if (nrow == NULL) return -1;
kvRowSetLen(nrow, kvRowLen(row) + (int16_t)sizeof(SColIdx) + diff);
kvRowSetLen(nrow, kvRowLen(row) + (uint16_t)sizeof(SColIdx) + diff);
kvRowSetNCols(nrow, kvRowNCols(row) + 1);
if (ptr == NULL) {
......@@ -605,8 +696,8 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
memcpy(pOldVal, value, varDataTLen(value));
} else { // need to reallocate the memory
int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
int16_t nlen = kvRowLen(row) + diff;
uint16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
uint16_t nlen = kvRowLen(row) + diff;
ASSERT(nlen > 0);
nrow = malloc(nlen);
if (nrow == NULL) return -1;
......@@ -709,3 +800,39 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
return row;
}
const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
case TSDB_DATA_TYPE_TINYINT:
return &TinyintNull;
case TSDB_DATA_TYPE_SMALLINT:
return &SmallintNull;
case TSDB_DATA_TYPE_INT:
return &IntNull;
case TSDB_DATA_TYPE_BIGINT:
return &BigintNull;
case TSDB_DATA_TYPE_FLOAT:
return &FloatNull;
case TSDB_DATA_TYPE_DOUBLE:
return &DoubleNull;
case TSDB_DATA_TYPE_BINARY:
return &BinaryNull;
case TSDB_DATA_TYPE_TIMESTAMP:
return &TimestampNull;
case TSDB_DATA_TYPE_NCHAR:
return &NcharNull;
case TSDB_DATA_TYPE_UTINYINT:
return &UTinyintNull;
case TSDB_DATA_TYPE_USMALLINT:
return &USmallintNull;
case TSDB_DATA_TYPE_UINT:
return &UIntNull;
case TSDB_DATA_TYPE_UBIGINT:
return &UBigintNull;
default:
ASSERT(0);
return NULL;
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ import com.taosdata.jdbc.utils.NullType;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
......@@ -463,6 +464,25 @@ public class TSDBResultSetRowData {
data.set(col, tsObj);
}
/**
* this implementation is used for TDengine old version
*/
public void setTimestamp(int col, long ts) {
//TODO: this implementation contains logical error
// when precision is us the (long ts) is 16 digital number
// when precision is ms, the (long ts) is 13 digital number
// we need a JNI function like this:
// public void setTimestamp(int col, long epochSecond, long nanoAdjustment)
if (ts < 1_0000_0000_0000_0L) {
data.set(col, new Timestamp(ts));
} else {
long epochSec = ts / 1000_000L;
long nanoAdjustment = ts % 1000_000L * 1000L;
Timestamp timestamp = Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
data.set(col, timestamp);
}
}
public Timestamp getTimestamp(int col, int nativeType) {
Object obj = data.get(col - 1);
if (obj == null)
......
......@@ -5,7 +5,6 @@ import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import com.taosdata.jdbc.enums.TimestampPrecision;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
......@@ -110,15 +109,27 @@ public class Utils {
return rawSql;
// toLowerCase
String preparedSql = rawSql.trim().toLowerCase();
String[] clause = new String[]{"values\\s*\\([\\s\\S]*?\\)", "tags\\s*\\([\\s\\S]*?\\)", "where[\\s\\S]*"};
String[] clause = new String[]{"tags\\s*\\([\\s\\S]*?\\)", "where[\\s\\S]*"};
Map<Integer, Integer> placeholderPositions = new HashMap<>();
RangeSet<Integer> clauseRangeSet = TreeRangeSet.create();
findPlaceholderPosition(preparedSql, placeholderPositions);
// find tags and where clause's position
findClauseRangeSet(preparedSql, clause, clauseRangeSet);
// find values clause's position
findValuesClauseRangeSet(preparedSql, clauseRangeSet);
return transformSql(rawSql, parameters, placeholderPositions, clauseRangeSet);
}
private static void findValuesClauseRangeSet(String preparedSql, RangeSet<Integer> clauseRangeSet) {
Matcher matcher = Pattern.compile("(values|,)\\s*(\\([^)]*\\))").matcher(preparedSql);
while (matcher.find()) {
int start = matcher.start(2);
int end = matcher.end(2);
clauseRangeSet.add(Range.closedOpen(start, end));
}
}
private static void findClauseRangeSet(String preparedSql, String[] regexArr, RangeSet<Integer> clauseRangeSet) {
clauseRangeSet.clear();
for (String regex : regexArr) {
......@@ -126,7 +137,7 @@ public class Utils {
while (matcher.find()) {
int start = matcher.start();
int end = matcher.end();
clauseRangeSet.add(Range.closed(start, end));
clauseRangeSet.add(Range.closedOpen(start, end));
}
}
}
......
......@@ -3,6 +3,8 @@ package com.taosdata.jdbc.utils;
import org.junit.Assert;
import org.junit.Test;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class UtilsTest {
......@@ -32,7 +34,7 @@ public class UtilsTest {
}
@Test
public void getNativeSqlReplaceQuestionMarks() {
public void lowerCase() {
// given
String nativeSql = "insert into ?.? (ts, temperature, humidity) using ?.? tags(?,?) values(now, ?, ?)";
Object[] parameters = Stream.of("test", "t1", "test", "weather", "beijing", 1, 12.2, 4).toArray();
......@@ -46,7 +48,7 @@ public class UtilsTest {
}
@Test
public void getNativeSqlReplaceQuestionMarks2() {
public void upperCase() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 123, 3.14, 220, 4).toArray();
......@@ -59,9 +61,49 @@ public class UtilsTest {
Assert.assertEquals(expected, actual);
}
@Test
public void multiValues() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?),(?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4),(200,3.1415,'xyz',5)";
Assert.assertEquals(expected, actual);
}
@Test
public void lineTerminator() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,\r\n?,?),(?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,\r\n'abc',4),(200,3.1415,'xyz',5)";
Assert.assertEquals(expected, actual);
}
@Test
public void lineTerminatorAndMultiValues() {
String nativeSql = "INSERT Into ? TAGS(?) VALUES(?,?,\r\n?,?),(?,? ,\r\n?,?) t? tags (?) Values (?,?,?\r\n,?),(?,?,?,?) t? Tags(?) values (?,?,?,?) , (?,?,?,?)";
Object[] parameters = Stream.of("t1", "abc", 100, 1.1, "xxx", "xxx", 200, 2.2, "xxx", "xxx", 2, "bcd", 300, 3.3, "xxx", "xxx", 400, 4.4, "xxx", "xxx", 3, "cde", 500, 5.5, "xxx", "xxx", 600, 6.6, "xxx", "xxx").toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT Into t1 TAGS('abc') VALUES(100,1.1,\r\n'xxx','xxx'),(200,2.2 ,\r\n'xxx','xxx') t2 tags ('bcd') Values (300,3.3,'xxx'\r\n,'xxx'),(400,4.4,'xxx','xxx') t3 Tags('cde') values (500,5.5,'xxx','xxx') , (600,6.6,'xxx','xxx')";
Assert.assertEquals(expected, actual);
}
@Test
public void getNativeSqlReplaceNothing() {
public void replaceNothing() {
// given
String nativeSql = "insert into test.t1 (ts, temperature, humidity) using test.weather tags('beijing',1) values(now, 12.2, 4)";
......@@ -73,7 +115,7 @@ public class UtilsTest {
}
@Test
public void getNativeSqlReplaceNothing2() {
public void replaceNothing2() {
// given
String nativeSql = "insert into test.t1 (ts, temperature, humidity) using test.weather tags('beijing',1) values(now, 12.2, 4)";
Object[] parameters = Stream.of("test", "t1", "test", "weather", "beijing", 1, 12.2, 4).toArray();
......@@ -86,7 +128,7 @@ public class UtilsTest {
}
@Test
public void getNativeSqlReplaceNothing3() {
public void replaceNothing3() {
// given
String nativeSql = "insert into ?.? (ts, temperature, humidity) using ?.? tags(?,?) values(now, ?, ?)";
......
......@@ -476,15 +476,17 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_DATA_HEAD_SIZE + pObj->rowSize;
char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer;
SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SDataRow trow = (SDataRow)pBlk->data;
tdInitDataRow(trow, pSchema);
SMemRow trow = (SMemRow)pBlk->data;
SDataRow dataRow = (SDataRow)memRowDataBody(trow);
memRowSetType(trow, SMEM_ROW_DATA);
tdInitDataRow(dataRow, pSchema);
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i;
......@@ -500,9 +502,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
memcpy((char *)val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len;
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
tdAppendColVal(dataRow, val, c->type, c->offset);
}
pBlk->dataLen = htonl(dataRowLen(trow));
pBlk->dataLen = htonl(memRowDataTLen(trow));
pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid);
......@@ -511,7 +513,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow);
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow);
pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(pHead->len);
......
......@@ -18,7 +18,7 @@ ELSE ()
ENDIF ()
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync ${LINK_JEMALLOC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lua lz4 balance sync ${LINK_JEMALLOC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosd taos_static)
......
......@@ -40,6 +40,7 @@
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
#include "module.h"
#include "qScript.h"
#include "mnode.h"
#if !defined(_MODULE) || !defined(_TD_LINUX)
......@@ -84,6 +85,7 @@ static SStep tsDnodeSteps[] = {
{"dnode-shell", dnodeInitShell, dnodeCleanupShell},
{"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer},
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
{"dnode-script", scriptEnvPoolInit, scriptEnvPoolCleanup},
};
static SStep tsDnodeCompactSteps[] = {
......@@ -266,7 +268,7 @@ static int32_t dnodeInitStorage() {
return -1;
}
}
//TODO(dengyihao): no need to init here
//TODO(dengyihao): no need to init here
if (dnodeCreateDir(tsMnodeDir) < 0) {
dError("failed to create dir: %s, reason: %s", tsMnodeDir, strerror(errno));
return -1;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册