提交 63c992a7 编写于 作者: Q qiaojialin

make session thread safe

上级 d1e305f4
...@@ -32,8 +32,6 @@ import org.apache.thrift.protocol.TBinaryProtocol; ...@@ -32,8 +32,6 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId; import java.time.ZoneId;
...@@ -44,7 +42,7 @@ public class Session { ...@@ -44,7 +42,7 @@ public class Session {
private String username; private String username;
private String password; private String password;
private final TSProtocolVersion protocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1; private final TSProtocolVersion protocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1;
public TSIService.Iface client = null; private TSIService.Iface client = null;
private TS_SessionHandle sessionHandle = null; private TS_SessionHandle sessionHandle = null;
private TSocket transport; private TSocket transport;
private boolean isClosed = true; private boolean isClosed = true;
...@@ -65,11 +63,11 @@ public class Session { ...@@ -65,11 +63,11 @@ public class Session {
this.password = password; this.password = password;
} }
public void open() throws IoTDBSessionException { public synchronized void open() throws IoTDBSessionException {
open(false, 0); open(false, 0);
} }
public void open(boolean enableRPCCompression, int connectionTimeoutInMs) public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBSessionException { throws IoTDBSessionException {
if (!isClosed) { if (!isClosed) {
return; return;
...@@ -125,7 +123,7 @@ public class Session { ...@@ -125,7 +123,7 @@ public class Session {
} }
public void close() throws IoTDBSessionException { public synchronized void close() throws IoTDBSessionException {
if (isClosed) { if (isClosed) {
return; return;
} }
...@@ -142,7 +140,7 @@ public class Session { ...@@ -142,7 +140,7 @@ public class Session {
} }
} }
public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException { public synchronized TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
TSBatchInsertionReq request = new TSBatchInsertionReq(); TSBatchInsertionReq request = new TSBatchInsertionReq();
request.deviceId = rowBatch.deviceId; request.deviceId = rowBatch.deviceId;
for (MeasurementSchema measurementSchema: rowBatch.measurements) { for (MeasurementSchema measurementSchema: rowBatch.measurements) {
...@@ -160,7 +158,7 @@ public class Session { ...@@ -160,7 +158,7 @@ public class Session {
} }
} }
public TSRPCResp insert(String deviceId, long time, List<String> measurements, List<String> values) public synchronized TSRPCResp insert(String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBSessionException { throws IoTDBSessionException {
TSInsertReq request = new TSInsertReq(); TSInsertReq request = new TSInsertReq();
request.setDeviceId(deviceId); request.setDeviceId(deviceId);
...@@ -175,7 +173,7 @@ public class Session { ...@@ -175,7 +173,7 @@ public class Session {
} }
} }
public TSRPCResp setStorageGroup(String storageGroupId) throws IoTDBSessionException { public synchronized TSRPCResp setStorageGroup(String storageGroupId) throws IoTDBSessionException {
TSSetStorageGroupReq request = new TSSetStorageGroupReq(); TSSetStorageGroupReq request = new TSSetStorageGroupReq();
request.setStorageGroupId(storageGroupId); request.setStorageGroupId(storageGroupId);
...@@ -186,7 +184,7 @@ public class Session { ...@@ -186,7 +184,7 @@ public class Session {
} }
} }
public TSRPCResp createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor) throws IoTDBSessionException { public synchronized TSRPCResp createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor) throws IoTDBSessionException {
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq(); TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
request.setPath(path); request.setPath(path);
request.setDataType(dataType.ordinal()); request.setDataType(dataType.ordinal());
...@@ -200,7 +198,7 @@ public class Session { ...@@ -200,7 +198,7 @@ public class Session {
} }
} }
public String getTimeZone() throws TException, IoTDBRPCException { public synchronized String getTimeZone() throws TException, IoTDBRPCException {
if (zoneId != null) { if (zoneId != null) {
return zoneId.toString(); return zoneId.toString();
} }
...@@ -210,7 +208,7 @@ public class Session { ...@@ -210,7 +208,7 @@ public class Session {
return resp.getTimeZone(); return resp.getTimeZone();
} }
public void setTimeZone(String zoneId) throws TException, IoTDBRPCException { public synchronized void setTimeZone(String zoneId) throws TException, IoTDBRPCException {
TSSetTimeZoneReq req = new TSSetTimeZoneReq(zoneId); TSSetTimeZoneReq req = new TSSetTimeZoneReq(zoneId);
TSRPCResp resp = client.setTimeZone(req); TSRPCResp resp = client.setTimeZone(req);
RpcUtils.verifySuccess(resp.getStatus()); RpcUtils.verifySuccess(resp.getStatus());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册