提交 9605658f 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'develop' into feature/file

...@@ -27,6 +27,7 @@ pipeline { ...@@ -27,6 +27,7 @@ pipeline {
cd debug cd debug
cmake .. > /dev/null cmake .. > /dev/null
make > /dev/null make > /dev/null
make install > /dev/null
cd ${WKC}/tests cd ${WKC}/tests
#./test-all.sh smoke #./test-all.sh smoke
./test-all.sh pytest ./test-all.sh pytest
......
...@@ -1016,9 +1016,9 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER ...@@ -1016,9 +1016,9 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER
``` ```
## TAOS SQL 边界限制 ## TAOS SQL 边界限制
- 数据库名最大长度为33 - 数据库名最大长度为32
- 表名最大长度为193,每行数据最大长度16k个字符 - 表名最大长度为192,每行数据最大长度16k个字符
- 列名最大长度为65,最多允许1024列,最少需要2列,第一列必须是时间戳 - 列名最大长度为64,最多允许1024列,最少需要2列,第一列必须是时间戳
- 标签最多允许128个,可以0个,标签总长度不超过16k个字符 - 标签最多允许128个,可以0个,标签总长度不超过16k个字符
- SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为1M - SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为1M
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制 - 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制
...@@ -48,6 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat ...@@ -48,6 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
......
...@@ -58,6 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri ...@@ -58,6 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
...@@ -134,6 +135,7 @@ if [ $1 -eq 0 ];then ...@@ -134,6 +135,7 @@ if [ $1 -eq 0 ];then
${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${cfg_link_dir}/* || : ${csudo} rm -f ${cfg_link_dir}/* || :
${csudo} rm -f ${inc_link_dir}/taos.h || : ${csudo} rm -f ${inc_link_dir}/taos.h || :
${csudo} rm -f ${inc_link_dir}/taoserror.h || : ${csudo} rm -f ${inc_link_dir}/taoserror.h || :
......
...@@ -45,7 +45,7 @@ if [ "$osType" != "Darwin" ]; then ...@@ -45,7 +45,7 @@ if [ "$osType" != "Darwin" ]; then
strip ${build_dir}/bin/taos strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh" bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh"
else else
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh" bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
fi fi
lib_files="${build_dir}/lib/libtaos.so.${version}" lib_files="${build_dir}/lib/libtaos.so.${version}"
else else
......
...@@ -36,7 +36,7 @@ if [ "$pagMode" == "lite" ]; then ...@@ -36,7 +36,7 @@ if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh" bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
else else
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh" bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
fi fi
lib_files="${build_dir}/lib/libtaos.so.${version}" lib_files="${build_dir}/lib/libtaos.so.${version}"
......
...@@ -81,8 +81,10 @@ function install_lib() { ...@@ -81,8 +81,10 @@ function install_lib() {
${csudo} ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo} ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo} ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so ${csudo} ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
${csudo} ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.so.1 || : ${csudo} ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo} ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || : ${csudo} ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
fi
} }
function install_bin() { function install_bin() {
......
...@@ -3978,7 +3978,7 @@ static void interp_function(SQLFunctionCtx *pCtx) { ...@@ -3978,7 +3978,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if (isNull(data1, srcType) || isNull(data2, srcType)) { if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else { } else {
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point); taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
} }
} else if (srcType == TSDB_DATA_TYPE_FLOAT) { } else if (srcType == TSDB_DATA_TYPE_FLOAT) {
point1.val = data1; point1.val = data1;
...@@ -3987,7 +3987,7 @@ static void interp_function(SQLFunctionCtx *pCtx) { ...@@ -3987,7 +3987,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if (isNull(data1, srcType) || isNull(data2, srcType)) { if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes); setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else { } else {
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point); taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
} }
} else { } else {
......
...@@ -2758,6 +2758,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* ...@@ -2758,6 +2758,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
const char* msg1 = "too many columns in group by clause"; const char* msg1 = "too many columns in group by clause";
const char* msg2 = "invalid column name in group by clause"; const char* msg2 = "invalid column name in group by clause";
const char* msg3 = "columns from one table allowed as group by columns"; const char* msg3 = "columns from one table allowed as group by columns";
const char* msg4 = "join query does not support group by";
const char* msg7 = "not support group by expression"; const char* msg7 = "not support group by expression";
const char* msg8 = "not allowed column type for group by"; const char* msg8 = "not allowed column type for group by";
const char* msg9 = "tags not allowed for table query"; const char* msg9 = "tags not allowed for table query";
...@@ -2778,6 +2779,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* ...@@ -2778,6 +2779,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
if (pQueryInfo->numOfTables > 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
SSchema s = tscGetTbnameColumnSchema(); SSchema s = tscGetTbnameColumnSchema();
......
...@@ -878,7 +878,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -878,7 +878,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tsBuf != NULL) { if (pQueryInfo->tsBuf != NULL) {
// note: here used the index instead of actual vnode id. // note: here used the index instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex; int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks); int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -156,8 +156,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -156,8 +156,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
win->ekey = elem1.ts; win->ekey = elem1.ts;
} }
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else { } else {
pLimit->offset -= 1;//offset apply to projection? pLimit->offset -= 1;//offset apply to projection?
} }
...@@ -193,8 +193,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -193,8 +193,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
TSKEY et = taosGetTimestampUs(); TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us", "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey, pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
tsBufGetNumOfVnodes(output1), et - st); tsBufGetNumOfGroup(output1), et - st);
return output1->numOfTotal; return output1->numOfTotal;
} }
...@@ -282,7 +282,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { ...@@ -282,7 +282,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0; int32_t num = 0;
int32_t* list = NULL; int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list); tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
// The virtual node, of which all tables are disqualified after the timestamp intersection, // The virtual node, of which all tables are disqualified after the timestamp intersection,
// is removed to avoid next stage query. // is removed to avoid next stage query.
...@@ -314,7 +314,7 @@ static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { ...@@ -314,7 +314,7 @@ static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0; int32_t num = 0;
int32_t* list = NULL; int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list); tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
size_t numOfGroups = taosArrayGetSize(pVgroupTables); size_t numOfGroups = taosArrayGetSize(pVgroupTables);
...@@ -853,11 +853,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -853,11 +853,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
} }
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
assert(pParentSql->fp != tscJoinQueryCallback);
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql); tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
// set no result command // set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
assert(pParentSql->fp != tscJoinQueryCallback);
(*pParentSql->fp)(pParentSql->param, pParentSql, 0); (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
// proceed to for ts_comp query // proceed to for ts_comp query
...@@ -2366,7 +2370,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -2366,7 +2370,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
pRes->numOfCols = (int32_t)numOfExprs; pRes->numOfCols = (int16_t)numOfExprs;
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->buffer = calloc(numOfExprs, POINTER_BYTES); pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
......
...@@ -52,7 +52,6 @@ public class TSDBConnection implements Connection { ...@@ -52,7 +52,6 @@ public class TSDBConnection implements Connection {
public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException { public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException {
this.dbMetaData = meta; this.dbMetaData = meta;
connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST), connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST),
Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")), Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER), info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
...@@ -197,12 +196,13 @@ public class TSDBConnection implements Connection { ...@@ -197,12 +196,13 @@ public class TSDBConnection implements Connection {
} }
public SQLWarning getWarnings() throws SQLException { public SQLWarning getWarnings() throws SQLException {
//todo: implement getWarnings according to the warning messages returned from TDengine
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
public void clearWarnings() throws SQLException { public void clearWarnings() throws SQLException {
// left blank to support HikariCP connection // left blank to support HikariCP connection
//todo: implement getWarnings according to the warning messages returned from TDengine //todo: implement clearWarnings according to the warning messages returned from TDengine
} }
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
......
...@@ -96,7 +96,7 @@ public class TSDBDatabaseMetaData implements java.sql.DatabaseMetaData { ...@@ -96,7 +96,7 @@ public class TSDBDatabaseMetaData implements java.sql.DatabaseMetaData {
} }
public int getDriverMajorVersion() { public int getDriverMajorVersion() {
return 0; return 2;
} }
public int getDriverMinorVersion() { public int getDriverMinorVersion() {
......
...@@ -14,13 +14,9 @@ ...@@ -14,13 +14,9 @@
*****************************************************************************/ *****************************************************************************/
package com.taosdata.jdbc; package com.taosdata.jdbc;
import java.io.*; import java.io.*;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Properties;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
...@@ -44,76 +40,53 @@ import java.util.logging.Logger; ...@@ -44,76 +40,53 @@ import java.util.logging.Logger;
*/ */
public class TSDBDriver implements java.sql.Driver { public class TSDBDriver implements java.sql.Driver {
@Deprecated @Deprecated
private static final String URL_PREFIX1 = "jdbc:TSDB://"; private static final String URL_PREFIX1 = "jdbc:TSDB://";
private static final String URL_PREFIX = "jdbc:TAOS://"; private static final String URL_PREFIX = "jdbc:TAOS://";
/**
* Key used to retrieve the database value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_DBNAME = "dbname";
/** /**
* Key used to retrieve the host value from the properties instance passed to * Key used to retrieve the host value from the properties instance passed to
* the driver. * the driver.
*/ */
public static final String PROPERTY_KEY_HOST = "host"; public static final String PROPERTY_KEY_HOST = "host";
/**
* Key used to retrieve the password value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_PASSWORD = "password";
/** /**
* Key used to retrieve the port number value from the properties instance * Key used to retrieve the port number value from the properties instance
* passed to the driver. * passed to the driver.
*/ */
public static final String PROPERTY_KEY_PORT = "port"; public static final String PROPERTY_KEY_PORT = "port";
/**
* Key used to retrieve the database value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_DBNAME = "dbname";
/** /**
* Key used to retrieve the user value from the properties instance passed to * Key used to retrieve the user value from the properties instance passed to
* the driver. * the driver.
*/ */
public static final String PROPERTY_KEY_USER = "user"; public static final String PROPERTY_KEY_USER = "user";
/**
* Key used to retrieve the password value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_PASSWORD = "password";
/** /**
* Key for the configuration file directory of TSDB client in properties instance * Key for the configuration file directory of TSDB client in properties instance
*/ */
public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir"; public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
/** /**
* Key for the timezone used by the TSDB client in properties instance * Key for the timezone used by the TSDB client in properties instance
*/ */
public static final String PROPERTY_KEY_TIME_ZONE = "timezone"; public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
/** /**
* Key for the locale used by the TSDB client in properties instance * Key for the locale used by the TSDB client in properties instance
*/ */
public static final String PROPERTY_KEY_LOCALE = "locale"; public static final String PROPERTY_KEY_LOCALE = "locale";
/** /**
* Key for the char encoding used by the TSDB client in properties instance * Key for the char encoding used by the TSDB client in properties instance
*/ */
public static final String PROPERTY_KEY_CHARSET = "charset"; public static final String PROPERTY_KEY_CHARSET = "charset";
public static final String PROPERTY_KEY_PROTOCOL = "protocol";
/**
* Index for port coming out of parseHostPortPair().
*/
public final static int PORT_NUMBER_INDEX = 1;
/**
* Index for host coming out of parseHostPortPair().
*/
public final static int HOST_NAME_INDEX = 0;
private TSDBDatabaseMetaData dbMetaData = null; private TSDBDatabaseMetaData dbMetaData = null;
static { static {
...@@ -169,9 +142,11 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -169,9 +142,11 @@ public class TSDBDriver implements java.sql.Driver {
} }
public Connection connect(String url, Properties info) throws SQLException { public Connection connect(String url, Properties info) throws SQLException {
if (url == null) { if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!")); throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
}
if (!acceptsURL(url))
return null;
Properties props = null; Properties props = null;
if ((props = parseURL(url, info)) == null) { if ((props = parseURL(url, info)) == null) {
...@@ -179,7 +154,10 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -179,7 +154,10 @@ public class TSDBDriver implements java.sql.Driver {
} }
//load taos.cfg start //load taos.cfg start
if (info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null && info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null) { if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR)); File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0]; File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile); List<String> endpoints = loadConfigEndpoints(cfgFile);
...@@ -190,7 +168,9 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -190,7 +168,9 @@ public class TSDBDriver implements java.sql.Driver {
} }
try { try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET), TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR),
(String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE)); (String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData); Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn; return newConn;
...@@ -208,43 +188,15 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -208,43 +188,15 @@ public class TSDBDriver implements java.sql.Driver {
} }
/** /**
* Parses hostPortPair in the form of [host][:port] into an array, with the * @param url the URL of the database
* element of index HOST_NAME_INDEX being the host (or null if not specified), * @return <code>true</code> if this driver understands the given URL;
* and the element of index PORT_NUMBER_INDEX being the port (or null if not * <code>false</code> otherwise
* specified). * @throws SQLException if a database access error occurs or the url is {@code null}
*
* @param hostPortPair host and port in form of of [host][:port]
* @return array containing host and port as Strings
* @throws SQLException if a parse error occurs
*/ */
protected static String[] parseHostPortPair(String hostPortPair) throws SQLException {
String[] splitValues = new String[2];
int portIndex = hostPortPair.indexOf(":");
String hostname = null;
if (portIndex != -1) {
if ((portIndex + 1) < hostPortPair.length()) {
String portAsString = hostPortPair.substring(portIndex + 1);
hostname = hostPortPair.substring(0, portIndex);
splitValues[HOST_NAME_INDEX] = hostname;
splitValues[PORT_NUMBER_INDEX] = portAsString;
} else {
throw new SQLException(TSDBConstants.WrapErrMsg("port is not proper!"));
}
} else {
splitValues[HOST_NAME_INDEX] = hostPortPair;
splitValues[PORT_NUMBER_INDEX] = null;
}
return splitValues;
}
public boolean acceptsURL(String url) throws SQLException { public boolean acceptsURL(String url) throws SQLException {
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.startsWith(URL_PREFIX); if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is null"));
return (url != null && url.length() > 0 && url.trim().length() > 0) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1));
} }
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
...@@ -252,15 +204,17 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -252,15 +204,17 @@ public class TSDBDriver implements java.sql.Driver {
info = new Properties(); info = new Properties();
} }
if ((url != null) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1))) { if (acceptsURL(url)) {
info = parseURL(url, info); info = parseURL(url, info);
} }
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST)); DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
hostProp.required = true; hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT)); DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false; portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME)); DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false; dbProp.required = false;
...@@ -268,9 +222,11 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -268,9 +222,11 @@ public class TSDBDriver implements java.sql.Driver {
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER)); DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true; userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD)); DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true; passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5]; DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp; propertyInfo[0] = hostProp;
...@@ -283,20 +239,68 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -283,20 +239,68 @@ public class TSDBDriver implements java.sql.Driver {
} }
/** /**
* example: jdbc:TSDB://127.0.0.1:0/db?user=root&password=your_password * example: jdbc:TAOS://127.0.0.1:0/db?user=root&password=your_password
*/ */
public Properties parseURL(String url, Properties defaults) throws java.sql.SQLException { public Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties(); Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null) { if (url == null || url.length() <= 0 || url.trim().length() <= 0)
return null; return null;
} if (!url.startsWith(URL_PREFIX) && !url.startsWith(URL_PREFIX1))
if (!url.startsWith(URL_PREFIX) && !url.startsWith(URL_PREFIX1)) {
return null; return null;
}
// parse properties
String urlForMeta = url; String urlForMeta = url;
int beginningOfSlashes = url.indexOf("//");
int index = url.indexOf("?");
if (index != -1) {
String paramString = url.substring(index + 1, url.length());
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
}
}
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
}
}
}
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
if (indexOfSlash + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_DBNAME, url.substring(indexOfSlash + 1));
}
url = url.substring(0, indexOfSlash);
}
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
if (indexOfColon + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_PORT, url.substring(indexOfColon + 1));
}
url = url.substring(0, indexOfColon);
}
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
/*
String urlForMeta = url;
String dbProductName = url.substring(url.indexOf(":") + 1); String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":")); dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//"); int beginningOfSlashes = url.indexOf("//");
...@@ -345,11 +349,11 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -345,11 +349,11 @@ public class TSDBDriver implements java.sql.Driver {
user = urlProps.getProperty(PROPERTY_KEY_USER).toString(); user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user); this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
*/
return urlProps; return urlProps;
} }
public void setPropertyValue(Properties property, String[] keyValuePair) { private void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) { switch (keyValuePair[0].toLowerCase()) {
case PROPERTY_KEY_USER: case PROPERTY_KEY_USER:
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]); property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
...@@ -372,13 +376,12 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -372,13 +376,12 @@ public class TSDBDriver implements java.sql.Driver {
} }
} }
public int getMajorVersion() { public int getMajorVersion() {
return 1; return 2;
} }
public int getMinorVersion() { public int getMinorVersion() {
return 1; return 0;
} }
public boolean jdbcCompliant() { public boolean jdbcCompliant() {
...@@ -389,33 +392,4 @@ public class TSDBDriver implements java.sql.Driver { ...@@ -389,33 +392,4 @@ public class TSDBDriver implements java.sql.Driver {
return null; return null;
} }
/**
* Returns the host property
*
* @param props the java.util.Properties instance to retrieve the hostname from.
* @return the host
*/
public String host(Properties props) {
return props.getProperty(PROPERTY_KEY_HOST, "localhost");
}
/**
* Returns the port number property
*
* @param props the properties to get the port number from
* @return the port number
*/
public int port(Properties props) {
return Integer.parseInt(props.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
}
/**
* Returns the database property from <code>props</code>
*
* @param props the Properties to look for the database property.
* @return the database name.
*/
public String database(Properties props) {
return props.getProperty(PROPERTY_KEY_DBNAME);
}
} }
package com.taosdata.jdbc; package com.taosdata.jdbc;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.sql.SQLException; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.*;
import java.util.Properties; import java.util.Properties;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
public class TSDBDriverTest { public class TSDBDriverTest {
private static final String[] validURLs = {
"jdbc:TAOS://localhost:0",
"jdbc:TAOS://localhost",
"jdbc:TAOS://localhost:6030/test",
"jdbc:TAOS://localhost:6030",
"jdbc:TAOS://localhost:6030/",
"jdbc:TSDB://localhost:6030",
"jdbc:TSDB://localhost:6030/",
"jdbc:TAOS://127.0.0.1:0/db?user=root&password=taosdata",
"jdbc:TAOS://:",
"jdbc:TAOS://:/",
"jdbc:TAOS://:/test",
"jdbc:TAOS://localhost:0/?user=root&password=taosdata"
};
private static boolean islibLoaded = false;
private static boolean isTaosdActived;
private Connection conn;
@BeforeClass
public static void before() {
String osName = System.getProperty("os.name").toLowerCase();
if (!osName.equals("linux"))
return;
// try to load taos lib
try {
System.loadLibrary("taos");
islibLoaded = true;
} catch (UnsatisfiedLinkError error) {
System.out.println("load tdengine lib failed.");
error.printStackTrace();
}
// check taosd is activated
try {
String[] cmd = {"/bin/bash", "-c", "ps -ef | grep taosd | grep -v \"grep\""};
Process exec = Runtime.getRuntime().exec(cmd);
BufferedReader reader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
int lineCnt = 0;
while (reader.readLine() != null) {
lineCnt++;
}
if (lineCnt > 0)
isTaosdActived = true;
} catch (IOException e) {
e.printStackTrace();
}
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
@Test
public void testConnectWithJdbcURL() {
final String url = "jdbc:TAOS://localhost:6030/log?user=root&password=taosdata";
try {
if (islibLoaded && isTaosdActived) {
conn = DriverManager.getConnection(url);
assertNotNull("failure - connection should not be null", conn);
}
} catch (SQLException e) {
e.printStackTrace();
fail("failure - should not throw Exception");
}
}
@Test
public void testConnectWithProperties() {
final String jdbcUrl = "jdbc:TAOS://localhost:6030/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
try {
if (islibLoaded && isTaosdActived) {
conn = DriverManager.getConnection(jdbcUrl, connProps);
assertNotNull("failure - connection should not be null", conn);
}
} catch (SQLException e) {
e.printStackTrace();
fail("failure - should not throw Exception");
}
}
@Test
public void testConnectWithConfigFile() {
String jdbcUrl = "jdbc:TAOS://:/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
try {
if (islibLoaded && isTaosdActived) {
conn = DriverManager.getConnection(jdbcUrl, connProps);
assertNotNull("failure - connection should not be null", conn);
}
} catch (SQLException e) {
e.printStackTrace();
fail("failure - should not throw Exception");
}
}
@Test(expected = SQLException.class)
public void testAcceptsURL() throws SQLException {
Driver driver = new TSDBDriver();
for (String url : validURLs) {
assertTrue("failure - acceptsURL(\" " + url + " \") should be true", driver.acceptsURL(url));
}
driver.acceptsURL(null);
fail("acceptsURL throws exception when parameter is null");
}
@Test @Test
public void urlParserTest() throws SQLException { public void testParseURL() {
TSDBDriver driver = new TSDBDriver(); TSDBDriver driver = new TSDBDriver();
String url = "jdbc:TSDB://127.0.0.1:0/db";
String url = "jdbc:TAOS://127.0.0.1:0/db?user=root&password=taosdata&charset=UTF-8";
Properties properties = new Properties(); Properties config = new Properties();
driver.parseURL(url, properties); Properties actual = driver.parseURL(url, config);
assertEquals(properties.get("host"), "127.0.0.1"); assertEquals("failure - host should be 127.0.0.1", "127.0.0.1", actual.get("host"));
assertEquals(properties.get("port"), "0"); assertEquals("failure - port should be 0", "0", actual.get("port"));
assertEquals(properties.get("dbname"), "db"); assertEquals("failure - dbname should be db", "db", actual.get("dbname"));
assertEquals(properties.get("user"), "root"); assertEquals("failure - user should be root", "root", actual.get("user"));
assertEquals(properties.get("password"), "your_password"); assertEquals("failure - password should be taosdata", "taosdata", actual.get("password"));
assertEquals("failure - charset should be UTF-8", "UTF-8", actual.get("charset"));
url = "jdbc:TSDB://127.0.0.1:0/log?charset=UTF-8";
properties = new Properties(); url = "jdbc:TAOS://127.0.0.1:0";
driver.parseURL(url, properties); config = new Properties();
assertEquals(properties.get("host"), "127.0.0.1"); actual = driver.parseURL(url, config);
assertEquals(properties.get("port"), "0"); assertEquals("failure - host should be 127.0.0.1", "127.0.0.1", actual.getProperty("host"));
assertEquals(properties.get("dbname"), "log"); assertEquals("failure - port should be 0", "0", actual.get("port"));
assertEquals(properties.get("charset"), "UTF-8"); assertNull("failure - dbname should be null", actual.get("dbname"));
url = "jdbc:TSDB://127.0.0.1:0/"; url = "jdbc:TAOS://127.0.0.1:0/db";
properties = new Properties(); config = new Properties();
driver.parseURL(url, properties); actual = driver.parseURL(url, config);
assertEquals(properties.get("host"), "127.0.0.1"); assertEquals("failure - host should be 127.0.0.1", "127.0.0.1", actual.getProperty("host"));
assertEquals(properties.get("port"), "0"); assertEquals("failure - port should be 0", "0", actual.get("port"));
assertEquals(properties.get("dbname"), null); assertEquals("failure - dbname should be db", "db", actual.get("dbname"));
url = "jdbc:TSDB://127.0.0.1:0/db"; url = "jdbc:TAOS://:/?";
properties = new Properties(); config = new Properties();
driver.parseURL(url, properties); config.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
assertEquals(properties.get("host"), "127.0.0.1"); config.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
assertEquals(properties.get("port"), "0"); actual = driver.parseURL(url, config);
assertEquals(properties.get("dbname"), "db"); assertEquals("failure - user should be root", "root", actual.getProperty("user"));
assertEquals("failure - password should be taosdata", "taosdata", actual.getProperty("password"));
assertNull("failure - host should be null", actual.getProperty("host"));
assertNull("failure - port should be null", actual.getProperty("port"));
assertNull("failure - dbname should be null", actual.getProperty("dbname"));
}
@Test
public void testGetPropertyInfo() throws SQLException {
Driver driver = new TSDBDriver();
final String url = "jdbc:TAOS://localhost:6030/log?user=root&password=taosdata";
Properties connProps = new Properties();
DriverPropertyInfo[] propertyInfo = driver.getPropertyInfo(url, connProps);
for (DriverPropertyInfo info : propertyInfo) {
if (info.name.equals(TSDBDriver.PROPERTY_KEY_HOST))
assertEquals("failure - host should be localhost", "localhost", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_PORT))
assertEquals("failure - port should be 6030", "6030", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_DBNAME))
assertEquals("failure - dbname should be test", "log", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_USER))
assertEquals("failure - user should be root", "root", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_PASSWORD))
assertEquals("failure - password should be root", "taosdata", info.value);
}
}
@Test
public void testGetMajorVersion() {
assertEquals("failure - getMajorVersion should be 2", 2, new TSDBDriver().getMajorVersion());
}
@Test
public void testGetMinorVersion() {
assertEquals("failure - getMinorVersion should be 0", 0, new TSDBDriver().getMinorVersion());
}
@Test
public void testJdbcCompliant() {
assertFalse("failure - jdbcCompliant should be false", new TSDBDriver().jdbcCompliant());
}
@Test
public void testGetParentLogger() throws SQLFeatureNotSupportedException {
assertNull("failure - getParentLogger should be be null", new TSDBDriver().getParentLogger());
} }
} }
\ No newline at end of file
...@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead->version = 0; pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer); free(buffer);
} }
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
int64_t ver = 0; int64_t ver = 0;
void *pCq = NULL; void *pCq = NULL;
int writeToQueue(void *pVnode, void *data, int type) { int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
return 0; return 0;
} }
......
...@@ -20,9 +20,11 @@ ...@@ -20,9 +20,11 @@
extern "C" { extern "C" {
#endif #endif
int32_t dnodeInitVnodeRead(); int32_t dnodeInitVRead();
void dnodeCleanupVnodeRead(); void dnodeCleanupVRead();
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = { ...@@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp}, {"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVRead, dnodeCleanupVRead},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
......
...@@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0; ...@@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
......
...@@ -17,83 +17,78 @@ ...@@ -17,83 +17,78 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnodeInt.h" #include "tqueue.h"
#include "dnodeMgmt.h"
#include "dnodeVRead.h"
#include "vnode.h" #include "vnode.h"
#include "dnodeInt.h"
typedef struct { typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
int32_t workerId; // worker ID int32_t workerId; // worker ID
} SReadWorker; } SVReadWorker;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
SReadWorker *readWorker; SVReadWorker * worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SReadWorkerPool; } SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
static void dnodeHandleIdleReadWorker(SReadWorker *);
// module global variable // module global variable
static SReadWorkerPool readPool; static SVReadWorkerPool tsVReadWP;
static taos_qset readQset; static taos_qset tsVReadQset;
int32_t dnodeInitVnodeRead() { int32_t dnodeInitVRead() {
readQset = taosOpenQset(); tsVReadQset = taosOpenQset();
readPool.min = tsNumOfCores; tsVReadWP.min = tsNumOfCores;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max); tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&readPool.mutex, NULL); pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (readPool.readWorker == NULL) return -1; if (tsVReadWP.worker == NULL) return -1;
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
pWorker->workerId = i; pWorker->workerId = i;
} }
dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max); dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max);
return 0; return 0;
} }
void dnodeCleanupVnodeRead() { void dnodeCleanupVRead() {
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(readQset); taosQsetThreadResume(tsVReadQset);
} }
} }
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }
free(readPool.readWorker); free(tsVReadWP.worker);
taosCloseQset(readQset); taosCloseQset(tsVReadQset);
pthread_mutex_destroy(&readPool.mutex); pthread_mutex_destroy(&tsVReadWP.mutex);
dInfo("dnode read is closed"); dInfo("dnode vread is closed");
} }
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char * pCont = (char *)pMsg->pCont;
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
...@@ -106,7 +101,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -106,7 +101,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
pRead->contLen = pHead->contLen; pRead->contLen = pHead->contLen;
...@@ -120,60 +115,52 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -120,60 +115,52 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_VND_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
} }
void *dnodeAllocVReadQueue(void *pVnode) { void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex); pthread_mutex_lock(&tsVReadWP.mutex);
taos_queue queue = taosOpenQueue(); taos_queue queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&readPool.mutex); pthread_mutex_unlock(&tsVReadWP.mutex);
return NULL; return NULL;
} }
taosAddIntoQset(readQset, queue, pVnode); taosAddIntoQset(tsVReadQset, queue, pVnode);
// spawn a thread to process queue // spawn a thread to process queue
if (readPool.num < readPool.max) { if (tsVReadWP.num < tsVReadWP.max) {
do { do {
SReadWorker *pWorker = readPool.readWorker + readPool.num; SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num;
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process vread vqueue since %s", strerror(errno));
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
readPool.num++; tsVReadWP.num++;
dDebug("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num); dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num);
} while (readPool.num < readPool.min); } while (tsVReadWP.num < tsVReadWP.min);
} }
pthread_mutex_unlock(&readPool.mutex); pthread_mutex_unlock(&tsVReadWP.mutex);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue); dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue);
return queue; return queue;
} }
void dnodeFreeVReadQueue(void *rqueue) { void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue); taosCloseQueue(rqueue);
// dynamically adjust the number of threads
} }
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
...@@ -186,32 +173,32 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -186,32 +173,32 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont); rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
SReadMsg *pReadMsg; SVReadMsg *pReadMsg;
int type; int32_t qtype;
void *pVnode; void * pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("qset:%p dnode read got no message from qset, exiting", readQset); dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
break; break;
} }
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle, dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg); taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pReadMsg); int32_t code = vnodeProcessRead(pVnode, pReadMsg);
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcVReadRsp(pVnode, pReadMsg, code);
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5)); assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
...@@ -223,19 +210,3 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -223,19 +210,3 @@ static void *dnodeProcessReadQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset);
if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) {
readPool.num--;
dDebug("read worker:%d is released, total:%d", pWorker->workerId, readPool.num);
pthread_exit(NULL);
} else {
usleep(30000);
sched_yield();
}
}
...@@ -15,13 +15,12 @@ ...@@ -15,13 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tsync.h"
#include "vnode.h" #include "vnode.h"
#include "syncInt.h"
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct { typedef struct {
...@@ -29,30 +28,21 @@ typedef struct { ...@@ -29,30 +28,21 @@ typedef struct {
taos_qset qset; // queue set taos_qset qset; // queue set
int32_t workerId; // worker ID int32_t workerId; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
} SWriteWorker; } SVWriteWorker;
typedef struct {
SRspRet rspRet;
SRpcMsg rpcMsg;
int32_t processedCount;
int32_t code;
int32_t contLen;
void * pCont;
} SWriteMsg;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *worker; SVWriteWorker * worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWriteWorkerPool; } SVWriteWorkerPool;
static SWriteWorkerPool tsVWriteWP; static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessVWriteQueue(void *param);
int32_t dnodeInitVWrite() { int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores; tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max); tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1; if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL); pthread_mutex_init(&tsVWriteWP.mutex, NULL);
...@@ -66,14 +56,14 @@ int32_t dnodeInitVWrite() { ...@@ -66,14 +56,14 @@ int32_t dnodeInitVWrite() {
void dnodeCleanupVWrite() { void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset); taosQsetThreadResume(pWorker->qset);
} }
} }
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
...@@ -86,44 +76,43 @@ void dnodeCleanupVWrite() { ...@@ -86,44 +76,43 @@ void dnodeCleanupVWrite() {
dInfo("dnode vwrite is closed"); dInfo("dnode vwrite is closed");
} }
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) { void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
char *pCont = pMsg->pCont; int32_t code;
char *pCont = pRpcMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { if (pRpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont; SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc); pCont += sizeof(SMsgDesc);
} }
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pMsg = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId); pMsg->vgId = htonl(pMsg->vgId);
pHead->contLen = htonl(pHead->contLen); pMsg->contLen = htonl(pMsg->contLen);
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) {
// put message into queue
SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); void *pVnode = vnodeAcquire(pMsg->vgId);
if (pVnode == NULL) {
code = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else { } else {
SRpcMsg rpcRsp = { SWalHead *pHead = (SWalHead *)(pCont - sizeof(SWalHead));
.handle = pMsg->handle, pHead->msgType = pRpcMsg->msgType;
.pCont = NULL, pHead->version = 0;
.contLen = 0, pHead->len = pMsg->contLen;
.code = TSDB_CODE_VND_INVALID_VGROUP_ID, code = vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg);
.msgType = 0 }
};
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
} }
vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
} }
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_mutex_lock(&tsVWriteWP.mutex); pthread_mutex_lock(&tsVWriteWP.mutex);
SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; SVWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue(); void *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&tsVWriteWP.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
...@@ -150,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) { ...@@ -150,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessVWriteQueue, pWorker) != 0) {
dError("failed to create thread to process vwrite queue since %s", strerror(errno)); dError("failed to create thread to process vwrite queue since %s", strerror(errno));
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
...@@ -179,7 +168,7 @@ void dnodeFreeVWriteQueue(void *wqueue) { ...@@ -179,7 +168,7 @@ void dnodeFreeVWriteQueue(void *wqueue) {
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return; if (param == NULL) return;
SWriteMsg *pWrite = param; SVWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
...@@ -187,26 +176,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -187,26 +176,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (count <= 1) return; if (count <= 1) return;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcHandle,
.pCont = pWrite->rspRet.rsp, .pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len, .contLen = pWrite->rspRet.len,
.code = pWrite->code, .code = pWrite->code,
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pWrite->rpcMsg.pCont);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessVWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SVWriteWorker *pWorker = param;
SWriteMsg * pWrite; SVWriteMsg * pWrite;
SWalHead * pHead;
SRspRet * pRspRet;
void * pVnode; void * pVnode;
void * pItem;
int32_t numOfMsgs; int32_t numOfMsgs;
int32_t qtype; int32_t qtype;
...@@ -219,55 +204,33 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -219,55 +204,33 @@ static void *dnodeProcessWriteQueue(void *param) {
break; break;
} }
bool forceFsync = false;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL; taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
pRspRet = NULL; dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosGetQitem(pWorker->qall, &qtype, &pItem); taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]);
} else if (qtype == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
} else {
pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
}
int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType], if (pWrite->code <= 0) pWrite->processedCount = 1;
pHead->version, tstrerror(code)); if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
if (pWrite) { dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1;
}
} }
walFsync(vnodeGetWal(pVnode)); walFsync(vnodeGetWal(pVnode), forceFsync);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(pWorker->qall); taosResetQitems(pWorker->qall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, &pItem); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem; dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
} else if (qtype == TAOS_QTYPE_FWD) { } else if (qtype == TAOS_QTYPE_FWD) {
pHead = pItem; vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
vnodeConfirmForward(pVnode, pHead->version, 0); taosFreeQitem(pWrite);
taosFreeQitem(pItem);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} else { } else {
taosFreeQitem(pItem); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
} }
......
...@@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); ...@@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue(); void dnodeFreeMnodePqueue();
......
...@@ -21,7 +21,7 @@ extern "C" { ...@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h" #include "tdataformat.h"
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
typedef struct { typedef struct {
int vgId; int vgId;
......
...@@ -71,7 +71,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin ...@@ -71,7 +71,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data // when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app // when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
......
...@@ -43,8 +43,8 @@ typedef struct { ...@@ -43,8 +43,8 @@ typedef struct {
int8_t keep; // keep the wal file when closed int8_t keep; // keep the wal file when closed
} SWalCfg; } SWalCfg;
typedef void* twalh; // WAL HANDLE typedef void * twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type); typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit(); int32_t walInit();
void walCleanUp(); void walCleanUp();
...@@ -55,7 +55,7 @@ void walStop(twalh); ...@@ -55,7 +55,7 @@ void walStop(twalh);
void walClose(twalh); void walClose(twalh);
int32_t walRenew(twalh); int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *); int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh); void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh); int64_t walGetVersion(twalh);
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "twal.h"
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_READY, TAOS_VN_STATUS_READY,
...@@ -29,17 +31,27 @@ typedef enum _VN_STATUS { ...@@ -29,17 +31,27 @@ typedef enum _VN_STATUS {
} EVnStatus; } EVnStatus;
typedef struct { typedef struct {
int len; int32_t len;
void *rsp; void * rsp;
void *qhandle; //used by query and retrieve msg void * qhandle; // used by query and retrieve msg
} SRspRet; } SRspRet;
typedef struct { typedef struct {
SRspRet rspRet; SRspRet rspRet;
void *pCont; void * pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SReadMsg; } SVReadMsg;
typedef struct {
int32_t code;
int32_t processedCount;
void * rpcHandle;
void * rpcAhandle;
SRspRet rspRet;
char reserveForSync[16];
SWalHead pHead[];
} SVWriteMsg;
extern char *vnodeStatus[]; extern char *vnodeStatus[];
...@@ -51,11 +63,11 @@ int32_t vnodeClose(int32_t vgId); ...@@ -51,11 +63,11 @@ int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *param);
...@@ -65,7 +77,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); ...@@ -65,7 +77,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
void vnodeCleanupResources(); void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode); int32_t vnodeCheckRead(void *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall; ...@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall;
static taos_queue tsSdbWriteQueue; static taos_queue tsSdbWriteQueue;
static SSdbWriteWorkerPool tsSdbPool; static SSdbWriteWorkerPool tsSdbPool;
static int sdbWrite(void *param, void *data, int type); static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg);
static int sdbWriteToQueue(void *param, void *data, int type); static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg);
static void * sdbWorkerFp(void *param); static void * sdbWorkerFp(void *param);
static int32_t sdbInitWriteWorker(); static int32_t sdbInitWriteWorker();
static void sdbCleanupWriteWorker(); static void sdbCleanupWriteWorker();
...@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int sdbWrite(void *param, void *data, int type) { static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
SSdbOper *pOper = param; SSdbOper *pOper = param;
SWalHead *pHead = data; SWalHead *pHead = data;
int32_t tableId = pHead->msgType / 10; int32_t tableId = pHead->msgType / 10;
...@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() { ...@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() {
tsSdbWriteQueue = NULL; tsSdbWriteQueue = NULL;
} }
int sdbWriteToQueue(void *param, void *data, int type) { int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
SWalHead *pHead = data; SWalHead *pHead = data;
int size = sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
taosWriteQitem(tsSdbWriteQueue, type, pWal); taosWriteQitem(tsSdbWriteQueue, qtype, pWal);
return 0; return 0;
} }
...@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) {
pOper = NULL; pOper = NULL;
} }
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type, NULL);
if (code > 0) code = 0; if (code > 0) code = 0;
if (pOper) { if (pOper) {
pOper->retCode = code; pOper->retCode = code;
...@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) {
} }
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal, true);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(tsSdbWriteQall); taosResetQitems(tsSdbWriteQall);
......
...@@ -44,7 +44,7 @@ long interlocked_add_fetch_32(long volatile* ptr, long val) { ...@@ -44,7 +44,7 @@ long interlocked_add_fetch_32(long volatile* ptr, long val) {
__int64 interlocked_add_fetch_64(__int64 volatile* ptr, __int64 val) { __int64 interlocked_add_fetch_64(__int64 volatile* ptr, __int64 val) {
//#ifdef _WIN64 //#ifdef _WIN64
return _InterlockedExchangeAdd64(ptr, val) + val; return InterlockedExchangeAdd64(ptr, val) + val;
//#else //#else
// return _InterlockedExchangeAdd(ptr, val) + val; // return _InterlockedExchangeAdd(ptr, val) + val;
//#endif //#endif
......
...@@ -65,12 +65,12 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co ...@@ -65,12 +65,12 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
int64_t remain = count - writeLen; int64_t remain = count - writeLen;
if (remain > 0) { if (remain > 0) {
size_t rlen = fread(buffer, 1, remain, in_file); size_t rlen = fread(buffer, 1, (size_t) remain, in_file);
if (rlen <= 0) { if (rlen <= 0) {
return writeLen; return writeLen;
} }
else { else {
fwrite(buffer, 1, remain, out_file); fwrite(buffer, 1, (size_t) remain, out_file);
writeLen += remain; writeLen += remain;
} }
} }
......
...@@ -49,7 +49,6 @@ typedef struct SFillInfo { ...@@ -49,7 +49,6 @@ typedef struct SFillInfo {
int32_t numOfTags; // number of tags int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row int32_t rowSize; // size of each row
// char ** pTags; // tags value for current interpolation
SFillTagColInfo* pTags; // tags value for filling gap SFillTagColInfo* pTags; // tags value for filling gap
SInterval interval; SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results char * prevValues; // previous row of data, to generate the interpolation results
...@@ -83,7 +82,7 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRo ...@@ -83,7 +82,7 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRo
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity); int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
......
...@@ -26,7 +26,7 @@ extern "C" { ...@@ -26,7 +26,7 @@ extern "C" {
#define MEM_BUF_SIZE (1 << 20) #define MEM_BUF_SIZE (1 << 20)
#define TS_COMP_FILE_MAGIC 0x87F5EC4C #define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512 #define TS_COMP_FILE_GROUP_MAX 512
typedef struct STSList { typedef struct STSList {
char* rawBuf; char* rawBuf;
...@@ -38,7 +38,7 @@ typedef struct STSList { ...@@ -38,7 +38,7 @@ typedef struct STSList {
typedef struct STSElem { typedef struct STSElem {
TSKEY ts; TSKEY ts;
tVariant* tag; tVariant* tag;
int32_t vnode; int32_t id;
} STSElem; } STSElem;
typedef struct STSCursor { typedef struct STSCursor {
...@@ -60,17 +60,17 @@ typedef struct STSBlock { ...@@ -60,17 +60,17 @@ typedef struct STSBlock {
* The size of buffer file should not be greater than 2G, * The size of buffer file should not be greater than 2G,
* and the offset of int32_t type is enough * and the offset of int32_t type is enough
*/ */
typedef struct STSVnodeBlockInfo { typedef struct STSGroupBlockInfo {
int32_t vnode; // vnode id int32_t id; // group id
int32_t offset; // offset set value in file int32_t offset; // offset set value in file
int32_t numOfBlocks; // number of total blocks int32_t numOfBlocks; // number of total blocks
int32_t compLen; // compressed size int32_t compLen; // compressed size
} STSVnodeBlockInfo; } STSGroupBlockInfo;
typedef struct STSVnodeBlockInfoEx { typedef struct STSGroupBlockInfoEx {
STSVnodeBlockInfo info; STSGroupBlockInfo info;
int32_t len; // length before compress int32_t len; // length before compress
} STSVnodeBlockInfoEx; } STSGroupBlockInfoEx;
typedef struct STSBuf { typedef struct STSBuf {
FILE* f; FILE* f;
...@@ -78,9 +78,9 @@ typedef struct STSBuf { ...@@ -78,9 +78,9 @@ typedef struct STSBuf {
uint32_t fileSize; uint32_t fileSize;
// todo use array // todo use array
STSVnodeBlockInfoEx* pData; STSGroupBlockInfoEx* pData;
uint32_t numOfAlloc; uint32_t numOfAlloc;
uint32_t numOfVnodes; uint32_t numOfGroups;
char* assistBuf; char* assistBuf;
int32_t bufSize; int32_t bufSize;
...@@ -94,22 +94,22 @@ typedef struct STSBuf { ...@@ -94,22 +94,22 @@ typedef struct STSBuf {
typedef struct STSBufFileHeader { typedef struct STSBufFileHeader {
uint32_t magic; // file magic number uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file uint32_t numOfGroup; // number of group stored in current file
int32_t tsOrder; // timestamp order in current file int32_t tsOrder; // timestamp order in current file
} STSBufFileHeader; } STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete, int32_t order); STSBuf* tsBufCreate(bool autoDelete, int32_t order);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t vnodeId); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t id);
void* tsBufDestroy(STSBuf* pTSBuf); void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
STSBuf* tsBufClone(STSBuf* pTSBuf); STSBuf* tsBufClone(STSBuf* pTSBuf);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId); STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf); void tsBufFlush(STSBuf* pTSBuf);
...@@ -118,7 +118,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf); ...@@ -118,7 +118,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag); STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf); STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
...@@ -131,11 +131,11 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur); ...@@ -131,11 +131,11 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
*/ */
void tsBufDisplay(STSBuf* pTSBuf); void tsBufDisplay(STSBuf* pTSBuf);
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf); int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf);
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId); void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id);
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks); int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t id, void* buf, int32_t* len, int32_t* numOfBlocks);
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag); STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag);
......
...@@ -609,7 +609,7 @@ int32_t intersect(SArray *pLeft, SArray *pRight, SArray *pFinalRes) { ...@@ -609,7 +609,7 @@ int32_t intersect(SArray *pLeft, SArray *pRight, SArray *pFinalRes) {
/* /*
* traverse the result and apply the function to each item to check if the item is qualified or not * traverse the result and apply the function to each item to check if the item is qualified or not
*/ */
static void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) { static UNUSED_FUNC void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE && fp != NULL); assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE && fp != NULL);
// scan the result array list and check for each item in the list // scan the result array list and check for each item in the list
...@@ -660,7 +660,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p ...@@ -660,7 +660,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p
* @param pSchema tag schemas * @param pSchema tag schemas
* @param fp filter callback function * @param fp filter callback function
*/ */
static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) { static UNUSED_FUNC void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
size_t size = taosArrayGetSize(pResult); size_t size = taosArrayGetSize(pResult);
SArray* array = taosArrayInit(size, POINTER_BYTES); SArray* array = taosArrayInit(size, POINTER_BYTES);
...@@ -733,10 +733,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -733,10 +733,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY)); assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo); param->setupInfoFn(pExpr, param->pExtInfo);
if (pSkipList == NULL) {
tArrayTraverse(pExpr, param->nodeFilterFn, result);
return;
}
tQueryInfo *pQueryInfo = pExpr->_node.info; tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) { if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
...@@ -748,49 +744,14 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -748,49 +744,14 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
return; return;
} }
// recursive traverse left child branch // The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK; uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
if (weight == 0 ) { //apply the hierarchical expression to every node in skiplist for find the qualified nodes
if (taosArrayGetSize(result) > 0 && pSkipList == NULL) {
/**
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
* Since no index presented, the filter operation is done by scan all elements in the result set.
*
* if the query is a high selectivity filter, only small portion of meters are retrieved.
*/
exprTreeTraverseImpl(pExpr, result, param);
} else {
/**
* apply the hierarchical expression to every node in skiplist for find the qualified nodes
*/
assert(taosArrayGetSize(result) == 0);
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param); tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
}
return;
}
if (weight == 2 || (weight == 1 && pExpr->_node.optr == TSDB_RELATION_OR)) {
SArray* rLeft = taosArrayInit(10, POINTER_BYTES);
SArray* rRight = taosArrayInit(10, POINTER_BYTES);
tExprTreeTraverse(pLeft, pSkipList, rLeft, param);
tExprTreeTraverse(pRight, pSkipList, rRight, param);
if (pExpr->_node.optr == TSDB_RELATION_AND) { // CROSS
intersect(rLeft, rRight, result);
} else if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
merge(rLeft, rRight, result);
} else {
assert(false);
}
taosArrayDestroy(rLeft);
taosArrayDestroy(rRight);
return;
}
#if 0
/* /*
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here * (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
* *
...@@ -819,6 +780,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -819,6 +780,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
* So, we do not set the skip list index as a parameter * So, we do not set the skip list index as a parameter
*/ */
tExprTreeTraverse(pSecond, NULL, result, param); tExprTreeTraverse(pSecond, NULL, result, param);
#endif
} }
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
......
...@@ -3843,7 +3843,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ ...@@ -3843,7 +3843,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag); STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) { if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else { } else {
...@@ -4777,7 +4777,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4777,7 +4777,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
if (pRuntimeEnv->cur.vgroupIndex == -1) { if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag); STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) { if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else { } else {
...@@ -4802,7 +4802,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4802,7 +4802,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag); STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem1.vnode < 0) { if (!tsBufIsValidElem(&elem1)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else { } else {
......
...@@ -215,7 +215,7 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2 ...@@ -215,7 +215,7 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2
return v1 + (v2 - v1) * (k - k1) / (k2 - k1); return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
} }
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
*(int32_t*)point->val = (int32_t)linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, (double)point1->key, *(int32_t*)point->val = (int32_t)linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, (double)point1->key,
...@@ -343,7 +343,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu ...@@ -343,7 +343,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset}; point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
point = (SPoint){.key = pFillInfo->start, .val = val1}; point = (SPoint){.key = pFillInfo->start, .val = val1};
taosDoLinearInterpolation(type, &point1, &point2, &point); taosGetLinearInterpolationVal(type, &point1, &point2, &point);
} }
setTagsValue(pFillInfo, data, *num); setTagsValue(pFillInfo, data, *num);
......
此差异已折叠。
...@@ -42,7 +42,7 @@ void simpleTest() { ...@@ -42,7 +42,7 @@ void simpleTest() {
EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num); EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num);
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0); EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->numOfGroups, 1);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
...@@ -69,7 +69,7 @@ void largeTSTest() { ...@@ -69,7 +69,7 @@ void largeTSTest() {
// the data has been flush to disk, no data in cache // the data has been flush to disk, no data in cache
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0); EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->numOfGroups, 1);
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
...@@ -105,7 +105,7 @@ void multiTagsTest() { ...@@ -105,7 +105,7 @@ void multiTagsTest() {
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t)); EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1); EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
EXPECT_EQ(pTSBuf->numOfVnodes, 1); EXPECT_EQ(pTSBuf->numOfGroups, 1);
tsBufFlush(pTSBuf); tsBufFlush(pTSBuf);
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
...@@ -139,7 +139,7 @@ void multiVnodeTagsTest() { ...@@ -139,7 +139,7 @@ void multiVnodeTagsTest() {
start += step * num; start += step * num;
} }
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1); EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
} }
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
...@@ -184,7 +184,7 @@ void loadDataTest() { ...@@ -184,7 +184,7 @@ void loadDataTest() {
start += step * num; start += step * num;
} }
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1); EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
} }
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC); EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
...@@ -203,7 +203,7 @@ void loadDataTest() { ...@@ -203,7 +203,7 @@ void loadDataTest() {
// create from exists file // create from exists file
STSBuf* pNewBuf = tsBufCreateFromFile(pTSBuf->path, false); STSBuf* pNewBuf = tsBufCreateFromFile(pTSBuf->path, false);
EXPECT_EQ(pNewBuf->tsOrder, pTSBuf->tsOrder); EXPECT_EQ(pNewBuf->tsOrder, pTSBuf->tsOrder);
EXPECT_EQ(pNewBuf->numOfVnodes, numOfVnode); EXPECT_EQ(pNewBuf->numOfGroups, numOfVnode);
EXPECT_EQ(pNewBuf->fileSize, pTSBuf->fileSize); EXPECT_EQ(pNewBuf->fileSize, pTSBuf->fileSize);
EXPECT_EQ(pNewBuf->pData[0].info.offset, pTSBuf->pData[0].info.offset); EXPECT_EQ(pNewBuf->pData[0].info.offset, pTSBuf->pData[0].info.offset);
...@@ -269,7 +269,7 @@ void TSTraverse() { ...@@ -269,7 +269,7 @@ void TSTraverse() {
start += step * num; start += step * num;
} }
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1); EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
} }
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
...@@ -304,7 +304,7 @@ void TSTraverse() { ...@@ -304,7 +304,7 @@ void TSTraverse() {
int32_t totalOutput = 10; int32_t totalOutput = 10;
while (1) { while (1) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) { if (!tsBufNextPos(pTSBuf)) {
break; break;
...@@ -352,7 +352,7 @@ void TSTraverse() { ...@@ -352,7 +352,7 @@ void TSTraverse() {
totalOutput = 10; totalOutput = 10;
while (1) { while (1) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) { if (!tsBufNextPos(pTSBuf)) {
break; break;
...@@ -427,7 +427,7 @@ void mergeDiffVnodeBufferTest() { ...@@ -427,7 +427,7 @@ void mergeDiffVnodeBufferTest() {
tsBufFlush(pTSBuf2); tsBufFlush(pTSBuf2);
tsBufMerge(pTSBuf1, pTSBuf2); tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2); EXPECT_EQ(pTSBuf1->numOfGroups, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
tsBufDisplay(pTSBuf1); tsBufDisplay(pTSBuf1);
...@@ -472,7 +472,7 @@ void mergeIdenticalVnodeBufferTest() { ...@@ -472,7 +472,7 @@ void mergeIdenticalVnodeBufferTest() {
tsBufFlush(pTSBuf2); tsBufFlush(pTSBuf2);
tsBufMerge(pTSBuf1, pTSBuf2); tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2); EXPECT_EQ(pTSBuf1->numOfGroups, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
tsBufResetPos(pTSBuf1); tsBufResetPos(pTSBuf1);
...@@ -482,12 +482,12 @@ void mergeIdenticalVnodeBufferTest() { ...@@ -482,12 +482,12 @@ void mergeIdenticalVnodeBufferTest() {
STSElem elem = tsBufGetElem(pTSBuf1); STSElem elem = tsBufGetElem(pTSBuf1);
if (count++ < numOfTags * num) { if (count++ < numOfTags * num) {
EXPECT_EQ(elem.vnode, 12); EXPECT_EQ(elem.id, 12);
} else { } else {
EXPECT_EQ(elem.vnode, 77); EXPECT_EQ(elem.id, 77);
} }
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
} }
tsBufDestroy(pTSBuf1); tsBufDestroy(pTSBuf1);
......
...@@ -1623,7 +1623,7 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1623,7 +1623,7 @@ static void rpcDecRef(SRpcInfo *pRpc)
int count = atomic_sub_fetch_32(&tsRpcNum, 1); int count = atomic_sub_fetch_32(&tsRpcNum, 1);
if (count == 0) { if (count == 0) {
taosCloseRef(tsRpcRefId); // taosCloseRef(tsRpcRefId);
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error // tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
} }
} }
......
...@@ -854,7 +854,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -854,7 +854,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
} else { } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead); syncSaveIntoBuffer(pPeer, pHead);
......
...@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) { ...@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
if (ret < 0) break; if (ret < 0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
} }
if (code < 0) { if (code < 0) {
...@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { ...@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset; SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead); offset += pHead->len + sizeof(SWalHead);
return offset; return offset;
......
...@@ -2311,7 +2311,11 @@ void filterPrepare(void* expr, void* param) { ...@@ -2311,7 +2311,11 @@ void filterPrepare(void* expr, void* param) {
if (pInfo->optr == TSDB_RELATION_IN) { if (pInfo->optr == TSDB_RELATION_IN) {
pInfo->q = (char*) pCond->arr; pInfo->q = (char*) pCond->arr;
} else { } else {
pInfo->q = calloc(1, pSchema->bytes + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes;
}
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
tVariantDump(pCond, pInfo->q, pSchema->type, true); tVariantDump(pCond, pInfo->q, pSchema->type, true);
} }
} }
......
...@@ -367,7 +367,14 @@ int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) { ...@@ -367,7 +367,14 @@ int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2)); case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2));
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE); tstr* t1 = (tstr*) f1;
tstr* t2 = (tstr*) f2;
if (t1->len != t2->len) {
return t1->len > t2->len? 1:-1;
}
int32_t ret = wcsncmp((wchar_t*) t1->data, (wchar_t*) t2->data, t2->len/TSDB_NCHAR_SIZE);
if (ret == 0) { if (ret == 0) {
return ret; return ret;
} }
......
...@@ -61,8 +61,6 @@ typedef struct { ...@@ -61,8 +61,6 @@ typedef struct {
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type);
int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type);
void vnodeInitWriteFp(void); void vnodeInitWriteFp(void);
void vnodeInitReadFp(void); void vnodeInitReadFp(void);
......
...@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteCqMsgToQueue; cqCfg.cqWrite = vnodeWriteToWQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return terrno; return terrno;
} }
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); walRestore(pVnode->wal, pVnode, vnodeProcessWrite);
if (pVnode->version == 0) { if (pVnode->version == 0) {
pVnode->version = walGetVersion(pVnode->wal); pVnode->version = walGetVersion(pVnode->wal);
} }
...@@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.ahandle = pVnode; syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToWQueue;
syncInfo.confirmForward = dnodeSendRpcVWriteRsp; syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
...@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) {
} }
void vnodeRelease(void *pVnodeRaw) { void vnodeRelease(void *pVnodeRaw) {
if (pVnodeRaw == NULL) return;
SVnodeObj *pVnode = pVnodeRaw; SVnodeObj *pVnode = pVnodeRaw;
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
...@@ -482,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) { ...@@ -482,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) {
return pVnode->rqueue; return pVnode->rqueue;
} }
void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue;
}
void *vnodeGetWal(void *pVnode) { void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal; return ((SVnodeObj *)pVnode)->wal;
} }
......
...@@ -29,9 +29,9 @@ ...@@ -29,9 +29,9 @@
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h" #include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) { void vnodeInitReadFp(void) {
...@@ -44,7 +44,7 @@ void vnodeInitReadFp(void) { ...@@ -44,7 +44,7 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue // request enters the queue
// //
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType; int msgType = pReadMsg->rpcMsg.msgType;
...@@ -82,7 +82,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void ...@@ -82,7 +82,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void
int32_t code = vnodeCheckRead(pVnode); int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
...@@ -146,7 +146,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { ...@@ -146,7 +146,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true; pRsp->completed = true;
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
...@@ -274,7 +274,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -274,7 +274,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return code; return code;
} }
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
......
...@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) { ...@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
} }
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param1; SVnodeObj * pVnode = vparam;
SWalHead * pHead = param2; SWalHead * pHead = wparam;
SRspRet * pRspRet = rparam;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
...@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); syncCode = syncForwardToPeer(pVnode->sync, pHead, pRspRet, qtype);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write into WAL // write into WAL
...@@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
pVnode->version = pHead->version; pVnode->version = pHead->version;
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) return code; if (code < 0) return code;
return syncCode; return syncCode;
...@@ -204,35 +205,32 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -204,35 +205,32 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vparam;
SWalHead * pHead = data; SWalHead * pHead = wparam;
int size = sizeof(SWalHead) + pHead->len;
SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead));
SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pSync); if (qtype == TAOS_QTYPE_RPC) {
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
}
return 0; int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
} SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
int vnodeWriteToQueue(void *param, void *data, int type) { if (rparam != NULL) {
SVnodeObj *pVnode = param; SRpcMsg *pRpcMsg = rparam;
SWalHead * pHead = data; pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle;
}
int size = sizeof(SWalHead) + pHead->len; memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pWal); taosWriteQitem(pVnode->wqueue, qtype, pWrite);
return TSDB_CODE_SUCCESS;
return 0;
} }
...@@ -70,7 +70,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -70,7 +70,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
if (walInitObj(pWal) != TSDB_CODE_SUCCESS) { if (walInitObj(pWal) != TSDB_CODE_SUCCESS) {
......
...@@ -111,18 +111,19 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -111,18 +111,19 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
return code; return code;
} }
void walFsync(void *handle) { void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; if (pWal == NULL || pWal->fd < 0) return;
if (pWal->fsyncPeriod == 0) { if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, file:%s, do fsync", pWal->vgId, pWal->name);
if (fsync(pWal->fd) < 0) { if (fsync(pWal->fd) < 0) {
wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
} }
} }
} }
int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1; if (handle == NULL) return -1;
SWal * pWal = handle; SWal * pWal = handle;
...@@ -143,12 +144,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * ...@@ -143,12 +144,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
continue; continue;
} }
if (!pWal->keep) {
wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
remove(walName);
} else {
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
}
count++; count++;
} }
...@@ -305,9 +301,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -305,9 +301,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId, wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId,
pWal->version, pHead->version, pHead->len); pWal->version, pHead->version, pHead->len);
if (pWal->keep) pWal->version = pHead->version; pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
} }
tclose(fd); tclose(fd);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
int64_t ver = 0; int64_t ver = 0;
void *pWal = NULL; void *pWal = NULL;
int writeToQueue(void *pVnode, void *data, int type) { int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
// do nothing // do nothing
SWalHead *pHead = data; SWalHead *pHead = data;
......
...@@ -35,10 +35,11 @@ class TDTestCase: ...@@ -35,10 +35,11 @@ class TDTestCase:
% (self.ts, self.ts + 2000000000, self.ts + 4000000000, self.ts + 5000000000, self.ts + 7000000000)) % (self.ts, self.ts + 2000000000, self.ts + 4000000000, self.ts + 5000000000, self.ts + 7000000000))
tdSql.query("select avg(voltage) from st interval(1n)") tdSql.query("select avg(voltage) from st interval(1n)")
tdSql.checkRows(3) tdSql.checkRows(4)
tdSql.checkData(0, 1, 221.4) tdSql.checkData(0, 1, 220.0)
tdSql.checkData(1, 1, 227.0) tdSql.checkData(1, 1, 222.33333333333334)
tdSql.checkData(2, 1, 222.0) tdSql.checkData(2, 1, 227.0)
tdSql.checkData(3, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d)") tdSql.query("select avg(voltage) from st interval(1n, 15d)")
tdSql.checkRows(4) tdSql.checkRows(4)
......
...@@ -106,7 +106,7 @@ class TDTestCase: ...@@ -106,7 +106,7 @@ class TDTestCase:
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.pid") tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.pid")
tdSql.checkRows(3) tdSql.checkRows(6)
tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
tdSql.checkRows(6) tdSql.checkRows(6)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
tdSql.execute("create table cars(ts timestamp, c nchar(2)) tags(t1 nchar(2))")
tdSql.execute("insert into car0 using cars tags('aa') values(now, 'bb');")
tdSql.query("select count(*) from cars where t1 like '%50 90 30 04 00 00%'")
tdSql.checkRows(0)
tdSql.execute("create table test_cars(ts timestamp, c nchar(2)) tags(t1 nchar(20))")
tdSql.execute("insert into car1 using test_cars tags('150 90 30 04 00 002') values(now, 'bb');")
tdSql.query("select * from test_cars where t1 like '%50 90 30 04 00 00%'")
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册