提交 93af9fbc 编写于 作者: C Cary Xu

Merge branch 'develop' into hotfix/TS-854-D

...@@ -456,23 +456,42 @@ pipeline { ...@@ -456,23 +456,42 @@ pipeline {
nohup taosd >/dev/null & nohup taosd >/dev/null &
sleep 10 sleep 10
''' '''
sh ''' sh '''
cd ${WKC}/tests/examples/nodejs cd ${WKC}/src/connector/python
npm install td2.0-connector > /dev/null 2>&1 export PYTHONPATH=$PWD/
node nodejsChecker.js host=localhost export LD_LIBRARY_PATH=${WKC}/debug/build/lib
node test1970.js pip3 install pytest
cd ${WKC}/tests/connectorTest/nodejsTest/nanosupport pytest tests/
npm install td2.0-connector > /dev/null 2>&1
node nanosecondTest.js
python3 examples/bind-multi.py
python3 examples/bind-row.py
python3 examples/demo.py
python3 examples/insert-lines.py
python3 examples/pep-249.py
python3 examples/query-async.py
python3 examples/query-objectively.py
python3 examples/subscribe-sync.py
python3 examples/subscribe-async.py
'''
sh '''
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
node test1970.js
cd ${WKC}/tests/connectorTest/nodejsTest/nanosupport
npm install td2.0-connector > /dev/null 2>&1
node nanosecondTest.js
''' '''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh ''' sh '''
cd ${WKC}/tests/examples/C#/taosdemo cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1 dotnet build -c Release
echo '' |./taosdemo -c /etc/taos tree | true
./bin/Release/net5.0/taosdemo -c /etc/taos -y
''' '''
} }
sh ''' sh '''
cd ${WKC}/tests/gotest cd ${WKC}/tests/gotest
bash batchtest.sh bash batchtest.sh
......
[Unit] [Unit]
Description=TDengine server service Description=TDengine server service
After=network-online.target After=network-online.target taosadapter.service
Wants=network-online.target Wants=network-online.target taosadapter.service
[Service] [Service]
Type=simple Type=simple
......
...@@ -886,7 +886,7 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa ...@@ -886,7 +886,7 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
TAOS_RES* res = taos_query(taos, sql); TAOS_RES* res = taos_query(taos, sql);
free(sql); free(sql);
code = taos_errno(res); code = taos_errno(res);
info->affectedRows = taos_affected_rows(res); info->affectedRows += taos_affected_rows(res);
taos_free_result(res); taos_free_result(res);
return code; return code;
} }
...@@ -1302,14 +1302,6 @@ clean_up: ...@@ -1302,14 +1302,6 @@ clean_up:
return code; return code;
} }
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
int code = tscSmlInsert(taos, points, numPoint, info);
free(info);
return code;
}
//========================================================================= //=========================================================================
/* Field Escape charaters /* Field Escape charaters
......
...@@ -1533,6 +1533,41 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO ...@@ -1533,6 +1533,41 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t stmtValidateValuesFields(SSqlCmd *pCmd, char * sql) {
int32_t loopCont = 1, index0 = 0, values = 0;
SStrToken sToken;
while (loopCont) {
sToken = tStrGetToken(sql, &index0, false);
if (sToken.n <= 0) {
return TSDB_CODE_SUCCESS;
}
switch (sToken.type) {
case TK_RP:
if (values) {
return TSDB_CODE_SUCCESS;
}
break;
case TK_VALUES:
values = 1;
break;
case TK_QUESTION:
case TK_LP:
break;
default:
if (values) {
tscError("only ? allowed in values");
return tscInvalidOperationMsg(pCmd->payload, "only ? allowed in values", NULL);
}
break;
}
}
return TSDB_CODE_SUCCESS;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// interface functions // interface functions
...@@ -1637,6 +1672,11 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -1637,6 +1672,11 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_RET(ret); STMT_RET(ret);
} }
ret = stmtValidateValuesFields(&pSql->cmd, pSql->sqlstr);
if (ret != TSDB_CODE_SUCCESS) {
STMT_RET(ret);
}
if (pStmt->multiTbInsert) { if (pStmt->multiTbInsert) {
STMT_RET(TSDB_CODE_SUCCESS); STMT_RET(TSDB_CODE_SUCCESS);
} }
......
...@@ -3457,7 +3457,9 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -3457,7 +3457,9 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
strncpy(pCmd->payload, idStr->z, idStr->n); SKillQueryMsg* msg = (SKillQueryMsg*)pCmd->payload;
strncpy(msg->queryId, idStr->z, idStr->n);
const char delim = ':'; const char delim = ':';
char* connIdStr = strtok(idStr->z, &delim); char* connIdStr = strtok(idStr->z, &delim);
...@@ -3465,7 +3467,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -3465,7 +3467,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
int32_t connId = (int32_t)strtol(connIdStr, NULL, 10); int32_t connId = (int32_t)strtol(connIdStr, NULL, 10);
if (connId <= 0) { if (connId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(msg, 0, sizeof(*msg));
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
...@@ -3475,7 +3477,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -3475,7 +3477,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
if (queryId <= 0) { if (queryId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(msg, 0, sizeof(*msg));
if (killType == TSDB_SQL_KILL_QUERY) { if (killType == TSDB_SQL_KILL_QUERY) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else { } else {
......
...@@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
} }
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SArray* queryOperator = createExecOperatorPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query);
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = query.numOfTags; int32_t numOfTags = query.numOfTags;
...@@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pSql->sqlstr, sqlLen); memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen; pMsg += sqlLen;
/*
//MSG EXTEND DEMO
pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_DUMMY);
tlv->len = htonl(sizeof(int16_t));
*(int16_t *)tlv->value = htons(12345);
pMsg += sizeof(*tlv) + ntohl(tlv->len);
tlv = (STLV *)pMsg;
tlv->len = 0;
pMsg += sizeof(*tlv);
*/
int32_t msgLen = (int32_t)(pMsg - pCmd->payload); int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen); tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen);
...@@ -2813,7 +2831,11 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn ...@@ -2813,7 +2831,11 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscAddQueryInfo(&pNew->cmd); tscAddQueryInfo(&pNew->cmd);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd); SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) { int payLoadLen = TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen;
if (autocreate && pSql->cmd.insertParam.tagData.dataLen != 0) {
payLoadLen += pSql->cmd.insertParam.tagData.dataLen;
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, payLoadLen)) {
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self); tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
......
...@@ -11,6 +11,11 @@ import java.util.Map; ...@@ -11,6 +11,11 @@ import java.util.Map;
public abstract class AbstractResultSet extends WrapperImpl implements ResultSet { public abstract class AbstractResultSet extends WrapperImpl implements ResultSet {
private int fetchSize; private int fetchSize;
protected boolean wasNull; protected boolean wasNull;
protected int timestampPrecision;
public void setTimestampPrecision(int timestampPrecision) {
this.timestampPrecision = timestampPrecision;
}
protected void checkAvailability(int columnIndex, int bounds) throws SQLException { protected void checkAvailability(int columnIndex, int bounds) throws SQLException {
if (isClosed()) if (isClosed())
......
...@@ -74,9 +74,8 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet { ...@@ -74,9 +74,8 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
public boolean next() throws SQLException { public boolean next() throws SQLException {
if (this.getBatchFetch()) { if (this.getBatchFetch()) {
if (this.blockData.forward()) { if (this.blockData.forward())
return true; return true;
}
int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData); int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
this.blockData.reset(); this.blockData.reset();
...@@ -214,7 +213,18 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet { ...@@ -214,7 +213,18 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
if (!lastWasNull) { if (!lastWasNull) {
Object value = this.rowData.getObject(columnIndex); Object value = this.rowData.getObject(columnIndex);
if (value instanceof Timestamp) { if (value instanceof Timestamp) {
res = ((Timestamp) value).getTime(); Timestamp ts = (Timestamp) value;
long epochSec = ts.getTime() / 1000;
long nanoAdjustment = ts.getNanos();
switch (this.timestampPrecision) {
case 0:
default: // ms
return ts.getTime();
case 1: // us
return epochSec * 1000_000L + nanoAdjustment / 1000L;
case 2: // ns
return epochSec * 1000_000_000L + nanoAdjustment;
}
} else { } else {
int nativeType = this.columnMetaDataList.get(columnIndex - 1).getColType(); int nativeType = this.columnMetaDataList.get(columnIndex - 1).getColType();
res = this.rowData.getLong(columnIndex, nativeType); res = this.rowData.getLong(columnIndex, nativeType);
......
...@@ -47,6 +47,8 @@ public class TSDBStatement extends AbstractStatement { ...@@ -47,6 +47,8 @@ public class TSDBStatement extends AbstractStatement {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY);
} }
TSDBResultSet res = new TSDBResultSet(this, this.connection.getConnector(), pSql); TSDBResultSet res = new TSDBResultSet(this, this.connection.getConnector(), pSql);
int timestampPrecision = this.connection.getConnector().getResultTimePrecision(pSql);
res.setTimestampPrecision(timestampPrecision);
res.setBatchFetch(this.connection.getBatchFetch()); res.setBatchFetch(this.connection.getBatchFetch());
return res; return res;
} }
......
package com.taosdata.jdbc.enums; package com.taosdata.jdbc.enums;
public enum TimestampPrecision { public class TimestampPrecision {
MS, public static final int MS = 0;
US, public static final int US = 1;
NS, public static final int NS = 2;
UNKNOWN
} }
...@@ -168,11 +168,22 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet { ...@@ -168,11 +168,22 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case TIMESTAMP: { case TIMESTAMP: {
Long value = row.getLong(colIndex); Long value = row.getLong(colIndex);
//TODO: this implementation has bug if the timestamp bigger than 9999_9999_9999_9 //TODO: this implementation has bug if the timestamp bigger than 9999_9999_9999_9
if (value < 1_0000_0000_0000_0L) if (value < 1_0000_0000_0000_0L) {
this.timestampPrecision = TimestampPrecision.MS;
return new Timestamp(value); return new Timestamp(value);
long epochSec = value / 1000_000L; }
long nanoAdjustment = value % 1000_000L * 1000L; if (value >= 1_0000_0000_0000_0L && value < 1_000_000_000_000_000_0l) {
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment)); this.timestampPrecision = TimestampPrecision.US;
long epochSec = value / 1000_000L;
long nanoAdjustment = value % 1000_000L * 1000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
if (value >= 1_000_000_000_000_000_0l) {
this.timestampPrecision = TimestampPrecision.NS;
long epochSec = value / 1000_000_000L;
long nanoAdjustment = value % 1000_000_000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
} }
case UTC: { case UTC: {
String value = row.getString(colIndex); String value = row.getString(colIndex);
...@@ -182,12 +193,15 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet { ...@@ -182,12 +193,15 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
if (value.length() > 31) { if (value.length() > 31) {
// ns timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSSSSS+0x00 // ns timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSSSSS+0x00
nanoAdjustment = fractionalSec; nanoAdjustment = fractionalSec;
this.timestampPrecision = TimestampPrecision.NS;
} else if (value.length() > 28) { } else if (value.length() > 28) {
// ms timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSS+0x00 // ms timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSS+0x00
nanoAdjustment = fractionalSec * 1000L; nanoAdjustment = fractionalSec * 1000L;
this.timestampPrecision = TimestampPrecision.US;
} else { } else {
// ms timestamp: yyyy-MM-ddTHH:mm:ss.SSS+0x00 // ms timestamp: yyyy-MM-ddTHH:mm:ss.SSS+0x00
nanoAdjustment = fractionalSec * 1000_000L; nanoAdjustment = fractionalSec * 1000_000L;
this.timestampPrecision = TimestampPrecision.MS;
} }
ZoneOffset zoneOffset = ZoneOffset.of(value.substring(value.length() - 5)); ZoneOffset zoneOffset = ZoneOffset.of(value.substring(value.length() - 5));
Instant instant = Instant.ofEpochSecond(epochSec, nanoAdjustment).atOffset(zoneOffset).toInstant(); Instant instant = Instant.ofEpochSecond(epochSec, nanoAdjustment).atOffset(zoneOffset).toInstant();
...@@ -196,7 +210,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet { ...@@ -196,7 +210,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case STRING: case STRING:
default: { default: {
String value = row.getString(colIndex); String value = row.getString(colIndex);
TimestampPrecision precision = Utils.guessTimestampPrecision(value); int precision = Utils.guessTimestampPrecision(value);
this.timestampPrecision = precision;
if (precision == TimestampPrecision.MS) { if (precision == TimestampPrecision.MS) {
// ms timestamp: yyyy-MM-dd HH:mm:ss.SSS // ms timestamp: yyyy-MM-dd HH:mm:ss.SSS
return row.getTimestamp(colIndex); return row.getTimestamp(colIndex);
...@@ -338,8 +354,18 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet { ...@@ -338,8 +354,18 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
wasNull = value == null; wasNull = value == null;
if (value == null) if (value == null)
return 0; return 0;
if (value instanceof Timestamp) if (value instanceof Timestamp) {
return ((Timestamp) value).getTime(); Timestamp ts = (Timestamp) value;
switch (this.timestampPrecision) {
case TimestampPrecision.MS:
default:
return ts.getTime();
case TimestampPrecision.US:
return ts.getTime() * 1000 + ts.getNanos() / 1000 % 1000;
case TimestampPrecision.NS:
return ts.getTime() * 1000_000 + ts.getNanos() % 1000_000;
}
}
long valueAsLong = 0; long valueAsLong = 0;
try { try {
valueAsLong = Long.parseLong(value.toString()); valueAsLong = Long.parseLong(value.toString());
......
...@@ -194,14 +194,14 @@ public class Utils { ...@@ -194,14 +194,14 @@ public class Utils {
return timestamp.toLocalDateTime().format(milliSecFormatter); return timestamp.toLocalDateTime().format(milliSecFormatter);
} }
public static TimestampPrecision guessTimestampPrecision(String value) { public static int guessTimestampPrecision(String value) {
if (isMilliSecFormat(value)) if (isMilliSecFormat(value))
return TimestampPrecision.MS; return TimestampPrecision.MS;
if (isMicroSecFormat(value)) if (isMicroSecFormat(value))
return TimestampPrecision.US; return TimestampPrecision.US;
if (isNanoSecFormat(value)) if (isNanoSecFormat(value))
return TimestampPrecision.NS; return TimestampPrecision.NS;
return TimestampPrecision.UNKNOWN; return TimestampPrecision.MS;
} }
private static boolean isMilliSecFormat(String timestampStr) { private static boolean isMilliSecFormat(String timestampStr) {
......
package com.taosdata.jdbc.cases;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
public class GetLongWithDifferentTimestampPrecision {
private final String host = "127.0.0.1";
@Test
public void testRestful() throws SQLException {
// given
String url = "jdbc:TAOS-RS://" + host + ":6041/";
Connection conn = DriverManager.getConnection(url, "root", "taosdata");
long ts = System.currentTimeMillis();
// when and then
assertResultSet(conn, "ms", ts, ts);
assertResultSet(conn, "us", ts, ts * 1000);
assertResultSet(conn, "ns", ts, ts * 1000_000);
}
@Test
public void testJni() throws SQLException {
// given
String url = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(url, "root", "taosdata");
long ts = System.currentTimeMillis();
// when and then
assertResultSet(conn, "ms", ts, ts);
assertResultSet(conn, "us", ts, ts * 1000);
assertResultSet(conn, "ns", ts, ts * 1000_000);
}
private void assertResultSet(Connection conn, String precision, long timestamp, long expect) throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test precision '" + precision + "'");
stmt.execute("create table test.weather(ts timestamp, f1 int)");
String dateTimeStr = sdf.format(new Date(timestamp));
stmt.execute("insert into test.weather values('" + dateTimeStr + "', 1)");
ResultSet rs = stmt.executeQuery("select * from test.weather");
rs.next();
long actual = rs.getLong("ts");
Assert.assertEquals(expect, actual);
stmt.execute("drop database if exists test");
}
}
}
...@@ -9,29 +9,29 @@ import java.util.Random; ...@@ -9,29 +9,29 @@ import java.util.Random;
@FixMethodOrder(MethodSorters.NAME_ASCENDING) @FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class RestfulJDBCTest { public class RestfulJDBCTest {
// private static final String host = "127.0.0.1"; private static final String host = "127.0.0.1";
private static final String host = "master";
private static final Random random = new Random(System.currentTimeMillis()); private static final Random random = new Random(System.currentTimeMillis());
private static Connection connection; private static Connection connection;
private static final String dbname = "restful_test";
@Test @Test
public void testCase001() throws SQLException { public void testCase001() throws SQLException {
// given // given
String sql = "drop database if exists restful_test"; String sql = "drop database if exists " + dbname;
// when // when
boolean execute = execute(connection, sql); boolean execute = execute(connection, sql);
// then // then
Assert.assertFalse(execute); Assert.assertFalse(execute);
// given // given
sql = "create database if not exists restful_test"; sql = "create database if not exists " + dbname;
// when // when
execute = execute(connection, sql); execute = execute(connection, sql);
// then // then
Assert.assertFalse(execute); Assert.assertFalse(execute);
// given // given
sql = "use restful_test"; sql = "use " + dbname;
// when // when
execute = execute(connection, sql); execute = execute(connection, sql);
// then // then
...@@ -41,7 +41,7 @@ public class RestfulJDBCTest { ...@@ -41,7 +41,7 @@ public class RestfulJDBCTest {
@Test @Test
public void testCase002() throws SQLException { public void testCase002() throws SQLException {
// given // given
String sql = "create table weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)"; String sql = "create table " + dbname + ".weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
// when // when
boolean execute = execute(connection, sql); boolean execute = execute(connection, sql);
// then // then
...@@ -52,7 +52,7 @@ public class RestfulJDBCTest { ...@@ -52,7 +52,7 @@ public class RestfulJDBCTest {
public void testCase004() throws SQLException { public void testCase004() throws SQLException {
for (int i = 1; i <= 100; i++) { for (int i = 1; i <= 100; i++) {
// given // given
String sql = "create table t" + i + " using weather tags('beijing', '" + i + "')"; String sql = "create table " + dbname + ".t" + i + " using " + dbname + ".weather tags('beijing', '" + i + "')";
// when // when
boolean execute = execute(connection, sql); boolean execute = execute(connection, sql);
// then // then
...@@ -68,7 +68,7 @@ public class RestfulJDBCTest { ...@@ -68,7 +68,7 @@ public class RestfulJDBCTest {
// given // given
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
String sql = "insert into t" + j + " values(" + currentTimeMillis + "," + (random.nextFloat() * 50) + "," + random.nextInt(100) + ")"; String sql = "insert into " + dbname + ".t" + j + " values(" + currentTimeMillis + "," + (random.nextFloat() * 50) + "," + random.nextInt(100) + ")";
// when // when
int affectRows = executeUpdate(connection, sql); int affectRows = executeUpdate(connection, sql);
// then // then
...@@ -83,7 +83,7 @@ public class RestfulJDBCTest { ...@@ -83,7 +83,7 @@ public class RestfulJDBCTest {
@Test @Test
public void testCase006() throws SQLException { public void testCase006() throws SQLException {
// given // given
String sql = "select * from weather"; String sql = "select * from " + dbname + ".weather";
// when // when
ResultSet rs = executeQuery(connection, sql); ResultSet rs = executeQuery(connection, sql);
ResultSetMetaData meta = rs.getMetaData(); ResultSetMetaData meta = rs.getMetaData();
...@@ -102,7 +102,7 @@ public class RestfulJDBCTest { ...@@ -102,7 +102,7 @@ public class RestfulJDBCTest {
@Test @Test
public void testCase007() throws SQLException { public void testCase007() throws SQLException {
// given // given
String sql = "drop database restful_test"; String sql = "drop database " + dbname;
// when // when
boolean execute = execute(connection, sql); boolean execute = execute(connection, sql);
...@@ -143,7 +143,7 @@ public class RestfulJDBCTest { ...@@ -143,7 +143,7 @@ public class RestfulJDBCTest {
public static void afterClass() throws SQLException { public static void afterClass() throws SQLException {
if (connection != null) { if (connection != null) {
Statement stmt = connection.createStatement(); Statement stmt = connection.createStatement();
stmt.execute("drop database if exists restful_test"); stmt.execute("drop database if exists " + dbname);
stmt.close(); stmt.close();
connection.close(); connection.close();
} }
......
...@@ -17,12 +17,25 @@ import java.text.SimpleDateFormat; ...@@ -17,12 +17,25 @@ import java.text.SimpleDateFormat;
public class RestfulResultSetTest { public class RestfulResultSetTest {
// private static final String host = "127.0.0.1"; private static final String host = "127.0.0.1";
private static final String host = "master";
private static Connection conn; private static Connection conn;
private static Statement stmt; private static Statement stmt;
private static ResultSet rs; private static ResultSet rs;
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("create database if not exists restful_test");
stmt.execute("use restful_test");
stmt.execute("drop table if exists weather");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
rs = stmt.executeQuery("select * from restful_test.weather");
rs.next();
}
@Test @Test
public void wasNull() throws SQLException { public void wasNull() throws SQLException {
Assert.assertFalse(rs.wasNull()); Assert.assertFalse(rs.wasNull());
...@@ -658,20 +671,6 @@ public class RestfulResultSetTest { ...@@ -658,20 +671,6 @@ public class RestfulResultSetTest {
Assert.assertTrue(rs.isWrapperFor(RestfulResultSet.class)); Assert.assertTrue(rs.isWrapperFor(RestfulResultSet.class));
} }
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("create database if not exists restful_test");
stmt.execute("use restful_test");
stmt.execute("drop table if exists weather");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
rs = stmt.executeQuery("select * from restful_test.weather");
rs.next();
}
@AfterClass @AfterClass
public static void afterClass() throws SQLException { public static void afterClass() throws SQLException {
if (rs != null) if (rs != null)
......
...@@ -7,7 +7,7 @@ import time ...@@ -7,7 +7,7 @@ import time
def subscribe_callback(p_sub, p_result, p_param, errno): def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback") print("# fetch in callback")
result = TaosResult(p_result) result = TaosResult(c_void_p(p_result))
result.check_error(errno) result.check_error(errno)
for row in result.rows_iter(): for row in result.rows_iter():
ts, n = row() ts, n = row()
...@@ -18,18 +18,21 @@ def test_subscribe_callback(conn): ...@@ -18,18 +18,21 @@ def test_subscribe_callback(conn):
# type: (TaosConnection) -> None # type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback" dbname = "pytest_taos_subscribe_callback"
try: try:
print("drop if exists")
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
print("create database")
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) print("create table")
conn.execute("create table if not exists log(ts timestamp, n int)") # conn.execute("use %s" % dbname)
conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname)
print("# subscribe with callback") print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback) sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback)
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % i) conn.execute("insert into %s.log values(now, %d)" % (dbname, i))
time.sleep(0.7) time.sleep(0.7)
# sub.close() sub.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
# conn.close() # conn.close()
......
...@@ -110,7 +110,7 @@ _libtaos.taos_get_client_info.restype = c_char_p ...@@ -110,7 +110,7 @@ _libtaos.taos_get_client_info.restype = c_char_p
def taos_get_client_info(): def taos_get_client_info():
# type: () -> str # type: () -> str
"""Get client version info.""" """Get client version info."""
return _libtaos.taos_get_client_info().decode() return _libtaos.taos_get_client_info().decode("utf-8")
_libtaos.taos_get_server_info.restype = c_char_p _libtaos.taos_get_server_info.restype = c_char_p
...@@ -120,7 +120,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,) ...@@ -120,7 +120,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,)
def taos_get_server_info(connection): def taos_get_server_info(connection):
# type: (c_void_p) -> str # type: (c_void_p) -> str
"""Get server version as string.""" """Get server version as string."""
return _libtaos.taos_get_server_info(connection).decode() return _libtaos.taos_get_server_info(connection).decode("utf-8")
_libtaos.taos_close.restype = None _libtaos.taos_close.restype = None
...@@ -308,16 +308,14 @@ def taos_subscribe(connection, restart, topic, sql, interval, callback=None, par ...@@ -308,16 +308,14 @@ def taos_subscribe(connection, restart, topic, sql, interval, callback=None, par
""" """
if callback != None: if callback != None:
callback = subscribe_callback_type(callback) callback = subscribe_callback_type(callback)
if param != None:
param = c_void_p(param)
return c_void_p( return c_void_p(
_libtaos.taos_subscribe( _libtaos.taos_subscribe(
connection, connection,
1 if restart else 0, 1 if restart else 0,
c_char_p(topic.encode("utf-8")), c_char_p(topic.encode("utf-8")),
c_char_p(sql.encode("utf-8")), c_char_p(sql.encode("utf-8")),
callback or None, callback,
param, c_void_p(param),
interval, interval,
) )
) )
......
...@@ -144,7 +144,7 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_ ...@@ -144,7 +144,7 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_
try: try:
if num_of_rows >= 0: if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data) tmpstr = ctypes.c_char_p(data)
res.append(tmpstr.value.decode()) res.append(tmpstr.value.decode("utf-8"))
else: else:
res.append( res.append(
( (
...@@ -172,7 +172,7 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=Field ...@@ -172,7 +172,7 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=Field
if rbyte == 1 and buffer[0] == b'\xff': if rbyte == 1 and buffer[0] == b'\xff':
res.append(None) res.append(None)
else: else:
res.append(cast(buffer, c_char_p).value.decode()) res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res return res
...@@ -188,7 +188,7 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldT ...@@ -188,7 +188,7 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldT
if rbyte == 4 and buffer[:4] == b'\xff'*4: if rbyte == 4 and buffer[:4] == b'\xff'*4:
res.append(None) res.append(None)
else: else:
res.append(cast(buffer, c_char_p).value.decode()) res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res return res
......
...@@ -3,6 +3,8 @@ from .cinterface import * ...@@ -3,6 +3,8 @@ from .cinterface import *
# from .connection import TaosConnection # from .connection import TaosConnection
from .error import * from .error import *
from ctypes import c_void_p
class TaosResult(object): class TaosResult(object):
"""TDengine result interface""" """TDengine result interface"""
...@@ -12,7 +14,11 @@ class TaosResult(object): ...@@ -12,7 +14,11 @@ class TaosResult(object):
# to make the __del__ order right # to make the __del__ order right
self._conn = conn self._conn = conn
self._close_after = close_after self._close_after = close_after
self._result = result if isinstance(result, c_void_p):
self._result = result
else:
self._result = c_void_p(result)
self._fields = None self._fields = None
self._field_count = None self._field_count = None
self._precision = None self._precision = None
......
...@@ -20,7 +20,8 @@ def stream_callback(p_param, p_result, p_row): ...@@ -20,7 +20,8 @@ def stream_callback(p_param, p_result, p_row):
result = TaosResult(p_result) result = TaosResult(p_result)
row = TaosRow(result, p_row) row = TaosRow(result, p_row)
try: try:
ts, count = row() ts, count = row.as_tuple()
print(ts, count)
p = cast(p_param, POINTER(Counter)) p = cast(p_param, POINTER(Counter))
p.contents.count += count p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
......
...@@ -63,7 +63,7 @@ def test_subscribe(conn): ...@@ -63,7 +63,7 @@ def test_subscribe(conn):
def subscribe_callback(p_sub, p_result, p_param, errno): def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("callback") print("callback")
result = TaosResult(p_result) result = TaosResult(c_void_p(p_result))
result.check_error(errno) result.check_error(errno)
for row in result.rows_iter(): for row in result.rows_iter():
ts, n = row() ts, n = row()
...@@ -76,7 +76,7 @@ def test_subscribe_callback(conn): ...@@ -76,7 +76,7 @@ def test_subscribe_callback(conn):
try: try:
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) conn.execute("use %s" % dbname)
conn.execute("create table if not exists log(ts timestamp, n int)") conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback") print("# subscribe with callback")
......
...@@ -120,6 +120,14 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -120,6 +120,14 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
if (pMsg->msgType >= TSDB_MSG_TYPE_MAX) {
dError("RPC %p, shell msg type:%d is not processed", pMsg->handle, pMsg->msgType);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return;
}
SRunStatus dnodeStatus = dnodeGetRunStatus(); SRunStatus dnodeStatus = dnodeGetRunStatus();
if (dnodeStatus == TSDB_RUN_STATUS_STOPPED) { if (dnodeStatus == TSDB_RUN_STATUS_STOPPED) {
dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]);
......
...@@ -230,6 +230,7 @@ typedef struct SSubmitBlk { ...@@ -230,6 +230,7 @@ typedef struct SSubmitBlk {
// Submit message for this TSDB // Submit message for this TSDB
typedef struct SSubmitMsg { typedef struct SSubmitMsg {
SMsgHead header; SMsgHead header;
int8_t extend;
int32_t length; int32_t length;
int32_t numOfBlocks; int32_t numOfBlocks;
char blocks[]; char blocks[];
...@@ -243,6 +244,7 @@ typedef struct { ...@@ -243,6 +244,7 @@ typedef struct {
} SShellSubmitRspBlock; } SShellSubmitRspBlock;
typedef struct { typedef struct {
int8_t extend;
int32_t code; // 0-success, > 0 error code int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written int32_t affectedRows; // number of records actually written
...@@ -278,6 +280,7 @@ typedef struct { ...@@ -278,6 +280,7 @@ typedef struct {
} SMDCreateTableMsg; } SMDCreateTableMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t len; // one create table message int32_t len; // one create table message
char tableName[TSDB_TABLE_FNAME_LEN]; char tableName[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
...@@ -290,11 +293,13 @@ typedef struct { ...@@ -290,11 +293,13 @@ typedef struct {
} SCreateTableMsg; } SCreateTableMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t numOfTables; int32_t numOfTables;
int32_t contLen; int32_t contLen;
} SCMCreateTableMsg; } SCMCreateTableMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
// if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table // if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table
int8_t supertable; int8_t supertable;
...@@ -302,6 +307,7 @@ typedef struct { ...@@ -302,6 +307,7 @@ typedef struct {
} SCMDropTableMsg; } SCMDropTableMsg;
typedef struct { typedef struct {
int8_t extend;
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int16_t type; /* operation type */ int16_t type; /* operation type */
...@@ -314,6 +320,7 @@ typedef struct { ...@@ -314,6 +320,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int8_t extend;
int64_t uid; int64_t uid;
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
...@@ -327,6 +334,7 @@ typedef struct { ...@@ -327,6 +334,7 @@ typedef struct {
} SUpdateTableTagValMsg; } SUpdateTableTagValMsg;
typedef struct { typedef struct {
int8_t extend;
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
...@@ -335,6 +343,7 @@ typedef struct { ...@@ -335,6 +343,7 @@ typedef struct {
} SConnectMsg; } SConnectMsg;
typedef struct { typedef struct {
int8_t extend;
char acctId[TSDB_ACCT_ID_LEN]; char acctId[TSDB_ACCT_ID_LEN];
char serverVersion[TSDB_VERSION_LEN]; char serverVersion[TSDB_VERSION_LEN];
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN];
...@@ -361,16 +370,19 @@ typedef struct { ...@@ -361,16 +370,19 @@ typedef struct {
} SAcctCfg; } SAcctCfg;
typedef struct { typedef struct {
int8_t extend;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
SAcctCfg cfg; SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg; } SCreateAcctMsg, SAlterAcctMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; int8_t extend;
char user[TSDB_USER_LEN];
} SDropUserMsg, SDropAcctMsg; } SDropUserMsg, SDropAcctMsg;
typedef struct { typedef struct {
int8_t extend;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
int8_t privilege; int8_t privilege;
...@@ -462,6 +474,7 @@ typedef struct { ...@@ -462,6 +474,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int8_t extend;
char version[TSDB_VERSION_LEN]; char version[TSDB_VERSION_LEN];
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
...@@ -514,6 +527,7 @@ typedef struct { ...@@ -514,6 +527,7 @@ typedef struct {
} SQueryTableMsg; } SQueryTableMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t code; int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle union{uint64_t qhandle; uint64_t qId;}; // query handle
} SQueryTableRsp; } SQueryTableRsp;
...@@ -521,11 +535,13 @@ typedef struct { ...@@ -521,11 +535,13 @@ typedef struct {
// todo: the show handle should be replaced with id // todo: the show handle should be replaced with id
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
int8_t extend;
union{uint64_t qhandle; uint64_t qId;}; // query handle union{uint64_t qhandle; uint64_t qId;}; // query handle
uint16_t free; uint16_t free;
} SRetrieveTableMsg; } SRetrieveTableMsg;
typedef struct SRetrieveTableRsp { typedef struct SRetrieveTableRsp {
int8_t extend;
int32_t numOfRows; int32_t numOfRows;
int8_t completed; // all results are returned to client int8_t completed; // all results are returned to client
int16_t precision; int16_t precision;
...@@ -551,6 +567,7 @@ typedef struct { ...@@ -551,6 +567,7 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t cacheBlockSize; //MB int32_t cacheBlockSize; //MB
int32_t totalBlocks; int32_t totalBlocks;
...@@ -577,6 +594,7 @@ typedef struct { ...@@ -577,6 +594,7 @@ typedef struct {
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX]; char path[PATH_MAX];
int32_t funcType; int32_t funcType;
...@@ -588,11 +606,13 @@ typedef struct { ...@@ -588,11 +606,13 @@ typedef struct {
} SCreateFuncMsg; } SCreateFuncMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t num; int32_t num;
char name[]; char name[];
} SRetrieveFuncMsg; } SRetrieveFuncMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int32_t funcType; int32_t funcType;
int8_t resType; int8_t resType;
...@@ -603,15 +623,18 @@ typedef struct { ...@@ -603,15 +623,18 @@ typedef struct {
} SFunctionInfoMsg; } SFunctionInfoMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t num; int32_t num;
char content[]; char content[];
} SUdfFuncMsg; } SUdfFuncMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg; } SDropFuncMsg;
typedef struct { typedef struct {
int8_t extend;
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg, SSyncDbMsg; } SDropDbMsg, SUseDbMsg, SSyncDbMsg;
...@@ -744,12 +767,14 @@ typedef struct { ...@@ -744,12 +767,14 @@ typedef struct {
} SCreateVnodeMsg, SAlterVnodeMsg; } SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct { typedef struct {
int8_t extend;
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag; int16_t createFlag;
char tags[]; char tags[];
} STableInfoMsg; } STableInfoMsg;
typedef struct { typedef struct {
int8_t extend;
uint8_t metaClone; // create local clone of the cached table meta uint8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfTables; int32_t numOfTables;
...@@ -758,21 +783,25 @@ typedef struct { ...@@ -758,21 +783,25 @@ typedef struct {
} SMultiTableInfoMsg; } SMultiTableInfoMsg;
typedef struct SSTableVgroupMsg { typedef struct SSTableVgroupMsg {
int8_t extend;
int32_t numOfTables; int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg; } SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t vgId; int32_t vgId;
int8_t numOfEps; int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg, SVgroupInfo; } SVgroupMsg, SVgroupInfo;
typedef struct { typedef struct {
int8_t extend;
int32_t numOfVgroups; int32_t numOfVgroups;
SVgroupMsg vgroups[]; SVgroupMsg vgroups[];
} SVgroupsMsg, SVgroupsInfo; } SVgroupsMsg, SVgroupsInfo;
typedef struct STableMetaMsg { typedef struct STableMetaMsg {
int8_t extend;
int32_t contLen; int32_t contLen;
char tableFname[TSDB_TABLE_FNAME_LEN]; // table id char tableFname[TSDB_TABLE_FNAME_LEN]; // table id
uint8_t numOfTags; uint8_t numOfTags;
...@@ -792,6 +821,7 @@ typedef struct STableMetaMsg { ...@@ -792,6 +821,7 @@ typedef struct STableMetaMsg {
} STableMetaMsg; } STableMetaMsg;
typedef struct SMultiTableMeta { typedef struct SMultiTableMeta {
int8_t extend;
int32_t numOfTables; int32_t numOfTables;
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t numOfUdf; int32_t numOfUdf;
...@@ -814,6 +844,7 @@ typedef struct { ...@@ -814,6 +844,7 @@ typedef struct {
* payloadLen is the length of payload * payloadLen is the length of payload
*/ */
typedef struct { typedef struct {
int8_t extend;
int8_t type; int8_t type;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
uint16_t payloadLen; uint16_t payloadLen;
...@@ -821,17 +852,20 @@ typedef struct { ...@@ -821,17 +852,20 @@ typedef struct {
} SShowMsg; } SShowMsg;
typedef struct { typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t vgid[]; int32_t vgid[];
} SCompactMsg; } SCompactMsg;
typedef struct SShowRsp { typedef struct SShowRsp {
int8_t extend;
uint64_t qhandle; uint64_t qhandle;
STableMetaMsg tableMeta; STableMetaMsg tableMeta;
} SShowRsp; } SShowRsp;
typedef struct { typedef struct {
int8_t extend;
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCreateDnodeMsg, SDropDnodeMsg; } SCreateDnodeMsg, SDropDnodeMsg;
...@@ -853,6 +887,7 @@ typedef struct { ...@@ -853,6 +887,7 @@ typedef struct {
} SConfigVnodeMsg; } SConfigVnodeMsg;
typedef struct { typedef struct {
int8_t extend;
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
char config[64]; char config[64];
} SCfgDnodeMsg; } SCfgDnodeMsg;
...@@ -884,6 +919,7 @@ typedef struct { ...@@ -884,6 +919,7 @@ typedef struct {
} SStreamDesc; } SStreamDesc;
typedef struct { typedef struct {
int8_t extend;
char clientVer[TSDB_VERSION_LEN]; char clientVer[TSDB_VERSION_LEN];
uint32_t connId; uint32_t connId;
int32_t pid; int32_t pid;
...@@ -894,6 +930,7 @@ typedef struct { ...@@ -894,6 +930,7 @@ typedef struct {
} SHeartBeatMsg; } SHeartBeatMsg;
typedef struct { typedef struct {
int8_t extend;
uint32_t queryId; uint32_t queryId;
uint32_t streamId; uint32_t streamId;
uint32_t totalDnodes; uint32_t totalDnodes;
...@@ -904,10 +941,12 @@ typedef struct { ...@@ -904,10 +941,12 @@ typedef struct {
} SHeartBeatRsp; } SHeartBeatRsp;
typedef struct { typedef struct {
int8_t extend;
char queryId[TSDB_KILL_MSG_LEN + 1]; char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnMsg; } SKillQueryMsg, SKillStreamMsg, SKillConnMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t vnode; int32_t vnode;
int32_t sid; int32_t sid;
uint64_t uid; uint64_t uid;
...@@ -932,6 +971,16 @@ typedef struct { ...@@ -932,6 +971,16 @@ typedef struct {
char reserved2[64]; char reserved2[64];
} SStartupStep; } SStartupStep;
typedef struct {
int16_t type;
int32_t len;
char value[];
} STLV;
enum {
TLV_TYPE_DUMMY = 1,
};
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -8145,6 +8145,32 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -8145,6 +8145,32 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
goto _cleanup; goto _cleanup;
} }
/*
//MSG EXTEND DEMO
if (pQueryMsg->extend) {
pMsg += pQueryMsg->sqlstrLen;
STLV *tlv = NULL;
while (1) {
tlv = (STLV *)pMsg;
tlv->type = ntohs(tlv->type);
tlv->len = ntohl(tlv->len);
if (tlv->len > 0) {
*(int16_t *)tlv->value = ntohs(*(int16_t *)tlv->value);
qDebug("Got TLV,type:%d,len:%d,value:%d", tlv->type, tlv->len, *(int16_t*)tlv->value);
pMsg += sizeof(*tlv) + tlv->len;
continue;
}
break;
}
}
*/
qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
......
...@@ -153,10 +153,14 @@ class TDTestCase: ...@@ -153,10 +153,14 @@ class TDTestCase:
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 2, 's1') tdSql.checkData(0, 2, 's1')
tdSql.checkData(1, 2, 's0') tdSql.checkData(1, 2, 's0')
tdSql.execute('kill stream %s ;' % tdSql.queryResult[0][0])
time.sleep(5)
tdSql.query("show streams")
tdSql.checkRows(1)
def stop(self): def stop(self):
tdSql.close() #tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
...@@ -2722,6 +2722,38 @@ int stmt_funcb_autoctb_e5(TAOS_STMT *stmt) { ...@@ -2722,6 +2722,38 @@ int stmt_funcb_autoctb_e5(TAOS_STMT *stmt) {
} }
int stmt_funcb_autoctb_e6(TAOS_STMT *stmt) {
char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(now,?,?,?,?,?,?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("case success:failed to execute taos_stmt_prepare. code:%s\n", taos_stmt_errstr(stmt));
}
return 0;
}
int stmt_funcb_autoctb_e7(TAOS_STMT *stmt) {
char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(?,true,?,?,?,?,?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("case success:failed to execute taos_stmt_prepare. code:%s\n", taos_stmt_errstr(stmt));
}
return 0;
}
int stmt_funcb_autoctb_e8(TAOS_STMT *stmt) {
char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(?,?,1,?,?,?,?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("case success:failed to execute taos_stmt_prepare. code:%s\n", taos_stmt_errstr(stmt));
}
return 0;
}
//300 tables 60 records //300 tables 60 records
int stmt_funcb1(TAOS_STMT *stmt) { int stmt_funcb1(TAOS_STMT *stmt) {
...@@ -4857,6 +4889,44 @@ void* runcase(void *par) { ...@@ -4857,6 +4889,44 @@ void* runcase(void *par) {
#endif #endif
#if 1
prepare(taos, 1, 0);
stmt = taos_stmt_init(taos);
printf("e6 start\n");
stmt_funcb_autoctb_e6(stmt);
printf("e6 end\n");
taos_stmt_close(stmt);
#endif
#if 1
prepare(taos, 1, 0);
stmt = taos_stmt_init(taos);
printf("e7 start\n");
stmt_funcb_autoctb_e7(stmt);
printf("e7 end\n");
taos_stmt_close(stmt);
#endif
#if 1
prepare(taos, 1, 0);
stmt = taos_stmt_init(taos);
printf("e8 start\n");
stmt_funcb_autoctb_e8(stmt);
printf("e8 end\n");
taos_stmt_close(stmt);
#endif
#if 1 #if 1
prepare(taos, 1, 0); prepare(taos, 1, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册