提交 585457f6 编写于 作者: P Ping Xiao

Merge branch 'develop' into xiaoping/add_test_case2

......@@ -4,3 +4,6 @@
[submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
......@@ -86,6 +86,13 @@ pipeline {
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4
./handle_crash_gen_val_log.sh
'''
}
sh '''
date
cd ${WKC}/tests
......@@ -131,14 +138,33 @@ pipeline {
sh'''
cd ${WORKSPACE}
git checkout develop
cd tests/gotest
bash batchtest.sh
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/gotest
bash batchtest.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${JENKINS_HOME}/workspace/C#NET/src/CheckC#
dotnet run
'''
}
}
}
......@@ -146,5 +172,82 @@ pipeline {
}
}
post {
success {
emailext (
subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
failure {
emailext (
subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
}
}
\ No newline at end of file
......@@ -58,7 +58,7 @@ cp -r ${top_dir}/src/connector/grafanaplugin ${pkg_dir}${install_home_pat
cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/nodejs ${pkg_dir}${install_home_path}/connector
cp ${compile_dir}/build/lib/taos-jdbcdriver*dist.* ${pkg_dir}${install_home_path}/connector
cp ${compile_dir}/build/lib/taos-jdbcdriver*dist.* ${pkg_dir}${install_home_path}/connector ||:
cp -r ${compile_dir}/../packaging/deb/DEBIAN ${pkg_dir}/
chmod 755 ${pkg_dir}/DEBIAN/*
......
......@@ -156,9 +156,15 @@ build_time=$(date +"%F %R")
# get commint id from git
gitinfo=$(git rev-parse --verify HEAD)
enterprise_dir="${top_dir}/../enterprise"
cd ${enterprise_dir}
gitinfoOfInternal=$(git rev-parse --verify HEAD)
if [[ "$verMode" == "cluster" ]]; then
enterprise_dir="${top_dir}/../enterprise"
cd ${enterprise_dir}
gitinfoOfInternal=$(git rev-parse --verify HEAD)
else
gitinfoOfInternal=NULL
fi
cd ${curr_dir}
# 2. cmake executable file
......@@ -193,23 +199,35 @@ cd ${curr_dir}
# 3. Call the corresponding script for packaging
if [ "$osType" != "Darwin" ]; then
if [[ "$verMode" != "cluster" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
ret='0'
command -v dpkg >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
else
echo "==========dpkg command not exist, so not release deb package!!!"
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
ret='0'
command -v rpmbuild >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
else
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
fi
echo "====do tar.gz package for all systems===="
......
......@@ -65,7 +65,7 @@ cp -r %{_compiledir}/../src/connector/grafanaplugin %{buildroot}%{homepath}/conn
cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector
cp %{_compiledir}/build/lib/taos-jdbcdriver*dist.* %{buildroot}%{homepath}/connector
cp %{_compiledir}/build/lib/taos-jdbcdriver*dist.* %{buildroot}%{homepath}/connector ||:
cp -r %{_compiledir}/../tests/examples/* %{buildroot}%{homepath}/examples
#Scripts executed before installation
......
......@@ -278,11 +278,11 @@ function install_service_on_sysvinit() {
# Install taosd service
if ((${os_type}==1)); then
${csudo} cp -f ${script_dir}/../deb/init.d/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../deb/init.d/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/../deb/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../deb/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
elif ((${os_type}==2)); then
${csudo} cp -f ${script_dir}/../rpm/init.d/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/init.d/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/../rpm/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
......
......@@ -110,7 +110,7 @@ mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
fi
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
......
......@@ -135,7 +135,7 @@ mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
fi
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
......
......@@ -124,7 +124,7 @@ cp ${lib_files} ${install_dir}/driver
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
cp -r ${connector_dir}/go ${install_dir}/connector
......
......@@ -156,7 +156,7 @@ cp ${lib_files} ${install_dir}/driver
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
cp -r ${connector_dir}/go ${install_dir}/connector
......
name: tdengine
base: core18
version: '2.0.5.1'
version: '2.0.6.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.5.1
- usr/lib/libtaos.so.2.0.6.0
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -30,6 +30,7 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#include "tcache.h"
#include "tref.h"
#include "qExecutor.h"
#include "qSqlparser.h"
......@@ -446,7 +447,7 @@ void tscFreeSqlObj(SSqlObj *pSql);
void tscFreeRegisteredSqlObj(void *pSql);
void tscFreeTableMetaHelper(void *pTableMeta);
void tscCloseTscObj(STscObj *pObj);
void tscCloseTscObj(void *pObj);
// todo move to taos? or create a new file: taos_internal.h
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
......@@ -516,6 +517,7 @@ extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern int tscRefId;
extern SRpcCorEpSet tscMgmtEpSet;
......
......@@ -427,8 +427,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else {
assert(code == TSDB_CODE_SUCCESS);
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
// param already freed by other routine and pSql in tscCache when ctrl + c
if (atomic_load_ptr(&pSql->param) == NULL) {
return;
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSql;
......
......@@ -190,18 +190,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle;
if (pObj == NULL || pObj->signature != pObj) {
int ret = taosAcquireRef(tscRefId, pObj);
if (ret < 0) {
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
return;
}
SSqlObj* pHB = pObj->pHb;
if (pObj->pTimer != tmrId || pHB == NULL) {
return;
}
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj);
return;
}
......@@ -213,6 +214,8 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
}
taosReleaseRef(tscRefId, pObj);
}
int tscSendMsgToServer(SSqlObj *pSql) {
......
......@@ -161,6 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
taosAddRef(tscRefId, pObj);
return pSql;
}
......@@ -296,7 +297,8 @@ void taos_close(TAOS *taos) {
}
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
tscCloseTscObj(pObj);
taosRemoveRef(tscRefId, pObj);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
......
......@@ -36,6 +36,7 @@ void * tscTmr;
void * tscQhandle;
void * tscCheckDiskUsageTmr;
int tsInsertHeadSize;
int tscRefId;
int tscNumOfThreads;
......@@ -146,6 +147,8 @@ void taos_init_imp(void) {
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
}
tscRefId = taosOpenRef(200, tscCloseTscObj);
tscDebug("client is initialized successfully");
}
......@@ -165,6 +168,7 @@ void taos_cleanup() {
tscQhandle = NULL;
}
taosCloseRef(tscRefId);
taosCleanupKeywordsTable();
taosCloseLog();
......
......@@ -404,7 +404,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
tscCloseTscObj(pTscObj);
taosRemoveRef(tscRefId, pTscObj);
}
}
......@@ -786,8 +786,8 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
}
// TODO: all subqueries should be freed correctly before close this connection.
void tscCloseTscObj(STscObj* pObj) {
assert(pObj != NULL);
void tscCloseTscObj(void *param) {
STscObj *pObj = param;
pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer));
......
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
Subproject commit b62a26ecc164a310104df57691691b237e091c89
......@@ -28,7 +28,7 @@ extern "C" {
#else
#define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code));
#endif
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
#define TAOS_SUCCEEDED(err) ((err) >= 0)
#define TAOS_FAILED(err) ((err) < 0)
......@@ -37,7 +37,7 @@ const char* tstrerror(int32_t err);
int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0
#ifdef TAOS_ERROR_C
......@@ -74,6 +74,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "Memory cor
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "Invalid config message")
TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, 0, 0x0104, "Data file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, 0, 0x0105, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, 0, 0x0106, "too many Ref Objs")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, 0, 0x0107, "Ref ID is removed")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, 0, 0x0108, "Invalid Ref ID")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, 0, 0x0109, "Ref is already there")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, 0, 0x010A, "Ref is not there")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "Invalid SQL statement")
......@@ -182,7 +188,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "No permission for disk files in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "Invalid message length")
// vnode
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_MSG_NOT_PROCESSED, 0, 0x0501, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_NEED_REPROCESSED, 0, 0x0502, "Action need to be reprocessed")
......
......@@ -106,6 +106,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "network-test" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105
#endif
......
......@@ -22,22 +22,22 @@ extern "C" {
#define TAOS_WAL_NOLOG 0
#define TAOS_WAL_WRITE 1
#define TAOS_WAL_FSYNC 2
typedef struct {
int8_t msgType;
int8_t reserved[3];
int32_t len;
uint64_t version;
uint32_t signature;
uint32_t cksum;
char cont[];
int8_t msgType;
int8_t reserved[3];
int32_t len;
uint64_t version;
uint32_t signature;
uint32_t cksum;
char cont[];
} SWalHead;
typedef struct {
int8_t walLevel; // wal level
int32_t fsyncPeriod; // millisecond
int8_t wals; // number of WAL files;
int8_t keep; // keep the wal file when closed
int8_t walLevel; // wal level
int32_t fsyncPeriod; // millisecond
int8_t wals; // number of WAL files;
int8_t keep; // keep the wal file when closed
} SWalCfg;
typedef void* twalh; // WAL HANDLE
......@@ -53,9 +53,6 @@ int walRestore(twalh, void *pVnode, FWalWrite writeFp);
int walGetWalFile(twalh, char *name, uint32_t *index);
int64_t walGetVersion(twalh);
extern int wDebugFlag;
#ifdef __cplusplus
}
#endif
......
......@@ -80,7 +80,10 @@ int main(int argc, char* argv[]) {
shellParseArgument(argc, argv, &args);
if (args.netTestRole && args.netTestRole[0] != 0) {
taosNetTest(args.host, (uint16_t)args.port, (uint16_t)args.endPort, args.pktLen, args.netTestRole);
taos_init();
CmdArguments cmdArgs;
memcpy(&cmdArgs, &args, sizeof(SShellArguments));
taosNetTest(&cmdArgs);
exit(0);
}
......
......@@ -281,7 +281,14 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
((SSdbTable *)pOper->table)->tableName, pOper->pObj, sdbGetKeyStr(pOper->table, pHead->cont),
pHead->version, action, tstrerror(pOper->retCode));
if (action == SDB_ACTION_INSERT) {
sdbDeleteHash(pOper->table, pOper);
// It's better to create a table in two stages, create it first and then set it success
//sdbDeleteHash(pOper->table, pOper);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = pOper->table,
.pObj = pOper->pObj
};
sdbDeleteRow(&oper);
}
}
......
......@@ -663,13 +663,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "dnode%d", i + 1);
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dDnode", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dstatus", i + 1);
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dStatus", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
......
......@@ -33,15 +33,11 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SPosInfo {
int32_t pageId:20;
int32_t rowId:12;
} SPosInfo;
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
SPosInfo pos;
int32_t pageId;
int32_t rowId;
} SGroupResInfo;
typedef struct SSqlGroupbyExpr {
......@@ -53,9 +49,10 @@ typedef struct SSqlGroupbyExpr {
} SSqlGroupbyExpr;
typedef struct SWindowResult {
SPosInfo pos; // Position of current result in disk-based output buffer
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:15;
bool closed:1; // this result status: closed or opened
uint16_t numOfRows; // number of rows of current time window
bool closed; // this result status: closed or opened
SResultInfo* resultInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window
} SWindowResult;
......
......@@ -51,7 +51,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t realRowId = (int32_t)(pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
}
......
......@@ -557,7 +557,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t sid,
int32_t numOfRowsPerPage) {
if (pWindowRes->pos.pageId != -1) {
if (pWindowRes->pageId != -1) {
return 0;
}
......@@ -590,11 +590,11 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
}
// set the number of rows in current disk page
if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pos.pageId = pageId;
pWindowRes->pos.rowId = (int32_t)(pData->num++);
if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pageId = pageId;
pWindowRes->rowId = (int32_t)(pData->num++);
assert(pWindowRes->pos.pageId >= 0);
assert(pWindowRes->pageId >= 0);
}
return 0;
......@@ -616,7 +616,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
*newWind = true;
// not assign result buffer yet, add new result buffer
if (pWindowRes->pos.pageId == -1) {
if (pWindowRes->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
......@@ -1143,7 +1143,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
assert(pRuntimeEnv->windowResInfo.interval == 0);
if (pWindowRes->pos.pageId == -1) {
if (pWindowRes->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
return -1;
......@@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
......@@ -2823,14 +2823,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos);
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pos.pageId);
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
TSKEY leftTimestamp = GET_INT64_VAL(b1);
SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pos.pageId);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2);
TSKEY rightTimestamp = GET_INT64_VAL(b2);
......@@ -2867,7 +2867,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
}
SGroupResInfo* info = &pQInfo->groupResInfo;
if (pQInfo->groupIndex == numOfGroups && info->pos.pageId == info->numOfDataPages) {
if (pQInfo->groupIndex == numOfGroups && info->pageId == info->numOfDataPages) {
SET_STABLE_QUERY_OVER(pQInfo);
}
......@@ -2883,10 +2883,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
// all results have been return to client, try next group
if (pGroupResInfo->pos.pageId == pGroupResInfo->numOfDataPages) {
if (pGroupResInfo->pageId == pGroupResInfo->numOfDataPages) {
pGroupResInfo->numOfDataPages = 0;
pGroupResInfo->pos.pageId = 0;
pGroupResInfo->pos.rowId = 0;
pGroupResInfo->pageId = 0;
pGroupResInfo->rowId = 0;
// current results of group has been sent to client, try next group
if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
......@@ -2914,22 +2914,22 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
assert(size == pGroupResInfo->numOfDataPages);
bool done = false;
for (int32_t j = pGroupResInfo->pos.pageId; j < size; ++j) {
for (int32_t j = pGroupResInfo->pageId; j < size; ++j) {
SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j);
tFilePage* pData = getResBufPage(pResultBuf, pi->pageId);
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->pos.rowId < pData->num);
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->pos.rowId);
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num);
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId);
if (numOfRes > pQuery->rec.capacity - offset) {
numOfCopiedRows = (int32_t)(pQuery->rec.capacity - offset);
pGroupResInfo->pos.rowId += numOfCopiedRows;
pGroupResInfo->rowId += numOfCopiedRows;
done = true;
} else {
numOfCopiedRows = (int32_t)pData->num;
pGroupResInfo->pos.pageId += 1;
pGroupResInfo->pos.rowId = 0;
pGroupResInfo->pageId += 1;
pGroupResInfo->rowId = 0;
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
......@@ -3020,8 +3020,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
pGroupResInfo->numOfDataPages = (int32_t)taosArrayGetSize(pageList);
pGroupResInfo->groupId = tid;
pGroupResInfo->pos.pageId = 0;
pGroupResInfo->pos.rowId = 0;
pGroupResInfo->pageId = 0;
pGroupResInfo->rowId = 0;
return pGroupResInfo->numOfDataPages;
}
......@@ -3067,7 +3067,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
SWindowResult *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
TSKEY ts = GET_INT64_VAL(b);
......@@ -3104,7 +3104,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
lastTimestamp = ts;
// move to the next element of current entry
int32_t currentPageId = pWindowRes->pos.pageId;
int32_t currentPageId = pWindowRes->pageId;
cs.position[pos] += 1;
if (cs.position[pos] >= pWindowResInfo->size) {
......@@ -3117,7 +3117,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
} else {
// current page is not needed anymore
SWindowResult *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
if (pNextWindowRes->pos.pageId != currentPageId) {
if (pNextWindowRes->pageId != currentPageId) {
releaseResBufPage(pRuntimeEnv->pResultBuf, page);
}
}
......@@ -3329,7 +3329,8 @@ int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool is
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pResultRow->pos = (SPosInfo) {-1, -1};
pResultRow->pageId = -1;
pResultRow->rowId = -1;
char* buf = (char*) pResultRow->resultInfo + numOfCols * sizeof(SResultInfo);
......@@ -3796,7 +3797,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pWindowRes->pos.pageId == -1) {
if (pWindowRes->pageId == -1) {
if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) !=
TSDB_CODE_SUCCESS) {
return;
......@@ -3813,7 +3814,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
......@@ -3840,7 +3841,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
......@@ -4019,12 +4020,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i].numOfRows == 0) {
pQInfo->groupIndex += 1;
pGroupResInfo->pos.rowId = 0;
pGroupResInfo->rowId = 0;
continue;
}
int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->pos.rowId;
int32_t oldOffset = pGroupResInfo->pos.rowId;
int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->rowId;
int32_t oldOffset = pGroupResInfo->rowId;
/*
* current output space is not enough to accommodate all data of this page, only partial results
......@@ -4032,13 +4033,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
*/
if (numOfRowsToCopy > pQuery->rec.capacity - numOfResult) {
numOfRowsToCopy = (int32_t) pQuery->rec.capacity - numOfResult;
pGroupResInfo->pos.rowId += numOfRowsToCopy;
pGroupResInfo->rowId += numOfRowsToCopy;
} else {
pGroupResInfo->pos.rowId = 0;
pGroupResInfo->rowId = 0;
pQInfo->groupIndex += 1;
}
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pos.pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pageId);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
......
......@@ -266,7 +266,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
return;
}
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i];
......@@ -279,7 +279,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
}
pWindowRes->numOfRows = 0;
pWindowRes->pos = (SPosInfo){-1, -1};
pWindowRes->pageId = -1;
pWindowRes->rowId = -1;
pWindowRes->closed = false;
pWindowRes->win = TSWINDOW_INITIALIZER;
}
......@@ -308,10 +309,10 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pos.pageId);
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId);
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage);
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pos.pageId);
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
......
......@@ -20,6 +20,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "lz4.h"
#include "tref.h"
#include "taoserror.h"
#include "tsocket.h"
#include "tglobal.h"
......@@ -72,7 +73,6 @@ typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo
SRpcEpSet epSet; // ip list provided by app
void *ahandle; // handle provided by app
void *signature; // for validation
struct SRpcConn *pConn; // pConn allocated
char msgType; // message type
uint8_t *pCont; // content provided by app
......@@ -132,6 +132,10 @@ int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;
static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
// server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
......@@ -211,14 +215,26 @@ static void rpcUnlockConn(SRpcConn *pConn);
static void rpcAddRef(SRpcInfo *pRpc);
static void rpcDecRef(SRpcInfo *pRpc);
void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc;
static void rpcFree(void *p) {
tTrace("free mem: %p", p);
free(p);
}
static void rpcInit(void) {
tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree);
}
void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc;
pthread_once(&tsRpcInit, rpcInit);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
......@@ -237,6 +253,8 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc->afp = pInit->afp;
pRpc->refCount = 1;
atomic_add_fetch_32(&tsRpcNum, 1);
size_t size = sizeof(SRpcConn) * pRpc->sessions;
pRpc->connList = (SRpcConn *)calloc(1, size);
if (pRpc->connList == NULL) {
......@@ -363,7 +381,6 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
pContext->ahandle = pMsg->ahandle;
pContext->signature = pContext;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->epSet = *pEpSet;
pContext->contLen = contLen;
......@@ -386,6 +403,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
// set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
taosAddRef(tsRpcRefId, pContext);
rpcSendReqToServer(pRpc, pContext);
return;
......@@ -536,14 +554,15 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
void rpcCancelRequest(void *handle) {
SRpcReqContext *pContext = handle;
// signature is used to check if pContext is freed.
// pContext may have been released just before app calls the rpcCancelRequest
if (pContext == NULL || pContext->signature != pContext) return;
int code = taosAcquireRef(tsRpcRefId, pContext);
if (code < 0) return;
if (pContext->pConn) {
tDebug("%s, app tries to cancel request", pContext->pConn->info);
rpcCloseConn(pContext->pConn);
}
taosReleaseRef(tsRpcRefId, pContext);
}
static void rpcFreeMsg(void *msg) {
......@@ -612,7 +631,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
// if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL;
rpcFreeMsg(pConn->pReqMsg);
taosRemoveRef(tsRpcRefId, pConn->pContext);
}
}
......@@ -1057,6 +1076,13 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
}
} else { // msg is passed to app only parsing is ok
if (pHead->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS);
rpcFreeMsg(pRecv->msg);
return pConn;
}
rpcProcessIncomingMsg(pConn, pHead, pContext);
}
}
......@@ -1068,7 +1094,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc;
pContext->signature = NULL;
pContext->pConn = NULL;
if (pContext->pRsp) {
// for synchronous API
......@@ -1085,7 +1110,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
}
// free the request message
rpcFreeCont(pContext->pCont);
taosRemoveRef(tsRpcRefId, pContext);
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
......@@ -1593,6 +1618,12 @@ static void rpcDecRef(SRpcInfo *pRpc)
pthread_mutex_destroy(&pRpc->mutex);
tDebug("%s rpc resources are released", pRpc->label);
taosTFree(pRpc);
int count = atomic_sub_fetch_32(&tsRpcNum, 1);
if (count == 0) {
taosCloseRef(tsRpcRefId);
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
}
}
}
......@@ -56,6 +56,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
int code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore
bool fileChanged = false;
*fversion = 0;
sinfo.index = 0;
......@@ -114,10 +115,11 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
close(dfd);
if (ret < 0) break;
fileChanged = true;
sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
}
if (code == 0 && (minfo.fversion != sinfo.fversion)) {
if (code == 0 && fileChanged) {
// data file is changed, code shall be set to 1
*fversion = minfo.fversion;
code = 1;
......
......@@ -79,7 +79,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
DIR * dir = NULL;
int fid = 0;
int vid = 0;
regex_t regex1, regex2;
regex_t regex1 = {0}, regex2 = {0};
int code = 0;
char fname[TSDB_FILENAME_LEN] = "\0";
......@@ -95,9 +95,27 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
dir = opendir(tDataDir);
if (dir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
if (errno == ENOENT) {
tsdbError("vgId:%d directory %s not exist", REPO_ID(pRepo), tDataDir);
terrno = TAOS_SYSTEM_ERROR(errno);
if (taosMkDir(tDataDir, 0755) < 0) {
tsdbError("vgId:%d failed to create directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
dir = opendir(tDataDir);
if (dir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} else {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
code = regcomp(&regex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat)$", REG_EXTENDED);
......
......@@ -1263,7 +1263,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t end = endPos;
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(start >= end);
SWAP(start, end, int32_t);
}
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
......
......@@ -20,7 +20,27 @@
extern "C" {
#endif
void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole);
typedef struct CmdArguments {
char* host;
char* password;
char* user;
char* auth;
char* database;
char* timezone;
bool is_raw_time;
bool is_use_passwd;
char file[TSDB_FILENAME_LEN];
char dir[TSDB_FILENAME_LEN];
int threadNum;
char* commands;
int abort;
int port;
int endPort;
int pktLen;
char* netTestRole;
} CmdArguments;
void taosNetTest(CmdArguments* args);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TREF_H
#define TDENGINE_TREF_H
#ifdef __cplusplus
extern "C" {
#endif
int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs
void taosCloseRef(int refId);
int taosListRef(); // return the number of references in system
int taosAddRef(int refId, void *p);
int taosAcquireRef(int refId, void *p);
void taosReleaseRef(int refId, void *p);
#define taosRemoveRef taosReleaseRef
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TREF_H
......@@ -15,11 +15,16 @@
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tulog.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tsocket.h"
#include "trpc.h"
#include "rpcHead.h"
#include "tutil.h"
#include "tnettest.h"
#define MAX_PKG_LEN (64*1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
......@@ -30,9 +35,15 @@ typedef struct {
uint16_t pktLen;
} info_s;
static char serverFqdn[TSDB_FQDN_LEN];
extern int tsRpcMaxUdpSize;
static char g_user[TSDB_USER_LEN+1] = {0};
static char g_pass[TSDB_PASSWORD_LEN+1] = {0};
static char g_serverFqdn[TSDB_FQDN_LEN] = {0};
static uint16_t g_startPort = 0;
static uint16_t g_endPort = 6042;
static uint32_t g_pktLen = 0;
static void *bindUdpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
......@@ -321,19 +332,145 @@ static void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uin
return ;
}
static void taosNetTestClient(const char* serverFqdn, uint16_t startPort, uint16_t endPort, int pktLen) {
uint32_t serverIp = taosGetIpFromFqdn(serverFqdn);
if (serverIp == 0xFFFFFFFF) {
printf("Failed to resolve FQDN:%s", serverFqdn);
exit(-1);
void* tnetInitRpc(char* secretEncrypt, char spi) {
SRpcInit rpcInit;
void* pRpcConn = NULL;
taosEncryptPass((uint8_t *)g_pass, strlen(g_pass), secretEncrypt);
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "NET-TEST";
rpcInit.numOfThreads = 1; // every DB connection has only one thread
rpcInit.cfp = NULL;
rpcInit.sessions = 16;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = g_user;
rpcInit.idleTime = 2000;
rpcInit.ckey = "key";
rpcInit.spi = spi;
rpcInit.secret = secretEncrypt;
pRpcConn = rpcOpen(&rpcInit);
return pRpcConn;
}
static int rpcCheckPortImpl(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi) {
SRpcEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void* pRpcConn;
char secretEncrypt[32] = {0};
pRpcConn = tnetInitRpc(secretEncrypt, spi);
if (NULL == pRpcConn) {
return -1;
}
checkPort(serverIp, startPort, endPort, pktLen);
memset(&epSet, 0, sizeof(SRpcEpSet));
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = port;
strcpy(epSet.fqdn[0], serverFqdn);
reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pktLen);
reqMsg.contLen = pktLen;
reqMsg.code = 0;
reqMsg.handle = NULL; // rpc handle returned to app
reqMsg.ahandle = NULL; // app handle set by client
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
// handle response
if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
//printf("code:%d[%s]\n", rspMsg.code, tstrerror(rspMsg.code));
return -1;
}
rpcFreeCont(rspMsg.pCont);
return;
rpcClose(pRpcConn);
return 0;
}
static void rpcCheckPort(uint32_t hostIp) {
int ret;
char spi;
for (uint16_t port = g_startPort; port <= g_endPort; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n");
//================ check tcp port ================
int32_t pktLen;
if (g_pktLen <= tsRpcMaxUdpSize) {
pktLen = tsRpcMaxUdpSize + 1000;
} else {
pktLen = g_pktLen;
}
spi = 1;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
spi = 0;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
printf("TCP port:%d test fail.\t\t", port);
} else {
//printf("tcp port:%d test ok.\t\t", port);
printf("TCP port:\033[32m%d test OK\033[0m\t\t", port);
}
} else {
//printf("tcp port:%d test ok.\t\t", port);
printf("TCP port:\033[32m%d test OK\033[0m\t\t", port);
}
//================ check udp port ================
if (g_pktLen >= tsRpcMaxUdpSize) {
pktLen = tsRpcMaxUdpSize - 1000;
} else {
pktLen = g_pktLen;
}
spi = 0;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
spi = 1;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
printf("udp port:%d test fail.\t\n", port);
} else {
//printf("udp port:%d test ok.\t\n", port);
printf("UDP port:\033[32m%d test OK\033[0m\t\n", port);
}
} else {
//printf("udp port:%d test ok.\t\n", port);
printf("UDP port:\033[32m%d test OK\033[0m\t\n", port);
}
}
printf("\n");
return ;
}
static void taosNetTestClient(int flag) {
uint32_t serverIp = taosGetIpFromFqdn(g_serverFqdn);
if (serverIp == 0xFFFFFFFF) {
printf("Failed to resolve FQDN:%s", g_serverFqdn);
exit(-1);
}
if (0 == flag) {
checkPort(serverIp, g_startPort, g_endPort, g_pktLen);
} else {
rpcCheckPort(serverIp);
}
return;
}
static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) {
......@@ -375,49 +512,66 @@ static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen)
}
void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole) {
if (pktLen > MAX_PKG_LEN) {
printf("test packet len overflow: %d, max len not greater than %d bytes\n", pktLen, MAX_PKG_LEN);
exit(-1);
void taosNetTest(CmdArguments *args) {
if (0 == args->pktLen) {
g_pktLen = 1000;
} else {
g_pktLen = args->pktLen;
}
if (port && endPort) {
if (port > endPort) {
printf("endPort[%d] must not lesss port[%d]\n", endPort, port);
if (args->port && args->endPort) {
if (args->port > args->endPort) {
printf("endPort[%d] must not lesss port[%d]\n", args->endPort, args->port);
exit(-1);
}
}
if (host && host[0] != 0) {
if (strlen(host) >= TSDB_EP_LEN) {
printf("host invalid: %s\n", host);
if (args->host && args->host[0] != 0) {
if (strlen(args->host) >= TSDB_EP_LEN) {
printf("host invalid: %s\n", args->host);
exit(-1);
}
taosGetFqdnPortFromEp(host, serverFqdn, &g_startPort);
taosGetFqdnPortFromEp(args->host, g_serverFqdn, &g_startPort);
} else {
tstrncpy(serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN);
tstrncpy(g_serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN);
g_startPort = tsServerPort;
}
if (port) {
g_startPort = port;
if (args->port) {
g_startPort = args->port;
}
if (endPort) {
g_endPort = endPort;
if (args->endPort) {
g_endPort = args->endPort;
}
if (port > endPort) {
if (g_startPort > g_endPort) {
printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort);
exit(-1);
}
if (args->is_use_passwd) {
if (args->password == NULL) args->password = getpass("Enter password: ");
} else {
args->password = TSDB_DEFAULT_PASS;
}
tstrncpy(g_pass, args->password, TSDB_PASSWORD_LEN);
if (args->user == NULL) {
args->user = TSDB_DEFAULT_USER;
}
tstrncpy(g_user, args->user, TSDB_USER_LEN);
if (0 == strcmp("client", netTestRole)) {
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", serverFqdn, g_startPort, g_endPort, pktLen);
taosNetTestClient(serverFqdn, g_startPort, g_endPort, pktLen);
} else if (0 == strcmp("server", netTestRole)) {
taosNetTestServer(g_startPort, g_endPort, pktLen);
if (0 == strcmp("client", args->netTestRole)) {
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen);
taosNetTestClient(0);
} else if (0 == strcmp("clients", args->netTestRole)) {
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen);
taosNetTestClient(1);
} else if (0 == strcmp("server", args->netTestRole)) {
taosNetTestServer(g_startPort, g_endPort, g_pktLen);
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "tutil.h"
#define TSDB_REF_OBJECTS 50
#define TSDB_REF_STATE_EMPTY 0
#define TSDB_REF_STATE_ACTIVE 1
#define TSDB_REF_STATE_DELETED 2
typedef struct SRefNode {
struct SRefNode *prev;
struct SRefNode *next;
void *p;
int32_t count;
} SRefNode;
typedef struct {
SRefNode **nodeList;
int state; // 0: empty, 1: active; 2: deleted
int refId;
int max;
int32_t count; // total number of SRefNodes in this set
int64_t *lockedBy;
void (*fp)(void *);
} SRefSet;
static SRefSet tsRefSetList[TSDB_REF_OBJECTS];
static pthread_once_t tsRefModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tsRefMutex;
static int tsRefSetNum = 0;
static int tsNextId = 0;
static void taosInitRefModule(void);
static int taosHashRef(SRefSet *pSet, void *p);
static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy);
static void taosIncRefCount(SRefSet *pSet);
static void taosDecRefCount(SRefSet *pSet);
int taosOpenRef(int max, void (*fp)(void *))
{
SRefNode **nodeList;
SRefSet *pSet;
int64_t *lockedBy;
int i, refId;
pthread_once(&tsRefModuleInit, taosInitRefModule);
nodeList = calloc(sizeof(SRefNode *), (size_t)max);
if (nodeList == NULL) {
return TSDB_CODE_REF_NO_MEMORY;
}
lockedBy = calloc(sizeof(int64_t), (size_t)max);
if (lockedBy == NULL) {
free(nodeList);
return TSDB_CODE_REF_NO_MEMORY;
}
pthread_mutex_lock(&tsRefMutex);
for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
}
if (i < TSDB_REF_OBJECTS) {
refId = tsNextId;
pSet = tsRefSetList + refId;
taosIncRefCount(pSet);
pSet->max = max;
pSet->nodeList = nodeList;
pSet->lockedBy = lockedBy;
pSet->fp = fp;
pSet->state = TSDB_REF_STATE_ACTIVE;
pSet->refId = refId;
tsRefSetNum++;
uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum);
} else {
refId = TSDB_CODE_REF_FULL;
free (nodeList);
free (lockedBy);
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
}
pthread_mutex_unlock(&tsRefMutex);
return refId;
}
void taosCloseRef(int refId)
{
SRefSet *pSet;
int deleted = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d is invalid, out of range", refId);
return;
}
pSet = tsRefSetList + refId;
pthread_mutex_lock(&tsRefMutex);
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
pSet->state = TSDB_REF_STATE_DELETED;
deleted = 1;
uTrace("refId:%d is closed, count:%d", refId, pSet->count);
} else {
uTrace("refId:%d is already closed, count:%d", refId, pSet->count);
}
pthread_mutex_unlock(&tsRefMutex);
if (deleted) taosDecRefCount(pSet);
}
int taosAddRef(int refId, void *p)
{
int hash;
SRefNode *pNode;
SRefSet *pSet;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to add, refId not valid", refId, p);
return TSDB_CODE_REF_INVALID_ID;
}
uTrace("refId:%d p:%p try to add", refId, p);
pSet = tsRefSetList + refId;
taosIncRefCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
taosDecRefCount(pSet);
uTrace("refId:%d p:%p failed to add, not active", refId, p);
return TSDB_CODE_REF_ID_REMOVED;
}
int code = 0;
hash = taosHashRef(pSet, p);
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->p == p)
break;
pNode = pNode->next;
}
if (pNode) {
code = TSDB_CODE_REF_ALREADY_EXIST;
uTrace("refId:%d p:%p is already there, faild to add", refId, p);
} else {
pNode = calloc(sizeof(SRefNode), 1);
if (pNode) {
pNode->p = p;
pNode->count = 1;
pNode->prev = 0;
pNode->next = pSet->nodeList[hash];
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
pSet->nodeList[hash] = pNode;
uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode);
} else {
code = TSDB_CODE_REF_NO_MEMORY;
uTrace("refId:%d p:%p is not added, since no memory", refId, p);
}
}
if (code < 0) taosDecRefCount(pSet);
taosUnlockList(pSet->lockedBy+hash);
return code;
}
int taosAcquireRef(int refId, void *p)
{
int hash, code = 0;
SRefNode *pNode;
SRefSet *pSet;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p);
return TSDB_CODE_REF_INVALID_ID;
}
uTrace("refId:%d p:%p try to acquire", refId, p);
pSet = tsRefSetList + refId;
taosIncRefCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
uTrace("refId:%d p:%p failed to acquire, not active", refId, p);
taosDecRefCount(pSet);
return TSDB_CODE_REF_ID_REMOVED;
}
hash = taosHashRef(pSet, p);
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->p == p) {
break;
}
pNode = pNode->next;
}
if (pNode) {
pNode->count++;
uTrace("refId:%d p:%p is acquired", refId, p);
} else {
code = TSDB_CODE_REF_NOT_EXIST;
uTrace("refId:%d p:%p is not there, failed to acquire", refId, p);
}
taosUnlockList(pSet->lockedBy+hash);
taosDecRefCount(pSet);
return code;
}
void taosReleaseRef(int refId, void *p)
{
int hash;
SRefNode *pNode;
SRefSet *pSet;
int released = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to release, refId not valid", refId, p);
return;
}
uTrace("refId:%d p:%p try to release", refId, p);
pSet = tsRefSetList + refId;
if (pSet->state == TSDB_REF_STATE_EMPTY) {
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
return;
}
hash = taosHashRef(pSet, p);
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->p == p)
break;
pNode = pNode->next;
}
if (pNode) {
pNode->count--;
if (pNode->count == 0) {
if ( pNode->prev ) {
pNode->prev->next = pNode->next;
} else {
pSet->nodeList[hash] = pNode->next;
}
if ( pNode->next ) {
pNode->next->prev = pNode->prev;
}
(*pSet->fp)(pNode->p);
free(pNode);
released = 1;
uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode);
} else {
uTrace("refId:%d p:%p is released", refId, p);
}
} else {
uTrace("refId:%d p:%p is not there, failed to release", refId, p);
}
taosUnlockList(pSet->lockedBy+hash);
if (released) taosDecRefCount(pSet);
}
int taosListRef() {
SRefSet *pSet;
SRefNode *pNode;
int num = 0;
pthread_mutex_lock(&tsRefMutex);
for (int i = 0; i < TSDB_REF_OBJECTS; ++i) {
pSet = tsRefSetList + i;
if (pSet->state == TSDB_REF_STATE_EMPTY)
continue;
uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count);
for (int j=0; j < pSet->max; ++j) {
pNode = pSet->nodeList[j];
while (pNode) {
uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count);
pNode = pNode->next;
num++;
}
}
}
pthread_mutex_unlock(&tsRefMutex);
return num;
}
static int taosHashRef(SRefSet *pSet, void *p)
{
int hash = 0;
int64_t v = (int64_t)p;
for (int i = 0; i < sizeof(v); ++i) {
hash += (int)(v & 0xFFFF);
v = v >> 16;
i = i + 2;
}
hash = hash % pSet->max;
return hash;
}
static void taosLockList(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId();
int i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) {
sched_yield();
}
}
}
static void taosUnlockList(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId();
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
assert(false);
}
}
static void taosInitRefModule(void) {
pthread_mutex_init(&tsRefMutex, NULL);
}
static void taosIncRefCount(SRefSet *pSet) {
atomic_add_fetch_32(&pSet->count, 1);
uTrace("refId:%d inc count:%d", pSet->refId, pSet->count);
}
static void taosDecRefCount(SRefSet *pSet) {
int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
uTrace("refId:%d dec count:%d", pSet->refId, pSet->count);
if (count > 0) return;
pthread_mutex_lock(&tsRefMutex);
if (pSet->state != TSDB_REF_STATE_EMPTY) {
pSet->state = TSDB_REF_STATE_EMPTY;
pSet->max = 0;
pSet->fp = NULL;
taosTFree(pSet->nodeList);
taosTFree(pSet->lockedBy);
tsRefSetNum--;
uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count);
}
pthread_mutex_unlock(&tsRefMutex);
}
......@@ -9,7 +9,22 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR)
INCLUDE_DIRECTORIES(${HEADER_GTEST_INCLUDE_DIR})
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(utilTest tutil common osdetail gtest pthread gcov)
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
ADD_EXECUTABLE(trefTest ${BIN_SRC})
TARGET_LINK_LIBRARIES(trefTest common tutil)
ENDIF()
#IF (TD_LINUX)
# ADD_EXECUTABLE(trefTest ./trefTest.c)
# TARGET_LINK_LIBRARIES(trefTest tutil common)
#ENDIF ()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include "os.h"
#include "tref.h"
#include "tlog.h"
#include "tglobal.h"
#include "taoserror.h"
#include "tulog.h"
typedef struct {
int refNum;
int steps;
int refId;
void **p;
} SRefSpace;
void *takeRefActions(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int code, id;
for (int i=0; i < pSpace->steps; ++i) {
printf("s");
id = random() % pSpace->refNum;
code = taosAddRef(pSpace->refId, pSpace->p[id]);
usleep(1);
id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
if (code >= 0) {
usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]);
}
id = random() % pSpace->refNum;
taosRemoveRef(pSpace->refId, pSpace->p[id]);
usleep(id %5 + 1);
id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
if (code >= 0) {
usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]);
}
}
for (int i=0; i < pSpace->refNum; ++i) {
taosRemoveRef(pSpace->refId, pSpace->p[i]);
}
//uInfo("refId:%d thread exits", pSpace->refId);
return NULL;
}
void myfree(void *p) {
return;
}
void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
printf("c");
pSpace->refId = taosOpenRef(10000, myfree);
if (pSpace->refId < 0) {
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId));
return NULL;
}
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
for (int i=0; i<pSpace->refNum; ++i) {
pSpace->p[i] = (void *) malloc(128);
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_t thread1, thread2, thread3;
pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace));
pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace));
pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace));
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
pthread_join(thread3, NULL);
taosCloseRef(pSpace->refId);
for (int i=0; i<pSpace->refNum; ++i) {
free(pSpace->p[i]);
}
uInfo("refId:%d main thread exit", pSpace->refId);
free(pSpace->p);
pSpace->p = NULL;
return NULL;
}
int main(int argc, char *argv[]) {
int refNum = 100;
int threads = 10;
int steps = 10000;
int loops = 1;
uDebugFlag = 143;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-n")==0 && i < argc-1) {
refNum = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
steps = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
loops = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
uDebugFlag = atoi(argv[i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-n]: number of references, default: %d\n", refNum);
printf(" [-s]: steps to run for each reference, default: %d\n", steps);
printf(" [-t]: number of refIds running in parallel, default: %d\n", threads);
printf(" [-l]: number of loops, default: %d\n", loops);
printf(" [-d]: debugFlag, default: %d\n", uDebugFlag);
exit(0);
}
}
taosInitLog("tref.log", 5000000, 10);
SRefSpace *pSpaceList = (SRefSpace *) calloc(sizeof(SRefSpace), threads);
pthread_t *pThreadList = (pthread_t *) calloc(sizeof(pthread_t), threads);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<loops; ++i) {
printf("\nloop: %d\n", i);
for (int j=0; j<threads; ++j) {
pSpaceList[j].steps = steps;
pSpaceList[j].refNum = refNum;
pthread_create(&(pThreadList[j]), &thattr, openRefSpace, (void *)(pSpaceList+j));
}
for (int j=0; j<threads; ++j) {
pthread_join(pThreadList[j], NULL);
}
}
int num = taosListRef();
printf("\nnumber of references:%d\n", num);
free(pSpaceList);
free(pThreadList);
taosCloseLog();
return num;
}
......@@ -291,11 +291,15 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->tsdb == NULL) {
vnodeCleanUp(pVnode);
return terrno;
} else if (terrno != TSDB_CODE_SUCCESS && pVnode->syncCfg.replica <= 1) {
} else if (terrno != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to open tsdb, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode);
return terrno;
if (pVnode->syncCfg.replica <= 1) {
vnodeCleanUp(pVnode);
return terrno;
} else {
pVnode->version = 0;
}
}
sprintf(temp, "%s/wal", rootDir);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_WAL_INT_H
#define TDENGINE_WAL_INT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
extern int32_t wDebugFlag;
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define walPrefix "wal"
#define walSignature (uint32_t)(0xFAFBFDFE)
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_WAL_MGMT_H
#define TDENGINE_WAL_MGMT_H
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "twal.h"
#include "walInt.h"
#include "walMgmt.h"
\ No newline at end of file
......@@ -14,11 +14,10 @@
*/
#define _DEFAULT_SOURCE
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
#include "tlog.h"
#include "twal.h"
#include "walInt.h"
#include "walMgmt.h"
#include "tchecksum.h"
#include "tutil.h"
#include "ttimer.h"
......@@ -26,14 +25,6 @@
#include "twal.h"
#include "tqueue.h"
#define walPrefix "wal"
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
typedef struct {
uint64_t version;
......@@ -54,7 +45,6 @@ typedef struct {
static void *walTmrCtrl = NULL;
static int tsWalNum = 0;
static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
static uint32_t walSignature = 0xFAFBFDFE;
static int walHandleExistingFiles(const char *path);
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
static int walRemoveWalFiles(const char *path);
......@@ -250,11 +240,13 @@ int walWrite(void *handle, SWalHead *pHead) {
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
} else {
pWal->version = pHead->version;
}
ASSERT(contLen == pHead->len + sizeof(SWalHead));
return terrno;
return 0;
}
void walFsync(void *handle) {
......@@ -424,7 +416,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
// ASSERT(false);
ASSERT(false);
break;
}
......
# 如何在 windows环境下使用jdbc进行TDengine应用开发
本文以windows环境为例,介绍java如何进行TDengine开发应用
## 环境准备
(1)安装jdk
官网下载jdk-1.8,下载页面:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
安装,配置环境变量,把jdk加入到环境变量里。
命令行内查看java的版本。
```shell
>java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
```
(2)安装配置maven
官网下载maven,下载地址:http://maven.apache.org/download.cgi
配置环境变量MAVEN_HOME,将MAVEN_HOME/bin添加到PATH
命令行里查看maven的版本
```shell
>mvn --version
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T03:39:06+08:00)
Maven home: D:\apache-maven-3.5.0\bin\..
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: C:\Program Files\Java\jdk1.8.0_131\jre
Default locale: zh_CN, platform encoding: GBK
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```
为了加快maven下载依赖的速度,可以为maven配置mirror,修改MAVEN_HOME\config\settings.xml文件
```xml
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<!-- 配置本地maven仓库的路径 -->
<localRepository>D:\apache-maven-localRepository</localRepository>
<mirrors>
<!-- 配置阿里云Maven镜像仓库 -->
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
<profiles>
<!-- 配置jdk,maven会默认使用java1.8 -->
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>
</profiles>
</settings>
```
(3)在linux服务器上安装TDengine-server
在taosdata官网下载TDengine-server,下载地址:https://www.taosdata.com/cn/all-downloads/
在linux服务器上安装TDengine-server
```shell
# tar -zxvf package/TDengine-server-2.0.1.1-Linux-x64.tar.gz
# cd TDengine-server/
# ./install.sh
```
启动taosd
```shell
# systemctl start taosd
```
在server上用taos连接taosd
```shell
# taos
taos> show dnodes;
id | end_point | vnodes | cores | status | role | create_time |
==================================================================================================================
1 | td01:6030 | 2 | 4 | ready | any | 2020-08-19 18:40:25.045 |
Query OK, 1 row(s) in set (0.005765s)
```
如果可以正确连接到taosd实例,并打印出databases的信息,说明TDengine的server已经正确启动。这里查看server的hostname
```shell
# hostname -f
td01
```
注意,如果安装TDengine后,使用默认的taos.cfg配置文件,taosd会使用当前server的hostname创建dnode实例。之后,在client也需要使用这个hostname来连接taosd。
(4)在windows上安装TDengine-client
在taosdata官网下载taos客户端,下载地址:
https://www.taosdata.com/cn/all-downloads/
下载后,双击exe安装。
修改client的hosts文件(C:\Windows\System32\drivers\etc\hosts),将server的hostname和ip配置到client的hosts文件中
```
192.168.236.136 td01
```
配置完成后,在命令行内使用taos shell连接server端
```shell
C:\TDengine>taos
Welcome to the TDengine shell from Linux, Client Version:2.0.1.1
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep1,keep2,keep(D) | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | precision | status |
===================================================================================================================================================================================================================================================================
test | 2020-08-19 18:43:50.731 | 1 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready |
log | 2020-08-19 18:40:28.064 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready |
Query OK, 2 row(s) in set (0.068000s)
```
如果windows上的client能够正常连接,并打印database信息,说明client可以正常连接server了。
## 应用开发
(1)新建maven工程,在pom.xml中引入taos-jdbcdriver依赖。
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.demo</groupId>
<artifactId>JdbcDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.8</version>
</dependency>
</dependencies>
</project>
```
(2)使用jdbc查询TDengine数据库
下面是示例代码:
```java
public class JdbcDemo {
public static void main(String[] args) throws Exception {
Connection conn = getConn();
Statement stmt = conn.createStatement();
// create database
stmt.executeUpdate("create database if not exists db");
// use database
stmt.executeUpdate("use db");
// create table
stmt.executeUpdate("create table if not exists tb (ts timestamp, temperature int, humidity float)");
// insert data
int affectedRows = stmt.executeUpdate("insert into tb values(now, 23, 10.3) (now + 1s, 20, 9.3)");
System.out.println("insert " + affectedRows + " rows.");
// query data
ResultSet resultSet = stmt.executeQuery("select * from tb");
Timestamp ts = null;
int temperature = 0;
float humidity = 0;
while(resultSet.next()){
ts = resultSet.getTimestamp(1);
temperature = resultSet.getInt(2);
humidity = resultSet.getFloat("humidity");
System.out.printf("%s, %d, %s\n", ts, temperature, humidity);
}
}
public static Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://td01:0/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
}
```
(3)测试jdbc访问tdengine的sever实例
console输出:
```
insert 2 rows.
2020-08-26 00:06:34.575, 23, 10.3
2020-08-26 00:06:35.575, 20, 9.3
```
## 指南
(1)如何设置主机名和hosts
在server上查看hostname和fqdn
```shell
查看hostname
# hostname
taos-server
查看fqdn
# hostname -f
taos-server
```
windows下hosts文件位于:
C:\\Windows\System32\drivers\etc\hosts
修改hosts文件,添加server的ip和hostname
```s
192.168.56.101 node5
```
(2)什么是fqdn?
> 什么是FQDN?
>
> FQDN(Full qualified domain name)全限定域名,fqdn由2部分组成:hostname+domainname。
>
> 例如,一个邮件服务器的fqdn可能是:mymail.somecollege.edu,其中mymail是hostname(主机名),somcollege.edu是domainname(域名)。本例中,.edu是顶级域名,.somecollege是二级域名。
>
> 当连接服务器时,必须指定fqdn,然后,dns服务器通过查看dns表,将hostname解析为相应的ip地址。如果只指定hostname(不指定domainname),应用程序可能服务解析主机名。因为如果你试图访问不在本地的远程服务器时,本地的dns服务器和可能没有远程服务器的hostname列表。
>
> 参考:https://kb.iu.edu/d/aiuv
......@@ -107,7 +107,7 @@ func main() {
fmt.Scanln()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
// open connect to taos server
//db, err := sql.Open(taosDriverName, url)
//if err != nil {
......@@ -115,6 +115,7 @@ func main() {
// os.Exit(1)
//}
//defer db.Close()
rand.Seed(time.Now().Unix())
createDatabase(configPara.dbName, configPara.supTblName)
fmt.Printf("======== create database success! ========\n\n")
......
# 如何在windows上使用nodejs进行TDengine应用开发
## 环境准备
(1)安装nodejs-10.22.0
下载链接:https://nodejs.org/dist/v10.22.0/node-v10.22.0-win-x64.zip
解压安装,把node配置到环境变量里
cmd启动命令行,查看node的版本
```shell
> node.exe --version
v10.22.0
> npm --version
6.14.6
```
(2)安装python2.7
下载链接:https://www.python.org/ftp/python/2.7.18/python-2.7.18.amd64.msi
查看python版本
```shell
>python --version
Python 2.7.18
```
(3)安装TDengine-client
下载地址:https://www.taosdata.com/cn/all-downloads/,选择一个合适的windows-client下载(client应该尽量与server端的版本保持一致)
使用client的taos shell连接server
```shell
>taos -h node5
Welcome to the TDengine shell from Linux, Client Version:2.0.6.0
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
taos> show dnodes;
id | end_point | vnodes | cores | status | role | create_time | offline reason |
============================================================================================================================================
1 | node5:6030 | 7 | 1 | ready | any | 2020-10-26 09:45:26.308 | |
Query OK, 1 row(s) in set (0.036000s)
```
注意:
* 检查能否在client的机器上ping通server的fqdn
* 如果你的dns server并没有提供到server的域名解析,可以将server的hostname配置到client的hosts文件中
## 应用开发
(1)建立nodejs项目
```
npm init
```
(2)安装windows-build-tools
```
npm install --global --production windows-build-tools
```
(3)安装td2.0-connector驱动
``` tdshell
npm install td2.0-connector
```
(4)nodejs访问tdengine的示例程序
```javascript
const taos = require('td2.0-connector');
var host = null;
var port = 6030;
for (var i = 2; i < global.process.argv.length; i++) {
var key = global.process.argv[i].split("=")[0];
var value = global.process.argv[i].split("=")[1];
if ("host" == key) {
host = value;
}
if ("port" == key) {
port = value;
}
}
if (host == null) {
console.log("Usage: node nodejsChecker.js host=<hostname> port=<port>");
process.exit(0);
}
// establish connection
var conn = taos.connect({host: host, user: "root", password: "taosdata", port: port});
var cursor = conn.cursor();
// create database
executeSql("create database if not exists testnodejs", 0);
// use db
executeSql("use testnodejs", 0);
// drop table
executeSql("drop table if exists testnodejs.weather", 0);
// create table
executeSql("create table if not exists testnodejs.weather(ts timestamp, temperature float, humidity int)", 0);
// insert
executeSql("insert into testnodejs.weather (ts, temperature, humidity) values(now, 20.5, 34)", 1);
// select
executeQuery("select * from testnodejs.weather");
// close connection
conn.close();
function executeQuery(sql) {
var start = new Date().getTime();
var promise = cursor.query(sql, true);
var end = new Date().getTime();
promise.then(function (result) {
printSql(sql, result != null, (end - start));
result.pretty();
});
}
function executeSql(sql, affectRows) {
var start = new Date().getTime();
var promise = cursor.execute(sql);
var end = new Date().getTime();
printSql(sql, promise == affectRows, (end - start));
}
function printSql(sql, succeed, cost) {
console.log("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
}
```
(5)测试nodejs程序
```shell
>node nodejsChecker.js
Usage: node nodejsChecker.js host=<hostname> port=<port>
# 提示指定host
>node nodejsChecker.js host=node5
Successfully connected to TDengine
Query OK, 0 row(s) affected (0.00997610s)
[ OK ] time cost: 14 ms, execute statement ====> create database if not exists testnodejs
Query OK, 0 row(s) affected (0.00235920s)
[ OK ] time cost: 4 ms, execute statement ====> use testnodejs
Query OK, 0 row(s) affected (0.06604280s)
[ OK ] time cost: 67 ms, execute statement ====> drop table if exists testnodejs.weather
Query OK, 0 row(s) affected (0.59403290s)
[ OK ] time cost: 595 ms, execute statement ====> create table if not exists testnodejs.weather(ts timestamp, temperature float, humidity int)
Query OK, 1 row(s) affected (0.01058950s)
[ OK ] time cost: 12 ms, execute statement ====> insert into testnodejs.weather (ts, temperature, humidity) values(now, 20.5, 34)
Query OK, 1 row(s) in set (0.00401490s)
[ OK ] time cost: 10 ms, execute statement ====> select * from testnodejs.weather
Connection is closed
ts | temperature | humidity |
=====================================================================
2020-10-27 18:49:15.547 | 20.5 | 34 |
```
## 指南
### 如何设置主机名和hosts
在server上查看hostname和fqdn
```shell
查看hostname
# hostname
taos-server
查看fqdn
# hostname -f
taos-server
```
windows下hosts文件位于:
C:\\Windows\System32\drivers\etc\hosts
修改hosts文件,添加server的ip和hostname
```
192.168.56.101 node5
```
> 什么是FQDN?
>
> FQDN(Full qualified domain name)全限定域名,fqdn由2部分组成:hostname+domainname。
>
> 例如,一个邮件服务器的fqdn可能是:mymail.somecollege.edu,其中mymail是hostname(主机名),somcollege.edu是domainname(域名)。本例中,.edu是顶级域名,.somecollege是二级域名。
>
> 当连接服务器时,必须指定fqdn,然后,dns服务器通过查看dns表,将hostname解析为相应的ip地址。如果只指定hostname(不指定domainname),应用程序可能服务解析主机名。因为如果你试图访问不在本地的远程服务器时,本地的dns服务器和可能没有远程服务器的hostname列表。
>
> 参考:https://kb.iu.edu/d/aiuv
......@@ -42,8 +42,8 @@ function executeQuery(sql){
var start = new Date().getTime();
var promise = cursor.query(sql, true);
var end = new Date().getTime();
printSql(sql, promise != null,(end - start));
promise.then(function(result){
printSql(sql, result != null,(end - start));
result.pretty();
});
}
......
......@@ -70,10 +70,12 @@ if [[ $1 == '--valgrind' ]]; then
$CRASH_GEN_EXEC $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
elif [[ $1 == '--helgrind' ]]; then
shift
HELGRIND_OUT=helgrind.out
HELGRIND_ERR=helgrind.err
valgrind \
--tool=helgrind \
$PYTHON_EXEC \
$CRASH_GEN_EXEC $@
$CRASH_GEN_EXEC $@ > $HELGRIND_OUT 2> $HELGRIND_ERR
else
$PYTHON_EXEC $CRASH_GEN_EXEC $@
fi
......
......@@ -1226,6 +1226,11 @@ class Task():
"To be implemeted by child classes, class name: {}".format(
self.__class__.__name__))
def _isServiceStable(self):
if not gSvcMgr:
return True # we don't run service, so let's assume it's stable
return gSvcMgr.isStable() # otherwise let's examine the service
def _isErrAcceptable(self, errno, msg):
if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY
......@@ -1263,7 +1268,7 @@ class Task():
return True
elif msg.find("duplicated column names") != -1: # also alter table tag issues
return True
elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ...
elif not self._isServiceStable(): # We are managing service, and ...
Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
return True
......@@ -1641,15 +1646,39 @@ class TaskReadData(StateTransitionTask):
def canBeginFrom(cls, state: AnyState):
return state.canReadData()
# def _canRestartService(self):
# if not gSvcMgr:
# return True # always
# return gSvcMgr.isActive() # only if it's running TODO: race condition here
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTable = self._db.getFixedSuperTable()
# 1 in 5 chance, simulate a broken connection.
if random.randrange(5) == 0: # TODO: break connection in all situations
wt.getDbConn().close()
wt.getDbConn().open()
print("_r", end="", flush=True)
# 1 in 5 chance, simulate a broken connection, only if service stable (not restarting)
if random.randrange(20)==0: # and self._canRestartService(): # TODO: break connection in all situations
# Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
Progress.emit(Progress.SERVICE_RECONNECT_START)
try:
wt.getDbConn().close()
wt.getDbConn().open()
except ConnectionError as err: # may fail
if not gSvcMgr:
Logging.error("Failed to reconnect in client-only mode")
raise # Not OK if we are running in client-only mode
if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to
Logging.error("Failed to reconnect when managed server is running")
raise # Not OK if we are running normally
Progress.emit(Progress.SERVICE_RECONNECT_FAILURE)
# Logging.info("Ignoring DB reconnect error")
# print("_r", end="", flush=True)
Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS)
# The above might have taken a lot of time, service might be running
# by now, causing error below to be incorrectly handled due to timing issue
return # TODO: fix server restart status race condtion
dbc = wt.getDbConn()
dbName = self._db.getName()
for rTbName in sTable.getRegTables(dbc, dbName): # regular tables
......
......@@ -163,11 +163,17 @@ class Progress:
BEGIN_THREAD_STEP = 1
END_THREAD_STEP = 2
SERVICE_HEART_BEAT= 3
SERVICE_RECONNECT_START = 4
SERVICE_RECONNECT_SUCCESS = 5
SERVICE_RECONNECT_FAILURE = 6
tokens = {
STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[',
END_THREAD_STEP: '] ',
SERVICE_HEART_BEAT: '.Y.'
SERVICE_HEART_BEAT: '.Y.',
SERVICE_RECONNECT_START: '<r.',
SERVICE_RECONNECT_SUCCESS: '.r>',
SERVICE_RECONNECT_FAILURE: '.xr>',
}
@classmethod
......
......@@ -280,16 +280,18 @@ class TdeSubProcess:
# process still alive, let's interrupt it
print("Terminate running process, send SIG_INT and wait...")
# sub process should end, then IPC queue should end, causing IO thread to end
self.subProcess.send_signal(signal.SIGINT)
# sig = signal.SIGINT
sig = signal.SIGKILL
self.subProcess.send_signal(sig) # SIGNINT or SIGKILL
self.subProcess.wait(20)
retCode = self.subProcess.returncode # should always be there
# May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now
self.subProcess = None
if retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode))
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(sig, retCode))
else:
Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT")
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(sig))
return - retCode
class ServiceManager:
......@@ -395,6 +397,13 @@ class ServiceManager:
return True
return False
def isRunning(self):
for ti in self._tInsts:
if not ti.getStatus().isRunning():
return False
return True
# def isRestarting(self):
# """
# Determine if the service/cluster is being "restarted", i.e., at least
......
......@@ -24,6 +24,7 @@ python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py
# tag
......@@ -150,6 +151,7 @@ python3 ./test.py -f query/select_last_crash.py
python3 ./test.py -f query/queryNullValueTest.py
python3 ./test.py -f query/queryInsertValue.py
python3 ./test.py -f query/queryConnection.py
python3 ./test.py -f query/queryCountCSVData.py
python3 ./test.py -f query/natualInterval.py
python3 ./test.py -f query/bug1471.py
python3 ./test.py -f query/dataLossTest.py
......
#!/bin/bash
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log
for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'`
do
if [ -n "$memError" ]; then
if [ "$memError" -gt 12 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\
More than our threshold! ## ${NC}"
fi
fi
done
grep 'start to execute\|definitely lost:' valgrind.err|grep -v 'grep'|uniq|tee crash_gen-definitely-lost-out.log
for defiMemError in `grep 'definitely lost:' crash_gen-definitely-lost-out.log | awk '{print $7}'`
do
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 3 ]; then
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
exit 8
fi
fi
done
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
class TDTestCase:
"""
create table and insert data from disordered.csv which timestamp is disordered and
ordered.csv which timestamp is ordered.
then execute 'select count(*) from table xx;'
"""
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute("create database if not exists demo;");
tdSql.execute("use demo;")
tdSql.execute("CREATE TABLE IF NOT EXISTS test1 (ts TIMESTAMP, ValueID int, "
"VariantValue float, Quality int, Flags int);")
tdSql.execute("CREATE TABLE IF NOT EXISTS test2 (ts TIMESTAMP, ValueID int, "
"VariantValue float, Quality int, Flags int);")
ordered_csv = __file__.split('query')[0] + 'test_data/ordered.csv'
disordered_csv = __file__.split('query')[0] + 'test_data/disordered.csv'
tdSql.execute(" insert into test1 file '{file}';".format(file=ordered_csv))
tdSql.execute(" insert into test2 file '{file}';".format(file=disordered_csv))
print("==============insert into test1 and test2 form test file")
print("==============step2")
tdSql.query('select * from test1;')
with open(ordered_csv) as f1:
num1 = len(f1.readlines())
tdSql.checkRows(num1)
tdSql.query('select * from test2;')
with open(disordered_csv) as f2:
num2 = len(f2.readlines())
tdSql.checkRows(num2)
print("=============execute select count(*) from xxx")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
# -*- coding: utf-8 -*-
import sys
import string
import random
import subprocess
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdLog.info('=============== step1')
tdLog.info('create table TestSensitiveT(ts timestamp, i int)')
tdSql.execute('create table TestSensitiveT(ts timestamp, i int)')
tdLog.info('create table TestSensitiveSt(ts timestamp,i int) tags(j int)')
tdSql.execute('create table TestSensitiveSt(ts timestamp,i int) tags(j int)')
tdLog.info('create table Abcde using TestSensitiveSt tags(1)')
tdSql.execute('create table AbcdeFgh using TestSensitiveSt tags(1)')
tdLog.info('=============== step2')
tdLog.info('test normal table ')
tdSql.error('create table testsensitivet(ts timestamp, i int)')
tdSql.error('create table testsensitivet(ts timestamp, j int)')
tdSql.error('create table testsensItivet(ts timestamp, j int)')
tdSql.error('create table TESTSENSITIVET(ts timestamp, i int)')
tdLog.info('=============== step3')
tdLog.info('test super table ')
tdSql.error('create table testsensitivest(ts timestamp,i int) tags(j int)')
tdSql.error('create table testsensitivest(ts timestamp,i int) tags(k int)')
tdSql.error('create table TESTSENSITIVEST(ts timestamp,i int) tags(j int)')
tdSql.error('create table Testsensitivest(ts timestamp,i int) tags(j int)')
tdLog.info('=============== step4')
tdLog.info('test subtable ')
tdSql.error('create table abcdefgh using TestSensitiveSt tags(1)')
tdSql.error('create table ABCDEFGH using TestSensitiveSt tags(1)')
tdSql.error('create table Abcdefgh using TestSensitiveSt tags(1)')
tdSql.error('create table abcdeFgh using TestSensitiveSt tags(1)')
tdSql.error('insert into table abcdefgh using TestSensitiveSt tags(1) values(now,1)')
tdSql.error('insert into table ABCDEFGH using TestSensitiveSt tags(1) values(now,1)')
tdSql.error('insert into table Abcdefgh using TestSensitiveSt tags(1) values(now,1)')
tdSql.error('insert into table abcdeFgH using TestSensitiveSt tags(1) values(now,1)')
tdSql.query('show tables')
tdLog.info('tdSql.checkRow(0)')
tdSql.checkRows(2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
"""
this directory contains test data files
"""
\ No newline at end of file
此差异已折叠。
此差异已折叠。
......@@ -25,7 +25,7 @@ class TDSql:
self.queryCols = 0
self.affectedRows = 0
def init(self, cursor, log=True):
def init(self, cursor, log=False):
self.cursor = cursor
if (log):
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 2
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sleep 3000
print ========== step3
sql create database d1
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 35)
sql insert into d1.t1 values(now+2s, 34)
sql insert into d1.t1 values(now+3s, 33)
sql insert into d1.t1 values(now+4s, 32)
sql insert into d1.t1 values(now+5s, 31)
print ========== step4
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sleep 3000
print ========== step5
sql select * from d1.t1 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 31 then
return -1
endi
if $data11 != 32 then
return -1
endi
if $data21 != 33 then
return -1
endi
if $data31 != 34 then
return -1
endi
if $data41 != 35 then
return -1
endi
print ========== step6
system_content rm -rf ../../../sim/dnode1/data/vnode/vnode2/tsdb/data
print ========== step7
sql select * from d1.t1 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != null then
return -1
endi
if $data11 != null then
return -1
endi
if $data21 != null then
return -1
endi
if $data31 != null then
return -1
endi
if $data41 != null then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -64,6 +64,7 @@ cd ../../../debug; make
./test.sh -f general/db/repeat.sim
./test.sh -f general/db/tables.sim
./test.sh -f general/db/vnodes.sim
./test.sh -f general/db/nosuchfile.sim
./test.sh -f general/field/2.sim
./test.sh -f general/field/3.sim
......@@ -277,6 +278,7 @@ cd ../../../debug; make
./test.sh -f unique/dnode/balance2.sim
./test.sh -f unique/dnode/balance3.sim
./test.sh -f unique/dnode/balancex.sim
./test.sh -f unique/dnode/data1.sim
./test.sh -f unique/dnode/offline1.sim
./test.sh -f unique/dnode/offline2.sim
./test.sh -f unique/dnode/reason.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode2 -c wallevel -v 2
system sh/cfg.sh -n dnode3 -c wallevel -v 2
system sh/cfg.sh -n dnode4 -c wallevel -v 2
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sleep 3000
print ========== step2
sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start
sql create dnode $hostname4
system sh/exec.sh -n dnode4 -s start
$x = 0
show2:
$x = $x + 1
sleep 3000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
print dnode3 openVnodes $data2_3
print dnode4 openVnodes $data2_4
if $data2_1 != 0 then
goto show2
endi
if $data2_2 != 0 then
goto show2
endi
if $data2_3 != 0 then
goto show2
endi
if $data2_4 != 0 then
goto show2
endi
print ========== step3
sql create database d1 replica 3
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 35)
sql insert into d1.t1 values(now+2s, 34)
sql insert into d1.t1 values(now+3s, 33)
sql insert into d1.t1 values(now+4s, 32)
sql insert into d1.t1 values(now+5s, 31)
$x = 0
show3:
$x = $x + 1
sleep 3000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
print dnode3 openVnodes $data2_3
print dnode4 openVnodes $data2_4
if $data2_1 != 0 then
goto show3
endi
if $data2_2 != 1 then
goto show3
endi
if $data2_3 != 1 then
goto show3
endi
if $data2_4 != 1 then
goto show3
endi
print ========== step4
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
print ========== step5
system_content rm -rf ../../../sim/dnode4/data/vnode/vnode2/tsdb/data
print ========== step6
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 10000
print ========== step7
sql select * from d1.t1 order by t desc
print $data01 $data11 $data21 $data31 $data41
if $data01 != 31 then
return -1
endi
if $data11 != 32 then
return -1
endi
if $data21 != 33 then
return -1
endi
if $data31 != 34 then
return -1
endi
if $data41 != 35 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册