提交 9d9d9460 编写于 作者: S Shengliang Guan

Merge from master

......@@ -15,7 +15,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -26,22 +26,21 @@ steps:
- 2.0
---
kind: pipeline
name: test_arm64
name: test_arm64_bionic
platform:
os: linux
arch: arm64
steps:
- name: build
image: gcc
image: arm64v8/ubuntu:bionic
commands:
- apt-get update
- apt-get install -y cmake build-essential
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -52,7 +51,84 @@ steps:
- 2.0
---
kind: pipeline
name: test_arm
name: test_arm64_focal
platform:
os: linux
arch: arm64
steps:
- name: build
image: arm64v8/ubuntu:focal
commands:
- echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
- apt-get update
- apt-get install -y -qq cmake build-essential
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make -j4
trigger:
event:
- pull_request
when:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm64_centos7
platform:
os: linux
arch: arm64
steps:
- name: build
image: arm64v8/centos:7
commands:
- yum install -y gcc gcc-c++ make cmake git
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make -j4
trigger:
event:
- pull_request
when:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm64_centos8
platform:
os: linux
arch: arm64
steps:
- name: build
image: arm64v8/centos:8
commands:
- dnf install -y gcc gcc-c++ make cmake epel-release git libarchive
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make -j4
trigger:
event:
- pull_request
when:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm_bionic
platform:
os: linux
......@@ -67,7 +143,7 @@ steps:
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch32 > /dev/null
- make
- make -j4
trigger:
event:
- pull_request
......@@ -76,7 +152,6 @@ steps:
- develop
- master
- 2.0
---
kind: pipeline
name: build_trusty
......@@ -95,7 +170,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -121,7 +196,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -130,7 +205,6 @@ steps:
- develop
- master
- 2.0
---
kind: pipeline
name: build_bionic
......@@ -147,7 +221,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -171,7 +245,7 @@ steps:
- mkdir debug
- cd debug
- cmake ..
- make
- make -j4
trigger:
event:
- pull_request
......@@ -179,28 +253,4 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: goodbye
platform:
os: linux
arch: amd64
steps:
- name: 64-bit
image: alpine
commands:
- echo 64-bit is good.
when:
branch:
- develop
- master
- 2.0
depends_on:
- test_arm64
- test_amd64
\ No newline at end of file
- 2.0
\ No newline at end of file
......@@ -5,7 +5,7 @@ node {
git url: 'https://github.com/taosdata/TDengine.git'
}
def skipstage=0
def skipbuild=0
def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME
......@@ -33,8 +33,7 @@ def abort_previous(){
milestone(buildNumber)
}
def pre_test(){
sh'hostname'
sh '''
sudo rmtaos || echo "taosd has not installed"
'''
......@@ -107,19 +106,17 @@ def pre_test(){
make > /dev/null
make install > /dev/null
cd ${WKC}/tests
pip3 install ${WKC}/src/connector/python
pip3 install ${WKC}/src/connector/python/
'''
return 1
}
pipeline {
agent none
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
}
stages {
stage('pre_build'){
agent{label 'master'}
......@@ -135,19 +132,22 @@ pipeline {
rm -rf ${WORKSPACE}.tes
cp -r ${WORKSPACE} ${WORKSPACE}.tes
cd ${WORKSPACE}.tes
git fetch
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
git checkout master
git pull origin master
'''
}
else {
else if(env.CHANGE_TARGET == '2.0'){
sh '''
git checkout 2.0
'''
}
else{
sh '''
git checkout develop
git pull origin develop
'''
}
}
......@@ -155,32 +155,34 @@ pipeline {
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
'''
script{
env.skipstage=sh(script:"cd ${WORKSPACE}.tes && git --no-pager diff --name-only FETCH_HEAD ${env.CHANGE_TARGET}|grep -v -E '.*md|//src//connector|Jenkinsfile|test-all.sh' || echo 0 ",returnStdout:true)
script{
skipbuild='2'
skipbuild=sh(script: "git log -2 --pretty=%B | fgrep -ie '[skip ci]' -e '[ci skip]' && echo 1 || echo 2", returnStdout:true)
println skipbuild
}
println env.skipstage
sh'''
rm -rf ${WORKSPACE}.tes
'''
}
}
stage('Parallel test stage') {
//only build pr
when {
allOf{
changeRequest()
expression {
env.skipstage != 0
expression{
return skipbuild.trim() == '2'
}
}
}
parallel {
stage('python_1_s1') {
agent{label 'p1'}
agent{label " slave1 || slave11 "}
steps {
pre_test()
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
sh '''
date
cd ${WKC}/tests
......@@ -191,11 +193,11 @@ pipeline {
}
}
stage('python_2_s5') {
agent{label 'p2'}
agent{label " slave5 || slave15 "}
steps {
pre_test()
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
sh '''
date
cd ${WKC}/tests
......@@ -205,9 +207,9 @@ pipeline {
}
}
stage('python_3_s6') {
agent{label 'p3'}
agent{label " slave6 || slave16 "}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
date
......@@ -218,10 +220,30 @@ pipeline {
}
}
stage('test_b1_s2') {
agent{label 'b1'}
agent{label " slave2 || slave12 "}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
nohup taosd >/dev/null &
sleep 10
'''
sh '''
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
'''
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo
'''
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
'''
sh '''
cd ${WKC}/tests
./test-all.sh b1fq
......@@ -229,9 +251,8 @@ pipeline {
}
}
}
stage('test_crash_gen_s3') {
agent{label "b2"}
agent{label " slave3 || slave13 "}
steps {
pre_test()
......@@ -257,20 +278,18 @@ pipeline {
./handle_taosd_val_log.sh
'''
}
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
sh '''
date
cd ${WKC}/tests
./test-all.sh b2fq
date
'''
}
}
}
}
stage('test_valgrind_s4') {
agent{label "b3"}
agent{label " slave4 || slave14 "}
steps {
pre_test()
......@@ -281,7 +300,7 @@ pipeline {
./handle_val_log.sh
'''
}
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
sh '''
date
cd ${WKC}/tests
......@@ -296,9 +315,9 @@ pipeline {
}
}
stage('test_b4_s7') {
agent{label 'b4'}
agent{label " slave7 || slave17 "}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
date
......@@ -315,9 +334,9 @@ pipeline {
}
}
stage('test_b5_s8') {
agent{label 'b5'}
agent{label " slave8 || slave18 "}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
date
......@@ -328,9 +347,9 @@ pipeline {
}
}
stage('test_b6_s9') {
agent{label 'b6'}
agent{label " slave9 || slave19 "}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
date
......@@ -341,9 +360,9 @@ pipeline {
}
}
stage('test_b7_s10') {
agent{label 'b7'}
agent{label " slave10 || slave20 "}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 55, unit: 'MINUTES'){
pre_test()
sh '''
date
......@@ -433,6 +452,5 @@ pipeline {
from: "support@taosdata.com"
)
}
}
}
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.1.6.0")
SET(TD_VER_NUMBER "2.1.7.1")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -98,6 +98,7 @@ int ConvertString(char *buf, size_t nBytes, UINT cpFrom, UINT cpTo, LPCSTR lpDef
char *DupAndConvert(const char *string, UINT cpFrom, UINT cpTo, LPCSTR lpDefaultChar) {
int nBytes;
char *pBuf;
char *pBuf1;
nBytes = 4 * ((int)lstrlen(string) + 1); /* Worst case for the size needed */
pBuf = (char *)malloc(nBytes);
if (!pBuf) {
......@@ -110,8 +111,9 @@ char *DupAndConvert(const char *string, UINT cpFrom, UINT cpTo, LPCSTR lpDefault
free(pBuf);
return NULL;
}
pBuf = realloc(pBuf, nBytes+1);
return pBuf;
pBuf1 = realloc(pBuf, nBytes+1);
if(pBuf1 == NULL && pBuf != NULL) free(pBuf);
return pBuf1;
}
int CountCharacters(const char *string, UINT cp) {
......
......@@ -68,6 +68,7 @@ int BreakArgLine(LPSTR pszCmdLine, char ***pppszArg) {
int iString = FALSE; /* TRUE = string mode; FALSE = non-string mode */
int nBackslash = 0;
char **ppszArg;
char **ppszArg1;
int iArg = FALSE; /* TRUE = inside an argument; FALSE = between arguments */
ppszArg = (char **)malloc((argc+1)*sizeof(char *));
......@@ -89,7 +90,10 @@ int BreakArgLine(LPSTR pszCmdLine, char ***pppszArg) {
if ((!iArg) && (c != ' ') && (c != '\t')) { /* Beginning of a new argument */
iArg = TRUE;
ppszArg[argc++] = pszCopy+j;
ppszArg = (char **)realloc(ppszArg, (argc+1)*sizeof(char *));
ppszArg1 = (char **)realloc(ppszArg, (argc+1)*sizeof(char *));
if(ppszArg1 == NULL && ppszArg != NULL)
free(ppszArg);
ppszArg = ppszArg1;
if (!ppszArg) return -1;
pszCopy[j] = c0 = '\0';
}
......@@ -212,7 +216,7 @@ int _initU(void) {
fprintf(stderr, "Warning: Can't convert the argument line to UTF-8\n");
_acmdln[0] = '\0';
}
realloc(_acmdln, n+1); /* Resize the memory block to fit the UTF-8 line */
//realloc(_acmdln, n+1); /* Resize the memory block to fit the UTF-8 line */
/* Should not fail since we make it smaller */
/* Record the console code page, to allow converting the output accordingly */
......
......@@ -196,6 +196,7 @@ not_compact_enough:
/* Normally defined in stdlib.h. Output buf must contain PATH_MAX bytes */
char *realpath(const char *path, char *outbuf) {
char *pOutbuf = outbuf;
char *pOutbuf1 = NULL;
int iErr;
const char *pc;
......@@ -242,8 +243,11 @@ realpath_failed:
return NULL;
}
if (!outbuf) pOutbuf = realloc(pOutbuf, strlen(pOutbuf) + 1);
return pOutbuf;
if (!outbuf) {
pOutbuf1 = realloc(pOutbuf, strlen(pOutbuf) + 1);
if(pOutbuf1 == NULL && pOutbuf) free(pOutbuf);
}
return pOutbuf1;
}
#endif
......@@ -517,6 +521,7 @@ int ResolveLinksA(const char *path, char *buf, size_t bufsize) {
/* Normally defined in stdlib.h. Output buf must contain PATH_MAX bytes */
char *realpathU(const char *path, char *outbuf) {
char *pOutbuf = outbuf;
char *pOutbuf1 = NULL;
char *pPath1 = NULL;
char *pPath2 = NULL;
int iErr;
......@@ -590,10 +595,13 @@ realpathU_failed:
}
DEBUG_LEAVE(("return 0x%p; // \"%s\"\n", pOutbuf, pOutbuf));
if (!outbuf) pOutbuf = realloc(pOutbuf, strlen(pOutbuf) + 1);
if (!outbuf) {
pOutbuf1 = realloc(pOutbuf, strlen(pOutbuf) + 1);
if(pOutbuf1 == NULL && pOutbuf) free(pOutbuf);
}
free(pPath1);
free(pPath2);
return pOutbuf;
return pOutbuf1;
}
#endif /* defined(_WIN32) */
......
Subproject commit 0ca5b15a8eac40327dd737be52c926fa5675712c
Subproject commit ceda5bf9fcd7836509ac97dcc0056b3f1dd48cc5
......@@ -144,6 +144,9 @@ keepColumnName 1
# max length of an SQL
# maxSQLLength 65480
# max length of WildCards
# maxWildCardsLength 100
# the maximum number of records allowed for super table time sorting
# maxNumOfOrderedRes 100000
......
......@@ -35,7 +35,7 @@ fi
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taosd
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh ${script_dir}/startPre.sh"
else
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator\
${script_dir}/remove.sh ${script_dir}/set_core.sh ${script_dir}/startPre.sh ${script_dir}/taosd-dump-cfg.gdb"
......
name: tdengine
base: core18
version: '2.1.6.0'
version: '2.1.7.1'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.1.6.0
- usr/lib/libtaos.so.2.1.7.1
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -116,8 +116,17 @@ void bnCleanupDnodes() {
static void bnCheckDnodesSize(int32_t dnodesNum) {
if (tsBnDnodes.maxSize <= dnodesNum) {
tsBnDnodes.maxSize = dnodesNum * 2;
tsBnDnodes.list = realloc(tsBnDnodes.list, tsBnDnodes.maxSize * sizeof(SDnodeObj *));
int32_t maxSize = dnodesNum * 2;
SDnodeObj** list1 = NULL;
int32_t retry = 0;
while(list1 == NULL && retry++ < 3) {
list1 = realloc(tsBnDnodes.list, maxSize * sizeof(SDnodeObj *));
}
if(list1) {
tsBnDnodes.list = list1;
tsBnDnodes.maxSize = maxSize;
}
}
}
......
......@@ -50,6 +50,12 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(SSqlObj *pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -29,15 +29,16 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
......@@ -142,6 +143,7 @@ bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
......
此差异已折叠。
此差异已折叠。
......@@ -299,7 +299,7 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
SSchema *schema = (SSchema*)pBlock->pTableMeta->schema;
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null
if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null
for (int32_t n = 0; n < rowNum; ++n) {
char *ptr = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * n + offset;
......@@ -1527,8 +1527,9 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pCmd->insertParam.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
pCmd->insertParam.objectId = pSql->self;
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
char* sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if(sqlstr == NULL && pSql->sqlstr) free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
STMT_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
......
此差异已折叠。
......@@ -409,7 +409,7 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY))) {
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
......@@ -880,16 +880,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
query.vgId = pTableMeta->vgId;
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
......
......@@ -887,7 +887,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
}
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
char* sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if(sqlstr == NULL && pSql->sqlstr) free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tfree(pSql);
......
......@@ -2034,17 +2034,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscAsyncResultOnError(pSql);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
SRetrieveSupport* pSupport = pSub->param;
tfree(pSupport->localBuffer);
tfree(pSupport);
tscFreeRetrieveSup(pSub);
taos_free_result(pSub);
}
......@@ -2395,8 +2392,12 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1);
tscColumnCopy(x, pCol);
} else {
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
SSchema ss = {.type = (uint8_t)pCol->info.type, .bytes = pCol->info.bytes, .colId = (int16_t)pCol->columnIndex};
tscColumnListInsert(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid, &ss);
int32_t ti = tscColumnExists(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid);
assert(ti >= 0);
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, ti);
tscColumnCopy(x, pCol);
}
}
}
......@@ -2558,7 +2559,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
static void tscFreeRetrieveSup(SSqlObj *pSql) {
void tscFreeRetrieveSup(SSqlObj *pSql) {
SRetrieveSupport *trsupport = pSql->param;
void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
......@@ -2716,33 +2717,43 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
SSqlObj *userSql = NULL;
if (pParentSql->param) {
userSql = ((SRetrieveSupport*)pParentSql->param)->pParentSql;
}
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (userSql == NULL) {
userSql = pParentSql;
}
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
if (userSql != pParentSql) {
tscFreeRetrieveSup(pParentSql);
}
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
tscFreeSubobj(userSql);
tfree(userSql->pSubs);
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
userSql->res.code = TSDB_CODE_SUCCESS;
userSql->retry++;
code = tsParseSql(pParentSql, true);
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", userSql->self,
tstrerror(code), userSql->retry);
tscResetSqlCmd(&userSql->cmd, true);
code = tsParseSql(userSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
userSql->res.code = code;
tscAsyncResultOnError(userSql);
return;
}
executeQuery(pParentSql, pQueryInfo);
pQueryInfo = tscGetQueryInfo(&userSql->cmd);
executeQuery(userSql, pQueryInfo);
} else {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
}
......@@ -2812,7 +2823,6 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
pParentSql->self, pState->numOfSub, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
tscClearInterpInfo(pPQueryInfo);
code = tscCreateGlobalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pMerger, pParentSql->self);
pParentSql->res.code = code;
......
......@@ -369,6 +369,27 @@ bool tscGroupbyColumn(SQueryInfo* pQueryInfo) {
return false;
}
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
if (pExpr->base.functionId == TSDB_FUNC_TS) {
continue;
}
if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM) {
return i;
}
}
return -1;
}
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
......@@ -625,8 +646,10 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
} else if (convertNchar && pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if(buffer == NULL)
return ;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
......@@ -1206,7 +1229,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols);
pOutput->precision = pSqlObjList[0]->res.precision;
SSchema* schema = NULL;
......@@ -1502,7 +1524,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSqlResult(pSql);
tscResetSqlCmd(pCmd, false);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
pCmd->allocSize = 0;
......@@ -1776,101 +1797,6 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
return TSDB_CODE_SUCCESS;
}
static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
int toffset = 0;
uint16_t nCols = pBuilder->nCols;
uint8_t memRowType = payloadType(p);
uint16_t nColsBound = payloadNCols(p);
if (pBuilder->nCols <= 0 || nColsBound <= 0) {
return NULL;
}
char* pVals = POINTER_SHIFT(p, payloadValuesOffset(p));
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
* +-----------+----------+----------+--------------------------------------|--------------------|
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
* +-----------+----------+----------+--------------------------------------+--------------------|
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
* 2. dataTLen: total length including the header and body.
*/
if (memRowType == SMEM_ROW_DATA) {
SDataRow trow = (SDataRow)memRowDataBody(memRow);
dataRowSetLen(trow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
p = (char*)payloadBody(pBuilder->buf);
uint16_t i = 0, j = 0;
while (j < nCols) {
if (i >= nColsBound) {
break;
}
int16_t colId = payloadColId(p);
if (colId == pSchema[j].colId) {
// ASSERT(payloadColType(p) == pSchema[j].type);
tdAppendColVal(trow, POINTER_SHIFT(pVals, payloadColOffset(p)), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p = payloadNextCol(p);
++i;
++j;
} else if (colId < pSchema[j].colId) {
p = payloadNextCol(p);
++i;
} else {
tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
}
while (j < nCols) {
tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
#if 0 // no need anymore
while (i < nColsBound) {
p = payloadNextCol(p);
++i;
}
#endif
} else if (memRowType == SMEM_ROW_KV) {
SKVRow kvRow = (SKVRow)memRowKvBody(memRow);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsBound));
kvRowSetNCols(kvRow, nColsBound);
memRowSetKvVersion(memRow, pBuilder->sversion);
p = (char*)payloadBody(pBuilder->buf);
int i = 0;
while (i < nColsBound) {
int16_t colId = payloadColId(p);
uint8_t colType = payloadColType(p);
tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, &toffset);
//toffset += sizeof(SColIdx);
p = payloadNextCol(p);
++i;
}
} else {
ASSERT(0);
}
int32_t rowTLen = memRowTLen(memRow);
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + rowTLen; // next row
pBuilder->pSubmitBlk->dataLen += rowTLen;
return memRow;
}
// Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam,
SBlockKeyTuple* blkKeyTuple) {
......@@ -1902,10 +1828,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize;
} else {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
if (IS_RAW_PAYLOAD(insertParam->payloadType)) {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
}
pBlock->schemaLen = 0;
}
......@@ -1932,18 +1859,19 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
pBlock->dataLen += memRowTLen(memRow);
}
} else {
SMemRowBuilder rowBuilder;
rowBuilder.pSchema = pSchema;
rowBuilder.sversion = pTableMeta->sversion;
rowBuilder.flen = flen;
rowBuilder.nCols = tinfo.numOfColumns;
rowBuilder.pDataBlock = pDataBlock;
rowBuilder.pSubmitBlk = pBlock;
rowBuilder.buf = p;
for (int32_t i = 0; i < numOfRows; ++i) {
rowBuilder.buf = (blkKeyTuple + i)->payloadAddr;
tdGenMemRowFromBuilder(&rowBuilder);
char* payload = (blkKeyTuple + i)->payloadAddr;
if (isNeedConvertRow(payload)) {
convertSMemRow(pDataBlock, payload, pTableDataBlock);
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
} else {
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
}
}
......@@ -1956,9 +1884,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta);
int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) {
for (int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
}
......@@ -2004,7 +1932,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
......@@ -2017,7 +1945,8 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
return ret;
}
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
......@@ -2061,7 +1990,9 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey);
}
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
int32_t len = pBlocks->numOfRows *
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
......@@ -3622,8 +3553,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
pNewQueryInfo->distinct = pQueryInfo->distinct;
if (pNewQueryInfo->buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
......@@ -3834,8 +3767,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
int32_t index = ps->subqueryIndex;
bool ret = subAndCheckDone(pSql, pParentSql, index);
tfree(ps);
pSql->param = NULL;
tscFreeRetrieveSup(pSql);
if (!ret) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
......@@ -3845,12 +3777,13 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
tscAsyncResultOnError(pParentSql);
return;
}
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
......@@ -3858,6 +3791,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
tscResetSqlCmd(&pParentSql->cmd, true);
code = tsParseSql(pParentSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
......@@ -3869,7 +3805,8 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
executeQuery(pParentSql, pQueryInfo);
return;
}
......@@ -3892,9 +3829,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
assert(pSql->subState.numOfSub == 0);
pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream);
assert(pSql->pSubs == NULL);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
assert(pSql->subState.states == NULL);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
code = pthread_mutex_init(&pSql->subState.mutex, NULL);
......@@ -3920,6 +3859,9 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
pNew->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pNew->rspSem, 0, 0);
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
......@@ -4454,6 +4396,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
STableMeta* p = NULL;
size_t sz = 0;
STableMeta* pChild = *ppChild;
STableMeta* pChild1;
taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz);
......@@ -4464,7 +4407,10 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
int32_t totalBytes = (p->tableInfo.numOfColumns + p->tableInfo.numOfTags) * sizeof(SSchema);
int32_t tableMetaSize = sizeof(STableMeta) + totalBytes;
if (*tableMetaCapacity < tableMetaSize) {
pChild = realloc(pChild, tableMetaSize);
pChild1 = realloc(pChild, tableMetaSize);
if(pChild1 == NULL)
return -1;
pChild = pChild1;
*tableMetaCapacity = (size_t)tableMetaSize;
}
......
......@@ -186,6 +186,7 @@ typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535
#define dataRowEnd(r) POINTER_SHIFT(r, dataRowLen(r))
#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
......@@ -201,14 +202,18 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type,
int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
if (IS_VAR_DATA_TYPE(type)) {
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value));
if (isCopyVarData) {
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value));
}
dataRowLen(row) += varDataTLen(value);
} else {
if (offset == 0) {
......@@ -223,6 +228,12 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t
return 0;
}
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
return tdAppendDataColVal(row, value, true, type, offset);
}
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
if (IS_VAR_DATA_TYPE(type)) {
......@@ -328,11 +339,10 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
......@@ -357,13 +367,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
}
typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int numOfRows;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
int maxCols; // max number of columns
int maxPoints; // max number of points
int numOfRows;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
SDataCol *cols;
} SDataCols;
......@@ -407,7 +415,7 @@ static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) {
}
}
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
SDataCols *tdNewDataCols(int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
......@@ -475,9 +483,10 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
}
// offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t *offset) {
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, bool isCopyValData, int16_t colId, int8_t type,
int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = *offset + TD_KV_ROW_HEAD_SIZE;
int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE;
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset);
char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row));
......@@ -485,10 +494,12 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
pColIdx->offset = kvRowLen(row); // offset of pColIdx including the TD_KV_ROW_HEAD_SIZE
if (IS_VAR_DATA_TYPE(type)) {
memcpy(ptr, value, varDataTLen(value));
if (isCopyValData) {
memcpy(ptr, value, varDataTLen(value));
}
kvRowLen(row) += varDataTLen(value);
} else {
if (*offset == 0) {
if (offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]);
......@@ -497,7 +508,6 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
}
kvRowLen(row) += TYPE_BYTES[type];
}
*offset += sizeof(SColIdx);
return 0;
}
......@@ -537,8 +547,9 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pColIdx == NULL) return -1;
pBuilder->pColIdx = pColIdx;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
......@@ -551,8 +562,9 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc);
if (pBuilder->buf == NULL) return -1;
void* buf = realloc(pBuilder->buf, pBuilder->alloc);
if (buf == NULL) return -1;
pBuilder->buf = buf;
}
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
......@@ -592,12 +604,24 @@ typedef void *SMemRow;
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define SMEM_ROW_DATA 0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow
#define SMEM_ROW_DATA 0x0U // SDataRow
#define SMEM_ROW_KV 0x01U // SKVRow
#define SMEM_ROW_CONVERT 0x80U // SMemRow convert flag
#define KVRatioKV (0.2f) // all bool
#define KVRatioPredict (0.4f)
#define KVRatioData (0.75f) // all bigint
#define KVRatioConvert (0.9f)
#define memRowType(r) (*(uint8_t *)(r))
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
#define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory
#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit
#define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01))
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT)
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \
......@@ -614,6 +638,14 @@ typedef void *SMemRow;
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen
static FORCE_INLINE char *memRowEnd(SMemRow row) {
if (isDataRow(row)) {
return (char *)dataRowEnd(memRowDataBody(row));
} else {
return (char *)kvRowEnd(memRowKvBody(row));
}
}
#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r))
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
......@@ -631,7 +663,6 @@ typedef void *SMemRow;
} \
} while (0)
#define memRowSetType(r, t) (memRowType(r) = (t))
#define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowSetKvVersion(r, v))
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
......@@ -664,12 +695,12 @@ static FORCE_INLINE void *tdGetMemRowDataOfColEx(void *row, int16_t colId, int8_
}
}
static FORCE_INLINE int tdAppendMemColVal(SMemRow row, const void *value, int16_t colId, int8_t type, int32_t offset,
int32_t *kvOffset) {
static FORCE_INLINE int tdAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
int8_t type, int32_t offset) {
if (isDataRow(row)) {
tdAppendColVal(memRowDataBody(row), value, type, offset);
tdAppendDataColVal(memRowDataBody(row), value, isCopyVarData, type, offset);
} else {
tdAppendKvColVal(memRowKvBody(row), value, colId, type, kvOffset);
tdAppendKvColVal(memRowKvBody(row), value, isCopyVarData, colId, type, offset);
}
return 0;
}
......@@ -691,6 +722,30 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value
return len;
}
/**
* 1. calculate the delta of AllNullLen for SDataRow.
* 2. calculate the real len for SKVRow.
*/
static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colType, int32_t *dataLen, int32_t *kvLen) {
switch (colType) {
case TSDB_DATA_TYPE_BINARY: {
int32_t varLen = varDataLen(value);
*dataLen += (varLen - CHAR_BYTES);
*kvLen += (varLen + sizeof(SColIdx));
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t varLen = varDataLen(value);
*dataLen += (varLen - TSDB_NCHAR_SIZE);
*kvLen += (varLen + sizeof(SColIdx));
break;
}
default: {
*kvLen += (TYPE_BYTES[colType] + sizeof(SColIdx));
break;
}
}
}
typedef struct {
int16_t colId;
......@@ -706,7 +761,7 @@ static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t c
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
#if 0
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
......@@ -752,6 +807,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); }
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -59,6 +59,7 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
extern int8_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[];
//query buffer management
......@@ -70,6 +71,7 @@ extern int8_t tsKeepOriginalColumnName;
// client
extern int32_t tsMaxSQLStringLen;
extern int32_t tsMaxWildCardsLen;
extern int8_t tsTscEnableRecordSql;
extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime;
......
......@@ -19,10 +19,10 @@
#include "wchar.h"
#include "tarray.h"
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull);
//TODO: change caller to use return val
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints;
if(IS_VAR_DATA_TYPE(pCol->type)) {
......@@ -31,7 +31,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded);
if(ptr == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize,
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
strerror(errno));
return -1;
} else {
......@@ -138,8 +138,9 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int1
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (columns == NULL) return -1;
pBuilder->columns = columns;
}
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
......@@ -239,20 +240,19 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->len = 0;
}
// value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
return 0;
}
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
} else {
tdAllocMemForCol(pCol, maxPoints);
dataColSetNEleNull(pCol, numOfRows);
}
}
......@@ -268,12 +268,21 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
}
return 0;
}
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else {
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
}
}
bool isNEleNull(SDataCol *pCol, int nEle) {
if(isAllRowsNull(pCol)) return true;
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
}
return true;
}
......@@ -290,9 +299,7 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
}
}
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
tdAllocMemForCol(pCol, maxPoints);
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0;
for (int i = 0; i < nEle; i++) {
......@@ -318,7 +325,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
}
}
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
......@@ -326,6 +333,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
}
pCols->maxPoints = maxRows;
pCols->maxCols = maxCols;
pCols->numOfRows = 0;
pCols->numOfCols = 0;
if (maxCols > 0) {
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
......@@ -342,13 +352,8 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
}
pCols->maxCols = maxCols;
}
pCols->maxRowSize = maxRowSize;
return pCols;
}
......@@ -357,8 +362,9 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
int oldMaxCols = pCols->maxCols;
if (schemaNCols(pSchema) > oldMaxCols) {
pCols->maxCols = schemaNCols(pSchema);
pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
if (pCols->cols == NULL) return -1;
void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
if (ptr == NULL) return -1;
pCols->cols = ptr;
for(i = oldMaxCols; i < pCols->maxCols; i++) {
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
......@@ -366,10 +372,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
}
}
if (schemaTLen(pSchema) > pCols->maxRowSize) {
pCols->maxRowSize = schemaTLen(pSchema);
}
tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema);
......@@ -398,7 +400,7 @@ SDataCols *tdFreeDataCols(SDataCols *pCols) {
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
if (pRet == NULL) return NULL;
pRet->numOfCols = pDataCols->numOfCols;
......@@ -413,7 +415,10 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
if (keepData) {
if (pDataCols->cols[i].len > 0) {
tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints);
if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
tdFreeDataCols(pRet);
return NULL;
}
pRet->cols[i].len = pDataCols->cols[i].len;
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
......@@ -584,9 +589,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) {
if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints);
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
}
target->numOfRows++;
......@@ -844,7 +852,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
int16_t k;
for (k = 0; k < nKvNCols; ++k) {
SColInfo *pColInfo = taosArrayGet(stashRow, k);
tdAppendKvColVal(kvRow, pColInfo->colVal, pColInfo->colId, pColInfo->colType, &toffset);
tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
toffset += sizeof(SColIdx);
}
ASSERT(kvLen == memRowTLen(tRow));
}
......
......@@ -25,6 +25,10 @@
#include "tutil.h"
#include "tlocale.h"
#include "ttimezone.h"
#include "tcompare.h"
// TSDB
bool tsdbForceKeepFile = false;
// cluster
char tsFirst[TSDB_EP_LEN] = {0};
......@@ -75,6 +79,7 @@ int32_t tsCompressMsgSize = -1;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;
int8_t tsTscEnableRecordSql = 0;
// the maximum number of results for projection query on super table that are returned from
......@@ -84,6 +89,9 @@ int32_t tsMaxNumOfOrderedResults = 100000;
// 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 us for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
......@@ -541,6 +549,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "maxNumOfDistinctRes";
cfg.ptr = &tsMaxNumOfDistinctResults;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 10*10000;
cfg.maxValue = 10000*10000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "numOfMnodes";
cfg.ptr = &tsNumOfMnodes;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......@@ -984,6 +1002,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg);
cfg.option = "maxWildCardsLength";
cfg.ptr = &tsMaxWildCardsLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = TSDB_MAX_FIELD_LEN;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg);
cfg.option = "maxNumOfOrderedRes";
cfg.ptr = &tsMaxNumOfOrderedResults;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......@@ -1531,6 +1559,7 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum <= TSDB_CFG_MAX_NUM);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit ce5201014136503d34fecbd56494b67b4961056c
Subproject commit b62a26ecc164a310104df57691691b237e091c89
......@@ -113,7 +113,6 @@
</includes>
<excludes>
<exclude>**/AppMemoryLeakTest.java</exclude>
<exclude>**/AuthenticationTest.java</exclude>
<exclude>**/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java</exclude>
<exclude>**/DatetimeBefore1970Test.java</exclude>
<exclude>**/FailOverTest.java</exclude>
......
......@@ -14,6 +14,8 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.*;
import java.util.logging.Logger;
......@@ -127,6 +129,11 @@ public class TSDBDriver extends AbstractDriver {
return null;
}
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_USER))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED);
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_PASSWORD))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED);
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET), (String) props.get(PROPERTY_KEY_TIME_ZONE));
......
......@@ -33,6 +33,8 @@ public class TSDBError {
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_NUMERIC_VALUE_OUT_OF_RANGE, "numeric value out of range");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNKNOWN_TAOS_TYPE, "unknown taos type in tdengine");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNKNOWN_TIMESTAMP_PRECISION, "unknown timestamp precision");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED, "user is required");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED, "password is required");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNKNOWN, "unknown error");
......
......@@ -29,6 +29,9 @@ public class TSDBErrorNumbers {
public static final int ERROR_UNKNOWN_TIMESTAMP_PRECISION = 0x2316; // unknown timestamp precision
public static final int ERROR_RESTFul_Client_Protocol_Exception = 0x2317;
public static final int ERROR_RESTFul_Client_IOException = 0x2318;
public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required
public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required
public static final int ERROR_UNKNOWN = 0x2350; //unknown error
......@@ -67,6 +70,8 @@ public class TSDBErrorNumbers {
errorNumbers.add(ERROR_UNKNOWN_TAOS_TYPE);
errorNumbers.add(ERROR_UNKNOWN_TIMESTAMP_PRECISION);
errorNumbers.add(ERROR_RESTFul_Client_IOException);
errorNumbers.add(ERROR_USER_IS_REQUIRED);
errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED);
errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception);
......
......@@ -36,7 +36,6 @@ public class TSDBJNIConnector {
static {
System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
}
public boolean isClosed() {
......
......@@ -7,6 +7,7 @@ import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.Properties;
import java.util.logging.Logger;
......@@ -40,8 +41,13 @@ public class RestfulDriver extends AbstractDriver {
String loginUrl = "http://" + host + ":" + port + "/rest/login/" + props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + "/" + props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD) + "";
try {
String user = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_USER), "UTF-8");
String password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), "UTF-8");
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_USER))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED);
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_PASSWORD))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED);
String user = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_USER), StandardCharsets.UTF_8.displayName());
String password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), StandardCharsets.UTF_8.displayName());
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
......
......@@ -7,6 +7,7 @@ import com.taosdata.jdbc.AbstractStatement;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.enums.TimestampFormat;
import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import com.taosdata.jdbc.utils.SqlSyntaxValidator;
......@@ -45,9 +46,7 @@ public class RestfulStatement extends AbstractStatement {
if (!SqlSyntaxValidator.isValidForExecuteUpdate(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_UPDATE, "not a valid sql for executeUpdate: " + sql);
final String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
return executeOneUpdate(url, sql);
return executeOneUpdate(sql);
}
@Override
......@@ -62,34 +61,25 @@ public class RestfulStatement extends AbstractStatement {
public boolean execute(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (!SqlSyntaxValidator.isValidForExecute(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE, "not a valid sql for execute: " + sql);
//如果执行了use操作应该将当前Statement的catalog设置为新的database
boolean result = true;
String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
if (conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).equals("TIMESTAMP")) {
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
}
if (conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).equals("UTC")) {
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
}
if (SqlSyntaxValidator.isUseSql(sql)) {
HttpClientPoolUtil.execute(url, sql, this.conn.getToken());
HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
this.database = sql.trim().replace("use", "").trim();
this.conn.setCatalog(this.database);
result = false;
} else if (SqlSyntaxValidator.isDatabaseUnspecifiedQuery(sql)) {
executeOneQuery(sql);
} else if (SqlSyntaxValidator.isDatabaseUnspecifiedUpdate(sql)) {
executeOneUpdate(url, sql);
executeOneUpdate(sql);
result = false;
} else {
if (SqlSyntaxValidator.isValidForExecuteQuery(sql)) {
executeQuery(sql);
executeOneQuery(sql);
} else {
executeUpdate(sql);
executeOneUpdate(sql);
result = false;
}
}
......@@ -97,19 +87,25 @@ public class RestfulStatement extends AbstractStatement {
return result;
}
private ResultSet executeOneQuery(String sql) throws SQLException {
if (!SqlSyntaxValidator.isValidForExecuteQuery(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_QUERY, "not a valid sql for executeQuery: " + sql);
private String getUrl() throws SQLException {
TimestampFormat timestampFormat = TimestampFormat.valueOf(conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).trim().toUpperCase());
String url;
switch (timestampFormat) {
case TIMESTAMP:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
break;
case UTC:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
break;
default:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
}
return url;
}
private ResultSet executeOneQuery(String sql) throws SQLException {
// row data
String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
String timestampFormat = conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT);
if ("TIMESTAMP".equalsIgnoreCase(timestampFormat))
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
if ("UTC".equalsIgnoreCase(timestampFormat))
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
String result = HttpClientPoolUtil.execute(url, sql, this.conn.getToken());
String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc"));
......@@ -119,11 +115,8 @@ public class RestfulStatement extends AbstractStatement {
return resultSet;
}
private int executeOneUpdate(String url, String sql) throws SQLException {
if (!SqlSyntaxValidator.isValidForExecuteUpdate(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_UPDATE, "not a valid sql for executeUpdate: " + sql);
String result = HttpClientPoolUtil.execute(url, sql, this.conn.getToken());
private int executeOneUpdate(String sql) throws SQLException {
String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) {
throw TSDBError.createSQLException(jsonObject.getInteger("code"), jsonObject.getString("desc"));
......@@ -134,7 +127,7 @@ public class RestfulStatement extends AbstractStatement {
}
private int getAffectedRows(JSONObject jsonObject) throws SQLException {
// create ... SQLs should return 0 , and Restful result is this:
// create ... SQLs should return 0 , and Restful result like this:
// {"status": "succ", "head": ["affected_rows"], "data": [[0]], "rows": 1}
JSONArray head = jsonObject.getJSONArray("head");
if (head.size() != 1 || !"affected_rows".equals(head.getString(0)))
......
......@@ -16,8 +16,7 @@ package com.taosdata.jdbc.utils;
public class SqlSyntaxValidator {
private static final String[] SQL = {"select", "insert", "import", "create", "use", "alter", "drop", "set", "show", "describe", "reset"};
private static final String[] updateSQL = {"insert", "import", "create", "use", "alter", "drop", "set"};
private static final String[] updateSQL = {"insert", "import", "create", "use", "alter", "drop", "set", "reset"};
private static final String[] querySQL = {"select", "show", "describe"};
private static final String[] databaseUnspecifiedShow = {"databases", "dnodes", "mnodes", "variables"};
......@@ -38,14 +37,6 @@ public class SqlSyntaxValidator {
return false;
}
public static boolean isValidForExecute(String sql) {
for (String prefix : SQL) {
if (sql.trim().toLowerCase().startsWith(prefix))
return true;
}
return false;
}
public static boolean isDatabaseUnspecifiedQuery(String sql) {
for (String databaseObj : databaseUnspecifiedShow) {
if (sql.trim().toLowerCase().matches("show\\s+" + databaseObj + ".*"))
......@@ -63,9 +54,5 @@ public class SqlSyntaxValidator {
return sql.trim().toLowerCase().startsWith("use");
}
public static boolean isSelectSql(String sql) {
return sql.trim().toLowerCase().startsWith("select");
}
}
......@@ -69,6 +69,8 @@ public class SubscribeTest {
@Before
public void createDatabase() throws SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBErrorNumbers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
......@@ -12,6 +15,47 @@ public class AuthenticationTest {
private static final String password = "taos?data";
private Connection conn;
@Test
public void connectWithoutUserByJni() {
try {
DriverManager.getConnection("jdbc:TAOS://" + host + ":0/?");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (2319): user is required", e.getMessage());
}
}
@Test
public void connectWithoutUserByRestful() {
try {
DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (2319): user is required", e.getMessage());
}
}
@Test
public void connectWithoutPasswordByJni() {
try {
DriverManager.getConnection("jdbc:TAOS://" + host + ":0/?user=root");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (231a): password is required", e.getMessage());
}
}
@Test
public void connectWithoutPasswordByRestful() {
try {
DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (231a): password is required", e.getMessage());
}
}
@Ignore
@Test
public void test() {
// change password
......
......@@ -29,6 +29,8 @@ public class BatchInsertTest {
public void before() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
......@@ -21,6 +21,8 @@ public class ImportTest {
public static void before() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
......@@ -270,6 +270,41 @@ public class InsertSpecialCharacterJniTest {
}
}
@Ignore
@Test
public void testSingleQuotaEscape() throws SQLException {
final long now = System.currentTimeMillis();
final String sql = "insert into t? using ? tags(?) values(?, ?, ?) t? using " + tbname2 + " tags(?) values(?,?,?) ";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
// t1
pstmt.setInt(1, 1);
pstmt.setString(2, tbname2);
pstmt.setString(3, special_character_str_5);
pstmt.setTimestamp(4, new Timestamp(now));
pstmt.setBytes(5, special_character_str_5.getBytes());
// t2
pstmt.setInt(7, 2);
pstmt.setString(8, special_character_str_5);
pstmt.setTimestamp(9, new Timestamp(now));
pstmt.setString(11, special_character_str_5);
int ret = pstmt.executeUpdate();
Assert.assertEquals(2, ret);
}
String query = "select * from ?.t? where ? < ? and ts >= ? and f1 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, dbName);
pstmt.setInt(2, 1);
pstmt.setString(3, "ts");
pstmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(5, new Timestamp(0));
ResultSet rs = pstmt.executeQuery();
Assert.assertNotNull(rs);
}
}
@Test
public void testCase10() throws SQLException {
final long now = System.currentTimeMillis();
......@@ -293,13 +328,12 @@ public class InsertSpecialCharacterJniTest {
Assert.assertEquals(2, ret);
}
//query t1
String query = "select * from ?.t? where ts < ? and ts >= ? and ? is not null";
String query = "select * from ?.t? where ts < ? and ts >= ? and f1 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, dbName);
pstmt.setInt(2, 1);
pstmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(4, new Timestamp(0));
pstmt.setString(5, "f1");
ResultSet rs = pstmt.executeQuery();
rs.next();
......@@ -311,12 +345,11 @@ public class InsertSpecialCharacterJniTest {
Assert.assertNull(f2);
}
// query t2
query = "select * from t? where ts < ? and ts >= ? and ? is not null";
query = "select * from t? where ts < ? and ts >= ? and f2 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setInt(1, 2);
pstmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(3, new Timestamp(0));
pstmt.setString(4, "f2");
ResultSet rs = pstmt.executeQuery();
rs.next();
......
......@@ -293,13 +293,12 @@ public class InsertSpecialCharacterRestfulTest {
Assert.assertEquals(2, ret);
}
//query t1
String query = "select * from ?.t? where ts < ? and ts >= ? and ? is not null";
String query = "select * from ?.t? where ts < ? and ts >= ? and f1 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, dbName);
pstmt.setInt(2, 1);
pstmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(4, new Timestamp(0));
pstmt.setString(5, "f1");
ResultSet rs = pstmt.executeQuery();
rs.next();
......@@ -311,12 +310,11 @@ public class InsertSpecialCharacterRestfulTest {
Assert.assertNull(f2);
}
// query t2
query = "select * from t? where ts < ? and ts >= ? and ? is not null";
query = "select * from t? where ts < ? and ts >= ? and f2 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setInt(1, 2);
pstmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(3, new Timestamp(0));
pstmt.setString(4, "f2");
ResultSet rs = pstmt.executeQuery();
rs.next();
......
......@@ -22,6 +22,8 @@ public class QueryDataTest {
public void createDatabase() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.Properties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ResetQueryCacheTest {
static Connection connection;
static Statement statement;
static String host = "127.0.0.1";
@Before
public void init() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/", properties);
statement = connection.createStatement();
} catch (SQLException e) {
return;
}
@Test
public void jni() throws SQLException {
// given
Connection connection = DriverManager.getConnection("jdbc:TAOS://127.0.0.1:0/?user=root&password=taosdata&timezone=UTC-8&charset=UTF-8&locale=en_US.UTF-8");
Statement statement = connection.createStatement();
// when
boolean execute = statement.execute("reset query cache");
// then
assertFalse(execute);
assertEquals(0, statement.getUpdateCount());
statement.close();
connection.close();
}
@Test
public void testResetQueryCache() throws SQLException {
String resetSql = "reset query cache";
statement.execute(resetSql);
}
public void restful() throws SQLException {
// given
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&timezone=UTC-8&charset=UTF-8&locale=en_US.UTF-8");
Statement statement = connection.createStatement();
// when
boolean execute = statement.execute("reset query cache");
// then
assertFalse(execute);
assertEquals(0, statement.getUpdateCount());
@After
public void close() {
try {
if (statement != null)
statement.close();
if (connection != null)
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
statement.close();
connection.close();
}
}
\ No newline at end of file
......@@ -20,6 +20,8 @@ public class SelectTest {
public void createDatabaseAndTable() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
......@@ -24,6 +24,8 @@ public class StableTest {
public static void createDatabase() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
package com.taosdata.jdbc.utils;
import org.junit.Assert;
import org.junit.Test;
public class SqlSyntaxValidatorTest {
@Test
public void isSelectSQL() {
Assert.assertTrue(SqlSyntaxValidator.isSelectSql("select * from test.weather"));
Assert.assertTrue(SqlSyntaxValidator.isSelectSql(" select * from test.weather"));
Assert.assertTrue(SqlSyntaxValidator.isSelectSql(" select * from test.weather "));
Assert.assertFalse(SqlSyntaxValidator.isSelectSql("insert into test.weather values(now, 1.1, 2)"));
}
@Test
public void isUseSQL() {
Assert.assertTrue(SqlSyntaxValidator.isUseSql("use database test"));
}
}
\ No newline at end of file
......@@ -109,6 +109,24 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, precision = 0)
return res;
}
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let len = data.readIntLE(currOffset, 2);
let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
if (dataEntry[0] == 255) {
res.push(null)
} else {
res.push(dataEntry.toString("utf-8"));
}
currOffset += nbytes;
}
return res;
}
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
......@@ -117,7 +135,11 @@ function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, precision = 0)
while (currOffset < data.length) {
let len = data.readIntLE(currOffset, 2);
let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
res.push(dataEntry.toString("utf-8"));
if (dataEntry[0] == 255 && dataEntry[1] == 255) {
res.push(null)
} else {
res.push(dataEntry.toString("utf-8"));
}
currOffset += nbytes;
}
return res;
......@@ -132,7 +154,7 @@ let convertFunctions = {
[FieldTypes.C_BIGINT]: convertBigint,
[FieldTypes.C_FLOAT]: convertFloat,
[FieldTypes.C_DOUBLE]: convertDouble,
[FieldTypes.C_BINARY]: convertNchar,
[FieldTypes.C_BINARY]: convertBinary,
[FieldTypes.C_TIMESTAMP]: convertTimestamp,
[FieldTypes.C_NCHAR]: convertNchar
}
......
......@@ -47,7 +47,8 @@ class TaosTimestamp extends Date {
super(Math.floor(date / 1000));
this.precisionExtras = date % 1000;
} else if (precision === 2) {
super(parseInt(date / 1000000));
// use BigInt to fix: 1623254400999999999 / 1000000 = 1623254401000 which not expected
super(parseInt(BigInt(date) / 1000000n));
// use BigInt to fix: 1625801548423914405 % 1000000 = 914496 which not expected (914405)
this.precisionExtras = parseInt(BigInt(date) % 1000000n);
} else {
......
{
"name": "td2.0-connector",
"version": "2.0.9",
"version": "2.0.10",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"directories": {
......
const taos = require('../tdengine');
var conn = taos.connect({ host: "localhost" });
var c1 = conn.cursor();
function checkData(data, row, col, expect) {
let checkdata = data[row][col];
if (checkdata == expect) {
// console.log('check pass')
}
else {
console.log('check failed, expect ' + expect + ', but is ' + checkdata)
}
}
c1.execute('drop database if exists testnodejsnchar')
c1.execute('create database testnodejsnchar')
c1.execute('use testnodejsnchar');
c1.execute('create table tb (ts timestamp, value float, text binary(200))')
c1.execute("insert into tb values('2021-06-10 00:00:00', 24.7, '中文10000000000000000000000');") -
c1.execute('insert into tb values(1623254400150, 24.7, NULL);')
c1.execute('import into tb values(1623254400300, 24.7, "中文3中文10000000000000000000000中文10000000000000000000000中文10000000000000000000000中文10000000000000000000000");')
sql = 'select * from tb;'
console.log('*******************************************')
c1.execute(sql);
data = c1.fetchall();
console.log(data)
//check data about insert data
checkData(data, 0, 2, '中文10000000000000000000000')
checkData(data, 1, 2, null)
checkData(data, 2, 2, '中文3中文10000000000000000000000中文10000000000000000000000中文10000000000000000000000中文10000000000000000000000')
\ No newline at end of file
......@@ -166,7 +166,6 @@ int32_t dnodeInitSystem() {
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump();
taosInitNotes();
dnodeInitTmr();
if (dnodeCreateDir(tsLogDir) < 0) {
......@@ -188,6 +187,8 @@ int32_t dnodeInitSystem() {
dInfo("start to initialize TDengine");
taosInitNotes();
if (dnodeInitComponents() != 0) {
return -1;
}
......
......@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) {
}
} else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1;
} else if (strcmp(argv[i], "--force-keep-file") == 0) {
tsdbForceKeepFile = true;
} else if (strcmp(argv[i], "--compact-mnode-wal") == 0) {
tsCompactMnodeWal = 1;
} else if (strcmp(argv[i], "-V") == 0) {
......
......@@ -32,7 +32,7 @@ typedef enum {
typedef struct {
int8_t msgType;
int8_t sver;
int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility
int8_t reserved[2];
int32_t len;
uint64_t version;
......
......@@ -72,12 +72,13 @@ static int32_t shellShowTables(TAOS *con, char *db) {
int32_t tbIndex = tbNum++;
if (tbMallocNum < tbNum) {
tbMallocNum = (tbMallocNum * 2 + 1);
tbNames = realloc(tbNames, tbMallocNum * sizeof(char *));
if (tbNames == NULL) {
char** tbNames1 = realloc(tbNames, tbMallocNum * sizeof(char *));
if (tbNames1 == NULL) {
fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
break;
}
tbNames = tbNames1;
}
tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN);
......
......@@ -64,6 +64,10 @@ void printHelp() {
exit(EXIT_SUCCESS);
}
char DARWINCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
char g_password[MAX_PASSWORD_SIZE];
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
wordexp_t full_path;
for (int i = 1; i < argc; i++) {
......@@ -77,10 +81,21 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
}
}
// for password
else if (strcmp(argv[i], "-p") == 0) {
arguments->is_use_passwd = true;
else if (strncmp(argv[i], "-p", 2) == 0) {
strcpy(tsOsName, "Darwin");
printf(DARWINCLIENT_VERSION, tsOsName, taos_get_client_info());
if (strlen(argv[i]) == 2) {
printf("Enter password: ");
if (scanf("%s", g_password) > 1) {
fprintf(stderr, "password read error\n");
}
getchar();
} else {
tstrncpy(g_password, (char *)(argv[i] + 2), MAX_PASSWORD_SIZE);
}
arguments->password = g_password;
}
// for management port
// for management port
else if (strcmp(argv[i], "-P") == 0) {
if (i < argc - 1) {
arguments->port = atoi(argv[++i]);
......@@ -98,7 +113,7 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= TSDB_FILENAME_LEN) {
fprintf(stderr, "config file path: %s overflow max len %d\n", argv[i], TSDB_FILENAME_LEN - 1);
exit(EXIT_FAILURE);
......
......@@ -65,7 +65,15 @@ extern TAOS *taos_connect_auth(const char *ip, const char *user, const char *aut
*/
TAOS *shellInit(SShellArguments *_args) {
printf("\n");
printf(CLIENT_VERSION, tsOsName, taos_get_client_info());
if (!_args->is_use_passwd) {
#ifdef TD_WINDOWS
strcpy(tsOsName, "Windows");
#elif defined(TD_DARWIN)
strcpy(tsOsName, "Darwin");
#endif
printf(CLIENT_VERSION, tsOsName, taos_get_client_info());
}
fflush(stdout);
// set options before initializing
......@@ -73,9 +81,7 @@ TAOS *shellInit(SShellArguments *_args) {
taos_options(TSDB_OPTION_TIMEZONE, _args->timezone);
}
if (_args->is_use_passwd) {
if (_args->password == NULL) _args->password = getpass("Enter password: ");
} else {
if (!_args->is_use_passwd) {
_args->password = TSDB_DEFAULT_PASS;
}
......@@ -170,7 +176,7 @@ static int32_t shellRunSingleCommand(TAOS *con, char *command) {
system("clear");
return 0;
}
if (regex_match(command, "^[\t ]*set[ \t]+max_binary_display_width[ \t]+(default|[1-9][0-9]*)[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
strtok(command, " \t");
strtok(NULL, " \t");
......@@ -182,7 +188,7 @@ static int32_t shellRunSingleCommand(TAOS *con, char *command) {
}
return 0;
}
if (regex_match(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
/* If source file. */
char *c_ptr = strtok(command, " ;");
......@@ -247,7 +253,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
esc = false;
continue;
}
if (c == '\\') {
esc = true;
continue;
......@@ -336,8 +342,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
}
if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands
int error_no = 0;
int error_no = 0;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
if (numOfRows < 0) {
atomic_store_64(&result, 0);
......@@ -530,7 +536,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* tres) {
fprintf(fp, "%s", fields[col].name);
}
fputc('\n', fp);
int numOfRows = 0;
do {
int32_t* length = taos_fetch_lengths(tres);
......@@ -716,7 +722,7 @@ static int verticalPrintResult(TAOS_RES* tres) {
int numOfRows = 0;
int showMore = 1;
do {
do {
if (numOfRows < resShowMaxNum) {
printf("*************************** %d.row ***************************\n", numOfRows + 1);
......@@ -851,7 +857,7 @@ static int horizontalPrintResult(TAOS_RES* tres) {
int numOfRows = 0;
int showMore = 1;
do {
int32_t* length = taos_fetch_lengths(tres);
if (numOfRows < resShowMaxNum) {
......@@ -867,7 +873,7 @@ static int horizontalPrintResult(TAOS_RES* tres) {
printf("[You can add limit statement to show more or redirect results to specific file to get all.]\n");
showMore = 0;
}
numOfRows++;
row = taos_fetch_row(tres);
} while(row != NULL);
......@@ -909,7 +915,7 @@ void read_history() {
if (errno != ENOENT) {
fprintf(stderr, "Failed to open file %s, reason:%s\n", f_history, strerror(errno));
}
#endif
#endif
return;
}
......@@ -934,9 +940,9 @@ void write_history() {
FILE *f = fopen(f_history, "w");
if (f == NULL) {
#ifndef WINDOWS
#ifndef WINDOWS
fprintf(stderr, "Failed to open file %s for write, reason:%s\n", f_history, strerror(errno));
#endif
#endif
return;
}
......@@ -982,13 +988,13 @@ void source_file(TAOS *con, char *fptr) {
/*
if (access(fname, F_OK) != 0) {
fprintf(stderr, "ERROR: file %s is not exist\n", fptr);
wordfree(&full_path);
free(cmd);
return;
}
*/
FILE *f = fopen(fname, "r");
if (f == NULL) {
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
......
......@@ -34,7 +34,7 @@ static char doc[] = "";
static char args_doc[] = "";
static struct argp_option options[] = {
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
{"password", 'p', "PASSWORD", OPTION_ARG_OPTIONAL, "The password to use when connecting to the server."},
{"password", 'p', 0, 0, "The password to use when connecting to the server."},
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
......@@ -63,8 +63,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
arguments->host = arg;
break;
case 'p':
arguments->is_use_passwd = true;
if (arg) arguments->password = arg;
break;
case 'P':
if (arg) {
......@@ -160,12 +158,41 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Our argp parser. */
static struct argp argp = {options, parse_opt, args_doc, doc};
char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
char g_password[MAX_PASSWORD_SIZE];
static void parse_password(
int argc, char *argv[], SShellArguments *arguments) {
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-p", 2) == 0) {
strcpy(tsOsName, "Linux");
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
if (strlen(argv[i]) == 2) {
printf("Enter password: ");
if (scanf("%20s", g_password) > 1) {
fprintf(stderr, "password reading error\n");
}
getchar();
} else {
tstrncpy(g_password, (char *)(argv[i] + 2), MAX_PASSWORD_SIZE);
}
arguments->password = g_password;
arguments->is_use_passwd = true;
}
}
}
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
argp_program_version = verType;
if (argc > 1) {
parse_password(argc, argv, arguments);
}
argp_parse(&argp, argc, argv, 0, 0, arguments);
if (arguments->abort) {
#ifndef _ALPINE
......
......@@ -71,7 +71,9 @@ int checkVersion() {
// Global configurations
SShellArguments args = {
.host = NULL,
#ifndef TD_WINDOWS
.password = NULL,
#endif
.user = NULL,
.database = NULL,
.timezone = NULL,
......
......@@ -19,6 +19,9 @@
extern char configDir[];
char WINCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
void printVersion() {
printf("version: %s\n", version);
}
......@@ -61,6 +64,8 @@ void printHelp() {
exit(EXIT_SUCCESS);
}
char g_password[MAX_PASSWORD_SIZE];
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
for (int i = 1; i < argc; i++) {
// for host
......@@ -73,11 +78,20 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
}
}
// for password
else if (strcmp(argv[i], "-p") == 0) {
arguments->is_use_passwd = true;
if (i < argc - 1 && argv[i + 1][0] != '-') {
arguments->password = argv[++i];
}
else if (strncmp(argv[i], "-p", 2) == 0) {
arguments->is_use_passwd = true;
strcpy(tsOsName, "Windows");
printf(WINCLIENT_VERSION, tsOsName, taos_get_client_info());
if (strlen(argv[i]) == 2) {
printf("Enter password: ");
if (scanf("%s", g_password) > 1) {
fprintf(stderr, "password read error!\n");
}
getchar();
} else {
tstrncpy(g_password, (char *)(argv[i] + 2), MAX_PASSWORD_SIZE);
}
arguments->password = g_password;
}
// for management port
else if (strcmp(argv[i], "-P") == 0) {
......@@ -104,7 +118,7 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (i < argc - 1) {
char *tmp = argv[++i];
if (strlen(tmp) >= TSDB_FILENAME_LEN) {
fprintf(stderr, "config file path: %s overflow max len %d\n", tmp, TSDB_FILENAME_LEN - 1);
......@@ -265,7 +279,7 @@ void *shellLoopQuery(void *arg) {
if (command == NULL) return NULL;
int32_t err = 0;
do {
memset(command, 0, MAX_COMMAND_SIZE);
shellPrintPrompt();
......@@ -274,7 +288,7 @@ void *shellLoopQuery(void *arg) {
err = shellReadCommand(con, command);
if (err) {
break;
}
}
} while (shellRunCommand(con, command) == 0);
return NULL;
......
此差异已折叠。
......@@ -206,9 +206,9 @@ static struct argp_option options[] = {
{"host", 'h', "HOST", 0, "Server host dumping data from. Default is localhost.", 0},
{"user", 'u', "USER", 0, "User name used to connect to server. Default is root.", 0},
#ifdef _TD_POWER_
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is powerdb.", 0},
{"password", 'p', 0, 0, "User password to connect to server. Default is powerdb.", 0},
#else
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is taosdata.", 0},
{"password", 'p', 0, 0, "User password to connect to server. Default is taosdata.", 0},
#endif
{"port", 'P', "PORT", 0, "Port to connect", 0},
{"cversion", 'v', "CVERION", 0, "client version", 0},
......@@ -248,12 +248,14 @@ static struct argp_option options[] = {
{0}
};
#define MAX_PASSWORD_SIZE 20
/* Used by main to communicate with parse_opt. */
typedef struct arguments {
// connection option
char *host;
char *user;
char *password;
char password[MAX_PASSWORD_SIZE];
uint16_t port;
char cversion[12];
uint16_t mysqlFlag;
......@@ -376,7 +378,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
g_args.user = arg;
break;
case 'p':
g_args.password = arg;
break;
case 'P':
g_args.port = atoi(arg);
......@@ -554,6 +555,25 @@ static void parse_precision_first(
}
}
static void parse_password(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-p", 2) == 0) {
if (strlen(argv[i]) == 2) {
printf("Enter password: ");
taosSetConsoleEcho(false);
if(scanf("%20s", arguments->password) > 1) {
errorPrint("%s() LN%d, password read error!\n", __func__, __LINE__);
}
taosSetConsoleEcho(true);
} else {
tstrncpy(arguments->password, (char *)(argv[i] + 2), MAX_PASSWORD_SIZE);
}
argv[i] = "";
}
}
}
static void parse_timestamp(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
......@@ -616,9 +636,10 @@ int main(int argc, char *argv[]) {
int ret = 0;
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
if (argc > 2) {
if (argc > 1) {
parse_precision_first(argc, argv, &g_args);
parse_timestamp(argc, argv, &g_args);
parse_password(argc, argv, &g_args);
}
argp_parse(&argp, argc, argv, 0, 0, &g_args);
......
......@@ -18,6 +18,7 @@
#include <stdio.h>
#include <stdlib.h>
#if defined(WINDOWS)
int main(int argc, char *argv[]) {
printf("welcome to use taospack tools v1.3 for windows.\n");
......@@ -148,7 +149,10 @@ float* read_float(const char* inFile, int* pcount){
//printf(" buff=%s float=%.50f \n ", buf, floats[fi]);
if ( ++fi == malloc_cnt ) {
malloc_cnt += 100000;
floats = realloc(floats, malloc_cnt*sizeof(float));
float* floats1 = realloc(floats, malloc_cnt*sizeof(float));
if(floats1 == NULL)
break;
floats = floats1;
}
memset(buf, 0, sizeof(buf));
}
......@@ -601,7 +605,6 @@ void test_threadsafe_double(int thread_count){
}
void unitTestFloat() {
float ft1 [] = {1.11, 2.22, 3.333};
......@@ -662,7 +665,50 @@ void unitTestFloat() {
free(ft2);
free(buff);
free(output);
}
void leakFloat() {
int cnt = sizeof(g_ft1)/sizeof(float);
float* floats = g_ft1;
int algorithm = 2;
// compress
const char* input = (const char*)floats;
int input_len = cnt * sizeof(float);
int output_len = input_len + 1024;
char* output = (char*) malloc(output_len);
char* buff = (char*) malloc(input_len);
int buff_len = input_len;
int ret_len = 0;
ret_len = tsCompressFloatLossy(input, input_len, cnt, output, output_len, algorithm, buff, buff_len);
if(ret_len == 0) {
printf(" compress float error.\n");
free(buff);
free(output);
return ;
}
float* ft2 = (float*)malloc(input_len);
ret_len = tsDecompressFloatLossy(output, ret_len, cnt, (char*)ft2, input_len, algorithm, buff, buff_len);
if(ret_len == 0) {
printf(" decompress float error.\n");
}
free(ft2);
free(buff);
free(output);
}
void leakTest(){
for(int i=0; i< 90000000000000; i++){
if(i%10000==0)
printf(" ---------- %d ---------------- \n", i);
leakFloat();
}
}
#define DB_CNT 500
......@@ -689,7 +735,7 @@ extern char Compressor [];
// ----------------- main ----------------------
//
int main(int argc, char *argv[]) {
printf("welcome to use taospack tools v1.3\n");
printf("welcome to use taospack tools v1.6\n");
//printf(" sizeof(int)=%d\n", (int)sizeof(int));
//printf(" sizeof(long)=%d\n", (int)sizeof(long));
......@@ -753,6 +799,9 @@ int main(int argc, char *argv[]) {
if(strcmp(argv[1], "-mem") == 0) {
memTest();
}
else if(strcmp(argv[1], "-leak") == 0) {
leakTest();
}
}
else{
unitTestFloat();
......
......@@ -2921,10 +2921,11 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
(*totalMallocLen) *= 2;
}
pMultiMeta = realloc(pMultiMeta, *totalMallocLen);
if (pMultiMeta == NULL) {
SMultiTableMeta* pMultiMeta1 = realloc(pMultiMeta, *totalMallocLen);
if (pMultiMeta1 == NULL) {
return NULL;
}
pMultiMeta = pMultiMeta1;
}
return pMultiMeta;
......
......@@ -24,6 +24,8 @@ void* taosLoadDll(const char *filename);
void* taosLoadSym(void* handle, char* name);
void taosCloseDll(void *handle);
int taosSetConsoleEcho(bool on);
#ifdef __cplusplus
}
#endif
......
......@@ -29,4 +29,30 @@ void* taosLoadSym(void* handle, char* name) {
void taosCloseDll(void *handle) {
}
int taosSetConsoleEcho(bool on)
{
#if 0
#define ECHOFLAGS (ECHO | ECHOE | ECHOK | ECHONL)
int err;
struct termios term;
if (tcgetattr(STDIN_FILENO, &term) == -1) {
perror("Cannot get the attribution of the terminal");
return -1;
}
if (on)
term.c_lflag|=ECHOFLAGS;
else
term.c_lflag &=~ECHOFLAGS;
err = tcsetattr(STDIN_FILENO,TCSAFLUSH,&term);
if (err == -1 && err == EINTR) {
perror("Cannot set the attribution of the terminal");
return -1;
}
#endif
return 0;
}
......@@ -504,8 +504,9 @@ void * taosTRealloc(void *ptr, size_t size) {
void * tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t);
tptr = realloc(tptr, tsize);
if (tptr == NULL) return NULL;
void* tptr1 = realloc(tptr, tsize);
if (tptr1 == NULL) return NULL;
tptr = tptr1;
*(size_t *)tptr = size;
......
......@@ -51,4 +51,28 @@ void taosCloseDll(void *handle) {
}
}
int taosSetConsoleEcho(bool on)
{
#define ECHOFLAGS (ECHO | ECHOE | ECHOK | ECHONL)
int err;
struct termios term;
if (tcgetattr(STDIN_FILENO, &term) == -1) {
perror("Cannot get the attribution of the terminal");
return -1;
}
if (on)
term.c_lflag|=ECHOFLAGS;
else
term.c_lflag &=~ECHOFLAGS;
err = tcsetattr(STDIN_FILENO,TCSAFLUSH,&term);
if (err == -1 && err == EINTR) {
perror("Cannot set the attribution of the terminal");
return -1;
}
return 0;
}
......@@ -81,11 +81,13 @@ int32_t getstr(char **lineptr, size_t *n, FILE *stream, char terminator, int32_t
*n += MIN_CHUNK;
nchars_avail = (int32_t)(*n + *lineptr - read_pos);
*lineptr = realloc(*lineptr, *n);
if (!*lineptr) {
char* lineptr1 = realloc(*lineptr, *n);
if (!lineptr1) {
errno = ENOMEM;
return -1;
}
*lineptr = lineptr1;
read_pos = *n - nchars_avail + *lineptr;
assert((*lineptr + *n) == (read_pos + nchars_avail));
}
......
......@@ -30,3 +30,17 @@ void taosCloseDll(void *handle) {
}
int taosSetConsoleEcho(bool on)
{
HANDLE hStdin = GetStdHandle(STD_INPUT_HANDLE);
DWORD mode = 0;
GetConsoleMode(hStdin, &mode );
if (on) {
mode |= ENABLE_ECHO_INPUT;
} else {
mode &= ~ENABLE_ECHO_INPUT;
}
SetConsoleMode(hStdin, mode);
return 0;
}
......@@ -101,13 +101,17 @@ char *httpGetStatusDesc(int32_t statusCode) {
}
static void httpCleanupString(HttpString *str) {
free(str->str);
str->str = NULL;
str->pos = 0;
str->size = 0;
if (str->str) {
free(str->str);
str->str = NULL;
str->pos = 0;
str->size = 0;
}
}
static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
char *new_str = NULL;
if (str->size == 0) {
str->pos = 0;
str->size = len + 1;
......@@ -115,7 +119,16 @@ static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
} else if (str->pos + len + 1 >= str->size) {
str->size += len;
str->size *= 4;
str->str = realloc(str->str, str->size);
new_str = realloc(str->str, str->size);
if (new_str == NULL && str->str) {
// if str->str was not NULL originally,
// the old allocated memory was left unchanged,
// see man 3 realloc
free(str->str);
}
str->str = new_str;
} else {
}
......@@ -317,7 +330,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const
static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
HttpContext *pContext = parser->pContext;
HttpString * buf = &parser->body;
HttpString *buf = &parser->body;
if (parser->parseCode != TSDB_CODE_SUCCESS) return -1;
if (buf->size <= 0) {
......@@ -326,6 +339,7 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
}
int32_t newSize = buf->pos + len + 1;
char *newStr = NULL;
if (newSize >= buf->size) {
if (buf->size >= HTTP_BUFFER_SIZE) {
httpError("context:%p, fd:%d, failed parse body, exceeding buffer size %d", pContext, pContext->fd, buf->size);
......@@ -336,7 +350,12 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
newSize = MAX(newSize, HTTP_BUFFER_INIT);
newSize *= 4;
newSize = MIN(newSize, HTTP_BUFFER_SIZE);
buf->str = realloc(buf->str, newSize);
newStr = realloc(buf->str, newSize);
if (newStr == NULL && buf->str) {
free(buf->str);
}
buf->str = newStr;
buf->size = newSize;
if (buf->str == NULL) {
......@@ -374,13 +393,20 @@ static HTTP_PARSER_STATE httpTopStack(HttpParser *parser) {
static int32_t httpPushStack(HttpParser *parser, HTTP_PARSER_STATE state) {
HttpStack *stack = &parser->stacks;
int8_t *newStacks = NULL;
if (stack->size == 0) {
stack->pos = 0;
stack->size = 32;
stack->stacks = malloc(stack->size * sizeof(int8_t));
} else if (stack->pos + 1 > stack->size) {
stack->size *= 2;
stack->stacks = realloc(stack->stacks, stack->size * sizeof(int8_t));
newStacks = realloc(stack->stacks, stack->size * sizeof(int8_t));
if (newStacks == NULL && stack->stacks) {
free(stack->stacks);
}
stack->stacks = newStacks;
} else {
}
......
......@@ -188,13 +188,17 @@ bool httpMallocMultiCmds(HttpContext *pContext, int32_t cmdSize, int32_t bufferS
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize) {
HttpSqlCmds *multiCmds = pContext->multiCmds;
if (cmdSize > HTTP_MAX_CMD_SIZE) {
if (cmdSize <= 0 || cmdSize > HTTP_MAX_CMD_SIZE) {
httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user,
cmdSize, HTTP_MAX_CMD_SIZE);
return false;
}
multiCmds->cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd));
HttpSqlCmd *new_cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd));
if (new_cmds == NULL && multiCmds->cmds) {
free(multiCmds->cmds);
}
multiCmds->cmds = new_cmds;
if (multiCmds->cmds == NULL) {
httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize);
return false;
......@@ -208,13 +212,17 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize) {
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int32_t bufferSize) {
HttpSqlCmds *multiCmds = pContext->multiCmds;
if (bufferSize > HTTP_MAX_BUFFER_SIZE) {
if (bufferSize <= 0 || bufferSize > HTTP_MAX_BUFFER_SIZE) {
httpError("context:%p, fd:%d, user:%s, mulitcmd buffer size:%d large then %d", pContext, pContext->fd,
pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE);
return false;
}
multiCmds->buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize);
char *new_buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize);
if (new_buffer == NULL && multiCmds->buffer) {
free(multiCmds->buffer);
}
multiCmds->buffer = new_buffer;
if (multiCmds->buffer == NULL) {
httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user, bufferSize);
return false;
......
......@@ -333,6 +333,7 @@ enum OPERATOR_TYPE_E {
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
};
typedef struct SOperatorInfo {
......@@ -417,7 +418,6 @@ typedef struct STableScanInfo {
int32_t *rowCellInfoOffset;
SExprInfo *pExpr;
SSDataBlock block;
bool loadExternalRows; // load external rows (prev & next rows)
int32_t numOfOutput;
int64_t elapsedTime;
......@@ -510,13 +510,21 @@ typedef struct SStateWindowOperatorInfo {
bool reptScan;
} SStateWindowOperatorInfo ;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo {
SHashObj *pSet;
SSDataBlock *pRes;
bool recordNullVal; //has already record the null value, no need to try again
int64_t threshold;
int64_t outputCapacity;
int32_t colIndex;
int32_t totalBytes;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
struct SGlobalMerger;
......@@ -541,6 +549,13 @@ typedef struct SMultiwayMergeInfo {
SArray *udfInfo;
} SMultiwayMergeInfo;
// todo support the disk-based sort
typedef struct SOrderOperatorInfo {
int32_t colIndex;
int32_t order;
SSDataBlock *pDataBlock;
} SOrderOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
......@@ -570,6 +585,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -220,6 +220,8 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
void tOrderDescDestroy(tOrderDescriptor *pDesc);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t srcStartRows,
int32_t numOfRowsToWrite, int32_t srcCapacity);
......
......@@ -39,7 +39,6 @@
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1)
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr);
......@@ -60,6 +59,7 @@ SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
......@@ -70,7 +70,7 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage*
int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
......
此差异已折叠。
此差异已折叠。
......@@ -1102,3 +1102,57 @@ void tOrderDescDestroy(tOrderDescriptor *pDesc) {
destroyColumnModel(pDesc->pColumnModel);
tfree(pDesc);
}
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) {
assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols);
int32_t bytes = pSchema[index].bytes;
int32_t size = bytes + sizeof(int32_t);
char* buf = calloc(1, size * numOfRows);
for(int32_t i = 0; i < numOfRows; ++i) {
char* dest = buf + size * i;
memcpy(dest, ((char*) pCols[index]) + bytes * i, bytes);
*(int32_t*)(dest+bytes) = i;
}
qsort(buf, numOfRows, size, compareFn);
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes1 = pSchema[i].bytes;
if (i == index) {
for(int32_t j = 0; j < numOfRows; ++j){
char* src = buf + (j * size);
char* dest = ((char*)pCols[i]) + (j * bytes1);
memcpy(dest, src, bytes1);
}
} else {
// make sure memory buffer is enough
if (prevLength < bytes1) {
char *tmp = realloc(p, bytes1 * numOfRows);
assert(tmp);
p = tmp;
prevLength = bytes1;
}
memcpy(p, pCols[i], bytes1 * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = ((char*)pCols[i]) + bytes1 * j;
int32_t newPos = *(int32_t*)(buf + (j * size) + bytes);
char* src = p + (newPos * bytes1);
memcpy(dest, src, bytes1);
}
}
}
tfree(buf);
tfree(p);
}
\ No newline at end of file
......@@ -206,6 +206,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else {
assert(pFillInfo->currentKey == ts);
initBeforeAfterDataBuf(pFillInfo, prev);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
initBeforeAfterDataBuf(pFillInfo, next);
++pFillInfo->index;
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
--pFillInfo->index;
}
// assign rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
......@@ -227,6 +233,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) {
assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
} else {
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
}
......
......@@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->comparFn = getKeyComparFunc(pBucket->type);
pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -296,7 +296,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
return -1;
}
pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
pComph->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pComph->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册