提交 29307ce3 编写于 作者: H Haojun Liao

[td-4038]<feature>: support column data batch bind.

上级 2e08de2b
......@@ -204,10 +204,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: bindColDataImp
* Signature: (J[B[BIIIIJ)J
* Signature: (J[B[B[BIIIIJ)J
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......
......@@ -753,7 +753,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(JNIEnv *env, jobject jobj, jlong stmt,
jbyteArray data, jbyteArray length, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
......@@ -767,16 +767,22 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
}
// todo refactor
jsize len = (*env)->GetArrayLength(env, data);
char *colBuf = (char *)calloc(1, sizeof(char) * len);
(*env)->GetByteArrayRegion(env, data, 0, len, (jbyte *)colBuf);
jsize len = (*env)->GetArrayLength(env, colDataList);
char *colBuf = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, colDataList, 0, len, (jbyte *)colBuf);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, length);
char *lengthArray = (char*) calloc(1, sizeof(char) * len);
(*env)->GetByteArrayRegion(env, length, 0, len, (jbyte*) lengthArray);
len = (*env)->GetArrayLength(env, lengthList);
char *lengthArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
char *nullArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
if ((*env)->ExceptionCheck(env)) {
}
......@@ -785,10 +791,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
b->num = numOfRows;
b->buffer_type = dataType; // todo check data type
b->buffer_length = tDataTypes[dataType].bytes;
b->is_null = calloc(numOfRows, sizeof(int32_t));
b->buffer_length = IS_VAR_DATA_TYPE(dataType)? dataBytes:tDataTypes[dataType].bytes;
b->is_null = nullArray;
b->buffer = colBuf;
b->length = (uintptr_t*)lengthArray;
b->length = (int32_t*)lengthArray;
// set the length and is_null array
switch(dataType) {
......@@ -800,8 +806,13 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
int32_t bytes = tDataTypes[dataType].bytes;
for(int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
b->is_null[i] = isNull(colBuf + bytes * i, dataType);
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
// do nothing
}
}
......
......@@ -148,7 +148,7 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
break;
default:
tscDebug("0x%"PRIx64" param %d: type mismatch or invalid", stmt->pSql->self, i);
tscDebug("0x%"PRIx64" bind column%d: type mismatch or invalid", stmt->pSql->self, i);
return TSDB_CODE_TSC_INVALID_VALUE;
}
}
......@@ -776,7 +776,7 @@ static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MU
}
} else if (param->type == TSDB_DATA_TYPE_BINARY) {
if (bind->length[i] > (uintptr_t)param->bytes) {
tscError("invalid binary length");
tscError("binary length too long, ignore it, expect:%d, actual:%d", param->bytes, (int32_t)bind->length[i]);
return TSDB_CODE_TSC_INVALID_VALUE;
}
int16_t bsize = (short)bind->length[i];
......@@ -784,9 +784,10 @@ static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MU
} else if (param->type == TSDB_DATA_TYPE_NCHAR) {
int32_t output = 0;
if (!taosMbsToUcs4(bind->buffer + bind->buffer_length * i, bind->length[i], varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
tscError("convert failed");
tscError("convert nchar string to UCS4_LE failed:%s", (char*)(bind->buffer + bind->buffer_length * i));
return TSDB_CODE_TSC_INVALID_VALUE;
}
varDataSetLen(data + param->offset, output);
}
}
......@@ -850,7 +851,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
int code = doBindParam(pBlock, data, param, &bind[param->idx], 1);
if (code != TSDB_CODE_SUCCESS) {
tscDebug("0x%"PRIx64" param %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
tscDebug("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
return code;
}
}
......@@ -920,7 +921,7 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c
int code = doBindBatchParam(pBlock, param, &bind[param->idx], pCmd->batchSize);
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" param %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
tscError("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
return code;
}
}
......@@ -931,7 +932,7 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c
int code = doBindBatchParam(pBlock, param, bind, pCmd->batchSize);
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" param %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
tscError("0x%"PRIx64" bind column %d: type mismatch or invalid", pStmt->pSql->self, param->idx);
return code;
}
......
......@@ -306,11 +306,11 @@ public class TSDBJNIConnector {
private native int setBindTableNameImp(long stmt, String name, long conn);
public int bindColumnDataArray(long stmt, ByteBuffer colList, ByteBuffer lengthList, int type, int bytes, int numOfRows,int columnIndex) {
return bindColDataImp(stmt, colList.array(), lengthList.array(), type, bytes, numOfRows, columnIndex, this.taos);
public int bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) {
return bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
}
private native int bindColDataImp(long stmt, byte[] data, byte[] length, int type, int bytes, int numOfRows, int columnIndex, long conn);
private native int bindColDataImp(long stmt, byte[] colDataList, byte[] lengthList, byte[] isNullList, int type, int bytes, int numOfRows, int columnIndex, long conn);
public int executeBatch(long stmt) {
return executeBatchImp(stmt, this.taos);
......
......@@ -18,6 +18,7 @@ import com.taosdata.jdbc.utils.Utils;
import java.io.InputStream;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.net.URL;
import java.nio.ByteBuffer;
......@@ -40,8 +41,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private boolean isPrepared;
private ArrayList<ColumnInfo> colData;
private int type;
private String tableName;
private long nativeStmtHandle = 0;
......@@ -540,6 +539,7 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
@SuppressWarnings("rawtypes")
private ArrayList data;
private int type;
private int bytes;
private boolean typeIsSet;
public ColumnInfo() {
......@@ -564,60 +564,61 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
this.tableName = name;
}
public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type) throws SQLException {
public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type, int bytes) throws SQLException {
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex);
if (col == null) {
ColumnInfo p = new ColumnInfo();
p.setType(type);
p.bytes = bytes;
p.data = (ArrayList<?>) list.clone();
this.colData.set(columnIndex, p);
} else {
if (col.type != type) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data type mismatch");
}
col.data.addAll(list);
}
}
public void setInt(int columnIndex, ArrayList<Integer> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_INT);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_INT, Integer.BYTES);
}
public void setFloat(int columnIndex, ArrayList<Float> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_FLOAT);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_FLOAT, Float.BYTES);
}
public void setTimestamp(int columnIndex, ArrayList<Long> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP, Long.BYTES);
}
public void setLong(int columnIndex, ArrayList<Long> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BIGINT);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BIGINT, Long.BYTES);
}
public void setDouble(int columnIndex, ArrayList<Double> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_DOUBLE);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_DOUBLE, Double.BYTES);
}
public void setBoolean(int columnIndex, ArrayList<Boolean> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BOOL);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BOOL, Byte.BYTES);
}
public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TINYINT);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_TINYINT, Byte.BYTES);
}
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_SMALLINT);
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_SMALLINT, Short.BYTES);
}
public void setString(int columnIndex, ArrayList<String> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BINARY);
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_BINARY, size);
}
public void setNString(int columnIndex, ArrayList<String> list) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_NCHAR);
// note: expand the required space for each NChar character
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException {
setValueImpl(columnIndex, list, TSDBConstants.TSDB_DATA_TYPE_NCHAR, size * Integer.BYTES);
}
public void columnDataAddBatch() {
......@@ -643,11 +644,9 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
e.printStackTrace();
}
int bytes = 0;
for (int i = 0; i < numOfCols; ++i) {
ColumnInfo col1 = this.colData.get(i);
if (!col1.isTypeSet()) {
if (col1 == null || !col1.isTypeSet()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
}
......@@ -655,51 +654,122 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical");
}
ByteBuffer bbuf = null;
ByteBuffer colDataList = ByteBuffer.allocate(rows * col1.bytes);
colDataList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthBuf = ByteBuffer.allocate(rows * Integer.BYTES);
lengthBuf.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthList = ByteBuffer.allocate(rows * Integer.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer isNullList = ByteBuffer.allocate(rows * Byte.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
switch (col1.type) {
case TSDBConstants.TSDB_DATA_TYPE_INT: {
bbuf = ByteBuffer.allocate(rows * Integer.BYTES);
bbuf.order(ByteOrder.LITTLE_ENDIAN);
for (int j = 0; j < rows; ++j) {
Integer val = (Integer) col1.data.get(j);
if (val == null) {
bbuf.putInt(j * Integer.BYTES, Integer.MIN_VALUE);
} else {
bbuf.putInt(j * Integer.BYTES, val);
}
lengthBuf.putInt(j * Integer.BYTES, Integer.BYTES);
colDataList.putInt(val == null? Integer.MIN_VALUE:val);
isNullList.put((byte) (val == null? 1:0));
}
bytes = Integer.BYTES;
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
bbuf = ByteBuffer.allocate(rows * Long.BYTES);
bbuf.order(ByteOrder.LITTLE_ENDIAN);
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
for (int j = 0; j < rows; ++j) {
Byte val = (Byte) col1.data.get(j);
colDataList.put(val == null? 0:val);
isNullList.put((byte) (val == null? 1:0));
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
for (int j = 0; j < rows; ++j) {
Byte val = (Byte) col1.data.get(j);
colDataList.put(val == null? 0:val);
isNullList.put((byte) (val == null? 1:0));
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
for (int j = 0; j < rows; ++j) {
Short val = (Short) col1.data.get(j);
colDataList.putShort(val == null? 0:val);
isNullList.put((byte) (val == null? 1:0));
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
for (int j = 0; j < rows; ++j) {
Long val = (Long) col1.data.get(j);
if (val == null) {
bbuf.putLong(j * Long.BYTES, Long.MIN_VALUE);
colDataList.putLong(val == null? 0:val);
isNullList.put((byte) (val == null? 1:0));
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
for (int j = 0; j < rows; ++j) {
Float val = (Float) col1.data.get(j);
colDataList.putFloat(val == null? 0:val);
isNullList.put((byte) (val == null? 1:0));
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
for (int j = 0; j < rows; ++j) {
Double val = (Double) col1.data.get(j);
colDataList.putDouble(val == null? 0:val);
isNullList.put((byte) (val == null? 1:0));
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
String charset = TaosGlobalConfig.getCharset();
for (int j = 0; j < rows; ++j) {
String val = (String) col1.data.get(j);
if (val != null && val.length() > col1.bytes) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "string data too long");
}
colDataList.position(j * col1.bytes); // seek to the correct position
if (val != null) {
byte[] b = null;
try {
if (col1.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
b = val.getBytes();
} else {
b = val.getBytes(charset);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
colDataList.put(b);
lengthList.putInt(b.length);
isNullList.put((byte) 0);
} else {
bbuf.putLong(j * Long.BYTES, val);
lengthList.putInt(0);
isNullList.put((byte) 1);
}
lengthBuf.putInt(j * Integer.BYTES, Long.BYTES);
}
bytes = Long.BYTES;
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
case TSDBConstants.TSDB_DATA_TYPE_UBIGINT: {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
}
};
connector.bindColumnDataArray(this.nativeStmtHandle, bbuf, lengthBuf, col1.type, bytes, rows, i);
connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i);
}
connector.executeBatch(this.nativeStmtHandle);
......
......@@ -438,8 +438,8 @@ public class TSDBResultSetBlockData {
}
try {
String ss = TaosGlobalConfig.getCharset();
return new String(dest, ss);
String charset = TaosGlobalConfig.getCharset();
return new String(dest, charset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
......
......@@ -11,11 +11,11 @@ public class NullType {
public static boolean isBooleanNull(byte val) {
return val == NullType.NULL_BOOL_VAL;
}
public static boolean isTinyIntNull(byte val) {
return val == Byte.MIN_VALUE;
}
public static boolean isSmallIntNull(short val) {
return val == Short.MIN_VALUE;
}
......@@ -23,19 +23,19 @@ public class NullType {
public static boolean isIntNull(int val) {
return val == Integer.MIN_VALUE;
}
public static boolean isBigIntNull(long val) {
return val == Long.MIN_VALUE;
}
public static boolean isFloatNull(float val) {
return Float.isNaN(val);
}
public static boolean isDoubleNull(double val) {
return Double.isNaN(val);
}
public static boolean isBinaryNull(byte[] val, int length) {
if (length != Byte.BYTES) {
return false;
......@@ -43,7 +43,7 @@ public class NullType {
return val[0] == 0xFF;
}
public static boolean isNcharNull(byte[] val, int length) {
if (length != Integer.BYTES) {
return false;
......@@ -51,5 +51,41 @@ public class NullType {
return (val[0] & val[1] & val[2] & val[3]) == 0xFF;
}
public static byte getBooleanNull() {
return NullType.NULL_BOOL_VAL;
}
public static byte getTinyintNull() {
return Byte.MIN_VALUE;
}
public static int getIntNull() {
return Integer.MIN_VALUE;
}
public static short getSmallIntNull() {
return Short.MIN_VALUE;
}
public static long getBigIntNull() {
return Long.MIN_VALUE;
}
public static int getFloatNull() {
return 0x7FF00000;
}
public static long getDoubleNull() {
return 0x7FFFFF0000000000L;
}
public static byte getBinaryNull() {
return (byte) 0xFF;
}
public static byte[] getNcharNull() {
return new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
}
}
......@@ -104,8 +104,8 @@ typedef struct TAOS_MULTI_BIND {
int buffer_type;
void *buffer;
uintptr_t buffer_length;
uintptr_t *length;
int *is_null;
int32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册