提交 511cdbea 编写于 作者: H Haojun Liao

[td-4038]

上级 a1fe6660
...@@ -204,10 +204,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI ...@@ -204,10 +204,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: bindColDataImp * Method: bindColDataImp
* Signature: (J[BIIIJ)J * Signature: (J[B[BIIIIJ)J
*/ */
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
(JNIEnv *, jobject, jlong, jbyteArray, jint, jint, jint, jlong); (JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
......
...@@ -746,14 +746,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI ...@@ -746,14 +746,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
return JNI_TDENGINE_ERROR; return JNI_TDENGINE_ERROR;
} }
jniDebug("jobj:%p, conn:%p, set stmt bind table name", jobj, tsconn); jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
(*env)->ReleaseStringUTFChars(env, jname, name); (*env)->ReleaseStringUTFChars(env, jname, name);
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(JNIEnv *env, jobject jobj, jlong stmt, JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(JNIEnv *env, jobject jobj, jlong stmt,
jbyteArray data, jint dataType, jint numOfRows, jint colIndex, jlong con) { jbyteArray data, jbyteArray length, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj); jniError("jobj:%p, connection already closed", jobj);
...@@ -766,15 +766,50 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J ...@@ -766,15 +766,50 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
return JNI_SQL_NULL; return JNI_SQL_NULL;
} }
#if 0 // todo refactor
TAOS_BIND* b = malloc(20); jsize len = (*env)->GetArrayLength(env, data);
b.num= jrows; char *colBuf = (char *)calloc(1, sizeof(char) * len);
int32_t code = taos_stmt_bind_param_batch(stmt, b, colInex); (*env)->GetByteArrayRegion(env, data, 0, len, (jbyte *)colBuf);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, length);
char *lengthArray = (char*) calloc(1, sizeof(char) * len);
(*env)->GetByteArrayRegion(env, length, 0, len, (jbyte*) lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
// bind multi-rows with only one invoke.
TAOS_MULTI_BIND* b = calloc(1, sizeof(TAOS_MULTI_BIND));
b->num = numOfRows;
b->buffer_type = dataType; // todo check data type
b->buffer_length = tDataTypes[dataType].bytes;
b->is_null = calloc(numOfRows, sizeof(int32_t));
b->buffer = colBuf;
b->length = (uintptr_t*)lengthArray;
// set the length and is_null array
switch(dataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
int32_t bytes = tDataTypes[dataType].bytes;
for(int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
b->is_null[i] = isNull(colBuf + bytes * i, dataType);
}
}
}
int32_t code = taos_stmt_bind_single_param_batch(pStmt, b, colIndex);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR; return JNI_TDENGINE_ERROR;
} }
#endif
return JNI_SUCCESS; return JNI_SUCCESS;
} }
...@@ -792,6 +827,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J ...@@ -792,6 +827,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
return JNI_SQL_NULL; return JNI_SQL_NULL;
} }
taos_stmt_add_batch(pStmt);
int32_t code = taos_stmt_execute(pStmt); int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
......
...@@ -1306,10 +1306,9 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { ...@@ -1306,10 +1306,9 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
} }
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name); pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
pStmt->mtb.nameSet = true; pStmt->mtb.nameSet = true;
tscDebug("sqlstr set to %s", pSql->sqlstr); tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pSql->cmd.parseFinished = 0; pSql->cmd.parseFinished = 0;
pSql->cmd.numOfParams = 0; pSql->cmd.numOfParams = 0;
...@@ -1350,7 +1349,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { ...@@ -1350,7 +1349,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid)); taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
tscDebug("table:%s is prepared, uid:%" PRIu64, name, pStmt->mtb.currentUid); tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
} }
return code; return code;
......
...@@ -1319,7 +1319,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) { ...@@ -1319,7 +1319,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
pBlocks->numOfRows = 0; pBlocks->numOfRows = 0;
}else { }else {
tscDebug("table %s data block is empty", pOneTableBlock->tableName.tname); tscDebug("0x%"PRIx64" table %s data block is empty", pSql->self, pOneTableBlock->tableName.tname);
} }
p = taosHashIterate(pCmd->pTableBlockHashList, p); p = taosHashIterate(pCmd->pTableBlockHashList, p);
......
...@@ -49,7 +49,7 @@ public class TSDBConnection extends AbstractConnection { ...@@ -49,7 +49,7 @@ public class TSDBConnection extends AbstractConnection {
this.databaseMetaData.setConnection(this); this.databaseMetaData.setConnection(this);
} }
public TSDBJNIConnector getConnection() { public TSDBJNIConnector getConnector() {
return this.connector; return this.connector;
} }
...@@ -58,7 +58,7 @@ public class TSDBConnection extends AbstractConnection { ...@@ -58,7 +58,7 @@ public class TSDBConnection extends AbstractConnection {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
} }
return new TSDBStatement(this, this.connector); return new TSDBStatement(this);
} }
public TSDBSubscribe subscribe(String topic, String sql, boolean restart) throws SQLException { public TSDBSubscribe subscribe(String topic, String sql, boolean restart) throws SQLException {
...@@ -74,14 +74,18 @@ public class TSDBConnection extends AbstractConnection { ...@@ -74,14 +74,18 @@ public class TSDBConnection extends AbstractConnection {
} }
public PreparedStatement prepareStatement(String sql) throws SQLException { public PreparedStatement prepareStatement(String sql) throws SQLException {
if (isClosed()) if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
return new TSDBPreparedStatement(this, this.connector, sql); }
return new TSDBPreparedStatement(this, sql);
} }
public void close() throws SQLException { public void close() throws SQLException {
if (isClosed) if (isClosed) {
return; return;
}
this.connector.closeConnection(); this.connector.closeConnection();
this.isClosed = true; this.isClosed = true;
} }
......
...@@ -30,10 +30,13 @@ public class TSDBJNIConnector { ...@@ -30,10 +30,13 @@ public class TSDBJNIConnector {
private static volatile Boolean isInitialized = false; private static volatile Boolean isInitialized = false;
private TaosInfo taosInfo = TaosInfo.getInstance(); private TaosInfo taosInfo = TaosInfo.getInstance();
// Connection pointer used in C // Connection pointer used in C
private long taos = TSDBConstants.JNI_NULL_POINTER; private long taos = TSDBConstants.JNI_NULL_POINTER;
// result set status in current connection // result set status in current connection
private boolean isResultsetClosed = true; private boolean isResultsetClosed = true;
private int affectedRows = -1; private int affectedRows = -1;
static { static {
...@@ -163,37 +166,14 @@ public class TSDBJNIConnector { ...@@ -163,37 +166,14 @@ public class TSDBJNIConnector {
private native long isUpdateQueryImp(long connection, long pSql); private native long isUpdateQueryImp(long connection, long pSql);
/** /**
* Free resultset operation from C to release resultset pointer by JNI * Free result set operation from C to release result set pointer by JNI
*/ */
public int freeResultSet(long pSql) { public int freeResultSet(long pSql) {
int res = TSDBConstants.JNI_SUCCESS; int res = this.freeResultSetImp(this.taos, pSql);
// if (result != taosResultSetPointer && taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
// throw new RuntimeException("Invalid result set pointer");
// }
// if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
res = this.freeResultSetImp(this.taos, pSql);
// taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
// }
isResultsetClosed = true; isResultsetClosed = true;
return res; return res;
} }
/**
* Close the open result set which is associated to the current connection. If the result set is already
* closed, return 0 for success.
*/
// public int freeResultSet() {
// int resCode = TSDBConstants.JNI_SUCCESS;
// if (!isResultsetClosed) {
// resCode = this.freeResultSetImp(this.taos, this.taosResultSetPointer);
// taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
// isResultsetClosed = true;
// }
// return resCode;
// }
private native int freeResultSetImp(long connection, long result); private native int freeResultSetImp(long connection, long result);
/** /**
...@@ -240,6 +220,7 @@ public class TSDBJNIConnector { ...@@ -240,6 +220,7 @@ public class TSDBJNIConnector {
*/ */
public void closeConnection() throws SQLException { public void closeConnection() throws SQLException {
int code = this.closeConnectionImp(this.taos); int code = this.closeConnectionImp(this.taos);
if (code < 0) { if (code < 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
} else if (code == 0) { } else if (code == 0) {
...@@ -247,6 +228,7 @@ public class TSDBJNIConnector { ...@@ -247,6 +228,7 @@ public class TSDBJNIConnector {
} else { } else {
throw new SQLException("Undefined error code returned by TDengine when closing a connection"); throw new SQLException("Undefined error code returned by TDengine when closing a connection");
} }
// invoke closeConnectionImpl only here // invoke closeConnectionImpl only here
taosInfo.connect_close_increment(); taosInfo.connect_close_increment();
} }
...@@ -283,7 +265,7 @@ public class TSDBJNIConnector { ...@@ -283,7 +265,7 @@ public class TSDBJNIConnector {
private native void unsubscribeImp(long subscription, boolean isKeep); private native void unsubscribeImp(long subscription, boolean isKeep);
/** /**
* Validate if a <I>create table</I> sql statement is correct without actually creating that table * Validate if a <I>create table</I> SQL statement is correct without actually creating that table
*/ */
public boolean validateCreateTableSql(String sql) { public boolean validateCreateTableSql(String sql) {
int res = validateCreateTableSqlImp(taos, sql.getBytes()); int res = validateCreateTableSqlImp(taos, sql.getBytes());
...@@ -295,7 +277,7 @@ public class TSDBJNIConnector { ...@@ -295,7 +277,7 @@ public class TSDBJNIConnector {
public long prepareStmt(String sql) throws SQLException { public long prepareStmt(String sql) throws SQLException {
Long stmt = 0L; Long stmt = 0L;
try { try {
stmt = prepareStmtImp(sql, this.taos); stmt = prepareStmtImp(sql.getBytes(), this.taos);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING);
...@@ -316,7 +298,7 @@ public class TSDBJNIConnector { ...@@ -316,7 +298,7 @@ public class TSDBJNIConnector {
return stmt; return stmt;
} }
private native long prepareStmtImp(String sql, long con); private native long prepareStmtImp(byte[] sql, long con);
public int setBindTableName(long stmt, String tableName) { public int setBindTableName(long stmt, String tableName) {
return setBindTableNameImp(stmt, tableName, this.taos); return setBindTableNameImp(stmt, tableName, this.taos);
...@@ -324,11 +306,11 @@ public class TSDBJNIConnector { ...@@ -324,11 +306,11 @@ public class TSDBJNIConnector {
private native int setBindTableNameImp(long stmt, String name, long conn); private native int setBindTableNameImp(long stmt, String name, long conn);
public int bindColumnDataArray(long stmt, byte[] data, int type, int numOfRows, int columnIndex) { public int bindColumnDataArray(long stmt, ByteBuffer colList, ByteBuffer lengthList, int type, int bytes, int numOfRows,int columnIndex) {
return bindColDataImp(stmt, data, type, numOfRows, columnIndex, this.taos); return bindColDataImp(stmt, colList.array(), lengthList.array(), type, bytes, numOfRows, columnIndex, this.taos);
} }
private native int bindColDataImp(long stmt, byte[] data, int type, int numOfRows, int columnIndex, long conn); private native int bindColDataImp(long stmt, byte[] data, byte[] length, int type, int bytes, int numOfRows, int columnIndex, long conn);
public int executeBatch(long stmt) { public int executeBatch(long stmt) {
return executeBatchImp(stmt, this.taos); return executeBatchImp(stmt, this.taos);
......
...@@ -22,8 +22,7 @@ import java.io.Reader; ...@@ -22,8 +22,7 @@ import java.io.Reader;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.DoubleBuffer; import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
...@@ -45,12 +44,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -45,12 +44,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private int type; private int type;
private String tableName; private String tableName;
private long nativeStmtPtr = 0; private long nativeStmtHandle = 0;
private volatile TSDBParameterMetaData parameterMetaData; private volatile TSDBParameterMetaData parameterMetaData;
TSDBPreparedStatement(TSDBConnection connection, TSDBJNIConnector connector, String sql) { TSDBPreparedStatement(TSDBConnection connection, String sql) {
super(connection, connector); super(connection);
init(sql); init(sql);
int parameterCnt = 0; int parameterCnt = 0;
...@@ -64,8 +63,9 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -64,8 +63,9 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
this.isPrepared = true; this.isPrepared = true;
} }
this.colData = new ArrayList<ColumnInfo>(parameterCnt); // the table name is also a parameter, so ignore it.
this.colData.addAll(Collections.nCopies(parameterCnt, null)); this.colData = new ArrayList<ColumnInfo>(parameterCnt - 1);
this.colData.addAll(Collections.nCopies(parameterCnt - 1, null));
} }
private void init(String sql) { private void init(String sql) {
...@@ -543,12 +543,15 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -543,12 +543,15 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private int type; private int type;
private boolean typeIsSet; private boolean typeIsSet;
public void ClumnInfo() { public ColumnInfo() {
this.typeIsSet = false; this.typeIsSet = false;
} }
public void setType(int type) { public void setType(int type) throws SQLException {
Assert.check(!this.typeIsSet); if (this.isTypeSet()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data type has been set");
}
this.typeIsSet = true; this.typeIsSet = true;
this.type = type; this.type = type;
} }
...@@ -562,106 +565,160 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -562,106 +565,160 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
this.tableName = name; this.tableName = name;
} }
@SuppressWarnings("unchecked") public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type) throws SQLException {
public void setInt(int columnIndex, ArrayList<Integer> list) throws SQLException {
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex); ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex);
if (col == null) { if (col == null) {
ColumnInfo p = new ColumnInfo(); ColumnInfo p = new ColumnInfo();
p.setType(TSDBConstants.TSDB_DATA_TYPE_INT); p.setType(type);
p.data = (ArrayList) list.clone(); p.data = (ArrayList<?>) list.clone();
this.colData.set(columnIndex, p); this.colData.set(columnIndex, p);
} else { } else {
if (col.type != TSDBConstants.TSDB_DATA_TYPE_INT) { if (col.type != type) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data type mismatch");
} }
col.data.addAll(list); col.data.addAll(list);
} }
} }
@SuppressWarnings("unchecked") public void setInt(int columnIndex, ArrayList<Integer> list) throws SQLException {
public void setFloat(int columnIndex, ArrayList<Float> list) throws SQLException { setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_INT);
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex); }
if (col == null) {
ColumnInfo p = new ColumnInfo(); public void setFloat(int columnIndex, ArrayList<Float> list) throws SQLException {
p.setType(TSDBConstants.TSDB_DATA_TYPE_INT); setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_FLOAT);
p.data = (ArrayList) list.clone(); }
this.colData.set(columnIndex, p);
} else { public void setTimestamp(int columnIndex, ArrayList<Long> list) throws SQLException {
if (col.type != TSDBConstants.TSDB_DATA_TYPE_INT) { setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); }
}
public void setLong(int columnIndex, ArrayList<Long> list) throws SQLException {
col.data.addAll(list); setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BIGINT);
} }
public void setDouble(int columnIndex, ArrayList<Double> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_DOUBLE);
}
public void setBoolean(int columnIndex, ArrayList<Boolean> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BOOL);
} }
public void addColumnDataBatch() { public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TINYINT);
}
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_SMALLINT);
}
public void setString(int columnIndex, ArrayList<String> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BINARY);
}
public void setNString(int columnIndex, ArrayList<String> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_NCHAR);
}
public void columnDataAddBatch() {
// do nothing // do nothing
} }
public void columnDataExecuteBatch() { public void columnDataExecuteBatch() throws SQLException {
int size = this.colData.size(); int numOfCols = this.colData.size();
ColumnInfo col = (ColumnInfo) this.colData.get(0); int rows = ((ColumnInfo) this.colData.get(0)).data.size();
int rows = col.data.size();
// pass the data block to native code // pass the data block to native code
TSDBJNIConnector conn = null; TSDBJNIConnector connector = null;
try { try {
conn = (TSDBJNIConnector) this.getConnection(); connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtPtr = conn.prepareStmt(rawSql); this.nativeStmtHandle = connector.prepareStmt(rawSql);
conn.setBindTableName(this.nativeStmtPtr, this.tableName);
// table name is not set yet, abort
if (this.tableName == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet");
}
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }
for (int i = 0; i < size; ++i) { int bytes = 0;
for (int i = 0; i < numOfCols; ++i) {
ColumnInfo col1 = this.colData.get(i); ColumnInfo col1 = this.colData.get(i);
Assert.check(col.isTypeSet()); if (!col1.isTypeSet()) {
ByteBuffer ib = ByteBuffer.allocate(rows); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
}
if (rows != col1.data.size()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical");
}
ByteBuffer bbuf = null;
ByteBuffer lengthBuf = ByteBuffer.allocate(rows * Integer.BYTES);
lengthBuf.order(ByteOrder.LITTLE_ENDIAN);
switch (col1.type) { switch (col1.type) {
case TSDBConstants.TSDB_DATA_TYPE_INT: { case TSDBConstants.TSDB_DATA_TYPE_INT: {
bbuf = ByteBuffer.allocate(rows * Integer.BYTES);
bbuf.order(ByteOrder.LITTLE_ENDIAN);
for (int j = 0; j < rows; ++j) { for (int j = 0; j < rows; ++j) {
Integer val = (Integer) col.data.get(j); Integer val = (Integer) col1.data.get(j);
if (val == null) { if (val == null) {
ib.putInt(Integer.MIN_VALUE); bbuf.putInt(j * Integer.BYTES, Integer.MIN_VALUE);
} else { } else {
ib.putInt((int) col.data.get(j)); bbuf.putInt(j * Integer.BYTES, val);
} }
lengthBuf.putInt(j * Integer.BYTES, Integer.BYTES);
} }
bytes = Integer.BYTES;
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: { case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
bbuf = ByteBuffer.allocate(rows * Long.BYTES);
bbuf.order(ByteOrder.LITTLE_ENDIAN);
for (int j = 0; j < rows; ++j) { for (int j = 0; j < rows; ++j) {
ib.putLong((long) col.data.get(j)); Long val = (Long) col1.data.get(j);
if (val == null) {
bbuf.putLong(j * Long.BYTES, Long.MIN_VALUE);
} else {
bbuf.putLong(j * Long.BYTES, val);
}
lengthBuf.putInt(j * Integer.BYTES, Long.BYTES);
} }
bytes = Long.BYTES;
break; break;
} }
}; };
conn.bindColumnDataArray(this.nativeStmtPtr, ib.array(), col1.type, rows, i); connector.bindColumnDataArray(this.nativeStmtHandle, bbuf, lengthBuf, col1.type, bytes, rows, i);
} }
conn.executeBatch(this.nativeStmtPtr); connector.executeBatch(this.nativeStmtHandle);
} }
public void columnDataClearBatchClear() { public void columnDataClearBatch() {
// TODO clear data in this.colData // TODO clear data in this.colData
} }
public void close() { public void columnDataCloseBatch() {
TSDBJNIConnector conn = null; TSDBJNIConnector connector = null;
try { try {
conn = (TSDBJNIConnector) this.getConnection(); connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtPtr = conn.prepareStmt(rawSql); connector.closeBatch(this.nativeStmtHandle);
conn.setBindTableName(this.nativeStmtPtr, this.tableName); this.nativeStmtHandle = 0L;
this.tableName = null;
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }
conn.closeBatch(this.nativeStmtPtr);
} }
} }
...@@ -19,8 +19,6 @@ import java.sql.ResultSet; ...@@ -19,8 +19,6 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
public class TSDBStatement extends AbstractStatement { public class TSDBStatement extends AbstractStatement {
private TSDBJNIConnector connector;
/** /**
* Status of current statement * Status of current statement
*/ */
...@@ -29,29 +27,26 @@ public class TSDBStatement extends AbstractStatement { ...@@ -29,29 +27,26 @@ public class TSDBStatement extends AbstractStatement {
private TSDBConnection connection; private TSDBConnection connection;
private TSDBResultSet resultSet; private TSDBResultSet resultSet;
public void setConnection(TSDBConnection connection) { TSDBStatement(TSDBConnection connection) {
this.connection = connection;
}
TSDBStatement(TSDBConnection connection, TSDBJNIConnector connector) {
this.connection = connection; this.connection = connection;
this.connector = connector;
} }
public ResultSet executeQuery(String sql) throws SQLException { public ResultSet executeQuery(String sql) throws SQLException {
// check if closed // check if closed
if (isClosed()) if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
//TODO: 如果在executeQuery方法中执行insert语句,那么先执行了SQL,再通过pSql来检查是否为一个insert语句,但这个insert SQL已经执行成功了 //TODO: 如果在executeQuery方法中执行insert语句,那么先执行了SQL,再通过pSql来检查是否为一个insert语句,但这个insert SQL已经执行成功了
// execute query // execute query
long pSql = this.connector.executeQuery(sql); long pSql = this.connection.getConnector().executeQuery(sql);
// if pSql is create/insert/update/delete/alter SQL // if pSql is create/insert/update/delete/alter SQL
if (this.connector.isUpdateQuery(pSql)) { if (this.connection.getConnector().isUpdateQuery(pSql)) {
this.connector.freeResultSet(pSql); this.connection.getConnector().freeResultSet(pSql);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY);
} }
TSDBResultSet res = new TSDBResultSet(this, this.connector, pSql); TSDBResultSet res = new TSDBResultSet(this, this.connection.getConnector(), pSql);
res.setBatchFetch(this.connection.getBatchFetch()); res.setBatchFetch(this.connection.getBatchFetch());
return res; return res;
} }
...@@ -60,14 +55,14 @@ public class TSDBStatement extends AbstractStatement { ...@@ -60,14 +55,14 @@ public class TSDBStatement extends AbstractStatement {
if (isClosed()) if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
long pSql = this.connector.executeQuery(sql); long pSql = this.connection.getConnector().executeQuery(sql);
// if pSql is create/insert/update/delete/alter SQL // if pSql is create/insert/update/delete/alter SQL
if (!this.connector.isUpdateQuery(pSql)) { if (!this.connection.getConnector().isUpdateQuery(pSql)) {
this.connector.freeResultSet(pSql); this.connection.getConnector().freeResultSet(pSql);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEUPDATE); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEUPDATE);
} }
int affectedRows = this.connector.getAffectedRows(pSql); int affectedRows = this.connection.getConnector().getAffectedRows(pSql);
this.connector.freeResultSet(pSql); this.connection.getConnector().freeResultSet(pSql);
return affectedRows; return affectedRows;
} }
...@@ -81,30 +76,29 @@ public class TSDBStatement extends AbstractStatement { ...@@ -81,30 +76,29 @@ public class TSDBStatement extends AbstractStatement {
public boolean execute(String sql) throws SQLException { public boolean execute(String sql) throws SQLException {
// check if closed // check if closed
if (isClosed()) if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
// execute query // execute query
long pSql = this.connector.executeQuery(sql); long pSql = this.connection.getConnector().executeQuery(sql);
// if pSql is create/insert/update/delete/alter SQL // if pSql is create/insert/update/delete/alter SQL
if (this.connector.isUpdateQuery(pSql)) { if (this.connection.getConnector().isUpdateQuery(pSql)) {
this.affectedRows = this.connector.getAffectedRows(pSql); this.affectedRows = this.connection.getConnector().getAffectedRows(pSql);
this.connector.freeResultSet(pSql); this.connection.getConnector().freeResultSet(pSql);
return false; return false;
} }
this.resultSet = new TSDBResultSet(this, this.connector, pSql); this.resultSet = new TSDBResultSet(this, this.connection.getConnector(), pSql);
this.resultSet.setBatchFetch(this.connection.getBatchFetch()); this.resultSet.setBatchFetch(this.connection.getBatchFetch());
return true; return true;
} }
public ResultSet getResultSet() throws SQLException { public ResultSet getResultSet() throws SQLException {
if (isClosed()) if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
// long resultSetPointer = connector.getResultSet(); }
// TSDBResultSet resSet = null;
// if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
// resSet = new TSDBResultSet(connector, resultSetPointer);
// }
return this.resultSet; return this.resultSet;
} }
...@@ -115,12 +109,20 @@ public class TSDBStatement extends AbstractStatement { ...@@ -115,12 +109,20 @@ public class TSDBStatement extends AbstractStatement {
} }
public Connection getConnection() throws SQLException { public Connection getConnection() throws SQLException {
if (isClosed()) if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (this.connector == null) }
if (this.connection.getConnector() == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
}
return this.connection; return this.connection;
} }
public void setConnection(TSDBConnection connection) {
this.connection = connection;
}
public boolean isClosed() throws SQLException { public boolean isClosed() throws SQLException {
return isClosed; return isClosed;
......
...@@ -102,11 +102,10 @@ typedef struct TAOS_BIND { ...@@ -102,11 +102,10 @@ typedef struct TAOS_BIND {
typedef struct TAOS_MULTI_BIND { typedef struct TAOS_MULTI_BIND {
int buffer_type; int buffer_type;
void * buffer; void *buffer;
uintptr_t buffer_length; // unused uintptr_t buffer_length;
uintptr_t *length; uintptr_t *length;
int * is_null; int *is_null;
int num; int num;
} TAOS_MULTI_BIND; } TAOS_MULTI_BIND;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册