未验证 提交 e792da17 编写于 作者: CWW666's avatar CWW666 提交者: GitHub

fix influxdb bug while setting influxdb connection response format as...

fix influxdb bug while setting influxdb connection response format as InfluxDB.ResponseFormat.JSON (#5946)
上级 b5367aa6
......@@ -12,6 +12,7 @@ Release Notes.
#### OAP-Backend
* Make meter receiver support MAL.
* Support influxDB connection response format option. Fix some error when use JSON as influxDB response format.
* Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters.
#### UI
......
......@@ -140,6 +140,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | duration | The time to wait at most (milliseconds). | SW_STORAGE_INFLUXDB_DURATION | 1000|
| - | - | batchEnabled | If true, write points with batch api. | SW_STORAGE_INFLUXDB_BATCH_ENABLED | true|
| - | - | fetchTaskLogMaxSize | The max number of fetch task log in a request. | SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE | 5000|
| - | - | connectionResponseFormat | The response format of connection to influxDB, cannot be anything but MSGPACK or JSON. | SW_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT | MSGPACK |
| agent-analyzer | default | Agent Analyzer. | SW_AGENT_ANALYZER | default |
| - | -| sampleRate|Sampling rate for receiving trace. The precision is 1/10000. 10000 means 100% sample in default.|SW_TRACE_SAMPLE_RATE|10000|
| - | - |slowDBAccessThreshold|The slow database access thresholds. Unit ms.|SW_SLOW_DB_THRESHOLD|default:200,mongodb:100|
......
......@@ -194,6 +194,7 @@ storage:
duration: ${SW_STORAGE_INFLUXDB_DURATION:1000} # the time to wait at most (milliseconds)
batchEnabled: ${SW_STORAGE_INFLUXDB_BATCH_ENABLED:true}
fetchTaskLogMaxSize: ${SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000} # the max number of fetch task log in a request
connectionResponseFormat: ${SW_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT:MSGPACK} # the response format of connection to influxDB, cannot be anything but MSGPACK or JSON.
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
......
......@@ -72,10 +72,11 @@ public class InfluxClient implements Client, HealthCheckable {
@Override
public void connect() {
try {
InfluxDB.ResponseFormat responseFormat = InfluxDB.ResponseFormat.valueOf(config.getConnectionResponseFormat());
influx = InfluxDBFactory.connect(config.getUrl(), config.getUser(), config.getPassword(),
new OkHttpClient.Builder().readTimeout(3, TimeUnit.MINUTES)
.writeTimeout(3, TimeUnit.MINUTES),
InfluxDB.ResponseFormat.MSGPACK
responseFormat
);
influx.query(new Query("CREATE DATABASE " + database));
influx.enableGzip();
......
......@@ -35,4 +35,5 @@ public class InfluxStorageConfig extends ModuleConfig {
private boolean batchEnabled = true;
private int fetchTaskLogMaxSize = 5000;
private String connectionResponseFormat = "MSGPACK";
}
......@@ -96,7 +96,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId((String) values.get(2));
basicTrace.setStart(String.valueOf(values.get(3)));
basicTrace.setStart(String.valueOf(((Number) values.get(3)).longValue()));
basicTrace.getEndpointNames().add((String) values.get(4));
basicTrace.setDuration(((Number) values.get(5)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number) values.get(6)).intValue()));
......@@ -190,11 +190,11 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
segmentRecord.setTraceId((String) values.get(2));
segmentRecord.setServiceId((String) values.get(3));
segmentRecord.setEndpointName((String) values.get(4));
segmentRecord.setStartTime((long) values.get(5));
segmentRecord.setEndTime((long) values.get(6));
segmentRecord.setLatency((int) values.get(7));
segmentRecord.setIsError((int) values.get(8));
segmentRecord.setVersion((int) values.get(10));
segmentRecord.setStartTime(((Number) values.get(5)).longValue());
segmentRecord.setEndTime(((Number) values.get(6)).longValue());
segmentRecord.setLatency(((Number) values.get(7)).intValue());
segmentRecord.setIsError(((Number) values.get(8)).intValue());
segmentRecord.setVersion(((Number) values.get(10)).intValue());
String base64 = (String) values.get(9);
if (!Strings.isNullOrEmpty(base64)) {
......
......@@ -165,7 +165,7 @@ public class TraceQuery implements ITraceQueryDAO {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId((String) values.get(2));
basicTrace.setStart(String.valueOf(values.get(3)));
basicTrace.setStart(String.valueOf(((Number) values.get(3)).longValue()));
basicTrace.getEndpointNames().add((String) values.get(4));
basicTrace.setDuration(((Number) values.get(5)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number) values.get(6)).intValue()));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册