提交 9ab8fb5b 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

......@@ -27,6 +27,7 @@ pipeline {
cd debug
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
cd ${WKC}/tests
#./test-all.sh smoke
./test-all.sh pytest
......@@ -1016,9 +1016,9 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER
## TAOS SQL 边界限制
- 数据库名最大长度为33
- 表名最大长度为193,每行数据最大长度16k个字符
- 列名最大长度为65,最多允许1024列,最少需要2列,第一列必须是时间戳
- 数据库名最大长度为32
- 表名最大长度为192,每行数据最大长度16k个字符
- 列名最大长度为64,最多允许1024列,最少需要2列,第一列必须是时间戳
- 标签最多允许128个,可以0个,标签总长度不超过16k个字符
- SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为1M
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制
......@@ -48,6 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
......@@ -58,6 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
......@@ -134,6 +135,7 @@ if [ $1 -eq 0 ];then
${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${cfg_link_dir}/* || :
${csudo} rm -f ${inc_link_dir}/taos.h || :
${csudo} rm -f ${inc_link_dir}/taoserror.h || :
......@@ -45,7 +45,7 @@ if [ "$osType" != "Darwin" ]; then
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh"
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
......@@ -36,7 +36,7 @@ if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
......@@ -81,8 +81,10 @@ function install_lib() {
${csudo} ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo} ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
${csudo} ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo} ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
${csudo} ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo} ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
function install_bin() {
......@@ -3978,7 +3978,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
point1.val = data1;
......@@ -3987,7 +3987,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
} else {
......@@ -97,6 +97,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[2].i64Key = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
pCtx->param[0].nType = pExpr->param[0].nType;
pCtx->interBufBytes = pExpr->interBytes;
......@@ -2758,6 +2758,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
const char* msg1 = "too many columns in group by clause";
const char* msg2 = "invalid column name in group by clause";
const char* msg3 = "columns from one table allowed as group by columns";
const char* msg4 = "join query does not support group by";
const char* msg7 = "not support group by expression";
const char* msg8 = "not allowed column type for group by";
const char* msg9 = "tags not allowed for table query";
......@@ -2778,6 +2779,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
if (pQueryInfo->numOfTables > 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
STableMeta* pTableMeta = NULL;
SSchema* pSchema = NULL;
SSchema s = tscGetTbnameColumnSchema();
......@@ -882,7 +882,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tsBuf != NULL) {
// note: here used the index instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -156,8 +156,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
win->ekey = elem1.ts;
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;//offset apply to projection?
......@@ -193,8 +193,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey,
tsBufGetNumOfVnodes(output1), et - st);
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
tsBufGetNumOfGroup(output1), et - st);
return output1->numOfTotal;
......@@ -282,7 +282,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
// The virtual node, of which all tables are disqualified after the timestamp intersection,
// is removed to avoid next stage query.
......@@ -314,7 +314,7 @@ static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
size_t numOfGroups = taosArrayGetSize(pVgroupTables);
......@@ -853,11 +853,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
assert(pParentSql->fp != tscJoinQueryCallback);
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
assert(pParentSql->fp != tscJoinQueryCallback);
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
// proceed to for ts_comp query
......@@ -2366,7 +2370,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
pRes->numOfCols = (int32_t)numOfExprs;
pRes->numOfCols = (int16_t)numOfExprs;
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
......@@ -52,7 +52,6 @@ public class TSDBConnection implements Connection {
public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException {
this.dbMetaData = meta;
Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
......@@ -197,12 +196,13 @@ public class TSDBConnection implements Connection {
public SQLWarning getWarnings() throws SQLException {
//todo: implement getWarnings according to the warning messages returned from TDengine
public void clearWarnings() throws SQLException {
// left blank to support HikariCP connection
//todo: implement getWarnings according to the warning messages returned from TDengine
//todo: implement clearWarnings according to the warning messages returned from TDengine
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
......@@ -96,7 +96,7 @@ public class TSDBDatabaseMetaData implements java.sql.DatabaseMetaData {
public int getDriverMajorVersion() {
return 0;
return 2;
public int getDriverMinorVersion() {
......@@ -14,13 +14,9 @@
package com.taosdata.jdbc;
import java.io.*;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.logging.Logger;
......@@ -44,76 +40,53 @@ import java.util.logging.Logger;
public class TSDBDriver implements java.sql.Driver {
private static final String URL_PREFIX1 = "jdbc:TSDB://";
private static final String URL_PREFIX = "jdbc:TAOS://";
* Key used to retrieve the database value from the properties instance passed
* to the driver.
public static final String PROPERTY_KEY_DBNAME = "dbname";
* Key used to retrieve the host value from the properties instance passed to
* the driver.
public static final String PROPERTY_KEY_HOST = "host";
* Key used to retrieve the password value from the properties instance passed
* to the driver.
public static final String PROPERTY_KEY_PASSWORD = "password";
* Key used to retrieve the port number value from the properties instance
* passed to the driver.
public static final String PROPERTY_KEY_PORT = "port";
* Key used to retrieve the database value from the properties instance passed
* to the driver.
public static final String PROPERTY_KEY_DBNAME = "dbname";
* Key used to retrieve the user value from the properties instance passed to
* the driver.
public static final String PROPERTY_KEY_USER = "user";
* Key used to retrieve the password value from the properties instance passed
* to the driver.
public static final String PROPERTY_KEY_PASSWORD = "password";
* Key for the configuration file directory of TSDB client in properties instance
public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
* Key for the timezone used by the TSDB client in properties instance
public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
* Key for the locale used by the TSDB client in properties instance
public static final String PROPERTY_KEY_LOCALE = "locale";
* Key for the char encoding used by the TSDB client in properties instance
public static final String PROPERTY_KEY_CHARSET = "charset";
public static final String PROPERTY_KEY_PROTOCOL = "protocol";
* Index for port coming out of parseHostPortPair().
public final static int PORT_NUMBER_INDEX = 1;
* Index for host coming out of parseHostPortPair().
public final static int HOST_NAME_INDEX = 0;
private TSDBDatabaseMetaData dbMetaData = null;
static {
......@@ -169,9 +142,11 @@ public class TSDBDriver implements java.sql.Driver {
public Connection connect(String url, Properties info) throws SQLException {
if (url == null) {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
if (!acceptsURL(url))
return null;
Properties props = null;
if ((props = parseURL(url, info)) == null) {
......@@ -179,7 +154,10 @@ public class TSDBDriver implements java.sql.Driver {
//load taos.cfg start
if (info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null && info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null) {
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
......@@ -190,7 +168,9 @@ public class TSDBDriver implements java.sql.Driver {
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET),
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR),
(String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
......@@ -208,43 +188,15 @@ public class TSDBDriver implements java.sql.Driver {
* Parses hostPortPair in the form of [host][:port] into an array, with the
* element of index HOST_NAME_INDEX being the host (or null if not specified),
* and the element of index PORT_NUMBER_INDEX being the port (or null if not
* specified).
* @param hostPortPair host and port in form of of [host][:port]
* @return array containing host and port as Strings
* @throws SQLException if a parse error occurs
* @param url the URL of the database
* @return <code>true</code> if this driver understands the given URL;
* <code>false</code> otherwise
* @throws SQLException if a database access error occurs or the url is {@code null}
protected static String[] parseHostPortPair(String hostPortPair) throws SQLException {
String[] splitValues = new String[2];
int portIndex = hostPortPair.indexOf(":");
String hostname = null;
if (portIndex != -1) {
if ((portIndex + 1) < hostPortPair.length()) {
String portAsString = hostPortPair.substring(portIndex + 1);
hostname = hostPortPair.substring(0, portIndex);
splitValues[HOST_NAME_INDEX] = hostname;
splitValues[PORT_NUMBER_INDEX] = portAsString;
} else {
throw new SQLException(TSDBConstants.WrapErrMsg("port is not proper!"));
} else {
splitValues[HOST_NAME_INDEX] = hostPortPair;
splitValues[PORT_NUMBER_INDEX] = null;
return splitValues;
public boolean acceptsURL(String url) throws SQLException {
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.startsWith(URL_PREFIX);
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is null"));
return (url != null && url.length() > 0 && url.trim().length() > 0) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1));
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
......@@ -252,15 +204,17 @@ public class TSDBDriver implements java.sql.Driver {
info = new Properties();
if ((url != null) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1))) {
if (acceptsURL(url)) {
info = parseURL(url, info);
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
hostProp.required = true;
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false;
......@@ -268,9 +222,11 @@ public class TSDBDriver implements java.sql.Driver {
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
......@@ -283,20 +239,68 @@ public class TSDBDriver implements java.sql.Driver {
* example: jdbc:TSDB://
* example: jdbc:TAOS://
public Properties parseURL(String url, Properties defaults) throws java.sql.SQLException {
public Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null) {
if (url == null || url.length() <= 0 || url.trim().length() <= 0)
return null;
if (!url.startsWith(URL_PREFIX) && !url.startsWith(URL_PREFIX1)) {
if (!url.startsWith(URL_PREFIX) && !url.startsWith(URL_PREFIX1))
return null;
// parse properties
String urlForMeta = url;
int beginningOfSlashes = url.indexOf("//");
int index = url.indexOf("?");
if (index != -1) {
String paramString = url.substring(index + 1, url.length());
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
if (indexOfSlash + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_DBNAME, url.substring(indexOfSlash + 1));
url = url.substring(0, indexOfSlash);
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
if (indexOfColon + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_PORT, url.substring(indexOfColon + 1));
url = url.substring(0, indexOfColon);
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
String urlForMeta = url;
String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//");
......@@ -345,11 +349,11 @@ public class TSDBDriver implements java.sql.Driver {
user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
return urlProps;
public void setPropertyValue(Properties property, String[] keyValuePair) {
private void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) {
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
......@@ -372,13 +376,12 @@ public class TSDBDriver implements java.sql.Driver {
public int getMajorVersion() {
return 1;
return 2;
public int getMinorVersion() {
return 1;
return 0;
public boolean jdbcCompliant() {
......@@ -389,33 +392,4 @@ public class TSDBDriver implements java.sql.Driver {
return null;
* Returns the host property
* @param props the java.util.Properties instance to retrieve the hostname from.
* @return the host
public String host(Properties props) {
return props.getProperty(PROPERTY_KEY_HOST, "localhost");
* Returns the port number property
* @param props the properties to get the port number from
* @return the port number
public int port(Properties props) {
return Integer.parseInt(props.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
* Returns the database property from <code>props</code>
* @param props the Properties to look for the database property.
* @return the database name.
public String database(Properties props) {
return props.getProperty(PROPERTY_KEY_DBNAME);
package com.taosdata.jdbc;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.SQLException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.*;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
public class TSDBDriverTest {
private static final String[] validURLs = {
private static boolean islibLoaded = false;
private static boolean isTaosdActived;
private Connection conn;
public static void before() {
String osName = System.getProperty("os.name").toLowerCase();
if (!osName.equals("linux"))
// try to load taos lib
try {
islibLoaded = true;
} catch (UnsatisfiedLinkError error) {
System.out.println("load tdengine lib failed.");
// check taosd is activated
try {
String[] cmd = {"/bin/bash", "-c", "ps -ef | grep taosd | grep -v \"grep\""};
Process exec = Runtime.getRuntime().exec(cmd);
BufferedReader reader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
int lineCnt = 0;
while (reader.readLine() != null) {
if (lineCnt > 0)
isTaosdActived = true;
} catch (IOException e) {
try {
} catch (ClassNotFoundException e) {
public void testConnectWithJdbcURL() {
final String url = "jdbc:TAOS://localhost:6030/log?user=root&password=taosdata";
try {
if (islibLoaded && isTaosdActived) {
conn = DriverManager.getConnection(url);
assertNotNull("failure - connection should not be null", conn);
} catch (SQLException e) {
fail("failure - should not throw Exception");
public void testConnectWithProperties() {
final String jdbcUrl = "jdbc:TAOS://localhost:6030/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
try {
if (islibLoaded && isTaosdActived) {
conn = DriverManager.getConnection(jdbcUrl, connProps);
assertNotNull("failure - connection should not be null", conn);
} catch (SQLException e) {
fail("failure - should not throw Exception");
public void urlParserTest() throws SQLException {
public void testConnectWithConfigFile() {
String jdbcUrl = "jdbc:TAOS://:/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
try {
if (islibLoaded && isTaosdActived) {
conn = DriverManager.getConnection(jdbcUrl, connProps);
assertNotNull("failure - connection should not be null", conn);
} catch (SQLException e) {
fail("failure - should not throw Exception");
@Test(expected = SQLException.class)
public void testAcceptsURL() throws SQLException {
Driver driver = new TSDBDriver();
for (String url : validURLs) {
assertTrue("failure - acceptsURL(\" " + url + " \") should be true", driver.acceptsURL(url));
fail("acceptsURL throws exception when parameter is null");
public void testParseURL() {
TSDBDriver driver = new TSDBDriver();
String url = "jdbc:TSDB://";
Properties properties = new Properties();
driver.parseURL(url, properties);
assertEquals(properties.get("host"), "");
assertEquals(properties.get("port"), "0");
assertEquals(properties.get("dbname"), "db");
assertEquals(properties.get("user"), "root");
assertEquals(properties.get("password"), "your_password");
url = "jdbc:TSDB://";
properties = new Properties();
driver.parseURL(url, properties);
assertEquals(properties.get("host"), "");
assertEquals(properties.get("port"), "0");
assertEquals(properties.get("dbname"), "log");
assertEquals(properties.get("charset"), "UTF-8");
url = "jdbc:TSDB://";
properties = new Properties();
driver.parseURL(url, properties);
assertEquals(properties.get("host"), "");
assertEquals(properties.get("port"), "0");
assertEquals(properties.get("dbname"), null);
url = "jdbc:TSDB://";
properties = new Properties();
driver.parseURL(url, properties);
assertEquals(properties.get("host"), "");
assertEquals(properties.get("port"), "0");
assertEquals(properties.get("dbname"), "db");
String url = "jdbc:TAOS://";
Properties config = new Properties();
Properties actual = driver.parseURL(url, config);
assertEquals("failure - host should be", "", actual.get("host"));
assertEquals("failure - port should be 0", "0", actual.get("port"));
assertEquals("failure - dbname should be db", "db", actual.get("dbname"));
assertEquals("failure - user should be root", "root", actual.get("user"));
assertEquals("failure - password should be taosdata", "taosdata", actual.get("password"));
assertEquals("failure - charset should be UTF-8", "UTF-8", actual.get("charset"));
url = "jdbc:TAOS://";
config = new Properties();
actual = driver.parseURL(url, config);
assertEquals("failure - host should be", "", actual.getProperty("host"));
assertEquals("failure - port should be 0", "0", actual.get("port"));
assertNull("failure - dbname should be null", actual.get("dbname"));
url = "jdbc:TAOS://";
config = new Properties();
actual = driver.parseURL(url, config);
assertEquals("failure - host should be", "", actual.getProperty("host"));
assertEquals("failure - port should be 0", "0", actual.get("port"));
assertEquals("failure - dbname should be db", "db", actual.get("dbname"));
url = "jdbc:TAOS://:/?";
config = new Properties();
config.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
config.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
actual = driver.parseURL(url, config);
assertEquals("failure - user should be root", "root", actual.getProperty("user"));
assertEquals("failure - password should be taosdata", "taosdata", actual.getProperty("password"));
assertNull("failure - host should be null", actual.getProperty("host"));
assertNull("failure - port should be null", actual.getProperty("port"));
assertNull("failure - dbname should be null", actual.getProperty("dbname"));
public void testGetPropertyInfo() throws SQLException {
Driver driver = new TSDBDriver();
final String url = "jdbc:TAOS://localhost:6030/log?user=root&password=taosdata";
Properties connProps = new Properties();
DriverPropertyInfo[] propertyInfo = driver.getPropertyInfo(url, connProps);
for (DriverPropertyInfo info : propertyInfo) {
if (info.name.equals(TSDBDriver.PROPERTY_KEY_HOST))
assertEquals("failure - host should be localhost", "localhost", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_PORT))
assertEquals("failure - port should be 6030", "6030", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_DBNAME))
assertEquals("failure - dbname should be test", "log", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_USER))
assertEquals("failure - user should be root", "root", info.value);
if (info.name.equals(TSDBDriver.PROPERTY_KEY_PASSWORD))
assertEquals("failure - password should be root", "taosdata", info.value);
public void testGetMajorVersion() {
assertEquals("failure - getMajorVersion should be 2", 2, new TSDBDriver().getMajorVersion());
public void testGetMinorVersion() {
assertEquals("failure - getMinorVersion should be 0", 0, new TSDBDriver().getMinorVersion());
public void testJdbcCompliant() {
assertFalse("failure - jdbcCompliant should be false", new TSDBDriver().jdbcCompliant());
public void testGetParentLogger() throws SQLFeatureNotSupportedException {
assertNull("failure - getParentLogger should be be null", new TSDBDriver().getParentLogger());
\ No newline at end of file
......@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead->version = 0;
// write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL);
......@@ -24,7 +24,7 @@
int64_t ver = 0;
void *pCq = NULL;
int writeToQueue(void *pVnode, void *data, int type) {
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
return 0;
......@@ -20,9 +20,11 @@
extern "C" {
int32_t dnodeInitVnodeRead();
void dnodeCleanupVnodeRead();
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg);
int32_t dnodeInitVRead();
void dnodeCleanupVRead();
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
#ifdef __cplusplus
......@@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vread", dnodeInitVRead, dnodeCleanupVRead},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
......@@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write
......@@ -17,225 +17,184 @@
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVRead.h"
#include "tqueue.h"
#include "vnode.h"
#include "dnodeInt.h"
typedef struct {
pthread_t thread; // thread
int32_t workerId; // worker ID
} SReadWorker;
pthread_t thread; // thread
int32_t workerId; // worker ID
} SVReadWorker;
typedef struct {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
SReadWorker *readWorker;
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
SVReadWorker * worker;
pthread_mutex_t mutex;
} SReadWorkerPool;
} SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param);
static void dnodeHandleIdleReadWorker(SReadWorker *);
// module global variable
static SReadWorkerPool readPool;
static taos_qset readQset;
static SVReadWorkerPool tsVReadWP;
static taos_qset tsVReadQset;
int32_t dnodeInitVnodeRead() {
readQset = taosOpenQset();
int32_t dnodeInitVRead() {
tsVReadQset = taosOpenQset();
readPool.min = tsNumOfCores;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max);
pthread_mutex_init(&readPool.mutex, NULL);
tsVReadWP.min = tsNumOfCores;
tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (readPool.readWorker == NULL) return -1;
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (tsVReadWP.worker == NULL) return -1;
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
pWorker->workerId = i;
dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max);
dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max);
return 0;
void dnodeCleanupVnodeRead() {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
void dnodeCleanupVRead() {
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) {
for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
for (int i = 0; i < tsVReadWP.max; ++i) {
SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
dInfo("dnode read is closed");
dInfo("dnode vread is closed");
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char * pCont = (char *)pMsg->pCont;
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
SMsgHead *pHead = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
if (queue == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
void *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) {
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
// put message into queue
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = *pMsg;
pRead->pCont = pCont;
pRead->contLen = pHead->contLen;
// next vnode
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.msgType = 0
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
void *dnodeAllocVReadQueue(void *pVnode) {
taos_queue queue = taosOpenQueue();
if (queue == NULL) {
return NULL;
taosAddIntoQset(readQset, queue, pVnode);
taosAddIntoQset(tsVReadQset, queue, pVnode);
// spawn a thread to process queue
if (readPool.num < readPool.max) {
if (tsVReadWP.num < tsVReadWP.max) {
do {
SReadWorker *pWorker = readPool.readWorker + readPool.num;
SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num;
pthread_attr_t thAttr;
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
dError("failed to create thread to process vread vqueue since %s", strerror(errno));
dDebug("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num);
} while (readPool.num < readPool.min);
dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num);
} while (tsVReadWP.num < tsVReadWP.min);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue);
dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue);
return queue;
void dnodeFreeVReadQueue(void *rqueue) {
// dynamically adjust the number of threads
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.handle = pRead->rpcHandle,
.pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len,
.code = code,
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
static void *dnodeProcessReadQueue(void *param) {
SReadMsg *pReadMsg;
int type;
void *pVnode;
SVReadMsg *pRead;
int32_t qtype;
void * pVnode;
while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("qset:%p dnode read got no message from qset, exiting", readQset);
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) {
dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg);
dDebug("%p, msg:%p:%s will be processed in vread queue, qtype:%d", pRead->rpcAhandle, pRead,
taosMsg[pRead->msgType], qtype);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pRead);
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcVReadRsp(pVnode, pRead, code);
} else {
if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
dnodeSendRpcVReadRsp(pVnode, pRead, pRead->code);
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pRead->rpcHandle == NULL || (pRead->rpcHandle != NULL && pRead->msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pRead, code);
return NULL;
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset);
if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) {
dDebug("read worker:%d is released, total:%d", pWorker->workerId, readPool.num);
} else {
......@@ -15,13 +15,12 @@
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tqueue.h"
#include "tsdb.h"
#include "twal.h"
#include "tsync.h"
#include "vnode.h"
#include "syncInt.h"
#include "dnodeInt.h"
typedef struct {
......@@ -29,30 +28,21 @@ typedef struct {
taos_qset qset; // queue set
int32_t workerId; // worker ID
pthread_t thread; // thread
} SWriteWorker;
typedef struct {
SRspRet rspRet;
SRpcMsg rpcMsg;
int32_t processedCount;
int32_t code;
int32_t contLen;
void * pCont;
} SWriteMsg;
} SVWriteWorker;
typedef struct {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *worker;
SVWriteWorker * worker;
pthread_mutex_t mutex;
} SWriteWorkerPool;
} SVWriteWorkerPool;
static SWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param);
static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessVWriteQueue(void *param);
int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max);
tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL);
......@@ -66,14 +56,14 @@ int32_t dnodeInitVWrite() {
void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i;
SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i;
SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
......@@ -86,44 +76,43 @@ void dnodeCleanupVWrite() {
dInfo("dnode vwrite is closed");
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) {
char *pCont = pMsg->pCont;
void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
int32_t code;
char *pCont = pRpcMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
if (pRpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc);
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
SMsgHead *pMsg = (SMsgHead *)pCont;
pMsg->vgId = htonl(pMsg->vgId);
pMsg->contLen = htonl(pMsg->contLen);
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) {
// put message into queue
SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
void *pVnode = vnodeAcquire(pMsg->vgId);
if (pVnode == NULL) {
} else {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.msgType = 0
SWalHead *pHead = (SWalHead *)(pCont - sizeof(SWalHead));
pHead->msgType = pRpcMsg->msgType;
pHead->version = 0;
pHead->len = pMsg->contLen;
code = vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = code};
void *dnodeAllocVWriteQueue(void *pVnode) {
SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
SVWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue();
if (queue == NULL) {
......@@ -150,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessVWriteQueue, pWorker) != 0) {
dError("failed to create thread to process vwrite queue since %s", strerror(errno));
......@@ -179,7 +168,7 @@ void dnodeFreeVWriteQueue(void *wqueue) {
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return;
SWriteMsg *pWrite = param;
SVWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
......@@ -187,28 +176,24 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (count <= 1) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.handle = pWrite->rpcHandle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param;
SWriteMsg * pWrite;
SWalHead * pHead;
SRspRet * pRspRet;
void * pVnode;
void * pItem;
int32_t numOfMsgs;
int32_t qtype;
static void *dnodeProcessVWriteQueue(void *param) {
SVWriteWorker *pWorker = param;
SVWriteMsg * pWrite;
void * pVnode;
int32_t numOfMsgs;
int32_t qtype;
dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
......@@ -219,55 +204,33 @@ static void *dnodeProcessWriteQueue(void *param) {
bool forceFsync = false;
for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL;
pRspRet = NULL;
taosGetQitem(pWorker->qall, &qtype, &pItem);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
} else if (qtype == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
} else {
pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version);
int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
pHead->version, tstrerror(code));
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1;
if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
if (pWrite) {
pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1;
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
walFsync(vnodeGetWal(pVnode), forceFsync);
// browse all items, and process them one by one
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, &pItem);
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else if (qtype == TAOS_QTYPE_FWD) {
pHead = pItem;
vnodeConfirmForward(pVnode, pHead->version, 0);
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
} else {
......@@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue();
......@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h"
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
typedef struct {
int vgId;
......@@ -70,8 +70,8 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);
// when a forward pkt is received, call this to handle data
typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
......@@ -43,8 +43,8 @@ typedef struct {
int8_t keep; // keep the wal file when closed
} SWalCfg;
typedef void* twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type);
typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit();
void walCleanUp();
......@@ -55,7 +55,7 @@ void walStop(twalh);
void walClose(twalh);
int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh);
void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh);
......@@ -20,6 +20,8 @@
extern "C" {
#include "twal.h"
typedef enum _VN_STATUS {
......@@ -29,17 +31,32 @@ typedef enum _VN_STATUS {
} EVnStatus;
typedef struct {
int len;
void *rsp;
void *qhandle; //used by query and retrieve msg
int32_t len;
void * rsp;
void * qhandle; // used by query and retrieve msg
} SRspRet;
typedef struct {
int32_t code;
int32_t contLen;
void * rpcHandle;
void * rpcAhandle;
void * qhandle;
int8_t qtype;
int8_t msgType;
SRspRet rspRet;
char pCont[];
} SVReadMsg;
typedef struct {
int32_t code;
int32_t processedCount;
void * rpcHandle;
void * rpcAhandle;
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
char reserveForSync[16];
SWalHead pHead[];
} SVWriteMsg;
extern char *vnodeStatus[];
......@@ -50,13 +67,11 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
......@@ -65,8 +80,8 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode);
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus
......@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall;
static taos_queue tsSdbWriteQueue;
static SSdbWriteWorkerPool tsSdbPool;
static int sdbWrite(void *param, void *data, int type);
static int sdbWriteToQueue(void *param, void *data, int type);
static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg);
static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg);
static void * sdbWorkerFp(void *param);
static int32_t sdbInitWriteWorker();
static void sdbCleanupWriteWorker();
......@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
static int sdbWrite(void *param, void *data, int type) {
static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
SSdbOper *pOper = param;
SWalHead *pHead = data;
int32_t tableId = pHead->msgType / 10;
......@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() {
tsSdbWriteQueue = NULL;
int sdbWriteToQueue(void *param, void *data, int type) {
int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
SWalHead *pHead = data;
int size = sizeof(SWalHead) + pHead->len;
int32_t size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
taosWriteQitem(tsSdbWriteQueue, type, pWal);
taosWriteQitem(tsSdbWriteQueue, qtype, pWal);
return 0;
......@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) {
pOper = NULL;
int32_t code = sdbWrite(pOper, pHead, type);
int32_t code = sdbWrite(pOper, pHead, type, NULL);
if (code > 0) code = 0;
if (pOper) {
pOper->retCode = code;
......@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) {
walFsync(tsSdbObj.wal, true);
// browse all items, and process them one by one
......@@ -44,7 +44,7 @@ long interlocked_add_fetch_32(long volatile* ptr, long val) {
__int64 interlocked_add_fetch_64(__int64 volatile* ptr, __int64 val) {
//#ifdef _WIN64
return _InterlockedExchangeAdd64(ptr, val) + val;
return InterlockedExchangeAdd64(ptr, val) + val;
// return _InterlockedExchangeAdd(ptr, val) + val;
......@@ -65,12 +65,12 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
int64_t remain = count - writeLen;
if (remain > 0) {
size_t rlen = fread(buffer, 1, remain, in_file);
size_t rlen = fread(buffer, 1, (size_t) remain, in_file);
if (rlen <= 0) {
return writeLen;
else {
fwrite(buffer, 1, remain, out_file);
fwrite(buffer, 1, (size_t) remain, out_file);
writeLen += remain;
......@@ -49,7 +49,6 @@ typedef struct SFillInfo {
int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
// char ** pTags; // tags value for current interpolation
SFillTagColInfo* pTags; // tags value for filling gap
SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results
......@@ -83,7 +82,7 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRo
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
......@@ -26,7 +26,7 @@ extern "C" {
#define MEM_BUF_SIZE (1 << 20)
typedef struct STSList {
char* rawBuf;
......@@ -38,7 +38,7 @@ typedef struct STSList {
typedef struct STSElem {
tVariant* tag;
int32_t vnode;
int32_t id;
} STSElem;
typedef struct STSCursor {
......@@ -60,17 +60,17 @@ typedef struct STSBlock {
* The size of buffer file should not be greater than 2G,
* and the offset of int32_t type is enough
typedef struct STSVnodeBlockInfo {
int32_t vnode; // vnode id
typedef struct STSGroupBlockInfo {
int32_t id; // group id
int32_t offset; // offset set value in file
int32_t numOfBlocks; // number of total blocks
int32_t compLen; // compressed size
} STSVnodeBlockInfo;
} STSGroupBlockInfo;
typedef struct STSVnodeBlockInfoEx {
STSVnodeBlockInfo info;
typedef struct STSGroupBlockInfoEx {
STSGroupBlockInfo info;
int32_t len; // length before compress
} STSVnodeBlockInfoEx;
} STSGroupBlockInfoEx;
typedef struct STSBuf {
FILE* f;
......@@ -78,9 +78,9 @@ typedef struct STSBuf {
uint32_t fileSize;
// todo use array
STSVnodeBlockInfoEx* pData;
STSGroupBlockInfoEx* pData;
uint32_t numOfAlloc;
uint32_t numOfVnodes;
uint32_t numOfGroups;
char* assistBuf;
int32_t bufSize;
......@@ -94,22 +94,22 @@ typedef struct STSBuf {
typedef struct STSBufFileHeader {
uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file
uint32_t numOfGroup; // number of group stored in current file
int32_t tsOrder; // timestamp order in current file
} STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete, int32_t order);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t vnodeId);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t id);
void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
STSBuf* tsBufClone(STSBuf* pTSBuf);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf);
......@@ -118,7 +118,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
......@@ -131,11 +131,11 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
void tsBufDisplay(STSBuf* pTSBuf);
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf);
int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf);
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id);
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks);
int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t id, void* buf, int32_t* len, int32_t* numOfBlocks);
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag);
......@@ -609,7 +609,7 @@ int32_t intersect(SArray *pLeft, SArray *pRight, SArray *pFinalRes) {
* traverse the result and apply the function to each item to check if the item is qualified or not
static void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
static UNUSED_FUNC void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE && fp != NULL);
// scan the result array list and check for each item in the list
......@@ -660,7 +660,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p
* @param pSchema tag schemas
* @param fp filter callback function
static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
static UNUSED_FUNC void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
size_t size = taosArrayGetSize(pResult);
SArray* array = taosArrayInit(size, POINTER_BYTES);
......@@ -733,10 +733,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo);
if (pSkipList == NULL) {
tArrayTraverse(pExpr, param->nodeFilterFn, result);
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
......@@ -748,49 +744,14 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
// recursive traverse left child branch
// The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
if (weight == 0 ) {
if (taosArrayGetSize(result) > 0 && pSkipList == NULL) {
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
* Since no index presented, the filter operation is done by scan all elements in the result set.
* if the query is a high selectivity filter, only small portion of meters are retrieved.
exprTreeTraverseImpl(pExpr, result, param);
} else {
* apply the hierarchical expression to every node in skiplist for find the qualified nodes
assert(taosArrayGetSize(result) == 0);
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
if (weight == 2 || (weight == 1 && pExpr->_node.optr == TSDB_RELATION_OR)) {
SArray* rLeft = taosArrayInit(10, POINTER_BYTES);
SArray* rRight = taosArrayInit(10, POINTER_BYTES);
tExprTreeTraverse(pLeft, pSkipList, rLeft, param);
tExprTreeTraverse(pRight, pSkipList, rRight, param);
if (pExpr->_node.optr == TSDB_RELATION_AND) { // CROSS
intersect(rLeft, rRight, result);
} else if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
merge(rLeft, rRight, result);
} else {
//apply the hierarchical expression to every node in skiplist for find the qualified nodes
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
#if 0
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
......@@ -819,6 +780,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
* So, we do not set the skip list index as a parameter
tExprTreeTraverse(pSecond, NULL, result, param);
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
......@@ -3843,7 +3843,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) {
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
......@@ -4777,7 +4777,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) {
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
......@@ -4802,7 +4802,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (elem1.vnode < 0) {
if (!tsBufIsValidElem(&elem1)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
......@@ -215,7 +215,7 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) {
*(int32_t*)point->val = (int32_t)linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, (double)point1->key,
......@@ -343,7 +343,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
point = (SPoint){.key = pFillInfo->start, .val = val1};
taosDoLinearInterpolation(type, &point1, &point2, &point);
taosGetLinearInterpolationVal(type, &point1, &point2, &point);
setTagsValue(pFillInfo, data, *num);
......@@ -168,7 +168,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
(*pHisto)->numOfEntries += 1;
} else { /* insert a new slot */
if ((*pHisto)->numOfElems > 1 && idx < (*pHisto)->numOfEntries) {
if ((*pHisto)->numOfElems >= 1 && idx < (*pHisto)->numOfEntries) {
if (idx > 0) {
assert((*pHisto)->elems[idx - 1].val <= val);
......@@ -661,4 +661,4 @@ SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2
return pResHistogram;
\ No newline at end of file
......@@ -42,7 +42,7 @@ void simpleTest() {
EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num);
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
EXPECT_EQ(pTSBuf->numOfVnodes, 1);
EXPECT_EQ(pTSBuf->numOfGroups, 1);
EXPECT_EQ(pTSBuf->tsData.len, 0);
......@@ -69,7 +69,7 @@ void largeTSTest() {
// the data has been flush to disk, no data in cache
EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
EXPECT_EQ(pTSBuf->numOfVnodes, 1);
EXPECT_EQ(pTSBuf->numOfGroups, 1);
......@@ -105,7 +105,7 @@ void multiTagsTest() {
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
EXPECT_EQ(pTSBuf->numOfVnodes, 1);
EXPECT_EQ(pTSBuf->numOfGroups, 1);
EXPECT_EQ(pTSBuf->tsData.len, 0);
......@@ -139,7 +139,7 @@ void multiVnodeTagsTest() {
start += step * num;
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1);
EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
......@@ -184,7 +184,7 @@ void loadDataTest() {
start += step * num;
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1);
EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
......@@ -203,7 +203,7 @@ void loadDataTest() {
// create from exists file
STSBuf* pNewBuf = tsBufCreateFromFile(pTSBuf->path, false);
EXPECT_EQ(pNewBuf->tsOrder, pTSBuf->tsOrder);
EXPECT_EQ(pNewBuf->numOfVnodes, numOfVnode);
EXPECT_EQ(pNewBuf->numOfGroups, numOfVnode);
EXPECT_EQ(pNewBuf->fileSize, pTSBuf->fileSize);
EXPECT_EQ(pNewBuf->pData[0].info.offset, pTSBuf->pData[0].info.offset);
......@@ -269,7 +269,7 @@ void TSTraverse() {
start += step * num;
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1);
EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
......@@ -304,7 +304,7 @@ void TSTraverse() {
int32_t totalOutput = 10;
while (1) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) {
......@@ -352,7 +352,7 @@ void TSTraverse() {
totalOutput = 10;
while (1) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) {
......@@ -427,7 +427,7 @@ void mergeDiffVnodeBufferTest() {
tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
EXPECT_EQ(pTSBuf1->numOfGroups, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
......@@ -472,7 +472,7 @@ void mergeIdenticalVnodeBufferTest() {
tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
EXPECT_EQ(pTSBuf1->numOfGroups, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
......@@ -482,12 +482,12 @@ void mergeIdenticalVnodeBufferTest() {
STSElem elem = tsBufGetElem(pTSBuf1);
if (count++ < numOfTags * num) {
EXPECT_EQ(elem.vnode, 12);
EXPECT_EQ(elem.id, 12);
} else {
EXPECT_EQ(elem.vnode, 77);
EXPECT_EQ(elem.id, 77);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
......@@ -1622,7 +1622,7 @@ static void rpcDecRef(SRpcInfo *pRpc)
int count = atomic_sub_fetch_32(&tsRpcNum, 1);
if (count == 0) {
// taosCloseRef(tsRpcRefId);
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
......@@ -863,7 +863,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
......@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
if (ret < 0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
if (code < 0) {
......@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)offset;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
offset += pHead->len + sizeof(SWalHead);
return offset;
......@@ -2311,7 +2311,11 @@ void filterPrepare(void* expr, void* param) {
if (pInfo->optr == TSDB_RELATION_IN) {
pInfo->q = (char*) pCond->arr;
} else {
pInfo->q = calloc(1, pSchema->bytes + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes;
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
tVariantDump(pCond, pInfo->q, pSchema->type, true);
......@@ -367,7 +367,14 @@ int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE);
tstr* t1 = (tstr*) f1;
tstr* t2 = (tstr*) f2;
if (t1->len != t2->len) {
return t1->len > t2->len? 1:-1;
int32_t ret = wcsncmp((wchar_t*) t1->data, (wchar_t*) t2->data, t2->len/TSDB_NCHAR_SIZE);
if (ret == 0) {
return ret;
......@@ -61,8 +61,6 @@ typedef struct {
char db[TSDB_DB_NAME_LEN];
} SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type);
int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type);
void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
......@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteCqMsgToQueue;
cqCfg.cqWrite = vnodeWriteToWQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) {
......@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return terrno;
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
walRestore(pVnode->wal, pVnode, vnodeProcessWrite);
if (pVnode->version == 0) {
pVnode->version = walGetVersion(pVnode->wal);
......@@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.writeToCache = vnodeWriteToWQueue;
syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
......@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) {
void vnodeRelease(void *pVnodeRaw) {
if (pVnodeRaw == NULL) return;
SVnodeObj *pVnode = pVnodeRaw;
int32_t vgId = pVnode->vgId;
......@@ -467,36 +468,6 @@ void *vnodeAcquire(int32_t vgId) {
return *ppVnode;
void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]);
return NULL;
return pVnode->rqueue;
void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
return NULL;
return pVnode->wqueue;
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
......@@ -29,9 +29,9 @@
#include "vnodeInt.h"
#include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) {
......@@ -44,9 +44,9 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType;
int32_t msgType = pReadMsg->msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
......@@ -56,7 +56,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
int32_t vnodeCheckRead(void *param) {
static int32_t vnodeCheckRead(void *param) {
SVnodeObj *pVnode = param;
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
......@@ -78,24 +78,58 @@ int32_t vnodeCheckRead(void *param) {
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.ahandle = ahandle;
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
atomic_add_fetch_32(&pVnode->refCount, 1);
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
int32_t size = sizeof(SVReadMsg) + contLen;
SVReadMsg *pRead = taosAllocateQitem(size);
if (pRead == NULL) {
if (rparam != NULL) {
SRpcMsg *pRpcMsg = rparam;
pRead->rpcHandle = pRpcMsg->handle;
pRead->rpcAhandle = pRpcMsg->ahandle;
pRead->msgType = pRpcMsg->msgType;
pRead->code = pRpcMsg->code;
if (contLen != 0) {
pRead->contLen = contLen;
memcpy(pRead->pCont, pCont, contLen);
} else {
pRead->qhandle = pCont;
pRead->qtype = qtype;
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode rqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->rqueue, qtype, pRead);
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
rpcMsg.ahandle = ahandle;
int32_t code = vnodeWriteToRQueue(pVnode, qhandle, 0, TAOS_QTYPE_QUERY, &rpcMsg);
if (code == TSDB_CODE_SUCCESS) {
vDebug("QInfo:%p add to vread queue for exec query", *qhandle);
return code;
* @param pRet response message object
......@@ -146,7 +180,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true;
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet;
......@@ -155,18 +189,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet));
// qHandle needs to be freed correctly
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pReadMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont;
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcHandle);
assert(pReadMsg->contLen > 0 && killQueryMsg->free == 1);
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
} else {
assert(*qhandle == (void *)killQueryMsg->qhandle);
......@@ -208,9 +242,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL &&
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
return pRsp->code;
......@@ -221,7 +255,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcHandle);
if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -230,7 +264,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else {
assert(pCont != NULL);
void **qhandle = (void **)pCont;
void **qhandle = (void **)pReadMsg->qhandle;
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
......@@ -242,14 +276,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// build query rsp, the retrieve request has reached here already
if (buildRes) {
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle);
assert(pReadMsg->rpcMsg.handle != NULL);
pReadMsg->rpcHandle = qGetResultRetrieveMsg(*qhandle);
assert(pReadMsg->rpcHandle != NULL);
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
// set the real rsp error code
pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle);
pReadMsg->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcHandle);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
......@@ -274,7 +308,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return code;
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont;
SRspRet *pRet = &pReadMsg->rspRet;
......@@ -283,7 +317,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
pRetrieve->free, pReadMsg->rpcMsg.handle);
pRetrieve->free, pReadMsg->rpcHandle);
memset(pRet, 0, sizeof(SRspRet));
......@@ -314,9 +348,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcHandle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -326,7 +359,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
bool freeHandle = true;
bool buildRes = false;
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcHandle);
if (code != TSDB_CODE_SUCCESS) {
// TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
......@@ -337,7 +370,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
assert(buildRes == true);
if (!buildRes) {
assert(pReadMsg->rpcMsg.handle != NULL);
assert(pReadMsg->rpcHandle != NULL);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
......@@ -345,7 +378,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// ahandle is the sqlObj pointer
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcHandle);
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
......@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param1;
SWalHead * pHead = param2;
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0;
SVnodeObj * pVnode = vparam;
SWalHead * pHead = wparam;
SRspRet * pRspRet = rparam;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
......@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pRspRet, qtype);
if (syncCode < 0) return syncCode;
// write into WAL
......@@ -90,13 +91,13 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
pVnode->version = pHead->version;
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) return code;
return syncCode;
int32_t vnodeCheckWrite(void *param) {
static int32_t vnodeCheckWrite(void *param) {
SVnodeObj *pVnode = param;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
......@@ -204,35 +205,32 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param;
SWalHead * pHead = data;
int size = sizeof(SWalHead) + pHead->len;
SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead));
SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam;
taosWriteQitem(pVnode->wqueue, type, pSync);
if (qtype == TAOS_QTYPE_RPC) {
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
return 0;
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) {
int vnodeWriteToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param;
SWalHead * pHead = data;
if (rparam != NULL) {
SRpcMsg *pRpcMsg = rparam;
pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle;
int size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pWal);
return 0;
taosWriteQitem(pVnode->wqueue, qtype, pWrite);
......@@ -70,7 +70,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
pWal->fsyncSeq = pCfg->fsyncPeriod % 1000;
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
if (walInitObj(pWal) != TSDB_CODE_SUCCESS) {
......@@ -111,18 +111,19 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
return code;
void walFsync(void *handle) {
void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle;
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
if (pWal == NULL || pWal->fd < 0) return;
if (pWal->fsyncPeriod == 0) {
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, file:%s, do fsync", pWal->vgId, pWal->name);
if (fsync(pWal->fd) < 0) {
wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) {
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1;
SWal * pWal = handle;
......@@ -143,12 +144,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
if (!pWal->keep) {
wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
} else {
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
......@@ -305,9 +301,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId,
pWal->version, pHead->version, pHead->len);
if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
......@@ -23,7 +23,7 @@
int64_t ver = 0;
void *pWal = NULL;
int writeToQueue(void *pVnode, void *data, int type) {
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
// do nothing
SWalHead *pHead = data;
......@@ -35,10 +35,11 @@ class TDTestCase:
% (self.ts, self.ts + 2000000000, self.ts + 4000000000, self.ts + 5000000000, self.ts + 7000000000))
tdSql.query("select avg(voltage) from st interval(1n)")
tdSql.checkData(0, 1, 221.4)
tdSql.checkData(1, 1, 227.0)
tdSql.checkData(2, 1, 222.0)
tdSql.checkData(0, 1, 220.0)
tdSql.checkData(1, 1, 222.33333333333334)
tdSql.checkData(2, 1, 227.0)
tdSql.checkData(3, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d)")
......@@ -106,7 +106,7 @@ class TDTestCase:
tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.pid")
tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
def run(self):
tdSql.execute("create table cars(ts timestamp, c nchar(2)) tags(t1 nchar(2))")
tdSql.execute("insert into car0 using cars tags('aa') values(now, 'bb');")
tdSql.query("select count(*) from cars where t1 like '%50 90 30 04 00 00%'")
tdSql.execute("create table test_cars(ts timestamp, c nchar(2)) tags(t1 nchar(20))")
tdSql.execute("insert into car1 using test_cars tags('150 90 30 04 00 002') values(now, 'bb');")
tdSql.query("select * from test_cars where t1 like '%50 90 30 04 00 00%'")
def stop(self):
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册