“ca8b7a2760be58670ad9388fddc99a1f4bb0d88a”上不存在“projects/euske”
提交 d3ef5f28 编写于 作者: S shenglian zhou

Merge remote-tracking branch 'origin/develop' into szhou/hotfix/td-11943

......@@ -456,23 +456,54 @@ pipeline {
nohup taosd >/dev/null &
sleep 10
'''
sh '''
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
node test1970.js
cd ${WKC}/tests/connectorTest/nodejsTest/nanosupport
npm install td2.0-connector > /dev/null 2>&1
node nanosecondTest.js
cd ${WKC}/src/connector/python
export PYTHONPATH=$PWD/
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
pip3 install pytest
pytest tests/
python3 examples/bind-multi.py
python3 examples/bind-row.py
python3 examples/demo.py
python3 examples/insert-lines.py
python3 examples/pep-249.py
python3 examples/query-async.py
python3 examples/query-objectively.py
python3 examples/subscribe-sync.py
python3 examples/subscribe-async.py
'''
sh '''
cd ${WKC}/src/connector/nodejs
npm install
npm run test
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
node test1970.js
cd ${WKC}/tests/connectorTest/nodejsTest/nanosupport
npm install td2.0-connector > /dev/null 2>&1
node nanosecondTest.js
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/src/connector/C#
dotnet test
dotnet run --project src/test/Cases/Cases.csproj
cd ${WKC}/tests/examples/C#
dotnet run --project C#checker/C#checker.csproj
dotnet run --project TDengineTest/TDengineTest.csproj
dotnet run --project schemaless/schemaless.csproj
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo -c /etc/taos
dotnet build -c Release
tree | true
./bin/Release/net5.0/taosdemo -c /etc/taos -y
'''
}
}
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
......
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target
After=network-online.target taosadapter.service
Wants=network-online.target taosadapter.service
[Service]
Type=simple
......
......@@ -3,7 +3,7 @@
# Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os
set -e
#set -x
set -x
# release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
......@@ -414,10 +414,14 @@ fi
if [[ "$httpdBuild" == "true" ]]; then
BUILD_HTTP=true
BUILD_TOOLS=false
else
BUILD_HTTP=false
fi
if [[ "$pagMode" == "full" ]]; then
BUILD_TOOLS=true
else
BUILD_TOOLS=false
fi
# check support cpu type
......@@ -514,15 +518,17 @@ if [ "$osType" != "Darwin" ]; then
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}')
${csudo} ./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}')
${csudo} ./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
else
echo "==========dpkg command not exist, so not release deb package!!!"
fi
ret='0'
......@@ -537,15 +543,17 @@ if [ "$osType" != "Darwin" ]; then
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
if [ -d ${top_dir}/src/kit/taos-tools/packaging/rpm ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/src/kit/taos-tools/packaging/rpm ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}'|sed -e 's/-/_/g')
${csudo} ./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}'|sed -e 's/-/_/g')
${csudo} ./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
else
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
fi
......
......@@ -188,9 +188,6 @@ function update() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -215,9 +212,6 @@ function install() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -189,9 +189,6 @@ function update() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -216,9 +213,6 @@ function install() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -247,9 +247,6 @@ function update_PowerDB() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -275,9 +272,6 @@ function install_PowerDB() {
install_header
install_lib
install_jemalloc
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -189,9 +189,6 @@ function update_prodb() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -216,9 +213,6 @@ function install_prodb() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -193,9 +193,6 @@ function update_tq() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -220,9 +217,6 @@ function install_tq() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -47,7 +47,8 @@ if [ "$osType" != "Darwin" ]; then
else
bin_files="${script_dir}/remove_client.sh \
${script_dir}/set_core.sh \
${script_dir}/get_client.sh ${script_dir}/taosd-dump-cfg.gdb"
${script_dir}/get_client.sh"
#${script_dir}/get_client.sh ${script_dir}/taosd-dump-cfg.gdb"
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
else
......
......@@ -69,15 +69,15 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/jh_taos
cp ${script_dir}/remove_jh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_jh.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/jh_taos
cp ${script_dir}/remove_jh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_jh.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/jhdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/jh_taosdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -69,15 +69,15 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/khclient
cp ${script_dir}/remove_kh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_kh.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/khclient
cp ${script_dir}/remove_kh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_kh.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/khdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/khdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -109,15 +109,15 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${script_dir}/remove_client_power.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${script_dir}/remove_client_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -69,15 +69,15 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/prodbc
cp ${script_dir}/remove_pro.sh ${install_dir}/bin
cp ${script_dir}/remove_client_pro.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/prodbc
cp ${script_dir}/remove_pro.sh ${install_dir}/bin
cp ${script_dir}/remove_client_pro.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/prodemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/prodump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -69,15 +69,15 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/tq
cp ${script_dir}/remove_tq.sh ${install_dir}/bin
cp ${script_dir}/remove_client_tq.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/tq
cp ${script_dir}/remove_tq.sh ${install_dir}/bin
cp ${script_dir}/remove_client_tq.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/tqdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/tqdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
#cp ${script_dir}/taosd-dump-cfg.gdb ${install_dir}/bin
fi
else
cp ${bin_files} ${install_dir}/bin
......
......@@ -256,7 +256,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson);
void tscDequoteAndTrimToken(SStrToken* pToken);
void tscRmEscapeAndTrimToken(SStrToken* pToken);
......@@ -264,7 +264,7 @@ int32_t tscValidateName(SStrToken* pToken, bool escapeEnabled, bool *dbIncluded)
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid);
......@@ -394,6 +394,9 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId);
int8_t jsonType2DbType(double data, int jsonType);
void getJsonKey(SStrToken *t0);
char* cloneCurrentDBName(SSqlObj* pSql);
......
......@@ -115,8 +115,9 @@ typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
uint16_t flen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
uint16_t extendedVarLen;
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
int32_t * boundedColumns; // bound column idx according to schema
SBoundColumn * cols;
SBoundIdxInfo *colIdxInfo;
......@@ -132,7 +133,7 @@ typedef struct {
typedef struct {
uint8_t memRowType; // default is 0, that is SDataRow
uint8_t compareStat; // 0 no need, 1 need compare
TDRowTLenT kvRowInitLen;
int32_t rowSize;
SMemRowInfo *rowInfo;
} SMemRowBuilder;
......@@ -150,8 +151,7 @@ typedef struct {
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
int32_t allNullLen);
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo);
void destroyMemRowBuilder(SMemRowBuilder *pBuilder);
/**
......@@ -453,7 +453,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
......@@ -533,16 +533,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
}
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) {
if (kvLen < (dataLen * KVRatioConvert)) {
memRowSetConvert(row);
}
} else if (kvLen > dataLen) {
memRowSetConvert(row);
}
}
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
memRowSetType(row, memRowType);
if (isDataRowT(memRowType)) {
......@@ -642,8 +632,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str,
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId,
int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) {
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
......@@ -655,26 +644,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &TRUE_VALUE, true, colId, pSchema->type, toffset);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, &FALSE_VALUE, true, colId, pSchema->type, toffset);
} else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
} else {
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
}
......@@ -684,8 +669,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -695,15 +679,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint8_t tmpVal = (uint8_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -713,15 +696,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint8_t tmpVal = (uint8_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -731,15 +713,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
int16_t tmpVal = (int16_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -749,15 +730,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint16_t tmpVal = (uint16_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -767,15 +747,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
int32_t tmpVal = (int32_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -785,15 +764,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint32_t tmpVal = (uint32_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -802,14 +780,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
}
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &iv, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -819,14 +796,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
uint64_t tmpVal = (uint64_t)iv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
......@@ -839,14 +815,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
}
float tmpVal = (float)dv;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
......@@ -857,15 +832,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &dv, true, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
......@@ -873,14 +847,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
char *rowEnd = memRowEnd(row);
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
}
break;
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
......@@ -892,7 +865,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
return tscInvalidOperationMsg(msg, buf, pToken->z);
}
varDataSetLen(rowEnd, output);
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
}
break;
......@@ -901,17 +874,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
if (primaryKey) {
// When building SKVRow primaryKey, we should not skip even with NULL value.
int64_t tmpVal = 0;
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
} else {
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
tdAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
}
} else {
int64_t tmpVal;
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
}
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
tdAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
}
break;
......
......@@ -547,6 +547,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
jniFromNCharToByteArray(env, (char *)row[i], length[i]));
break;
}
case TSDB_DATA_TYPE_JSON: {
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
jniFromNCharToByteArray(env, (char *)row[i], length[i]));
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int precision = taos_result_precision(result);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]), precision);
......
......@@ -85,16 +85,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(dst, type, pField->bytes);
int32_t bytes = pSchema[i].bytes;
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY){
bytes -= VARSTR_HEADER_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
}
}
else if(pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
bytes -= VARSTR_HEADER_SIZE;
bytes = bytes / TSDB_NCHAR_SIZE;
}
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
......@@ -222,7 +221,7 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
return -1;
}
uint8_t type = fields[idx].type;
int32_t length = lengths[idx];
int32_t length = lengths[idx];
switch (type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -248,6 +247,7 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_JSON:
memcpy(result, val, length);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......
......@@ -41,9 +41,8 @@ enum {
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
char *str, char **end);
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
int32_t allNullLen) {
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) {
ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
if (nRows > 0) {
// already init(bind multiple rows by single column)
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
......@@ -51,41 +50,12 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3
}
}
// default compareStat is ROW_COMPARE_NO_NEED
if (nBoundCols == 0) { // file input
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen;
if (isUtilizeKVRow(kvLen, dataLen)) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
float boundRatio = ((float)nBoundCols / (float)nCols);
if (boundRatio < KVRatioKV) {
pBuilder->memRowType = SMEM_ROW_KV;
return TSDB_CODE_SUCCESS;
} else if (boundRatio > KVRatioData) {
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
}
pBuilder->compareStat = ROW_COMPARE_NEED;
if (boundRatio < KVRatioPredict) {
pBuilder->memRowType = SMEM_ROW_KV;
} else {
pBuilder->memRowType = SMEM_ROW_DATA;
}
}
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
if (nRows > 0) {
pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo));
if (pBuilder->rowInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int i = 0; i < nRows; ++i) {
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
}
pBuilder->memRowType = SMEM_ROW_DATA;
}
return TSDB_CODE_SUCCESS;
......@@ -385,6 +355,19 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
}
break;
case TSDB_DATA_TYPE_JSON:
if (pToken->n >= pSchema->bytes) { // reserve 1 byte for select
return tscInvalidOperationMsg(msg, "json tag length too long", pToken->z);
}
if (pToken->type == TK_NULL) {
*(int8_t *)payload = TSDB_DATA_TINYINT_NULL;
} else if (pToken->type != TK_STRING){
tscInvalidOperationMsg(msg, "invalid json data", pToken->z);
} else{
*((int8_t *)payload) = TSDB_DATA_JSON_PLACEHOLDER;
}
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) {
if (primaryKey) {
......@@ -455,8 +438,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
SSchema * schema = tscGetTableSchema(pTableMeta);
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t kvLen = pBuilder->kvRowInitLen;
bool isParseBindParam = false;
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
......@@ -533,8 +514,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int16_t colId = -1;
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset,
colId, &dataLen, &kvLen, pBuilder->compareStat);
int32_t ret =
tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -549,13 +530,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
if (!isParseBindParam) {
// 2. check and set convert flag
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
checkAndConvertMemRow(row, dataLen, kvLen);
}
// 3. set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) {
SDataRow dataRow = memRowDataBody(row);
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) {
......@@ -565,7 +541,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
}
*len = getExtendedRowSize(pDataBlocks);
*len = pBuilder->rowSize;
return TSDB_CODE_SUCCESS;
}
......@@ -618,11 +594,11 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
if (TSDB_CODE_SUCCESS !=
(code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound,
pDataBlock->boundColumnInfo.allNullLen))) {
if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) {
return code;
}
pDataBlock->rowBuilder.rowSize = extendedRowSize;
while (1) {
index = 0;
sToken = tStrGetToken(*str, &index, false);
......@@ -701,6 +677,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
pColInfo->boundedColumns[i] = i;
}
pColInfo->allNullLen += pColInfo->flen;
pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
}
......@@ -1094,9 +1071,26 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return code;
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal, false);
if(pSchema->type == TSDB_DATA_TYPE_JSON){
assert(spd.numOfBound == 1);
if(sToken.n > TSDB_MAX_JSON_TAGS_LEN/TSDB_NCHAR_SIZE){
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return tscSQLSyntaxErrMsg(pInsertParam->msg, "json tag too long", NULL);
}
char* json = strndup(sToken.z, sToken.n);
code = parseJsontoTagData(json, &kvRowBuilder, pInsertParam->msg, pTagSchema[spd.boundedColumns[0]].colId);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
tfree(json);
return code;
}
tfree(json);
}
}
tscDestroyBoundColumnInfo(&spd);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
......@@ -1110,7 +1104,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (pInsertParam->tagData.dataLen <= 0){
return tscSQLSyntaxErrMsg(pInsertParam->msg, "tag value expected", NULL);
}
char* pTag = realloc(pInsertParam->tagData.data, pInsertParam->tagData.dataLen);
if (pTag == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1224,6 +1218,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
int32_t nCols = pColInfo->numOfCols;
pColInfo->numOfBound = 0;
pColInfo->boundNullLen = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
for (int32_t i = 0; i < nCols; ++i) {
pColInfo->cols[i].valStat = VAL_STAT_NONE;
......@@ -1281,6 +1276,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound;
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
}
findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) {
isOrdered = false;
......@@ -1304,6 +1310,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound;
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
}
findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) {
isOrdered = false;
......@@ -1754,13 +1771,18 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows);
int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock);
tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows);
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
if (tokenBuf == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
// insert from .csv means full and ordered columns, thus use SDataRow all the time
ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType);
pTableDataBlock->rowBuilder.rowSize = extendedRowSize;
while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
......
......@@ -887,7 +887,7 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
TAOS_RES* res = taos_query(taos, sql);
free(sql);
code = taos_errno(res);
info->affectedRows = taos_affected_rows(res);
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
return code;
}
......@@ -1303,14 +1303,6 @@ clean_up:
return code;
}
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
int code = tscSmlInsert(taos, points, numPoint, info);
free(info);
return code;
}
//=========================================================================
/* Field Escape charaters
......
......@@ -1533,6 +1533,41 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO
return TSDB_CODE_SUCCESS;
}
int32_t stmtValidateValuesFields(SSqlCmd *pCmd, char * sql) {
int32_t loopCont = 1, index0 = 0, values = 0;
SStrToken sToken;
while (loopCont) {
sToken = tStrGetToken(sql, &index0, false);
if (sToken.n <= 0) {
return TSDB_CODE_SUCCESS;
}
switch (sToken.type) {
case TK_RP:
if (values) {
return TSDB_CODE_SUCCESS;
}
break;
case TK_VALUES:
values = 1;
break;
case TK_QUESTION:
case TK_LP:
break;
default:
if (values) {
tscError("only ? allowed in values");
return tscInvalidOperationMsg(pCmd->payload, "only ? allowed in values", NULL);
}
break;
}
}
return TSDB_CODE_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////
// interface functions
......@@ -1637,6 +1672,11 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_RET(ret);
}
ret = stmtValidateValuesFields(&pSql->cmd, pSql->sqlstr);
if (ret != TSDB_CODE_SUCCESS) {
STMT_RET(ret);
}
if (pStmt->multiTbInsert) {
STMT_RET(TSDB_CODE_SUCCESS);
}
......
此差异已折叠。
......@@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
......@@ -832,7 +832,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (validateColumn && !tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
if (validateColumn && !tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
tscError("0x%"PRIx64" table schema is not matched with parsed sql", id);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SArray* queryOperator = createExecOperatorPlan(&query);
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = query.numOfTags;
......@@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen;
/*
//MSG EXTEND DEMO
pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_DUMMY);
tlv->len = htonl(sizeof(int16_t));
*(int16_t *)tlv->value = htons(12345);
pMsg += sizeof(*tlv) + ntohl(tlv->len);
tlv = (STLV *)pMsg;
tlv->len = 0;
pMsg += sizeof(*tlv);
*/
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen);
......@@ -1492,7 +1510,8 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateTableSql *pCreateTableInfo = pInfo->pCreateTableInfo;
if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
int32_t numOfTables = (int32_t)taosArrayGetSize(pInfo->pCreateTableInfo->childTableInfo);
size += numOfTables * (sizeof(SCreateTableMsg) + TSDB_MAX_TAGS_LEN);
size += numOfTables * (sizeof(SCreateTableMsg) +
((TSDB_MAX_TAGS_LEN > TSDB_MAX_JSON_TAGS_LEN)?TSDB_MAX_TAGS_LEN:TSDB_MAX_JSON_TAGS_LEN));
} else {
size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
}
......@@ -1614,9 +1633,6 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
}
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
......@@ -1645,14 +1661,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSchema++;
}
pMsg = (char *)pSchema;
pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
if (pAlterInfo->tagData.dataLen > 0) {
memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
}
pMsg += pAlterInfo->tagData.dataLen;
msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
int msgLen = sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
......@@ -1854,7 +1863,9 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo, pSql->self, true);
bool convertJson = true;
if (pQueryInfo->isStddev == true) convertJson = false;
convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......@@ -2813,7 +2824,11 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscAddQueryInfo(&pNew->cmd);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
int payLoadLen = TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen;
if (autocreate && pSql->cmd.insertParam.tagData.dataLen != 0) {
payLoadLen += pSql->cmd.insertParam.tagData.dataLen;
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, payLoadLen)) {
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
tscFreeSqlObj(pNew);
......
......@@ -445,7 +445,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
// revise the length for binary and nchar fields
if (f[j].type == TSDB_DATA_TYPE_BINARY) {
f[j].bytes -= VARSTR_HEADER_SIZE;
} else if (f[j].type == TSDB_DATA_TYPE_NCHAR) {
} else if (f[j].type == TSDB_DATA_TYPE_NCHAR || f[j].type == TSDB_DATA_TYPE_JSON) {
f[j].bytes = (f[j].bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
}
......
......@@ -227,6 +227,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
if (skipped) {
slot = 0;
stackidx = 0;
tVariantDestroy(&tag);
continue;
}
......@@ -334,6 +335,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
}
if (mergeDone) {
tVariantDestroy(&tag);
break;
}
......@@ -341,6 +343,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
stackidx = 0;
skipRemainValue(mainCtx->p->pTSBuf, &tag);
tVariantDestroy(&tag);
}
stackidx = 0;
......@@ -633,16 +636,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the join condition tag column info, todo extract method
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
pExpr->base.numOfParams = 0; // the value is 0 by default. just make sure.
// add json tag key, if there is no json tag key, just hold place.
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
// set the tag column id for executor to extract correct tag value
tVariant* pVariant = &pExpr->base.param[0];
tVariant* pVariant = &pExpr->base.param[pExpr->base.numOfParams];
pVariant->i64 = colId;
pVariant->nType = TSDB_DATA_TYPE_BIGINT;
pVariant->nLen = sizeof(int64_t);
pExpr->base.numOfParams = 1;
pExpr->base.numOfParams++;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
......@@ -729,8 +737,15 @@ int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
__compar_fn_t func = getComparFunc(t1->padding, 0);
if (t1->padding == TSDB_DATA_TYPE_JSON){
bool canReturn = true;
int32_t result = jsonCompareUnit(t1->tag, t2->tag, &canReturn);
if(canReturn) return result;
__compar_fn_t func = getComparFunc(t1->tag[0], 0);
return func(t1->tag + CHAR_BYTES, t2->tag + CHAR_BYTES);
}
__compar_fn_t func = getComparFunc(t1->padding, 0);
return func(t1->tag, t2->tag);
}
......@@ -821,16 +836,21 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
SExprInfo *pExpr = tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SExprInfo *pExpr = tscExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[0].i64 = tagColId;
pExpr->base.param[0].nLen = sizeof(int64_t);
pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.numOfParams = 1;
pExpr->base.numOfParams = 0; // the value is 0 by default. just make sure.
// add json tag key, if there is no json tag key, just hold place.
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pSupporter->tagCond.joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pSupporter->tagCond.joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[pExpr->base.numOfParams].i64 = tagColId;
pExpr->base.param[pExpr->base.numOfParams].nLen = sizeof(int64_t);
pExpr->base.param[pExpr->base.numOfParams].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.numOfParams++;
}
// add the filter tag column
......@@ -2012,7 +2032,12 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
SExprInfo* pExpr = tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
if(strlen(pTagCond->joinInfo.joinTables[0]->tagJsonKeyName) > 0){
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pTagCond->joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pTagCond->joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
}
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
......@@ -2025,15 +2050,6 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function
SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param->i64 = tagColId;
pExpr->base.numOfParams = 1;
}
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
......@@ -2427,6 +2443,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
tscInitQueryInfo(pNewQueryInfo);
pNewQueryInfo->isStddev = true; // for json tag
// add the group cond
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
......@@ -2490,6 +2507,10 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
}
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG, getNewResColId(pCmd));
if (schema->type == TSDB_DATA_TYPE_JSON){
p->base.numOfParams = pExpr->base.numOfParams;
tVariantAssign(&p->base.param[0], &pExpr->base.param[0]);
}
p->base.resColId = pExpr->base.resColId;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
......@@ -3681,7 +3702,9 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
if (pQueryInfo->isStddev && type == TSDB_DATA_TYPE_JSON){ // for json tag compare in the second round of stddev
pRes->tsrow[j] = pRes->urow[i];
}else if (!IS_VAR_DATA_TYPE(type) && type != TSDB_DATA_TYPE_JSON) {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
} else {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
......
......@@ -29,6 +29,7 @@
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "cJSON.h"
#ifdef HTTP_EMBEDDED
#include "httpInt.h"
......@@ -109,6 +110,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
n = bufSize + escapeSize;
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
......@@ -734,34 +736,33 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar) {
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar, bool convertJson) {
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
if (pInfo->pExpr->base.param[0].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes);
assert(pInfo->pExpr->base.param[0].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[1].nLen);
memcpy(varDataVal(p), pInfo->pExpr->base.param[0].pz, pInfo->pExpr->base.param[0].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[0].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes);
memcpy(p, &pInfo->pExpr->base.param[0].i64, pInfo->field.bytes);
}
}
}
} else if (convertNchar && pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
} else if (convertNchar && (pInfo->field.type == TSDB_DATA_TYPE_NCHAR)) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if(buffer == NULL)
return ;
if (buffer == NULL) return;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
......@@ -785,8 +786,63 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else if (pInfo->field.type == TSDB_DATA_TYPE_JSON) {
if (convertJson){
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if (buffer == NULL) return;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
char type = *p;
char* realData = p + CHAR_BYTES;
if (type == TSDB_DATA_TYPE_JSON && isNull(realData, TSDB_DATA_TYPE_JSON)) {
memcpy(dst, realData, varDataTLen(realData));
} else if (type == TSDB_DATA_TYPE_BINARY) {
assert(*(uint32_t*)varDataVal(realData) == TSDB_DATA_JSON_null); // json null value
assert(varDataLen(realData) == INT_BYTES);
sprintf(varDataVal(dst), "%s", "null");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_JSON) {
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), POINTER_SHIFT(varDataVal(dst), CHAR_BYTES));
*(char*)(POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES)) = '\"';
varDataSetLen(dst, length + CHAR_BYTES*2);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(realData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char *)realData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else {
assert(0);
}
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else{
// if convertJson is false, json data as raw data used for stddev for the second round
}
}
if (convertNchar) {
......@@ -794,6 +850,10 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
}
}
void tscJson2String(char *src, char* dst){
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) {
......@@ -807,11 +867,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true);
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true, true);
}
}
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar) {
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson) {
assert(pRes->numOfCols > 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -822,7 +882,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
pRes->urow[i] = pColData->pData;
pRes->length[i] = pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, convertNchar);
setResRawPtrImpl(pRes, pInfo, i, convertNchar, convertJson);
/*
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.ColName.flag)) {
......@@ -878,17 +938,6 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
}
}
static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
int32_t numOfCols = (int32_t) taosArrayGetSize(pTableCols);
SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pTableCols, i);
pColInfo[i] = pCol->info;//[index].type;
}
return pColInfo;
}
typedef struct SDummyInputInfo {
SSDataBlock *block;
STableQueryInfo *pTableQueryInfo;
......@@ -1278,14 +1327,14 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar) {
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar, convertJson);
}
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
......@@ -1319,8 +1368,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
// handle the following query process
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
STableMeta* pTableMeta = tscGetMetaInfo(px, 0)->pTableMeta;
SSchema* pSchema = tscGetTableSchema(pTableMeta);
......@@ -1435,7 +1482,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
px->pQInfo->runtimeEnv.udfIsCopy = true;
px->pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
tfree(pColumnInfo);
tfree(schema);
// set the pRuntimeEnv for pSourceOperator
......@@ -1444,7 +1490,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px, pSql->self, false);
convertQueryResult(pOutput, px, pSql->self, false, false);
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
......@@ -2020,18 +2066,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
char* payload = (blkKeyTuple + i)->payloadAddr;
if (isNeedConvertRow(payload)) {
convertSMemRow(pDataBlock, payload, pTableDataBlock);
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
} else {
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
char* payload = (blkKeyTuple + i)->payloadAddr;
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
}
......@@ -3095,12 +3134,12 @@ void tscIncStreamExecutionCount(void* pStream) {
ps->num += 1;
}
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams) {
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
if (pTableMetaInfo->pTableMeta == NULL) {
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId <= TSDB_UD_COLUMN_INDEX) {
return true;
}
......@@ -4923,16 +4962,17 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI
pse->colInfo.colId = pSource->base.colInfo.colId;
pse->colType = pSource->base.colType;
pse->colBytes = pSource->base.colBytes;
pse->resBytes = sizeof(double);
pse->resType = TSDB_DATA_TYPE_DOUBLE;
pse->functionId = pSource->base.functionId;
pse->numOfParams = pSource->base.numOfParams;
for (int32_t j = 0; j < pSource->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pSource->base.param[j]);
buildArithmeticExprFromMsg(px, NULL);
buildScalarExprFromMsg(px, NULL);
}
pse->resBytes = px->pExpr->resultBytes;
pse->resType = px->pExpr->resultType;
}
}
......@@ -5127,7 +5167,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SCALAR_EXPR) {
for (int32_t j = 0; j < pQueryAttr->pExpr1[i].base.numOfParams; ++j) {
buildArithmeticExprFromMsg(&pQueryAttr->pExpr1[i], NULL);
buildScalarExprFromMsg(&pQueryAttr->pExpr1[i], NULL);
}
}
}
......@@ -5399,3 +5439,152 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
return p;
}
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId){
// set json NULL data
uint8_t nullTypeVal[CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES] = {0};
uint32_t jsonNULL = TSDB_DATA_JSON_NULL;
int jsonIndex = startColId + 1;
char nullTypeKey[VARSTR_HEADER_SIZE + INT_BYTES] = {0};
varDataSetLen(nullTypeKey, INT_BYTES);
nullTypeVal[0] = TSDB_DATA_TYPE_JSON;
varDataSetLen(nullTypeVal + CHAR_BYTES, INT_BYTES);
*(uint32_t*)(varDataVal(nullTypeKey)) = jsonNULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeKey, false); // add json null type
if (strtrim(json) == 0 || strcasecmp(json, "null") == 0){
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeVal, true); // add json null value
return TSDB_CODE_SUCCESS;
}
int32_t jsonNotNull = TSDB_DATA_JSON_NOT_NULL;
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNotNull;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeVal, true); // add json type
// set json real data
cJSON *root = cJSON_Parse(json);
if (root == NULL){
tscError("json parse error");
return tscSQLSyntaxErrMsg(errMsg, "json parse error", NULL);
}
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root)){
tscError("json error invalide value");
return tscSQLSyntaxErrMsg(errMsg, "json error invalide value", NULL);
}
int retCode = 0;
SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
for(int i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) {
tscError("json inner error:%d", i);
retCode = tscSQLSyntaxErrMsg(errMsg, "json inner error", NULL);
goto end;
}
char *jsonKey = item->string;
if(!isValidateTag(jsonKey)){
tscError("json key not validate");
retCode = tscSQLSyntaxErrMsg(errMsg, "json key not validate", NULL);
goto end;
}
if(strlen(jsonKey) > TSDB_MAX_JSON_KEY_LEN){
tscError("json key too long error");
retCode = tscSQLSyntaxErrMsg(errMsg, "json key too long, more than 256", NULL);
goto end;
}
if(strlen(jsonKey) == 0 || taosHashGet(keyHash, jsonKey, strlen(jsonKey)) != NULL){
continue;
}
// json key encode by binary
char tagKey[TSDB_MAX_JSON_KEY_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(tagKey), jsonKey, strlen(jsonKey));
int32_t outLen = (int32_t)strlen(jsonKey);
taosHashPut(keyHash, jsonKey, outLen, &outLen, CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
varDataSetLen(tagKey, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagKey, false); // add json key
if(item->type == cJSON_String){ // add json value format: type|data
char *jsonValue = item->valuestring;
outLen = 0;
char tagVal[TSDB_MAX_JSON_TAGS_LEN] = {0};
*tagVal = jsonType2DbType(0, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if (!taosMbsToUcs4(jsonValue, strlen(jsonValue), varDataVal(tagData),
TSDB_MAX_JSON_TAGS_LEN - CHAR_BYTES - VARSTR_HEADER_SIZE, &outLen)) {
tscError("json string error:%s|%s", strerror(errno), jsonValue);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
varDataSetLen(tagData, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, true);
}else if(item->type == cJSON_Number){
char tagVal[LONG_BYTES + CHAR_BYTES] = {0};
*tagVal = jsonType2DbType(item->valuedouble, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if(*tagVal == TSDB_DATA_TYPE_DOUBLE) *((double *)tagData) = item->valuedouble;
else if(*tagVal == TSDB_DATA_TYPE_BIGINT) *((int64_t *)tagData) = item->valueint;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal, true);
}else if(item->type == cJSON_True || item->type == cJSON_False){
char tagVal[CHAR_BYTES + CHAR_BYTES] = {0};
*tagVal = jsonType2DbType((double)(item->valueint), item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
*tagData = (char)(item->valueint);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BOOL, tagVal, true);
}else if(item->type == cJSON_NULL){
char tagVal[CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES] = {0};
*tagVal = jsonType2DbType(0, item->type); // type
int32_t* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
varDataSetLen(tagData, INT_BYTES);
*(uint32_t*)(varDataVal(tagData)) = TSDB_DATA_JSON_null;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BINARY, tagVal, true);
}
else{
retCode = tscSQLSyntaxErrMsg(errMsg, "invalidate json value", NULL);
goto end;
}
}
if(taosHashGetSize(keyHash) == 0){ // set json NULL true
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNULL;
memcpy(POINTER_SHIFT(kvRowBuilder->buf, kvRowBuilder->pColIdx[2].offset), nullTypeVal, CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES);
}
end:
taosHashCleanup(keyHash);
cJSON_Delete(root);
return retCode;
}
int8_t jsonType2DbType(double data, int jsonType){
switch(jsonType){
case cJSON_Number:
if (data - (int64_t)data > 0) return TSDB_DATA_TYPE_DOUBLE; else return TSDB_DATA_TYPE_BIGINT;
case cJSON_String:
return TSDB_DATA_TYPE_NCHAR;
case cJSON_NULL:
return TSDB_DATA_TYPE_BINARY;
case cJSON_True:
case cJSON_False:
return TSDB_DATA_TYPE_BOOL;
}
return TSDB_DATA_TYPE_NULL;
}
// get key from json->'key'
void getJsonKey(SStrToken *t0){
while(true){
t0->n = tGetToken(t0->z, &t0->type);
if (t0->type == TK_STRING){
t0->z++;
t0->n -= 2;
break;
}else if (t0->type == TK_ILLEGAL){
assert(0);
}
t0->z += t0->n;
}
}
......@@ -547,7 +547,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value, bool isJumpJsonVType) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
......@@ -560,9 +560,14 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
pBuilder->nCols++;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
char* jumpType = (char*)value;
if(isJumpJsonVType) jumpType += CHAR_BYTES;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(jumpType) : TYPE_BYTES[type];
if(isJumpJsonVType) tlen += CHAR_BYTES; // add type size
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
assert(pBuilder->alloc > 0);
pBuilder->alloc *= 2;
}
void* buf = realloc(pBuilder->buf, pBuilder->alloc);
......@@ -609,22 +614,17 @@ typedef void *SMemRow;
#define SMEM_ROW_DATA 0x0U // SDataRow
#define SMEM_ROW_KV 0x01U // SKVRow
#define SMEM_ROW_CONVERT 0x80U // SMemRow convert flag
#define KVRatioKV (0.2f) // all bool
#define KVRatioPredict (0.4f)
#define KVRatioData (0.75f) // all bigint
#define KVRatioConvert (0.9f)
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
#define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory
#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit
#define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01))
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT)
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \
......
......@@ -42,6 +42,7 @@ extern int8_t tsArbOnline;
extern int64_t tsArbOnlineTimestamp;
extern int32_t tsDnodeId;
extern int64_t tsDnodeStartTime;
extern int8_t tsDnodeNopLoop;
// common
extern int tsRpcTimer;
......
......@@ -25,7 +25,7 @@ extern "C" {
// variant, each number/string/field_id has a corresponding struct during parsing sql
typedef struct tVariant {
uint32_t nType;
int32_t nType; // change uint to int, because in tVariantCreate() pVar->nType = -1; // -1 means error type
int32_t nLen; // only used for string, for number, it is useless
union {
int64_t i64;
......
......@@ -247,6 +247,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
doExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
tfree(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
tfree(pNode->pSchema);
} else if (pNode->nodeType == TSQL_NODE_FUNC) {
......@@ -255,7 +256,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
tfree(pNode->pType);
}
free(pNode);
tfree(pNode);
}
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
......@@ -272,7 +273,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
tfree((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
} else if ((*pExpr)->nodeType == TSQL_NODE_FUNC) {
......@@ -284,7 +285,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
tfree((*pExpr)->pType);
}
free(*pExpr);
tfree(*pExpr);
*pExpr = NULL;
}
......@@ -502,7 +503,7 @@ static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
tVariant* pVal = expr->pVal;
tbufWriteUint32(bw, pVal->nType);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
if (pVal->nType == TSDB_DATA_TYPE_BINARY || pVal->nType == TSDB_DATA_TYPE_NCHAR) {
tbufWriteInt32(bw, pVal->nLen);
tbufWrite(bw, pVal->pz, pVal->nLen);
} else {
......@@ -571,7 +572,7 @@ static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
pExpr->pVal = pVal;
pVal->nType = tbufReadUint32(br);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
if (pVal->nType == TSDB_DATA_TYPE_BINARY || pVal->nType == TSDB_DATA_TYPE_NCHAR) {
tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen));
pVal->pz = calloc(1, pVal->nLen + 1);
tbufReadToBuffer(br, pVal->pz, pVal->nLen);
......
......@@ -47,6 +47,7 @@ int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME;
char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0;
int64_t tsDnodeStartTime = 0;
int8_t tsDnodeNopLoop = 0;
// common
int32_t tsRpcTimer = 300;
......@@ -622,6 +623,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "dnodeNopLoop";
cfg.ptr = &tsDnodeNopLoop;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "balance";
cfg.ptr = &tsEnableBalance;
cfg.valType = TAOS_CFG_VTYPE_INT8;
......
......@@ -4,6 +4,7 @@
#include "tname.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tglobal.h"
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
......@@ -251,6 +252,9 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
int32_t rowLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
if (pSchema[i].type == TSDB_DATA_TYPE_JSON && numOfCols != 1){
return false;
}
// 1. valid types
if (!isValidDataType(pSchema[i].type)) {
return false;
......@@ -301,8 +305,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
return false;
}
int32_t maxTagLen = TSDB_MAX_TAGS_LEN;
if (numOfTags == 1 && pSchema[numOfCols].type == TSDB_DATA_TYPE_JSON){
maxTagLen = TSDB_MAX_JSON_TAGS_LEN;
}
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, maxTagLen)) {
return false;
}
......
......@@ -18,7 +18,7 @@
#include "ttokendef.h"
#include "tscompression.h"
const int32_t TYPE_BYTES[15] = {
const int32_t TYPE_BYTES[16] = {
-1, // TSDB_DATA_TYPE_NULL
sizeof(int8_t), // TSDB_DATA_TYPE_BOOL
sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT
......@@ -34,6 +34,7 @@ const int32_t TYPE_BYTES[15] = {
sizeof(uint16_t), // TSDB_DATA_TYPE_USMALLINT
sizeof(uint32_t), // TSDB_DATA_TYPE_UINT
sizeof(uint64_t), // TSDB_DATA_TYPE_UBIGINT
sizeof(int8_t), // TSDB_DATA_TYPE_JSON
};
#define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \
......@@ -367,8 +368,8 @@ static void getStatics_nchr(const void *pData, int32_t numOfRow, int64_t *min, i
*maxIndex = 0;
}
tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
tDataTypeDescriptor tDataTypes[16] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool, tsDecompressBool, getStatics_bool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
......@@ -376,13 +377,14 @@ tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_u16},
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_u64},
{TSDB_DATA_TYPE_JSON,4, TSDB_MAX_JSON_TAGS_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
};
char tTokenTypeSwitcher[13] = {
......@@ -428,7 +430,7 @@ FORCE_INLINE void* getDataMax(int32_t type) {
bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT;
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_JSON;
}
void setVardataNull(void* val, int32_t type) {
......@@ -438,6 +440,9 @@ void setVardataNull(void* val, int32_t type) {
} else if (type == TSDB_DATA_TYPE_NCHAR) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t*) varDataVal(val) = TSDB_DATA_NCHAR_NULL;
} else if (type == TSDB_DATA_TYPE_JSON) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t*) varDataVal(val) = TSDB_DATA_JSON_NULL;
} else {
assert(0);
}
......@@ -505,6 +510,7 @@ void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems) {
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_JSON:
for (int32_t i = 0; i < numOfElems; ++i) {
setVardataNull(POINTER_SHIFT(val, i * bytes), type);
}
......
......@@ -158,7 +158,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
pVar->dKey = GET_FLOAT_VAL(pz);
break;
}
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
case TSDB_DATA_TYPE_NCHAR:{ // here we get the nchar length from raw binary bits length
size_t lenInwchar = len / TSDB_NCHAR_SIZE;
pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
......@@ -167,7 +167,13 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
break;
}
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
case TSDB_DATA_TYPE_JSON:{
pVar->pz = calloc(len + 2, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
break;
}
case TSDB_DATA_TYPE_BINARY:{
pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
......@@ -185,7 +191,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
void tVariantDestroy(tVariant *pVar) {
if (pVar == NULL) return;
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) {
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR || pVar->nType == TSDB_DATA_TYPE_JSON) {
tfree(pVar->pz);
pVar->nLen = 0;
}
......@@ -210,11 +216,41 @@ bool tVariantIsValid(tVariant *pVar) {
return isValidDataType(pVar->nType);
}
bool tVariantTypeMatch(tVariant *pVar, int8_t dbType){
switch (dbType) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
if(pVar->nType != TSDB_DATA_TYPE_BINARY && pVar->nType != TSDB_DATA_TYPE_NCHAR){
return false;
}
break;
}
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:{
if(pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR){
return false;
}
break;
}
}
return true;
}
void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return;
pDst->nType = pSrc->nType;
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR || pSrc->nType == TSDB_DATA_TYPE_JSON) {
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
char* p = realloc(pDst->pz, len);
assert(p);
......@@ -249,7 +285,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
}
}
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) {
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY && isValidDataType(pDst->nType)) { // if pDst->nType=-1, core dump. eg: where intcolumn=999999999999999999999999999
pDst->nLen = tDataTypes[pDst->nType].bytes;
}
}
......@@ -267,7 +303,7 @@ int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) {
return 1;
}
if (p1->nType == TSDB_DATA_TYPE_BINARY || p1->nType == TSDB_DATA_TYPE_NCHAR) {
if (p1->nType == TSDB_DATA_TYPE_BINARY || p1->nType == TSDB_DATA_TYPE_NCHAR || p1->nType == TSDB_DATA_TYPE_JSON) {
if (p1->nLen == p2->nLen) {
return memcmp(p1->pz, p2->pz, p1->nLen);
} else {
......@@ -815,7 +851,7 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
break;
}
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:{
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*(uint8_t*) payload = TSDB_DATA_BINARY_NULL;
......@@ -852,7 +888,7 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:{
int32_t newlen = 0;
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
......@@ -888,6 +924,16 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
break;
}
case TSDB_DATA_TYPE_JSON:{
if (pVariant->nType == TSDB_DATA_TYPE_BINARY){
*((int8_t *)payload) = TSDB_DATA_JSON_PLACEHOLDER;
} else if (pVariant->nType == TSDB_DATA_TYPE_JSON){ // select * from stable, set tag type to json,from setTagValue/tag_project_function
memcpy(payload, pVariant->pz, pVariant->nLen);
}else {
return -1;
}
break;
}
}
return 0;
......
......@@ -3,6 +3,7 @@ using TDengineDriver;
using System.Runtime.InteropServices;
using System.Text;
using System.Collections.Generic;
namespace Test.UtilsTools
{
public class UtilsTools
......@@ -163,4 +164,4 @@ namespace Test.UtilsTools
System.Environment.Exit(0);
}
}
}
\ No newline at end of file
}
......@@ -11,6 +11,11 @@ import java.util.Map;
public abstract class AbstractResultSet extends WrapperImpl implements ResultSet {
private int fetchSize;
protected boolean wasNull;
protected int timestampPrecision;
public void setTimestampPrecision(int timestampPrecision) {
this.timestampPrecision = timestampPrecision;
}
protected void checkAvailability(int columnIndex, int bounds) throws SQLException {
if (isClosed())
......
package com.taosdata.jdbc;
import java.sql.*;
public class AbstractStatementWrapper extends AbstractStatement{
protected Statement statement;
public AbstractStatementWrapper(Statement statement) {
this.statement = statement;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return statement.executeQuery(sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return statement.executeUpdate(sql);
}
@Override
public void close() throws SQLException {
statement.close();
}
@Override
public boolean execute(String sql) throws SQLException {
return statement.execute(sql);
}
@Override
public ResultSet getResultSet() throws SQLException {
return statement.getResultSet();
}
@Override
public int getUpdateCount() throws SQLException {
return statement.getUpdateCount();
}
@Override
public Connection getConnection() throws SQLException {
return statement.getConnection();
}
@Override
public boolean isClosed() throws SQLException {
return statement.isClosed();
}
}
......@@ -6,30 +6,30 @@ import com.taosdata.jdbc.rs.RestfulConnection;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
/**
* this class is an extension of {@link Statement}. e.g.:
* Statement statement = conn.createStatement();
* SchemalessStatement schemalessStatement = new SchemalessStatement(statement);
* schemalessStatement.execute(sql);
* schemalessStatement.insert(lines, SchemalessProtocolType, SchemalessTimestampType);
* This class is for schemaless lines(line/telnet/json) write to tdengine.
* e.g.:
* SchemalessWriter writer = new SchemalessWriter(connection);
* writer.write(lines, SchemalessProtocolType, SchemalessTimestampType);
*/
public class SchemalessStatement extends AbstractStatementWrapper {
public SchemalessStatement(Statement statement) {
super(statement);
public class SchemalessWriter {
protected Connection connection;
public SchemalessWriter(Connection connection) {
this.connection = connection;
}
/**
* batch insert schemaless lines
* batch schemaless lines write to db
*
* @param lines schemaless lines
* @param protocolType schemaless type {@link SchemalessProtocolType}
* @param timestampType Time precision {@link SchemalessTimestampType}
* @throws SQLException execute insert exception
* @throws SQLException execute exception
*/
public void insert(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
Connection connection = this.getConnection();
public void write(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
if (connection instanceof TSDBConnection) {
TSDBConnection tsdbConnection = (TSDBConnection) connection;
tsdbConnection.getConnector().insertLines(lines, protocolType, timestampType);
......@@ -41,14 +41,27 @@ public class SchemalessStatement extends AbstractStatementWrapper {
}
/**
* only one insert
* only one line writes to db
*
* @param line schemaless line
* @param protocolType schemaless type {@link SchemalessProtocolType}
* @param timestampType Time precision {@link SchemalessTimestampType}
* @throws SQLException execute insert exception
* @throws SQLException execute exception
*/
public void write(String line, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
write(new String[]{line}, protocolType, timestampType);
}
/**
* batch schemaless lines write to db with list
*
* @param lines schemaless list
* @param protocolType schemaless type {@link SchemalessProtocolType}
* @param timestampType Time precision {@link SchemalessTimestampType}
* @throws SQLException execute exception
*/
public void insert(String line, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
insert(new String[]{line}, protocolType, timestampType);
public void write(List<String> lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
String[] strings = lines.toArray(new String[0]);
write(strings, protocolType, timestampType);
}
}
......@@ -74,9 +74,8 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
public boolean next() throws SQLException {
if (this.getBatchFetch()) {
if (this.blockData.forward()) {
if (this.blockData.forward())
return true;
}
int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
this.blockData.reset();
......@@ -214,7 +213,18 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
if (!lastWasNull) {
Object value = this.rowData.getObject(columnIndex);
if (value instanceof Timestamp) {
res = ((Timestamp) value).getTime();
Timestamp ts = (Timestamp) value;
long epochSec = ts.getTime() / 1000;
long nanoAdjustment = ts.getNanos();
switch (this.timestampPrecision) {
case 0:
default: // ms
return ts.getTime();
case 1: // us
return epochSec * 1000_000L + nanoAdjustment / 1000L;
case 2: // ns
return epochSec * 1000_000_000L + nanoAdjustment;
}
} else {
int nativeType = this.columnMetaDataList.get(columnIndex - 1).getColType();
res = this.rowData.getLong(columnIndex, nativeType);
......
......@@ -47,6 +47,8 @@ public class TSDBStatement extends AbstractStatement {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY);
}
TSDBResultSet res = new TSDBResultSet(this, this.connection.getConnector(), pSql);
int timestampPrecision = this.connection.getConnector().getResultTimePrecision(pSql);
res.setTimestampPrecision(timestampPrecision);
res.setBatchFetch(this.connection.getBatchFetch());
return res;
}
......
package com.taosdata.jdbc.enums;
public enum TimestampPrecision {
MS,
US,
NS,
UNKNOWN
public class TimestampPrecision {
public static final int MS = 0;
public static final int US = 1;
public static final int NS = 2;
}
......@@ -168,11 +168,22 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case TIMESTAMP: {
Long value = row.getLong(colIndex);
//TODO: this implementation has bug if the timestamp bigger than 9999_9999_9999_9
if (value < 1_0000_0000_0000_0L)
if (value < 1_0000_0000_0000_0L) {
this.timestampPrecision = TimestampPrecision.MS;
return new Timestamp(value);
long epochSec = value / 1000_000L;
long nanoAdjustment = value % 1000_000L * 1000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
if (value >= 1_0000_0000_0000_0L && value < 1_000_000_000_000_000_0l) {
this.timestampPrecision = TimestampPrecision.US;
long epochSec = value / 1000_000L;
long nanoAdjustment = value % 1000_000L * 1000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
if (value >= 1_000_000_000_000_000_0l) {
this.timestampPrecision = TimestampPrecision.NS;
long epochSec = value / 1000_000_000L;
long nanoAdjustment = value % 1000_000_000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
}
case UTC: {
String value = row.getString(colIndex);
......@@ -182,12 +193,15 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
if (value.length() > 31) {
// ns timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSSSSS+0x00
nanoAdjustment = fractionalSec;
this.timestampPrecision = TimestampPrecision.NS;
} else if (value.length() > 28) {
// ms timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSS+0x00
nanoAdjustment = fractionalSec * 1000L;
this.timestampPrecision = TimestampPrecision.US;
} else {
// ms timestamp: yyyy-MM-ddTHH:mm:ss.SSS+0x00
nanoAdjustment = fractionalSec * 1000_000L;
this.timestampPrecision = TimestampPrecision.MS;
}
ZoneOffset zoneOffset = ZoneOffset.of(value.substring(value.length() - 5));
Instant instant = Instant.ofEpochSecond(epochSec, nanoAdjustment).atOffset(zoneOffset).toInstant();
......@@ -196,7 +210,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case STRING:
default: {
String value = row.getString(colIndex);
TimestampPrecision precision = Utils.guessTimestampPrecision(value);
int precision = Utils.guessTimestampPrecision(value);
this.timestampPrecision = precision;
if (precision == TimestampPrecision.MS) {
// ms timestamp: yyyy-MM-dd HH:mm:ss.SSS
return row.getTimestamp(colIndex);
......@@ -338,8 +354,18 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
wasNull = value == null;
if (value == null)
return 0;
if (value instanceof Timestamp)
return ((Timestamp) value).getTime();
if (value instanceof Timestamp) {
Timestamp ts = (Timestamp) value;
switch (this.timestampPrecision) {
case TimestampPrecision.MS:
default:
return ts.getTime();
case TimestampPrecision.US:
return ts.getTime() * 1000 + ts.getNanos() / 1000 % 1000;
case TimestampPrecision.NS:
return ts.getTime() * 1000_000 + ts.getNanos() % 1000_000;
}
}
long valueAsLong = 0;
try {
valueAsLong = Long.parseLong(value.toString());
......
......@@ -194,14 +194,14 @@ public class Utils {
return timestamp.toLocalDateTime().format(milliSecFormatter);
}
public static TimestampPrecision guessTimestampPrecision(String value) {
public static int guessTimestampPrecision(String value) {
if (isMilliSecFormat(value))
return TimestampPrecision.MS;
if (isMicroSecFormat(value))
return TimestampPrecision.US;
if (isNanoSecFormat(value))
return TimestampPrecision.NS;
return TimestampPrecision.UNKNOWN;
return TimestampPrecision.MS;
}
private static boolean isMilliSecFormat(String timestampStr) {
......
......@@ -10,6 +10,8 @@ import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class SchemalessInsertTest {
private final String dbname = "test_schemaless_insert";
......@@ -27,10 +29,8 @@ public class SchemalessInsertTest {
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"};
// when
try (Statement statement = conn.createStatement();
SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) {
schemalessStatement.insert(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
}
SchemalessWriter writer = new SchemalessWriter(conn);
writer.write(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
// then
Statement statement = conn.createStatement();
......@@ -62,10 +62,9 @@ public class SchemalessInsertTest {
};
// when
try (Statement statement = conn.createStatement();
SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) {
schemalessStatement.insert(lines, SchemalessProtocolType.TELNET, SchemalessTimestampType.NOT_CONFIGURED);
}
SchemalessWriter writer = new SchemalessWriter(conn);
writer.write(lines, SchemalessProtocolType.TELNET, SchemalessTimestampType.NOT_CONFIGURED);
// then
Statement statement = conn.createStatement();
......@@ -114,10 +113,8 @@ public class SchemalessInsertTest {
"]";
// when
try (Statement statement = conn.createStatement();
SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) {
schemalessStatement.insert(json, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
}
SchemalessWriter writer = new SchemalessWriter(conn);
writer.write(json, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
// then
Statement statement = conn.createStatement();
......@@ -135,6 +132,33 @@ public class SchemalessInsertTest {
statement.close();
}
@Test
public void telnetListInsert() throws SQLException {
// given
List<String> list = new ArrayList<>();
list.add("stb0_0 1626006833 4 host=host0 interface=eth0");
list.add("stb0_1 1626006833 4 host=host0 interface=eth0");
list.add("stb0_2 1626006833 4 host=host0 interface=eth0 id=\"special_name\"");
// when
SchemalessWriter writer = new SchemalessWriter(conn);
writer.write(list, SchemalessProtocolType.TELNET, SchemalessTimestampType.NOT_CONFIGURED);
// then
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show tables");
Assert.assertNotNull(rs);
ResultSetMetaData metaData = rs.getMetaData();
Assert.assertTrue(metaData.getColumnCount() > 0);
int rowCnt = 0;
while (rs.next()) {
rowCnt++;
}
Assert.assertEquals(list.size(), rowCnt);
rs.close();
statement.close();
}
@Before
public void before() {
String host = "127.0.0.1";
......
package com.taosdata.jdbc.cases;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
public class GetLongWithDifferentTimestampPrecision {
private final String host = "127.0.0.1";
@Test
public void testRestful() throws SQLException {
// given
String url = "jdbc:TAOS-RS://" + host + ":6041/";
Connection conn = DriverManager.getConnection(url, "root", "taosdata");
long ts = System.currentTimeMillis();
// when and then
assertResultSet(conn, "ms", ts, ts);
assertResultSet(conn, "us", ts, ts * 1000);
assertResultSet(conn, "ns", ts, ts * 1000_000);
}
@Test
public void testJni() throws SQLException {
// given
String url = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(url, "root", "taosdata");
long ts = System.currentTimeMillis();
// when and then
assertResultSet(conn, "ms", ts, ts);
assertResultSet(conn, "us", ts, ts * 1000);
assertResultSet(conn, "ns", ts, ts * 1000_000);
}
private void assertResultSet(Connection conn, String precision, long timestamp, long expect) throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test precision '" + precision + "'");
stmt.execute("create table test.weather(ts timestamp, f1 int)");
String dateTimeStr = sdf.format(new Date(timestamp));
stmt.execute("insert into test.weather values('" + dateTimeStr + "', 1)");
ResultSet rs = stmt.executeQuery("select * from test.weather");
rs.next();
long actual = rs.getLong("ts");
Assert.assertEquals(expect, actual);
stmt.execute("drop database if exists test");
}
}
}
......@@ -9,29 +9,29 @@ import java.util.Random;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class RestfulJDBCTest {
// private static final String host = "127.0.0.1";
private static final String host = "master";
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static Connection connection;
private static final String dbname = "restful_test";
@Test
public void testCase001() throws SQLException {
// given
String sql = "drop database if exists restful_test";
String sql = "drop database if exists " + dbname;
// when
boolean execute = execute(connection, sql);
// then
Assert.assertFalse(execute);
// given
sql = "create database if not exists restful_test";
sql = "create database if not exists " + dbname;
// when
execute = execute(connection, sql);
// then
Assert.assertFalse(execute);
// given
sql = "use restful_test";
sql = "use " + dbname;
// when
execute = execute(connection, sql);
// then
......@@ -41,7 +41,7 @@ public class RestfulJDBCTest {
@Test
public void testCase002() throws SQLException {
// given
String sql = "create table weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
String sql = "create table " + dbname + ".weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
// when
boolean execute = execute(connection, sql);
// then
......@@ -52,7 +52,7 @@ public class RestfulJDBCTest {
public void testCase004() throws SQLException {
for (int i = 1; i <= 100; i++) {
// given
String sql = "create table t" + i + " using weather tags('beijing', '" + i + "')";
String sql = "create table " + dbname + ".t" + i + " using " + dbname + ".weather tags('beijing', '" + i + "')";
// when
boolean execute = execute(connection, sql);
// then
......@@ -68,7 +68,7 @@ public class RestfulJDBCTest {
// given
long currentTimeMillis = System.currentTimeMillis();
String sql = "insert into t" + j + " values(" + currentTimeMillis + "," + (random.nextFloat() * 50) + "," + random.nextInt(100) + ")";
String sql = "insert into " + dbname + ".t" + j + " values(" + currentTimeMillis + "," + (random.nextFloat() * 50) + "," + random.nextInt(100) + ")";
// when
int affectRows = executeUpdate(connection, sql);
// then
......@@ -83,7 +83,7 @@ public class RestfulJDBCTest {
@Test
public void testCase006() throws SQLException {
// given
String sql = "select * from weather";
String sql = "select * from " + dbname + ".weather";
// when
ResultSet rs = executeQuery(connection, sql);
ResultSetMetaData meta = rs.getMetaData();
......@@ -102,7 +102,7 @@ public class RestfulJDBCTest {
@Test
public void testCase007() throws SQLException {
// given
String sql = "drop database restful_test";
String sql = "drop database " + dbname;
// when
boolean execute = execute(connection, sql);
......@@ -143,7 +143,7 @@ public class RestfulJDBCTest {
public static void afterClass() throws SQLException {
if (connection != null) {
Statement stmt = connection.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("drop database if exists " + dbname);
stmt.close();
connection.close();
}
......
......@@ -17,12 +17,25 @@ import java.text.SimpleDateFormat;
public class RestfulResultSetTest {
// private static final String host = "127.0.0.1";
private static final String host = "master";
private static final String host = "127.0.0.1";
private static Connection conn;
private static Statement stmt;
private static ResultSet rs;
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("create database if not exists restful_test");
stmt.execute("use restful_test");
stmt.execute("drop table if exists weather");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
rs = stmt.executeQuery("select * from restful_test.weather");
rs.next();
}
@Test
public void wasNull() throws SQLException {
Assert.assertFalse(rs.wasNull());
......@@ -658,20 +671,6 @@ public class RestfulResultSetTest {
Assert.assertTrue(rs.isWrapperFor(RestfulResultSet.class));
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("create database if not exists restful_test");
stmt.execute("use restful_test");
stmt.execute("drop table if exists weather");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
rs = stmt.executeQuery("select * from restful_test.weather");
rs.next();
}
@AfterClass
public static void afterClass() throws SQLException {
if (rs != null)
......
......@@ -3,7 +3,6 @@ var conn = taos.connect();
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;
function convertDateToTS(date) {
let tsArr = date.toISOString().split("T")
return "\"" + tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length - 1) + "\"";
......
const taos = require('../tdengine');
var conn = taos.connect({ host: "127.0.0.1", user: "root", password: "taosdata", config: "/etc/taos", port: 10 });
var c1 = conn.cursor();
executeUpdate("create database nodedb;");
executeUpdate("create database if not exists nodedb;");
executeUpdate("use nodedb;");
executeUpdate("create table unsigntest(ts timestamp,ut tinyint unsigned,us smallint unsigned,ui int unsigned,ub bigint unsigned,bi bigint);");
executeUpdate("create table if not exists unsigntest(ts timestamp,ut tinyint unsigned,us smallint unsigned,ui int unsigned,ub bigint unsigned,bi bigint);");
executeUpdate("insert into unsigntest values (now, 254,65534,4294967294,18446744073709551614,9223372036854775807);");
executeUpdate("insert into unsigntest values (now, 0,0,0,0,-9223372036854775807);");
executeQuery("select * from unsigntest;");
executeUpdate("drop database nodedb;");
executeUpdate("drop database if exists nodedb;");
function executeUpdate(sql) {
console.log(sql);
......
......@@ -124,6 +124,21 @@ class TaosBind(ctypes.Structure):
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def json(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_JSON
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def tinyint_unsigned(self, value):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer = cast(pointer(c_uint8(value)), c_void_p)
......@@ -356,6 +371,11 @@ class TaosMultiBind(ctypes.Structure):
self.buffer_type = FieldType.C_NCHAR
self._str_to_buffer(values)
def json(self, values):
# type: (list[str]) -> None
self.buffer_type = FieldType.C_JSON
self._str_to_buffer(values)
def tinyint_unsigned(self, values):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer_length = sizeof(c_uint8)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册