提交 ca45800e 编写于 作者: Q qiaojialin

make session module depend on service-rpc instead of jdbc

上级 07775693
......@@ -38,11 +38,11 @@ public class Config {
*/
public static final String DEFAULT_SERIES_NAME = "default";
public static final String AUTH_USER = "user";
public static final String DEFAULT_USER = "user";
public static final String AUTH_USER = "root";
public static final String DEFAULT_USER = "root";
public static final String AUTH_PASSWORD = "password";
public static final String DEFALUT_PASSWORD = "password";
public static final String AUTH_PASSWORD = "root";
public static final String DEFALUT_PASSWORD = "root";
public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL = 1000;
......
......@@ -18,10 +18,6 @@
*/
package org.apache.iotdb.jdbc;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
......@@ -42,6 +38,8 @@ import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
......@@ -94,15 +92,10 @@ public class IoTDBConnection implements Connection {
// open client session
openSession();
// Wrap the client with a thread-safe proxy to serialize the RPC calls
client = newSynchronizedClient(client);
client = RpcUtils.newSynchronizedClient(client);
autoCommit = false;
}
public static TSIService.Iface newSynchronizedClient(TSIService.Iface client) {
return (TSIService.Iface) Proxy.newProxyInstance(IoTDBConnection.class.getClassLoader(),
new Class[]{TSIService.Iface.class}, new SynchronizedHandler(client));
}
@Override
public boolean isWrapperFor(Class<?> arg0) throws SQLException {
throw new SQLException("Method not supported");
......@@ -427,11 +420,11 @@ public class IoTDBConnection implements Connection {
// validate connection
try {
Utils.verifySuccess(openResp.getStatus());
} catch (IoTDBSQLException e) {
RpcUtils.verifySuccess(openResp.getStatus());
} catch (IoTDBRPCException e) {
// failed to connect, disconnect from the server
transport.close();
throw e;
throw new SQLException(e);
}
if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) {
throw new TException(String
......@@ -468,7 +461,7 @@ public class IoTDBConnection implements Connection {
client = new TSIService.Client(new TBinaryProtocol(transport));
}
openSession();
client = newSynchronizedClient(client);
client = RpcUtils.newSynchronizedClient(client);
flag = true;
break;
}
......@@ -489,14 +482,22 @@ public class IoTDBConnection implements Connection {
}
TSGetTimeZoneResp resp = client.getTimeZone();
Utils.verifySuccess(resp.getStatus());
try {
RpcUtils.verifySuccess(resp.getStatus());
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e);
}
return resp.getTimeZone();
}
public void setTimeZone(String zoneId) throws TException, IoTDBSQLException {
TSSetTimeZoneReq req = new TSSetTimeZoneReq(zoneId);
TSSetTimeZoneResp resp = client.setTimeZone(req);
Utils.verifySuccess(resp.getStatus());
try {
RpcUtils.verifySuccess(resp.getStatus());
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e);
}
this.zoneId = ZoneId.of(zoneId);
}
......@@ -512,32 +513,4 @@ public class IoTDBConnection implements Connection {
this.protocol = protocol;
}
public static class SynchronizedHandler implements InvocationHandler {
private final TSIService.Iface client;
public SynchronizedHandler(TSIService.Iface client) {
this.client = client;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
synchronized (client) {
return method.invoke(client, args);
}
} catch (InvocationTargetException e) {
// all IFace APIs throw TException
if (e.getTargetException() instanceof TException) {
throw e.getTargetException();
} else {
// should not happen
throw new TException("Error in calling method " + method.getName(),
e.getTargetException());
}
} catch (Exception e) {
throw new TException("Error in calling method " + method.getName(), e);
}
}
}
}
......@@ -25,6 +25,8 @@ import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
......@@ -82,9 +84,9 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
req.setColumnPath(schemaPattern);
try {
TSFetchMetadataResp resp = client.fetchMetadata(req);
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
return new IoTDBMetadataResultSet(resp.getColumnsList(), null, null);
} catch (TException e) {
} catch (TException | IoTDBRPCException e) {
throw new TException("Conncetion error when fetching column metadata", e);
}
case Constant.CATALOG_DEVICE:
......@@ -92,19 +94,19 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
req.setColumnPath(schemaPattern);
try {
TSFetchMetadataResp resp = client.fetchMetadata(req);
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
return new IoTDBMetadataResultSet(resp.getColumnsList(), null, null);
} catch (TException e) {
} catch (TException | IoTDBRPCException e) {
throw new TException("Conncetion error when fetching delta object metadata", e);
}
case Constant.CATALOG_STORAGE_GROUP:
req = new TSFetchMetadataReq(Constant.GLOBAL_SHOW_STORAGE_GROUP_REQ);
try {
TSFetchMetadataResp resp = client.fetchMetadata(req);
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
Set<String> showStorageGroup = resp.getShowStorageGroups();
return new IoTDBMetadataResultSet(null, showStorageGroup, null);
} catch (TException e) {
} catch (TException | IoTDBRPCException e) {
throw new TException("Conncetion error when fetching storage group metadata", e);
}
case Constant.CATALOG_TIMESERIES:
......@@ -112,10 +114,10 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
req.setColumnPath(schemaPattern);
try {
TSFetchMetadataResp resp = client.fetchMetadata(req);
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
List<List<String>> showTimeseriesList = resp.getShowTimeseriesList();
return new IoTDBMetadataResultSet(null, null, showTimeseriesList);
} catch (TException e) {
} catch (TException | IoTDBRPCException e) {
throw new TException("Conncetion error when fetching timeseries metadata", e);
}
default:
......@@ -1247,7 +1249,11 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
TSFetchMetadataReq req = new TSFetchMetadataReq("METADATA_IN_JSON");
TSFetchMetadataResp resp;
resp = client.fetchMetadata(req);
Utils.verifySuccess(resp.getStatus());
try {
RpcUtils.verifySuccess(resp.getStatus());
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e);
}
return resp.getMetadataInJson();
}
}
......@@ -46,6 +46,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
......@@ -214,9 +216,9 @@ public class IoTDBQueryResultSet implements ResultSet {
if (operationHandle != null) {
TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle, queryId);
TSCloseOperationResp closeResp = client.closeOperation(closeReq);
Utils.verifySuccess(closeResp.getStatus());
RpcUtils.verifySuccess(closeResp.getStatus());
}
} catch (SQLException e) {
} catch (IoTDBRPCException e) {
throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
} catch (TException e) {
throw new SQLException(
......@@ -708,7 +710,7 @@ public class IoTDBQueryResultSet implements ResultSet {
try {
TSFetchResultsResp resp = client.fetchResults(req);
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
if (!resp.hasResultSet) {
emptyResultSet = true;
} else {
......@@ -716,7 +718,7 @@ public class IoTDBQueryResultSet implements ResultSet {
List<RowRecord> records = Utils.convertRowRecords(tsQueryDataSet);
recordItr = records.iterator();
}
} catch (TException e) {
} catch (TException | IoTDBRPCException e) {
throw new SQLException(
"Cannot fetch result from server, because of network connection: {} ", e);
}
......
......@@ -29,4 +29,8 @@ public class IoTDBSQLException extends SQLException {
super(reason);
}
public IoTDBSQLException(Throwable cause) {
super(cause);
}
}
......@@ -30,6 +30,8 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationResp;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
......@@ -131,7 +133,7 @@ public class IoTDBStatement implements Statement {
if (operationHandle != null) {
TSCancelOperationReq closeReq = new TSCancelOperationReq(operationHandle);
TSCancelOperationResp closeResp = client.cancelOperation(closeReq);
Utils.verifySuccess(closeResp.getStatus());
RpcUtils.verifySuccess(closeResp.getStatus());
}
} catch (Exception e) {
throw new SQLException("Error occurs when canceling statement.", e);
......@@ -158,7 +160,7 @@ public class IoTDBStatement implements Statement {
TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle, -1);
closeReq.setStmtId(stmtId);
TSCloseOperationResp closeResp = client.closeOperation(closeReq);
Utils.verifySuccess(closeResp.getStatus());
RpcUtils.verifySuccess(closeResp.getStatus());
}
} catch (Exception e) {
throw new SQLException("Error occurs when closing statement.", e);
......@@ -246,7 +248,11 @@ public class IoTDBStatement implements Statement {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
TSExecuteStatementResp execResp = client.executeStatement(execReq);
operationHandle = execResp.getOperationHandle();
Utils.verifySuccess(execResp.getStatus());
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (IoTDBRPCException e) {
throw new SQLException(e);
}
if (execResp.getOperationHandle().hasResultSet) {
IoTDBQueryResultSet resSet = new IoTDBQueryResultSet(this,
execResp.getColumns(), client,
......@@ -346,7 +352,11 @@ public class IoTDBStatement implements Statement {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
TSExecuteStatementResp execResp = client.executeQueryStatement(execReq);
operationHandle = execResp.getOperationHandle();
Utils.verifySuccess(execResp.getStatus());
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (IoTDBRPCException e) {
throw new SQLException(e);
}
IoTDBQueryResultSet resSet = new IoTDBQueryResultSet(this, execResp.getColumns(), client,
operationHandle, sql, execResp.getOperationType(), execResp.getDataTypeList(),
queryId.getAndIncrement());
......@@ -399,7 +409,11 @@ public class IoTDBStatement implements Statement {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
operationHandle = execResp.getOperationHandle();
Utils.verifySuccess(execResp.getStatus());
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e);
}
return 0;
}
......
......@@ -27,8 +27,6 @@ import java.util.regex.Pattern;
import org.apache.iotdb.service.rpc.thrift.TSDataValue;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSRowRecord;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
......@@ -75,17 +73,6 @@ public class Utils {
return params;
}
/**
* verify success.
*
* @param status -status
*/
public static void verifySuccess(TS_Status status) throws IoTDBSQLException {
if (status.getStatusCode() != TS_StatusCode.SUCCESS_STATUS) {
throw new IoTDBSQLException(status.errorMessage);
}
}
/**
* convert row records.
*
......@@ -148,64 +135,4 @@ public class Utils {
}
}
public static ByteBuffer getTimeBuffer(RowBatch rowBatch) {
ByteBuffer timeBuffer = ByteBuffer.allocate(rowBatch.getTimeBytesSize());
for (int i = 0; i < rowBatch.batchSize; i++) {
timeBuffer.putLong(rowBatch.timestamps[i]);
}
timeBuffer.flip();
return timeBuffer;
}
public static ByteBuffer getValueBuffer(RowBatch rowBatch) {
ByteBuffer valueBuffer = ByteBuffer.allocate(rowBatch.getValueBytesSize());
for (int i = 0; i < rowBatch.measurements.size(); i++) {
TSDataType dataType = rowBatch.measurements.get(i).getType();
switch (dataType) {
case INT32:
int[] intValues = (int[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putInt(intValues[index]);
}
break;
case INT64:
long[] longValues = (long[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putLong(longValues[index]);
}
break;
case FLOAT:
float[] floatValues = (float[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putFloat(floatValues[index]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putDouble(doubleValues[index]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.put(BytesUtils.boolToByte(boolValues[index]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putInt(binaryValues[index].getLength());
valueBuffer.put(binaryValues[index].getValues());
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}
valueBuffer.flip();
return valueBuffer;
}
}
......@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSDataValue;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSRowRecord;
......@@ -69,13 +70,13 @@ public class UtilsTest {
@Test
public void testVerifySuccess() {
try {
Utils.verifySuccess(new TS_Status(TS_StatusCode.SUCCESS_STATUS));
RpcUtils.verifySuccess(new TS_Status(TS_StatusCode.SUCCESS_STATUS));
} catch (Exception e) {
fail();
}
try {
Utils.verifySuccess(new TS_Status(TS_StatusCode.ERROR_STATUS));
RpcUtils.verifySuccess(new TS_Status(TS_StatusCode.ERROR_STATUS));
} catch (Exception e) {
return;
}
......
package org.apache.iotdb.rpc;
public class IoTDBRPCException extends Exception{
private static final long serialVersionUID = -1268775292265203036L;
public IoTDBRPCException(String reason) {
super(reason);
}
}
package org.apache.iotdb.rpc;
import java.lang.reflect.Proxy;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
public class RpcUtils {
public static TSIService.Iface newSynchronizedClient(TSIService.Iface client) {
return (TSIService.Iface) Proxy.newProxyInstance(RpcUtils.class.getClassLoader(),
new Class[]{TSIService.Iface.class}, new SynchronizedHandler(client));
}
/**
* verify success.
*
* @param status -status
*/
public static void verifySuccess(TS_Status status) throws IoTDBRPCException {
if (status.getStatusCode() != TS_StatusCode.SUCCESS_STATUS) {
throw new IoTDBRPCException(status.errorMessage);
}
}
}
package org.apache.iotdb.rpc;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.thrift.TException;
public class SynchronizedHandler implements InvocationHandler {
private final TSIService.Iface client;
public SynchronizedHandler(TSIService.Iface client) {
this.client = client;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
synchronized (client) {
return method.invoke(client, args);
}
} catch (InvocationTargetException e) {
// all IFace APIs throw TException
if (e.getTargetException() instanceof TException) {
throw e.getTargetException();
} else {
// should not happen
throw new TException("Error in calling method " + method.getName(),
e.getTargetException());
}
} catch (Exception e) {
throw new TException("Error in calling method " + method.getName(), e);
}
}
}
......@@ -54,7 +54,12 @@
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<artifactId>service-rpc</artifactId>
<version>0.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>tsfile</artifactId>
<version>0.9.0-SNAPSHOT</version>
</dependency>
</dependencies>
......
package org.apache.iotdb.session;
public class Config {
public static final String DEFAULT_USER = "root";
public static final String DEFALUT_PASSWORD = "root";
}
......@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.exception;
package org.apache.iotdb.session;
public class IoTDBSessionException extends RuntimeException {
......
......@@ -19,11 +19,8 @@
package org.apache.iotdb.session;
import java.time.ZoneId;
import org.apache.iotdb.exception.IoTDBSessionException;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.IoTDBConnection;
import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.apache.iotdb.jdbc.Utils;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
......@@ -35,6 +32,7 @@ import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
......@@ -102,13 +100,8 @@ public class Session {
try {
TSOpenSessionResp openResp = client.openSession(openReq);
// validate connection
try {
Utils.verifySuccess(openResp.getStatus());
} catch (IoTDBSQLException e) {
transport.close();
throw e;
}
// validate connectiontry {
RpcUtils.verifySuccess(openResp.getStatus());
if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) {
throw new TException(String
......@@ -124,13 +117,14 @@ public class Session {
zoneId = ZoneId.of(getTimeZone());
}
} catch (TException | IoTDBSQLException e) {
} catch (TException | IoTDBRPCException e) {
transport.close();
throw new IoTDBSessionException(String.format("Can not open session to %s:%s with user: %s.",
host, port, username), e);
}
isClosed = false;
client = IoTDBConnection.newSynchronizedClient(client);
client = RpcUtils.newSynchronizedClient(client);
}
......@@ -158,8 +152,8 @@ public class Session {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
}
request.setTimestamps(Utils.getTimeBuffer(rowBatch));
request.setValues(Utils.getValueBuffer(rowBatch));
request.setTimestamps(SessionUtils.getTimeBuffer(rowBatch));
request.setValues(SessionUtils.getValueBuffer(rowBatch));
request.setSize(rowBatch.batchSize);
try {
......@@ -169,20 +163,20 @@ public class Session {
}
}
public String getTimeZone() throws TException, IoTDBSQLException {
public String getTimeZone() throws TException, IoTDBRPCException {
if (zoneId != null) {
return zoneId.toString();
}
TSGetTimeZoneResp resp = client.getTimeZone();
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
return resp.getTimeZone();
}
public void setTimeZone(String zoneId) throws TException, IoTDBSQLException {
public void setTimeZone(String zoneId) throws TException, IoTDBRPCException {
TSSetTimeZoneReq req = new TSSetTimeZoneReq(zoneId);
TSSetTimeZoneResp resp = client.setTimeZone(req);
Utils.verifySuccess(resp.getStatus());
RpcUtils.verifySuccess(resp.getStatus());
this.zoneId = ZoneId.of(zoneId);
}
......
package org.apache.iotdb.session;
import java.nio.ByteBuffer;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.write.record.RowBatch;
public class SessionUtils {
public static ByteBuffer getTimeBuffer(RowBatch rowBatch) {
ByteBuffer timeBuffer = ByteBuffer.allocate(rowBatch.getTimeBytesSize());
for (int i = 0; i < rowBatch.batchSize; i++) {
timeBuffer.putLong(rowBatch.timestamps[i]);
}
timeBuffer.flip();
return timeBuffer;
}
public static ByteBuffer getValueBuffer(RowBatch rowBatch) {
ByteBuffer valueBuffer = ByteBuffer.allocate(rowBatch.getValueBytesSize());
for (int i = 0; i < rowBatch.measurements.size(); i++) {
TSDataType dataType = rowBatch.measurements.get(i).getType();
switch (dataType) {
case INT32:
int[] intValues = (int[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putInt(intValues[index]);
}
break;
case INT64:
long[] longValues = (long[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putLong(longValues[index]);
}
break;
case FLOAT:
float[] floatValues = (float[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putFloat(floatValues[index]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putDouble(doubleValues[index]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.put(BytesUtils.boolToByte(boolValues[index]));
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) rowBatch.values[i];
for (int index = 0; index < rowBatch.batchSize; index++) {
valueBuffer.putInt(binaryValues[index].getLength());
valueBuffer.put(binaryValues[index].getValues());
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}
valueBuffer.flip();
return valueBuffer;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册