提交 48d307b4 编写于 作者: D dapan1121

Merge branch 'master' into hotfix/TD-11584

...@@ -70,7 +70,6 @@ def pre_test(){ ...@@ -70,7 +70,6 @@ def pre_test(){
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git clean -dfx git clean -dfx
git ls-files --stage | grep 160000 | awk '{print $4}' | xargs git rm --cached
git submodule update --init --recursive git submodule update --init --recursive
cd ${WK} cd ${WK}
git reset --hard HEAD~10 git reset --hard HEAD~10
...@@ -144,7 +143,6 @@ def pre_test_noinstall(){ ...@@ -144,7 +143,6 @@ def pre_test_noinstall(){
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git clean -dfx git clean -dfx
git ls-files --stage | grep 160000 | awk '{print $4}' | xargs git rm --cached
git submodule update --init --recursive git submodule update --init --recursive
cd ${WK} cd ${WK}
git reset --hard HEAD~10 git reset --hard HEAD~10
...@@ -215,7 +213,6 @@ def pre_test_ningsi(){ ...@@ -215,7 +213,6 @@ def pre_test_ningsi(){
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git clean -dfx git clean -dfx
git ls-files --stage | grep 160000 | awk '{print $4}' | xargs git rm --cached
git submodule update --init --recursive git submodule update --init --recursive
cd ${WK} cd ${WK}
git reset --hard HEAD~10 git reset --hard HEAD~10
......
...@@ -24,19 +24,19 @@ ELSEIF (TD_WINDOWS) ...@@ -24,19 +24,19 @@ ELSEIF (TD_WINDOWS)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver) INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
IF (TD_POWER) IF (TD_POWER)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .) INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/power.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/power.exe DESTINATION .)
ELSEIF (TD_TQ) ELSEIF (TD_TQ)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .) INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/tq.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/tq.exe DESTINATION .)
ELSEIF (TD_PRO) ELSEIF (TD_PRO)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .) INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/prodbc.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/prodbc.exe DESTINATION .)
ELSEIF (TD_KH) ELSEIF (TD_KH)
INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/kinghistorian.cfg DESTINATION cfg) INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/kinghistorian.cfg DESTINATION cfg)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/khclient.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/khclient.exe DESTINATION .)
ELSEIF (TD_JH) ELSEIF (TD_JH)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .) INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/jh_taos.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/jh_taos.exe DESTINATION .)
ELSE () ELSE ()
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/go DESTINATION connector) INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/go DESTINATION connector)
...@@ -44,7 +44,8 @@ ELSEIF (TD_WINDOWS) ...@@ -44,7 +44,8 @@ ELSEIF (TD_WINDOWS)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/python DESTINATION connector) INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/python DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/C\# DESTINATION connector) INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/C\# DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/tests/examples DESTINATION .) INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/tests/examples DESTINATION .)
INSTALL(FILES ${TD_COMMUNITY_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosdemo.exe DESTINATION .) INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosdemo.exe DESTINATION .)
ENDIF () ENDIF ()
......
avro @ a1fce29d
Subproject commit a1fce29d9675b4dd95dfee9db32cc505d0b2227c
googletest @ e2239ee6
Subproject commit e2239ee6043f73722e7aa812a459f54a28552929
rocksdb @ c3034fce
Subproject commit c3034fce329017036c807e01261729bfc11a5d62
zlib @ cacf7f1d
Subproject commit cacf7f1d4e3d44d871b605da3b647f07d718623f
#!/bin/bash
#Startup script for the nginx Web Server
# chkconfig: 2345 99 01
# description: Nginx For TDengine Service.
#
#
### BEGIN INIT INFO
# Provides: nginx
# Required-Start: $local_fs $network $syslog
# Required-Stop: $local_fs $network $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Starts nginx
# Description: Starts nginx.
### END INIT INFO
nginx=/usr/local/nginxd/sbin/nginx
case $1 in
start)
echo -n "Starting Nginx"
$nginx
echo " done."
;;
stop)
echo -n "Stopping Nginx"
$nginx -s stop
echo " done."
;;
test)
$nginx -t
echo "Success."
;;
reload)
echo -n "Reloading Nginx"
$nginx -s reload
echo " done."
;;
restart)
$nginx -s reload
echo "reload done."
;;
*)
echo "Usage: $0 {start|restart|reload|stop|test|show}"
;;
esac
#!/bin/bash
#Startup script for the nginx Web Server
# chkconfig: 2345 99 01
#
#
### BEGIN INIT INFO
# Provides: nginx
# Provides: nginx
# Required-Start: $local_fs $network $syslog
# Required-Stop: $local_fs $network $syslog
# Short-Description: Starts nginx
# Description: Starts nginx.
### END INIT INFO
nginx=/usr/local/nginxd/sbin/nginx
case $1 in
start)
echo -n "Starting Nginx"
$nginx
echo " done."
;;
stop)
echo -n "Stopping Nginx"
$nginx -s stop
echo " done."
;;
test)
$nginx -t
echo "Success."
;;
reload)
echo -n "Reloading Nginx"
$nginx -s reload
echo " done."
;;
restart)
$nginx -s reload
echo "reload done."
;;
*)
echo "Usage: $0 {start|restart|reload|stop|test|show}"
;;
esac
...@@ -582,6 +582,26 @@ function clean_service_on_sysvinit() { ...@@ -582,6 +582,26 @@ function clean_service_on_sysvinit() {
${csudo} rm -f ${service_config_dir}/taosd || : ${csudo} rm -f ${service_config_dir}/taosd || :
${csudo} rm -f ${service_config_dir}/tarbitratord || : ${csudo} rm -f ${service_config_dir}/tarbitratord || :
if [ "$verMode" == "cluster" ]; then\
if pidof nginxd ; then
${csudo} service nginxd stop || :
fi
if ((${initd_mod}==1)); then
if [ -e ${service_config_dir}/nginxd ]; then
${csudo} chkconfig --del nginxd || :
fi
elif ((${initd_mod}==2)); then
if [ -e ${service_config_dir}/nginxd ]; then
${csudo} insserv -r nginxd || :
fi
elif ((${initd_mod}==3)); then
if [ -e ${service_config_dir}/nginxd ]; then
${csudo} update-rc.d -f nginxd remove || :
fi
fi
${csudo} rm -f ${service_config_dir}/nginxd || :
fi
if $(which init &> /dev/null); then if $(which init &> /dev/null); then
${csudo} init q || : ${csudo} init q || :
fi fi
...@@ -598,11 +618,19 @@ function install_service_on_sysvinit() { ...@@ -598,11 +618,19 @@ function install_service_on_sysvinit() {
${csudo} cp ${script_dir}/init.d/taosd.deb ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp ${script_dir}/init.d/taosd.deb ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord ${csudo} cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord
${csudo} cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord ${csudo} cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord
if [ "$verMode" == "cluster" ]; then
${csudo} cp -f ${script_dir}/init.d/nginxd.deb ${install_main_dir}/init.d/nginxd
${csudo} cp ${script_dir}/init.d/nginxd.deb ${service_config_dir}/nginxd && ${csudo} chmod a+x ${service_config_dir}/nginxd
fi
elif ((${os_type}==2)); then elif ((${os_type}==2)); then
${csudo} cp -f ${script_dir}/init.d/taosd.rpm ${install_main_dir}/init.d/taosd ${csudo} cp -f ${script_dir}/init.d/taosd.rpm ${install_main_dir}/init.d/taosd
${csudo} cp ${script_dir}/init.d/taosd.rpm ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp ${script_dir}/init.d/taosd.rpm ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord ${csudo} cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord
${csudo} cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord ${csudo} cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord
if [ "$verMode" == "cluster" ]; then
${csudo} cp -f ${script_dir}/init.d/nginxd.rpm ${install_main_dir}/init.d/nginxd
${csudo} cp ${script_dir}/init.d/nginxd.rpm ${service_config_dir}/nginxd && ${csudo} chmod a+x ${service_config_dir}/nginxd
fi
fi fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start" #restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
...@@ -613,14 +641,26 @@ function install_service_on_sysvinit() { ...@@ -613,14 +641,26 @@ function install_service_on_sysvinit() {
${csudo} chkconfig --level 2345 taosd on || : ${csudo} chkconfig --level 2345 taosd on || :
${csudo} chkconfig --add tarbitratord || : ${csudo} chkconfig --add tarbitratord || :
${csudo} chkconfig --level 2345 tarbitratord on || : ${csudo} chkconfig --level 2345 tarbitratord on || :
if [ "$verMode" == "cluster" ]; then
${csudo} chkconfig --add nginxd || :
${csudo} chkconfig --level 0123456 nginxd on || :
${csudo} service nginxd start
fi
elif ((${initd_mod}==2)); then elif ((${initd_mod}==2)); then
${csudo} insserv taosd || : ${csudo} insserv taosd || :
${csudo} insserv -d taosd || : ${csudo} insserv -d taosd || :
${csudo} insserv tarbitratord || : ${csudo} insserv tarbitratord || :
${csudo} insserv -d tarbitratord || : ${csudo} insserv -d tarbitratord || :
if [ "$verMode" == "cluster" ]; then
${csudo} insserv nginxd || :
${csudo} insserv -d nginxd || :
fi
elif ((${initd_mod}==3)); then elif ((${initd_mod}==3)); then
${csudo} update-rc.d taosd defaults || : ${csudo} update-rc.d taosd defaults || :
${csudo} update-rc.d tarbitratord defaults || : ${csudo} update-rc.d tarbitratord defaults || :
if [ "$verMode" == "cluster" ]; then
${csudo} update-rc.d nginxd defaults || :
fi
fi fi
} }
...@@ -779,7 +819,7 @@ vercomp () { ...@@ -779,7 +819,7 @@ vercomp () {
function is_version_compatible() { function is_version_compatible() {
curr_version=`ls ${script_dir}/driver/libtaos.so* |cut -d '.' -f 3-6` curr_version=`ls ${script_dir}/driver/libtaos.so* | awk -F 'libtaos.so.' '{print $2}'`
if [ -f ${script_dir}/driver/vercomp.txt ]; then if [ -f ${script_dir}/driver/vercomp.txt ]; then
min_compatible_version=`cat ${script_dir}/driver/vercomp.txt` min_compatible_version=`cat ${script_dir}/driver/vercomp.txt`
......
...@@ -63,6 +63,8 @@ init_file_deb=${script_dir}/../deb/taosd ...@@ -63,6 +63,8 @@ init_file_deb=${script_dir}/../deb/taosd
init_file_rpm=${script_dir}/../rpm/taosd init_file_rpm=${script_dir}/../rpm/taosd
init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord
init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord
init_file_nginx_deb=${script_dir}/../deb/nginxd
init_file_nginx_rpm=${script_dir}/../rpm/nginxd
# make directories. # make directories.
mkdir -p ${install_dir} mkdir -p ${install_dir}
...@@ -73,6 +75,8 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/taos ...@@ -73,6 +75,8 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/taos
mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/taosd.rpm mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/taosd.rpm
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || : mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || : mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || :
mkdir -p ${install_dir}/init.d && cp ${init_file_nginx_deb} ${install_dir}/init.d/nginxd.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_nginx_rpm} ${install_dir}/init.d/nginxd.rpm || :
if [ -f ${build_dir}/bin/jemalloc-config ]; then if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3} mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
......
...@@ -358,19 +358,28 @@ static int32_t handlePassword(SSqlCmd* pCmd, SStrToken* pPwd) { ...@@ -358,19 +358,28 @@ static int32_t handlePassword(SSqlCmd* pCmd, SStrToken* pPwd) {
// validate the out put field type for "UNION ALL" subclause // validate the out put field type for "UNION ALL" subclause
static int32_t normalizeVarDataTypeLength(SSqlCmd* pCmd) { static int32_t normalizeVarDataTypeLength(SSqlCmd* pCmd) {
const char* msg1 = "columns in select clause not identical"; const char* msg1 = "columns in select clause not identical";
const char* msg2 = "too many select clause siblings, at most 100 allowed";
int32_t siblings = 0;
int32_t diffSize = 0; int32_t diffSize = 0;
// if there is only one element, the limit of clause is the limit of global result. // if there is only one element, the limit of clause is the limit of global result.
SQueryInfo* pQueryInfo1 = pCmd->pQueryInfo; SQueryInfo* pQueryInfo1 = pCmd->pQueryInfo;
SQueryInfo* pSibling = pQueryInfo1->sibling; SQueryInfo* pSibling = pQueryInfo1->sibling;
// pQueryInfo1 itself
++siblings;
while(pSibling != NULL) { while(pSibling != NULL) {
int32_t ret = tscFieldInfoCompare(&pQueryInfo1->fieldsInfo, &pSibling->fieldsInfo, &diffSize); int32_t ret = tscFieldInfoCompare(&pQueryInfo1->fieldsInfo, &pSibling->fieldsInfo, &diffSize);
if (ret != 0) { if (ret != 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
if (++siblings > 100) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
pSibling = pSibling->sibling; pSibling = pSibling->sibling;
} }
...@@ -6550,17 +6559,15 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN ...@@ -6550,17 +6559,15 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN
// todo refactor // todo refactor
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); }
}
// for projection query on super table, all queries are subqueries // for projection query on super table, all queries are subqueries
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) { !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) {
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
} }
} }
...@@ -7535,6 +7542,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -7535,6 +7542,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow"; const char* msg4 = "illegal value or data overflow";
const char* msg5 = "tags number not matched"; const char* msg5 = "tags number not matched";
const char* msg6 = "create table only from super table is allowed";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
...@@ -7575,6 +7583,10 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -7575,6 +7583,10 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return code; return code;
} }
if (!UTIL_TABLE_IS_SUPER_TABLE(pStableMetaInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
size_t valSize = taosArrayGetSize(pValList); size_t valSize = taosArrayGetSize(pValList);
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
...@@ -8685,7 +8697,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8685,7 +8697,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column"; const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg5 = "only tag query not compatible with normal column filter"; const char* msg5 = "only tag query not compatible with normal column filter";
const char* msg6 = "not support stddev/percentile/interp in the outer query yet"; const char* msg6 = "not support stddev/percentile/interp in the outer query yet";
const char* msg7 = "derivative/twa/irate requires timestamp column exists in subquery"; const char* msg7 = "derivative/twa/rate/irate/diff requires timestamp column exists in subquery";
const char* msg8 = "condition missing for join query"; const char* msg8 = "condition missing for join query";
const char* msg9 = "not support 3 level select"; const char* msg9 = "not support 3 level select";
...@@ -8769,7 +8781,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8769,7 +8781,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
SExprInfo* pExpr = tscExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) { if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE ||
f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
} }
......
...@@ -980,7 +980,11 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, ...@@ -980,7 +980,11 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
assert(pExpr->resColId < 0); if (pExpr->resColId >= 0) {
tscError("result column id underflowed: %d", pExpr->resColId);
return TSDB_CODE_TSC_RES_TOO_MANY;
}
SSqlExpr* pSqlExpr = (SSqlExpr *)(*pMsg); SSqlExpr* pSqlExpr = (SSqlExpr *)(*pMsg);
SColIndex* pIndex = &pSqlExpr->colInfo; SColIndex* pIndex = &pSqlExpr->colInfo;
...@@ -3142,15 +3146,19 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -3142,15 +3146,19 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd* pCmd2 = &pSql->rootObj->cmd; SSqlCmd* pCmd2 = &pSql->rootObj->cmd;
pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap); pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap);
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pSql->rootObj->retryReason = pSql->retryReason; pSql->rootObj->retryReason = pSql->retryReason;
SSqlObj *tmpSql = pSql->rootObj;
tscFreeSubobj(pSql->rootObj);
tfree(tmpSql->pSubs);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
char* n = strdup(name); char* n = strdup(name);
taosArrayPush(pNameList, &n); taosArrayPush(pNameList, &n);
code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); code = getMultiTableMetaFromMnode(tmpSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(pNameList, freeElem); taosArrayDestroyEx(pNameList, freeElem);
taosArrayDestroyEx(vgroupList, freeElem); taosArrayDestroyEx(vgroupList, freeElem);
......
...@@ -448,13 +448,14 @@ typedef struct { ...@@ -448,13 +448,14 @@ typedef struct {
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n) #define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE) #define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r)) #define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
#define kvRowKeys(r) POINTER_SHIFT(r, *(uint16_t *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(int16_t)))
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r)) #define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset) #define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) tfree(r) #define kvRowFree(r) tfree(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r)) #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowValLen(r) (kvRowLen(r) - TD_KV_ROW_HEAD_SIZE - sizeof(SColIdx) * kvRowNCols(r)) #define kvRowValLen(r) (kvRowLen(r) - TD_KV_ROW_HEAD_SIZE - sizeof(SColIdx) * kvRowNCols(r))
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r))) #define kvRowTKey(r) (*(TKEY *)(kvRowKeys(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r)) #define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r)) #define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
...@@ -652,7 +653,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) { ...@@ -652,7 +653,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) {
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) #define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version #define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
#define memRowSetKvVersion(r, v) (memRowKvVersion(r) = (v)) #define memRowSetKvVersion(r, v) (memRowKvVersion(r) = (v))
#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowValues(memRowKvBody(r))) #define memRowKeys(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowKeys(memRowKvBody(r)))
#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r))) #define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r)))
#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowDataBody(r)) : kvRowKey(memRowKvBody(r))) #define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowDataBody(r)) : kvRowKey(memRowKvBody(r)))
......
...@@ -223,6 +223,8 @@ extern uint32_t curRange; ...@@ -223,6 +223,8 @@ extern uint32_t curRange;
extern char Compressor[]; extern char Compressor[];
#endif #endif
// long query
extern int8_t tsDeadLockKillQuery;
typedef struct { typedef struct {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
......
...@@ -271,6 +271,9 @@ uint32_t curRange = 100; // range ...@@ -271,6 +271,9 @@ uint32_t curRange = 100; // range
char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif #endif
// long query death-lock
int8_t tsDeadLockKillQuery = 1;
int32_t (*monStartSystemFp)() = NULL; int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL; void (*monStopSystemFp)() = NULL;
void (*monExecuteSQLFp)(char *sql) = NULL; void (*monExecuteSQLFp)(char *sql) = NULL;
...@@ -1606,6 +1609,17 @@ static void doInitGlobalConfig(void) { ...@@ -1606,6 +1609,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// enable kill long query
cfg.option = "deadLockKillQuery";
cfg.ptr = &tsDeadLockKillQuery;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -66,7 +66,11 @@ public class RestfulStatement extends AbstractStatement { ...@@ -66,7 +66,11 @@ public class RestfulStatement extends AbstractStatement {
boolean result = true; boolean result = true;
if (SqlSyntaxValidator.isUseSql(sql)) { if (SqlSyntaxValidator.isUseSql(sql)) {
HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken()); String ret = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject resultJson = JSON.parseObject(ret);
if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), "sql: " + sql + ", desc: " + resultJson.getString("desc"));
}
this.database = sql.trim().replace("use", "").trim(); this.database = sql.trim().replace("use", "").trim();
this.conn.setCatalog(this.database); this.conn.setCatalog(this.database);
result = false; result = false;
...@@ -115,7 +119,7 @@ public class RestfulStatement extends AbstractStatement { ...@@ -115,7 +119,7 @@ public class RestfulStatement extends AbstractStatement {
String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken()); String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject resultJson = JSON.parseObject(result); JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) { if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc")); throw TSDBError.createSQLException(resultJson.getInteger("code"), "sql: " + sql + ", desc: " + resultJson.getString("desc"));
} }
this.resultSet = new RestfulResultSet(database, this, resultJson); this.resultSet = new RestfulResultSet(database, this, resultJson);
this.affectedRows = 0; this.affectedRows = 0;
...@@ -126,7 +130,7 @@ public class RestfulStatement extends AbstractStatement { ...@@ -126,7 +130,7 @@ public class RestfulStatement extends AbstractStatement {
String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken()); String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject jsonObject = JSON.parseObject(result); JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) { if (jsonObject.getString("status").equals("error")) {
throw TSDBError.createSQLException(jsonObject.getInteger("code"), jsonObject.getString("desc")); throw TSDBError.createSQLException(jsonObject.getInteger("code"), "sql: " + sql + ", desc: " + jsonObject.getString("desc"));
} }
this.resultSet = null; this.resultSet = null;
this.affectedRows = getAffectedRows(jsonObject); this.affectedRows = getAffectedRows(jsonObject);
...@@ -134,16 +138,13 @@ public class RestfulStatement extends AbstractStatement { ...@@ -134,16 +138,13 @@ public class RestfulStatement extends AbstractStatement {
} }
private int getAffectedRows(JSONObject jsonObject) throws SQLException { private int getAffectedRows(JSONObject jsonObject) throws SQLException {
// create ... SQLs should return 0 , and Restful result like this:
// {"status": "succ", "head": ["affected_rows"], "data": [[0]], "rows": 1}
JSONArray head = jsonObject.getJSONArray("head"); JSONArray head = jsonObject.getJSONArray("head");
if (head.size() != 1 || !"affected_rows".equals(head.getString(0))) if (head.size() != 1 || !"affected_rows".equals(head.getString(0)))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE, "invalid variable: [" + head.toJSONString() + "]");
JSONArray data = jsonObject.getJSONArray("data"); JSONArray data = jsonObject.getJSONArray("data");
if (data != null) if (data != null)
return data.getJSONArray(0).getInteger(0); return data.getJSONArray(0).getInteger(0);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE, "invalid variable: [" + jsonObject.toJSONString() + "]");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE);
} }
@Override @Override
......
...@@ -57,53 +57,33 @@ public class AuthenticationTest { ...@@ -57,53 +57,33 @@ public class AuthenticationTest {
@Ignore @Ignore
@Test @Test
public void test() { public void test() throws SQLException {
// change password // change password
try { conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=" + user + "&password=taosdata");
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=" + user + "&password=taosdata"); Statement stmt = conn.createStatement();
Statement stmt = conn.createStatement(); stmt.execute("alter user " + user + " pass '" + password + "'");
stmt.execute("alter user " + user + " pass '" + password + "'"); stmt.close();
stmt.close(); conn.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
// use new to login and execute query // use new to login and execute query
try { conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=" + user + "&password=" + password);
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=" + user + "&password=" + password); stmt = conn.createStatement();
Statement stmt = conn.createStatement(); stmt.execute("show databases");
stmt.execute("show databases"); ResultSet rs = stmt.getResultSet();
ResultSet rs = stmt.getResultSet(); ResultSetMetaData meta = rs.getMetaData();
ResultSetMetaData meta = rs.getMetaData(); while (rs.next()) {
while (rs.next()) { for (int i = 1; i <= meta.getColumnCount(); i++) {
for (int i = 1; i <= meta.getColumnCount(); i++) { System.out.print(meta.getColumnLabel(i) + ":" + rs.getString(i) + "\t");
System.out.print(meta.getColumnLabel(i) + ":" + rs.getString(i) + "\t");
}
System.out.println();
} }
} catch (SQLException e) { System.out.println();
e.printStackTrace();
} }
// change password back
try {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=" + user + "&password=" + password);
Statement stmt = conn.createStatement();
stmt.execute("alter user " + user + " pass 'taosdata'");
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Before // change password back
public void before() { conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=" + user + "&password=" + password);
try { stmt = conn.createStatement();
Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); stmt.execute("alter user " + user + " pass 'taosdata'");
} catch (ClassNotFoundException e) { stmt.close();
e.printStackTrace(); conn.close();
}
} }
} }
package com.taosdata.jdbc.cases; package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBErrorNumbers;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.sql.DriverManager; import java.sql.DriverManager;
...@@ -9,16 +7,14 @@ import java.sql.SQLException; ...@@ -9,16 +7,14 @@ import java.sql.SQLException;
public class ConnectWrongDatabaseTest { public class ConnectWrongDatabaseTest {
@Test @Test(expected = SQLException.class)
public void connect() { public void connectByJni() throws SQLException {
try { DriverManager.getConnection("jdbc:TAOS://localhost:6030/wrong_db?user=root&password=taosdata");
Class.forName("com.taosdata.jdbc.TSDBDriver"); }
DriverManager.getConnection("jdbc:TAOS://localhost:6030/wrong_db?user=root&password=taosdata");
} catch (ClassNotFoundException e) { @Test(expected = SQLException.class)
e.printStackTrace(); public void connectByRestful() throws SQLException {
} catch (SQLException e) { DriverManager.getConnection("jdbc:TAOS-RS://localhost:6041/wrong_db?user=root&password=taosdata");
Assert.assertEquals(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL, e.getErrorCode());
}
} }
} }
...@@ -18,9 +18,8 @@ public class InsertDbwithoutUseDbTest { ...@@ -18,9 +18,8 @@ public class InsertDbwithoutUseDbTest {
private static final Random random = new Random(System.currentTimeMillis()); private static final Random random = new Random(System.currentTimeMillis());
@Test @Test
public void case001() throws ClassNotFoundException, SQLException { public void case001() throws SQLException {
// prepare schema // prepare schema
Class.forName("com.taosdata.jdbc.TSDBDriver");
String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata"; String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(url, properties); Connection conn = DriverManager.getConnection(url, properties);
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
...@@ -51,9 +50,8 @@ public class InsertDbwithoutUseDbTest { ...@@ -51,9 +50,8 @@ public class InsertDbwithoutUseDbTest {
} }
@Test @Test
public void case002() throws ClassNotFoundException, SQLException { public void case002() throws SQLException {
// prepare the schema // prepare the schema
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
final String url = "jdbc:TAOS-RS://" + host + ":6041/inWithoutDb?user=root&password=taosdata"; final String url = "jdbc:TAOS-RS://" + host + ":6041/inWithoutDb?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(url, properties); Connection conn = DriverManager.getConnection(url, properties);
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
......
...@@ -374,22 +374,17 @@ public class RestfulConnectionTest { ...@@ -374,22 +374,17 @@ public class RestfulConnectionTest {
} }
@BeforeClass @BeforeClass
public static void beforeClass() { public static void beforeClass() throws SQLException {
try { Properties properties = new Properties();
Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
Properties properties = new Properties(); properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata", properties);
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); // create test database for test cases
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/log?user=root&password=taosdata", properties); try (Statement stmt = conn.createStatement()) {
// create test database for test cases stmt.execute("create database if not exists test");
try (Statement stmt = conn.createStatement()) {
stmt.execute("create database if not exists test");
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} }
} }
@AfterClass @AfterClass
......
...@@ -10,11 +10,11 @@ import java.util.Random; ...@@ -10,11 +10,11 @@ import java.util.Random;
public class RestfulJDBCTest { public class RestfulJDBCTest {
private static final String host = "127.0.0.1"; private static final String host = "127.0.0.1";
private final Random random = new Random(System.currentTimeMillis()); private static final Random random = new Random(System.currentTimeMillis());
private Connection connection; private static Connection connection;
@Test @Test
public void testCase001() { public void testCase001() throws SQLException {
// given // given
String sql = "drop database if exists restful_test"; String sql = "drop database if exists restful_test";
// when // when
...@@ -38,7 +38,7 @@ public class RestfulJDBCTest { ...@@ -38,7 +38,7 @@ public class RestfulJDBCTest {
} }
@Test @Test
public void testCase002() { public void testCase002() throws SQLException {
// given // given
String sql = "create table weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)"; String sql = "create table weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
// when // when
...@@ -48,7 +48,7 @@ public class RestfulJDBCTest { ...@@ -48,7 +48,7 @@ public class RestfulJDBCTest {
} }
@Test @Test
public void testCase004() { public void testCase004() throws SQLException {
for (int i = 1; i <= 100; i++) { for (int i = 1; i <= 100; i++) {
// given // given
String sql = "create table t" + i + " using weather tags('beijing', '" + i + "')"; String sql = "create table t" + i + " using weather tags('beijing', '" + i + "')";
...@@ -60,7 +60,7 @@ public class RestfulJDBCTest { ...@@ -60,7 +60,7 @@ public class RestfulJDBCTest {
} }
@Test @Test
public void testCase005() { public void testCase005() throws SQLException {
int rows = 0; int rows = 0;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
for (int j = 1; j <= 100; j++) { for (int j = 1; j <= 100; j++) {
...@@ -99,7 +99,7 @@ public class RestfulJDBCTest { ...@@ -99,7 +99,7 @@ public class RestfulJDBCTest {
} }
@Test @Test
public void testCase007() { public void testCase007() throws SQLException {
// given // given
String sql = "drop database restful_test"; String sql = "drop database restful_test";
...@@ -110,50 +110,41 @@ public class RestfulJDBCTest { ...@@ -110,50 +110,41 @@ public class RestfulJDBCTest {
Assert.assertFalse(execute); Assert.assertFalse(execute);
} }
private int executeUpdate(Connection connection, String sql) { private int executeUpdate(Connection connection, String sql) throws SQLException {
try (Statement stmt = connection.createStatement()) { try (Statement stmt = connection.createStatement()) {
return stmt.executeUpdate(sql); return stmt.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
} }
return 0;
} }
private boolean execute(Connection connection, String sql) { private boolean execute(Connection connection, String sql) throws SQLException {
try (Statement stmt = connection.createStatement()) { try (Statement stmt = connection.createStatement()) {
return stmt.execute(sql); return stmt.execute(sql);
} catch (SQLException e) {
e.printStackTrace();
} }
return false;
} }
private ResultSet executeQuery(Connection connection, String sql) { private ResultSet executeQuery(Connection connection, String sql) throws SQLException {
try (Statement statement = connection.createStatement()) { try (Statement statement = connection.createStatement()) {
return statement.executeQuery(sql); return statement.executeQuery(sql);
} catch (SQLException e) {
e.printStackTrace();
} }
return null;
} }
@Before @BeforeClass
public void before() { public static void beforeClass() {
try { try {
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=root&password=taosdata"); connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@After @AfterClass
public void after() { public static void afterClass() throws SQLException {
try { if (connection != null) {
if (connection != null) Statement stmt = connection.createStatement();
connection.close(); stmt.execute("drop database if exists restful_test");
} catch (SQLException e) { stmt.close();
e.printStackTrace(); connection.close();
} }
} }
......
...@@ -186,22 +186,17 @@ public class RestfulResultSetMetaDataTest { ...@@ -186,22 +186,17 @@ public class RestfulResultSetMetaDataTest {
} }
@BeforeClass @BeforeClass
public static void beforeClass() { public static void beforeClass() throws SQLException {
try { conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); stmt = conn.createStatement();
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=root&password=taosdata"); stmt.execute("create database if not exists restful_test");
stmt = conn.createStatement(); stmt.execute("use restful_test");
stmt.execute("create database if not exists restful_test"); stmt.execute("drop table if exists weather");
stmt.execute("use restful_test"); stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("drop table if exists weather"); stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))"); rs = stmt.executeQuery("select * from restful_test.weather");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')"); rs.next();
rs = stmt.executeQuery("select * from restful_test.weather"); meta = rs.getMetaData();
rs.next();
meta = rs.getMetaData();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
} }
@AfterClass @AfterClass
......
...@@ -658,36 +658,29 @@ public class RestfulResultSetTest { ...@@ -658,36 +658,29 @@ public class RestfulResultSetTest {
} }
@BeforeClass @BeforeClass
public static void beforeClass() { public static void beforeClass() throws SQLException {
try { conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); stmt = conn.createStatement();
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=root&password=taosdata"); stmt.execute("drop database if exists restful_test");
stmt = conn.createStatement(); stmt.execute("create database if not exists restful_test");
stmt.execute("create database if not exists restful_test"); stmt.execute("use restful_test");
stmt.execute("use restful_test"); stmt.execute("drop table if exists weather");
stmt.execute("drop table if exists weather"); stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))"); stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')"); rs = stmt.executeQuery("select * from restful_test.weather");
rs = stmt.executeQuery("select * from restful_test.weather"); rs.next();
rs.next();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
} }
@AfterClass @AfterClass
public static void afterClass() { public static void afterClass() throws SQLException {
try { if (rs != null)
if (rs != null) rs.close();
rs.close(); if (stmt != null) {
if (stmt != null) stmt.execute("drop database if exists restful_test");
stmt.close(); stmt.close();
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
} }
if (conn != null)
conn.close();
} }
} }
\ No newline at end of file
...@@ -581,11 +581,14 @@ public class SQLTest { ...@@ -581,11 +581,14 @@ public class SQLTest {
@BeforeClass @BeforeClass
public static void before() throws SQLException { public static void before() throws SQLException {
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/restful_test?user=root&password=taosdata"); connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
} }
@AfterClass @AfterClass
public static void after() throws SQLException { public static void after() throws SQLException {
Statement stmt = connection.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.close();
connection.close(); connection.close();
} }
......
...@@ -103,7 +103,7 @@ _libtaos.taos_get_client_info.restype = c_char_p ...@@ -103,7 +103,7 @@ _libtaos.taos_get_client_info.restype = c_char_p
def taos_get_client_info(): def taos_get_client_info():
# type: () -> str # type: () -> str
"""Get client version info.""" """Get client version info."""
return _libtaos.taos_get_client_info().decode() return _libtaos.taos_get_client_info().decode("utf-8")
_libtaos.taos_get_server_info.restype = c_char_p _libtaos.taos_get_server_info.restype = c_char_p
...@@ -113,7 +113,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,) ...@@ -113,7 +113,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,)
def taos_get_server_info(connection): def taos_get_server_info(connection):
# type: (c_void_p) -> str # type: (c_void_p) -> str
"""Get server version as string.""" """Get server version as string."""
return _libtaos.taos_get_server_info(connection).decode() return _libtaos.taos_get_server_info(connection).decode("utf-8")
_libtaos.taos_close.restype = None _libtaos.taos_close.restype = None
......
...@@ -144,7 +144,7 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_ ...@@ -144,7 +144,7 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_
try: try:
if num_of_rows >= 0: if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data) tmpstr = ctypes.c_char_p(data)
res.append(tmpstr.value.decode()) res.append(tmpstr.value.decode("utf-8"))
else: else:
res.append( res.append(
( (
...@@ -172,7 +172,7 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=Field ...@@ -172,7 +172,7 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=Field
if rbyte == 1 and buffer[0] == b'\xff': if rbyte == 1 and buffer[0] == b'\xff':
res.append(None) res.append(None)
else: else:
res.append(cast(buffer, c_char_p).value.decode()) res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res return res
...@@ -188,7 +188,7 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldT ...@@ -188,7 +188,7 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldT
if rbyte == 4 and buffer[:4] == b'\xff'*4: if rbyte == 4 and buffer[:4] == b'\xff'*4:
res.append(None) res.append(None)
else: else:
res.append(cast(buffer, c_char_p).value.decode()) res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res return res
......
...@@ -3,6 +3,8 @@ from .cinterface import * ...@@ -3,6 +3,8 @@ from .cinterface import *
# from .connection import TaosConnection # from .connection import TaosConnection
from .error import * from .error import *
from ctypes import c_void_p
class TaosResult(object): class TaosResult(object):
"""TDengine result interface""" """TDengine result interface"""
...@@ -12,7 +14,11 @@ class TaosResult(object): ...@@ -12,7 +14,11 @@ class TaosResult(object):
# to make the __del__ order right # to make the __del__ order right
self._conn = conn self._conn = conn
self._close_after = close_after self._close_after = close_after
self._result = result if isinstance(result, c_void_p):
self._result = result
else:
self._result = c_void_p(result)
self._fields = None self._fields = None
self._field_count = None self._field_count = None
self._precision = None self._precision = None
......
...@@ -36,7 +36,6 @@ def test_insert_lines(conn): ...@@ -36,7 +36,6 @@ def test_insert_lines(conn):
conn.insert_lines(lines) conn.insert_lines(lines)
print("inserted") print("inserted")
result = conn.query("select * from st") result = conn.query("select * from st")
print(*result.fields)
all = result.rows_iter() all = result.rows_iter()
for row in all: for row in all:
print(row) print(row)
......
# encoding:UTF-8
from taos import * from taos import *
from ctypes import * from ctypes import *
......
...@@ -20,7 +20,8 @@ def stream_callback(p_param, p_result, p_row): ...@@ -20,7 +20,8 @@ def stream_callback(p_param, p_result, p_row):
result = TaosResult(p_result) result = TaosResult(p_result)
row = TaosRow(result, p_row) row = TaosRow(result, p_row)
try: try:
ts, count = row() ts, count = row.as_tuple()
print(ts, count)
p = cast(p_param, POINTER(Counter)) p = cast(p_param, POINTER(Counter))
p.contents.count += count p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
......
...@@ -76,6 +76,11 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo); ...@@ -76,6 +76,11 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo);
*/ */
int32_t qKillQuery(qinfo_t qinfo); int32_t qKillQuery(qinfo_t qinfo);
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt);
int32_t qQueryCompleted(qinfo_t qinfo); int32_t qQueryCompleted(qinfo_t qinfo);
/** /**
......
...@@ -35,6 +35,7 @@ int32_t* taosGetErrno(); ...@@ -35,6 +35,7 @@ int32_t* taosGetErrno();
#define terrno (*taosGetErrno()) #define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0 #define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
// rpc // rpc
#define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress") #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress")
...@@ -106,6 +107,7 @@ int32_t* taosGetErrno(); ...@@ -106,6 +107,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_DUP_COL_NAMES TAOS_DEF_ERROR_CODE(0, 0x021D) //"duplicated column names") #define TSDB_CODE_TSC_DUP_COL_NAMES TAOS_DEF_ERROR_CODE(0, 0x021D) //"duplicated column names")
#define TSDB_CODE_TSC_INVALID_TAG_LENGTH TAOS_DEF_ERROR_CODE(0, 0x021E) //"Invalid tag length") #define TSDB_CODE_TSC_INVALID_TAG_LENGTH TAOS_DEF_ERROR_CODE(0, 0x021E) //"Invalid tag length")
#define TSDB_CODE_TSC_INVALID_COLUMN_LENGTH TAOS_DEF_ERROR_CODE(0, 0x021F) //"Invalid column length") #define TSDB_CODE_TSC_INVALID_COLUMN_LENGTH TAOS_DEF_ERROR_CODE(0, 0x021F) //"Invalid column length")
#define TSDB_CODE_TSC_RES_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0227) //"Result set too large to be output")
// mnode // mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed") #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
......
...@@ -39,6 +39,7 @@ extern "C" { ...@@ -39,6 +39,7 @@ extern "C" {
#define TSDB_STATUS_COMMIT_START 1 #define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2 #define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
// TSDB STATE DEFINITION // TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0 #define TSDB_STATE_OK 0x0
...@@ -413,6 +414,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); ...@@ -413,6 +414,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact // For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo); int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "taos.h" #include "taos.h"
#include "shellCommand.h" #include "shellCommand.h"
#define SHELL_INPUT_MAX_COMMAND_SIZE 500000 #define SHELL_INPUT_MAX_COMMAND_SIZE 10000
extern char configDir[]; extern char configDir[];
......
blm3 @ f56aa0f4
Subproject commit f56aa0f485d7bb6aebbcefc2007eeecdccb767c8
taosadapter @ 6397bf59
Subproject commit 6397bf5963f62f0aa5c4b9b961b16ed5c62579f1
...@@ -1005,11 +1005,11 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1005,11 +1005,11 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
if ((*retVal < pData[i]) ^ isMin) { if ((*retVal < pData[i]) ^ isMin) {
*retVal = pData[i]; *retVal = pData[i];
TSKEY k = tsList[i]; if(tsList) {
TSKEY k = tsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, k); DO_UPDATE_TAG_COLUMNS(pCtx, k);
}
} }
*notNullElems += 1; *notNullElems += 1;
} }
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
......
...@@ -8416,6 +8416,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -8416,6 +8416,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
} }
pQInfo->qId = qId; pQInfo->qId = qId;
pQInfo->startExecTs = 0;
pQInfo->runtimeEnv.pUdfInfo = pUdfInfo; pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
......
...@@ -35,7 +35,7 @@ typedef struct SQueryMgmt { ...@@ -35,7 +35,7 @@ typedef struct SQueryMgmt {
bool closed; bool closed;
} SQueryMgmt; } SQueryMgmt;
static void queryMgmtKillQueryFn(void* handle) { static void queryMgmtKillQueryFn(void* handle, void* param1) {
void** fp = (void**)handle; void** fp = (void**)handle;
qKillQuery(*fp); qKillQuery(*fp);
} }
...@@ -452,7 +452,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { ...@@ -452,7 +452,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
pQueryMgmt->closed = true; pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock); pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn, NULL);
} }
void qQueryMgmtReOpen(void *pQMgmt) { void qQueryMgmtReOpen(void *pQMgmt) {
...@@ -547,3 +547,148 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) { ...@@ -547,3 +547,148 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0; return 0;
} }
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t err = TSDB_CODE_SUCCESS;
void** handle = qAcquireQInfo(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
err = TSDB_CODE_FAILED;
break;
}
}
qReleaseQInfo(pMgmt, (void **)&handle, true);
return err;
}
// local struct
typedef struct {
int64_t qId;
int64_t startExecTs;
} SLongQuery;
// callbark for sort compare
static int compareLongQuery(const void* p1, const void* p2) {
// sort desc
SLongQuery* plq1 = *(SLongQuery**)p1;
SLongQuery* plq2 = *(SLongQuery**)p2;
if(plq1->startExecTs == plq2->startExecTs) {
return 0;
} else if(plq1->startExecTs > plq2->startExecTs) {
return 1;
} else {
return -1;
}
}
// callback for taosCacheRefresh
static void cbFoundItem(void* handle, void* param1) {
SQInfo * qInfo = *(SQInfo**) handle;
if(qInfo == NULL) return ;
SArray* qids = (SArray*) param1;
if(qids == NULL) return ;
bool usedMem = true;
bool usedIMem = true;
SMemTable* mem = qInfo->query.memRef.snapshot.omem;
SMemTable* imem = qInfo->query.memRef.snapshot.imem;
if(mem == NULL || T_REF_VAL_GET(mem) == 0)
usedMem = false;
if(imem == NULL || T_REF_VAL_GET(mem) == 0)
usedIMem = false ;
if(!usedMem && !usedIMem)
return ;
// push to qids
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->qId = qInfo->qId;
plq->startExecTs = qInfo->startExecTs;
taosArrayPush(qids, &plq);
}
// longquery
void* qObtainLongQuery(void* param){
SQueryMgmt* qMgmt = (SQueryMgmt*)param;
if(qMgmt == NULL || qMgmt->qinfoPool == NULL)
return NULL;
SArray* qids = taosArrayInit(4, sizeof(int64_t*));
if(qids == NULL) return NULL;
// Get each item
taosCacheRefresh(qMgmt->qinfoPool, cbFoundItem, qids);
size_t cnt = taosArrayGetSize(qids);
if(cnt == 0) {
taosArrayDestroy(qids);
return NULL;
}
if(cnt > 1)
taosArraySort(qids, compareLongQuery);
return qids;
}
//solve tsdb no block to commit
bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) {
SQueryMgmt *pQueryMgmt = pMgmt;
bool fixed = false;
// qid top list
SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt);
if(qids == NULL) return false;
// kill Query
int64_t now = taosGetTimestampMs();
size_t cnt = taosArrayGetSize(qids);
size_t i;
SLongQuery* plq;
for(i=0; i < cnt; i++) {
plq = (SLongQuery* )taosArrayGetP(qids, i);
if(plq->startExecTs > now) continue;
if(now - plq->startExecTs >= longQueryMs) {
qKillQueryByQId(pMgmt, plq->qId, 500, 10); // wait 50*100 ms
if(tsdbNoProblem(pRepo)) {
fixed = true;
qWarn("QId:0x%"PRIx64" fixed problem after kill this query.", plq->qId);
break;
}
}
}
// free qids
for(i=0; i < cnt; i++) {
free(taosArrayGetP(qids, i));
}
taosArrayDestroy(qids);
return fixed;
}
//solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
qWarn("pRepo=%p start solve problem.", pRepo);
if(qFixedNoBlock(pRepo, pMgmt, 10*60*1000)) {
return true;
}
if(qFixedNoBlock(pRepo, pMgmt, 2*60*1000)){
return true;
}
if(qFixedNoBlock(pRepo, pMgmt, 30*1000)){
return true;
}
qWarn("pRepo=%p solve problem failed.", pRepo);
return false;
}
\ No newline at end of file
...@@ -29,6 +29,7 @@ typedef struct { ...@@ -29,6 +29,7 @@ typedef struct {
int tBufBlocks; int tBufBlocks;
int nBufBlocks; int nBufBlocks;
int nRecycleBlocks; int nRecycleBlocks;
int nElasticBlocks;
int64_t index; int64_t index;
SList* bufBlockList; SList* bufBlockList;
} STsdbBufPool; } STsdbBufPool;
...@@ -41,6 +42,10 @@ int tsdbOpenBufPool(STsdbRepo* pRepo); ...@@ -41,6 +42,10 @@ int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// health cite
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
#endif /* _TD_TSDB_BUFFER_H_ */ #endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */
...@@ -97,6 +97,7 @@ struct STsdbRepo { ...@@ -97,6 +97,7 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
pthread_t* pthread;
}; };
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
......
...@@ -14,11 +14,10 @@ ...@@ -14,11 +14,10 @@
*/ */
#include "tsdbint.h" #include "tsdbint.h"
#include "tsdbHealth.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool *tsdbNewBufPool() { STsdbBufPool *tsdbNewBufPool() {
...@@ -69,6 +68,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { ...@@ -69,6 +68,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks; pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0; pPool->nBufBlocks = 0;
pPool->nElasticBlocks = 0;
pPool->index = 0; pPool->index = 0;
pPool->nRecycleBlocks = 0; pPool->nRecycleBlocks = 0;
...@@ -120,6 +120,18 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -120,6 +120,18 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) { while (POOL_IS_EMPTY(pBufPool)) {
if(tsDeadLockKillQuery) {
// supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) {
tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles);
break;
} else {
// no newBlock, kill query free
if(!tsdbUrgeQueryFree(pRepo))
tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
}
}
pRepo->repoLocked = false; pRepo->repoLocked = false;
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
pRepo->repoLocked = true; pRepo->repoLocked = true;
...@@ -139,11 +151,11 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -139,11 +151,11 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize); STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) { if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; return NULL;
} }
pBufBlock->blockId = 0; pBufBlock->blockId = 0;
...@@ -151,13 +163,9 @@ static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { ...@@ -151,13 +163,9 @@ static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
pBufBlock->remain = bufBlockSize; pBufBlock->remain = bufBlockSize;
return pBufBlock; return pBufBlock;
_err:
tsdbFreeBufBlock(pBufBlock);
return NULL;
} }
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) {
...@@ -193,10 +201,15 @@ err: ...@@ -193,10 +201,15 @@ err:
return err; return err;
} }
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) {
STsdbBufBlock *pBufBlock = NULL; STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
free(pNode); free(pNode);
pPool->nBufBlocks--; if(bELastic) {
pPool->nElasticBlocks--;
tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles);
}
else
pPool->nBufBlocks--;
} }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tarray.h"
#include "query.h"
#include "tglobal.h"
#include "tlist.h"
#include "tsdbint.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "ttimer.h"
#include "tthread.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks ++;
cnt ++ ;
}
}
}
return cnt;
}
// switch anther thread to run
void* cbKillQueryFree(void* param) {
STsdbRepo* pRepo = (STsdbRepo*)param;
// vnode
if(pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if(pRepo->pthread){
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
}
return NULL;
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// check previous running
if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if(pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks/3;
STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
}
bool tsdbNoProblem(STsdbRepo* pRepo) {
if(listNEles(pRepo->pPool->bufBlockList) == 0)
return false;
return true;
}
\ No newline at end of file
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
// no test file errors here // no test file errors here
#include "taosdef.h" #include "taosdef.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "ttimer.h"
#include "tthread.h"
#define IS_VALID_PRECISION(precision) \ #define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
...@@ -126,6 +128,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { ...@@ -126,6 +128,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo); tsdbStopStream(pRepo);
if(pRepo->pthread){
taosDestoryThread(pRepo->pthread);
pRepo->pthread = NULL;
}
if (toCommit) { if (toCommit) {
tsdbSyncCommit(repo); tsdbSyncCommit(repo);
...@@ -547,6 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -547,6 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->appH = *pAppH; pRepo->appH = *pAppH;
} }
pRepo->repoLocked = false; pRepo->repoLocked = false;
pRepo->pthread = NULL;
int code = pthread_mutex_init(&(pRepo->mutex), NULL); int code = pthread_mutex_init(&(pRepo->mutex), NULL);
if (code != 0) { if (code != 0) {
......
...@@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL; SListNode *pNode = NULL;
bool recycleBlocks = pBufPool->nRecycleBlocks > 0; bool addNew = false;
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
if (pBufPool->nRecycleBlocks > 0) { if (pBufPool->nRecycleBlocks > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode); tsdbRecycleBufferBlock(pBufPool, pNode, false);
pBufPool->nRecycleBlocks -= 1; pBufPool->nRecycleBlocks -= 1;
} else { } else {
tdListAppendNode(pBufPool->bufBlockList, pNode); if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) {
tsdbRecycleBufferBlock(pBufPool, pNode, true);
} else {
tdListAppendNode(pBufPool->bufBlockList, pNode);
addNew = true;
}
} }
} }
if (!recycleBlocks) { if (addNew) {
int code = pthread_cond_signal(&pBufPool->poolNotEmpty); int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) { if (code != 0) {
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
...@@ -555,7 +560,7 @@ static void tsdbFreeTableData(STableData *pTableData) { ...@@ -555,7 +560,7 @@ static void tsdbFreeTableData(STableData *pTableData) {
} }
} }
static char *tsdbGetTsTupleKey(const void *data) { return memRowTuple((SMemRow)data); } static char *tsdbGetTsTupleKey(const void *data) { return memRowKeys((SMemRow)data); }
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables); ASSERT(pMemTable->maxTables < maxTables);
......
...@@ -33,6 +33,7 @@ extern "C" { ...@@ -33,6 +33,7 @@ extern "C" {
#endif #endif
typedef void (*__cache_free_fn_t)(void*); typedef void (*__cache_free_fn_t)(void*);
typedef void (*__cache_trav_fn_t)(void*, void*);
typedef struct SCacheStatis { typedef struct SCacheStatis {
int64_t missCount; int64_t missCount;
...@@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj); ...@@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
* @param fp * @param fp
* @return * @return
*/ */
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1);
/** /**
* stop background refresh worker thread * stop background refresh worker thread
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 121 #define TSDB_CFG_MAX_NUM 122
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TTHREAD_H
#define TDENGINE_TTHREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
// destory thread
bool taosDestoryThread(pthread_t* pthread);
// thread running return true
bool taosThreadRunning(pthread_t* pthread);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TTHREAD_H
...@@ -505,7 +505,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -505,7 +505,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
typedef struct SHashTravSupp { typedef struct SHashTravSupp {
SCacheObj* pCacheObj; SCacheObj* pCacheObj;
int64_t time; int64_t time;
__cache_free_fn_t fp; __cache_trav_fn_t fp;
void* param1;
} SHashTravSupp; } SHashTravSupp;
static bool travHashTableEmptyFn(void* param, void* data) { static bool travHashTableEmptyFn(void* param, void* data) {
...@@ -667,17 +668,17 @@ bool travHashTableFn(void* param, void* data) { ...@@ -667,17 +668,17 @@ bool travHashTableFn(void* param, void* data) {
} }
if (ps->fp) { if (ps->fp) {
(ps->fp)(pNode->data); (ps->fp)(pNode->data, ps->param1);
} }
// do not remove element in hash table // do not remove element in hash table
return true; return true;
} }
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t utl_time, __cache_free_fn_t fp) { static void doCacheRefresh(SCacheObj* pCacheObj, int64_t utl_time, __cache_trav_fn_t fp, void* param1) {
assert(pCacheObj != NULL); assert(pCacheObj != NULL);
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = utl_time}; SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = utl_time, .param1 = param1};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
} }
...@@ -748,7 +749,7 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -748,7 +749,7 @@ void* taosCacheTimedRefresh(void *handle) {
// refresh data in hash table // refresh data in hash table
if (elemInHash > 0) { if (elemInHash > 0) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, NULL); doCacheRefresh(pCacheObj, now, NULL, NULL);
} }
taosTrashcanEmpty(pCacheObj, false); taosTrashcanEmpty(pCacheObj, false);
...@@ -766,13 +767,13 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -766,13 +767,13 @@ void* taosCacheTimedRefresh(void *handle) {
return NULL; return NULL;
} }
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) {
if (pCacheObj == NULL) { if (pCacheObj == NULL) {
return; return;
} }
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, fp); doCacheRefresh(pCacheObj, now, fp, param1);
} }
void taosStopCacheRefreshWorker(void) { void taosStopCacheRefreshWorker(void) {
......
...@@ -112,9 +112,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too lon ...@@ -112,9 +112,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too lon
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_META_CACHED, "No table meta cached") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_META_CACHED, "No table meta cached")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_COL_NAMES, "duplicated column names") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_COL_NAMES, "duplicated column names")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TAG_LENGTH, "Invalid tag length") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TAG_LENGTH, "Invalid tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_COLUMN_LENGTH, "Invalid column length") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_COLUMN_LENGTH, "Invalid column length")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_RES_TOO_MANY, "Result set too large to be output")
// mnode // mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tthread.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tutil.h"
#include "tulog.h"
#include "taoserror.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(pthread, &thattr, __start_routine, param);
pthread_attr_destroy(&thattr);
if (ret != 0) {
free(pthread);
return NULL;
}
return pthread;
}
// destory thread
bool taosDestoryThread(pthread_t* pthread) {
if(pthread == NULL) return false;
if(taosThreadRunning(pthread)) {
pthread_cancel(*pthread);
pthread_join(*pthread, NULL);
}
free(pthread);
return true;
}
// thread running return true
bool taosThreadRunning(pthread_t* pthread) {
if(pthread == NULL) return false;
int ret = pthread_kill(*pthread, 0);
if(ret == ESRCH)
return false;
if(ret == EINVAL)
return false;
// alive
return true;
}
...@@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return vnodeSaveVersion(pVnode); return vnodeSaveVersion(pVnode);
} }
// timer thread callback
if(status == TSDB_STATUS_COMMIT_NOBLOCK) {
qSolveCommitNoBlock(pVnode->tsdb, pVnode->qMgmt);
}
return 0; return 0;
} }
...@@ -21,78 +21,91 @@ import json ...@@ -21,78 +21,91 @@ import json
import random import random
import time import time
import datetime import datetime
import multiprocessing
from multiprocessing import Manager, Pool, Lock from multiprocessing import Manager, Pool, Lock
from multipledispatch import dispatch from multipledispatch import dispatch
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
@dispatch(str, str) @dispatch(str, str)
def v_print(msg: str, arg: str): def v_print(msg, arg):
# type: (str, str) -> None
if verbose: if verbose:
print(msg % arg) print(msg % arg)
@dispatch(str, str, str) @dispatch(str, str, str)
def v_print(msg: str, arg1: str, arg2: str): def v_print(msg, arg1, arg2):
# type: (str, str, str) -> None
if verbose: if verbose:
print(msg % (arg1, arg2)) print(msg % (arg1, arg2))
@dispatch(str, str, str, str) @dispatch(str, str, str, str)
def v_print(msg: str, arg1: str, arg2: str, arg3: str): def v_print(msg, arg1, arg2, arg3):
# type: (str, str, str, str) -> None
if verbose: if verbose:
print(msg % (arg1, arg2, arg3)) print(msg % (arg1, arg2, arg3))
@dispatch(str, str, str, str, str) @dispatch(str, str, str, str, str)
def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str): def v_print(msg, arg1, arg2, arg3, arg4):
# type: (str, str, str, str, str) -> None
if verbose: if verbose:
print(msg % (arg1, arg2, arg3, arg4)) print(msg % (arg1, arg2, arg3, arg4))
@dispatch(str, int) @dispatch(str, int)
def v_print(msg: str, arg: int): def v_print(msg, arg):
# type: (str, int) -> None
if verbose: if verbose:
print(msg % int(arg)) print(msg % int(arg))
@dispatch(str, int, str) @dispatch(str, int, str)
def v_print(msg: str, arg1: int, arg2: str): def v_print(msg, arg1, arg2):
# type: (str, int, str) -> None
if verbose: if verbose:
print(msg % (int(arg1), str(arg2))) print(msg % (int(arg1), str(arg2)))
@dispatch(str, str, int) @dispatch(str, str, int)
def v_print(msg: str, arg1: str, arg2: int): def v_print(msg, arg1, arg2):
# type: (str, str, int) -> None
if verbose: if verbose:
print(msg % (arg1, int(arg2))) print(msg % (arg1, int(arg2)))
@dispatch(str, int, int) @dispatch(str, int, int)
def v_print(msg: str, arg1: int, arg2: int): def v_print(msg, arg1, arg2):
# type: (str, int, int) -> None
if verbose: if verbose:
print(msg % (int(arg1), int(arg2))) print(msg % (int(arg1), int(arg2)))
@dispatch(str, int, int, str) @dispatch(str, int, int, str)
def v_print(msg: str, arg1: int, arg2: int, arg3: str): def v_print(msg, arg1, arg2, arg3):
# type: (str, int, int, str) -> None
if verbose: if verbose:
print(msg % (int(arg1), int(arg2), str(arg3))) print(msg % (int(arg1), int(arg2), str(arg3)))
@dispatch(str, int, int, int) @dispatch(str, int, int, int)
def v_print(msg: str, arg1: int, arg2: int, arg3: int): def v_print(msg, arg1, arg2, arg3):
# type: (str, int, int, int) -> None
if verbose: if verbose:
print(msg % (int(arg1), int(arg2), int(arg3))) print(msg % (int(arg1), int(arg2), int(arg3)))
@dispatch(str, int, int, int, int) @dispatch(str, int, int, int, int)
def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int): def v_print(msg, arg1, arg2, arg3, arg4):
# type: (str, int, int, int, int) -> None
if verbose: if verbose:
print(msg % (int(arg1), int(arg2), int(arg3), int(arg4))) print(msg % (int(arg1), int(arg2), int(arg3), int(arg4)))
def restful_execute(host: str, port: int, user: str, password: str, cmd: str): def restful_execute(host, port, user, password, cmd):
# type: (str, int, str, str, str) -> None
url = "http://%s:%d/rest/sql" % (host, restPort) url = "http://%s:%d/rest/sql" % (host, restPort)
v_print("restful_execute - cmd: %s", cmd) v_print("restful_execute - cmd: %s", cmd)
...@@ -112,7 +125,8 @@ def restful_execute(host: str, port: int, user: str, password: str, cmd: str): ...@@ -112,7 +125,8 @@ def restful_execute(host: str, port: int, user: str, password: str, cmd: str):
print("resp: %s" % json.dumps(resp.json())) print("resp: %s" % json.dumps(resp.json()))
def query_func(process: int, thread: int, cmd: str): def query_func(process, thread, cmd):
# type: (int, int, str) -> None
v_print("%d process %d thread cmd: %s", process, thread, cmd) v_print("%d process %d thread cmd: %s", process, thread, cmd)
if oneMoreHost != "NotSupported" and random.randint( if oneMoreHost != "NotSupported" and random.randint(
...@@ -133,7 +147,8 @@ def query_func(process: int, thread: int, cmd: str): ...@@ -133,7 +147,8 @@ def query_func(process: int, thread: int, cmd: str):
host, port, user, password, cmd) host, port, user, password, cmd)
def query_data_process(cmd: str): def query_data_process(cmd):
# type: (str) -> None
# establish connection if native # establish connection if native
if native: if native:
v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir) v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
...@@ -256,7 +271,8 @@ def drop_databases(): ...@@ -256,7 +271,8 @@ def drop_databases():
(dbName, i)) (dbName, i))
def insert_func(process: int, thread: int): def insert_func(process, thread):
# type: (int, int) -> None
v_print("%d process %d thread, insert_func ", process, thread) v_print("%d process %d thread, insert_func ", process, thread)
# generate uuid # generate uuid
...@@ -374,7 +390,8 @@ def create_tb(): ...@@ -374,7 +390,8 @@ def create_tb():
(tbName, j)) (tbName, j))
def insert_data_process(lock, i: int, begin: int, end: int): def insert_data_process(lock, i, begin, end):
# type: (multiprocessing._LockType, int, int, int) -> None
lock.acquire() lock.acquire()
tasks = end - begin tasks = end - begin
v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks) v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks)
...@@ -675,7 +692,10 @@ if __name__ == "__main__": ...@@ -675,7 +692,10 @@ if __name__ == "__main__":
printConfig() printConfig()
if not skipPrompt: if not skipPrompt:
input("Press any key to continue..") try:
input("Press any key to continue..")
except SyntaxError:
pass
# establish connection first if native # establish connection first if native
if native: if native:
......
...@@ -19,4 +19,3 @@ python .\test.py -f query\filterFloatAndDouble.py ...@@ -19,4 +19,3 @@ python .\test.py -f query\filterFloatAndDouble.py
python .\test.py -f query\filterOtherTypes.py python .\test.py -f query\filterOtherTypes.py
python .\test.py -f query\querySort.py python .\test.py -f query\querySort.py
python .\test.py -f query\queryJoin.py python .\test.py -f query\queryJoin.py
python .\test.py -f tools\windows_input.py
\ No newline at end of file
...@@ -42,6 +42,7 @@ python3 ./test.py -f table/alter_column.py ...@@ -42,6 +42,7 @@ python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create.py python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/create_db_from_normal_db.py
#stable #stable
python3 ./test.py -f stable/insert.py python3 ./test.py -f stable/insert.py
......
...@@ -103,10 +103,27 @@ class TDTestCase: ...@@ -103,10 +103,27 @@ class TDTestCase:
select count(*) as count, loc from st where ts between 1600000000000 and 1600000000010 group by loc''') select count(*) as count, loc from st where ts between 1600000000000 and 1600000000010 group by loc''')
tdSql.checkRows(6) tdSql.checkRows(6)
# https://jira.taosdata.com:18080/browse/TS-715
tdLog.info("test case for TS-715")
sql = ""
tdSql.execute("create table st2(ts timestamp, c1 int, c2 int, c3 int) tags(loc nchar(20))")
for i in range(101):
if i == 0:
sql = "select last(*) from sub0 "
else:
sql += f"union all select last(*) from sub{i} "
tdSql.execute("create table sub%d using st2 tags('nchar%d')" % (i, i))
tdSql.execute("insert into sub%d values(%d, %d, %d, %d)" % (i, self.ts + i, i, i, i))
tdSql.error(sql)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("test case for TS-783")
tdSql.execute("drop table if exists db.state1;")
tdSql.execute("create table db.state1 (ts timestamp, c1 int);")
tdSql.error("create table db.test1 using db.state1 tags('tt');")
tdSql.execute("drop table if exists db.state2;")
tdSql.execute("create table db.state2 (ts timestamp, c1 int) tags (t binary(20));")
tdSql.query("create table db.test2 using db.state2 tags('tt');")
tdSql.error("create table db.test22 using db.test2 tags('tt');")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -15,6 +15,7 @@ import os ...@@ -15,6 +15,7 @@ import os
from uiautomation import WindowControl from uiautomation import WindowControl
from util.cases import * from util.cases import *
from util.sql import * from util.sql import *
import clipboard
class TDTestCase: class TDTestCase:
...@@ -55,16 +56,22 @@ class TDTestCase: ...@@ -55,16 +56,22 @@ class TDTestCase:
sql = "insert into db.tb values(now,'%s');" % temp sql = "insert into db.tb values(now,'%s');" % temp
window.SendKeys(sql) window.SendKeys(sql)
window.SendKeys('{Enter}') window.SendKeys('{Enter}')
window.SendKeys('{Ctrl}A')
window.SendKeys('{Ctrl}C')
# 获取剪切板里面的复制内容
result = clipboard.paste()
window.SendKeys('{Ctrl}C') window.SendKeys('{Ctrl}C')
window.SendKeys('exit') window.SendKeys('exit')
window.SendKeys('{Enter}') window.SendKeys('{Enter}')
return result
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
ret = tdSql.execute('create table tb (ts timestamp, i binary(300))') ret = tdSql.execute('create table tb (ts timestamp, i binary(300))')
self.win_input_test() result = self.win_input_test()
tdLog.info(result)
tdSql.query("select * from tb") tdSql.query("select * from tb")
tdSql.checkRows(1) tdSql.checkRows(1)
......
...@@ -330,9 +330,9 @@ if $data12 != 71680.000000000 then ...@@ -330,9 +330,9 @@ if $data12 != 71680.000000000 then
return -1 return -1
endi endi
sql select top(x, 20) from (select c1 x from nest_tb0); sql select top(x, 20) from (select ts,c1 x from nest_tb0);
sql select bottom(x, 20) from (select c1 x from nest_tb0) sql select bottom(x, 20) from (select ts,c1 x from nest_tb0)
print ===================> group by + having print ===================> group by + having
...@@ -407,12 +407,14 @@ if $data03 != @20-09-15 00:00:00.000@ then ...@@ -407,12 +407,14 @@ if $data03 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
sql select diff(val) from (select c1 val from nest_tb0); sql_error select diff(val) from (select c1 val from nest_tb0);
sql select diff(val) from (select ts,c1 val from nest_tb0);
if $rows != 9999 then if $rows != 9999 then
return -1 return -1
endi endi
if $data00 != @70-01-01 08:00:00.000@ then if $data00 != @20-09-15 00:01:00.000@ then
return -1 return -1
endi endi
......
...@@ -31,6 +31,8 @@ $tsu = $tsu + $ts0 ...@@ -31,6 +31,8 @@ $tsu = $tsu + $ts0
#sql_error select top(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu slimit 5 offset 1 #sql_error select top(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu slimit 5 offset 1
#sql_error select bottom(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu slimit 5 offset 1 #sql_error select bottom(c1, 1) from $stb where ts >= $ts0 and ts <= $tsu slimit 5 offset 1
sql_error select t1 from $stb slimit 5 offset 1;
### select from stb + group by + slimit offset ### select from stb + group by + slimit offset
sql select max(c1), min(c2), avg(c3), sum(c4), spread(c5), sum(c6), count(c7), first(c8), last(c9) from $stb group by t1 slimit 5 soffset 0 sql select max(c1), min(c2), avg(c3), sum(c4), spread(c5), sum(c6), count(c7), first(c8), last(c9) from $stb group by t1 slimit 5 soffset 0
if $rows != 5 then if $rows != 5 then
......
...@@ -32,33 +32,48 @@ sql create dnode $hostname3 ...@@ -32,33 +32,48 @@ sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode3 -s start
sleep 3000 sleep 3000
$x = 0
show1:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show dnodes sql show dnodes
print dnode1 $data5_1 print dnode1 $data5_1
print dnode1 $data5_2 print dnode2 $data5_2
print dnode1 $data5_3 print dnode3 $data5_3
if $data5_1 != mnode then if $data5_1 != mnode then
return -1 goto show1
endi endi
if $data5_2 != vnode then if $data5_2 != vnode then
return -1 goto show1
endi endi
if $data5_3 != any then if $data5_3 != any then
return -1 goto show1
endi endi
show2:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show mnodes sql show mnodes
print dnode1 ==> $data2_1 print dnode1 ==> $data2_1
print dnode2 ==> $data2_2 print dnode2 ==> $data2_2
print dnode3 ==> $data2_3 print dnode3 ==> $data2_3
if $data2_1 != master then if $data2_1 != master then
return -1 goto show2
endi endi
if $data2_2 != null then if $data2_2 != null then
return -1 goto show2
endi endi
if $data2_3 != slave then if $data2_3 != slave then
return -1 goto show2
endi endi
print ========== step2 print ========== step2
...@@ -72,26 +87,28 @@ sql create table d1.t6 (ts timestamp, i int) ...@@ -72,26 +87,28 @@ sql create table d1.t6 (ts timestamp, i int)
sql create table d1.t7 (ts timestamp, i int) sql create table d1.t7 (ts timestamp, i int)
sql create table d1.t8 (ts timestamp, i int) sql create table d1.t8 (ts timestamp, i int)
show3:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show dnodes sql show dnodes
print dnode1 $data2_1 print dnode1 $data2_1
print dnode2 $data2_2 print dnode2 $data2_2
print dnode3 $data2_3 print dnode3 $data2_3
if $data2_1 != 0 then if $data2_1 != 0 then
return -1 goto show3
endi endi
if $data2_2 != 1 then if $data2_2 != 1 then
return -1 goto show3
endi endi
if $data2_3 != 1 then if $data2_3 != 1 then
return -1 goto show3
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT \ No newline at end of file
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册