未验证 提交 a68f2d02 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Remove the hard requirement of BASE64 encoding for the binary field (#8685)

* Remove the hard requirement of BASE64 encoding for the binary field, and resolved concerns from https://github.com/apache/skywalking/pull/8684#issuecomment-1068660880 
上级 91a82e05
......@@ -116,6 +116,7 @@ Release Notes.
* Refactor the core Builder mechanism, new storage plugin could implement their own converter and get rid of hard
requirement of using HashMap to communicate between data object and database native structure.
* [Breaking Change] Break all existing 3rd-party storage extensions.
* Remove hard requirement of BASE64 encoding for binary field.
#### UI
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.alarm;
import java.util.Base64;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
......@@ -32,9 +31,8 @@ import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
......@@ -94,12 +92,8 @@ public class AlarmRecord extends Record {
record.setStartTime(((Number) converter.get(START_TIME)).longValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
record.setRuleName((String) converter.get(RULE_NAME));
if (StringUtil.isEmpty((String) converter.get(TAGS_RAW_DATA))) {
record.setTagsRawData(new byte[] {});
} else {
// Don't read the tags as they has been in the data binary already.
record.setTagsRawData(Base64.getDecoder().decode((String) converter.get(TAGS_RAW_DATA)));
}
record.setTagsRawData(converter.getWith(TAGS_RAW_DATA, HashMapConverter.ToEntity.Base64Decoder.INSTANCE));
// Don't read the TAGS as they are only for query.
return record;
}
......@@ -113,11 +107,7 @@ public class AlarmRecord extends Record {
converter.accept(START_TIME, storageData.getStartTime());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
converter.accept(RULE_NAME, storageData.getRuleName());
if (CollectionUtils.isEmpty(storageData.getTagsRawData())) {
converter.accept(TAGS_RAW_DATA, Const.EMPTY_STRING);
} else {
converter.accept(TAGS_RAW_DATA, new String(Base64.getEncoder().encode(storageData.getTagsRawData())));
}
converter.accept(TAGS_RAW_DATA, storageData.getTagsRawData());
converter.accept(TAGS, storageData.getTagsInString());
}
}
......
......@@ -18,11 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.manual.log;
import java.util.Base64;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
......@@ -30,9 +28,8 @@ import org.apache.skywalking.oap.server.core.query.type.ContentType;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
public abstract class AbstractLogRecord extends Record {
......@@ -121,12 +118,7 @@ public abstract class AbstractLogRecord extends Record {
record.setContentType(((Number) converter.get(CONTENT_TYPE)).intValue());
record.setContent((String) converter.get(CONTENT));
record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
if (StringUtil.isEmpty((String) converter.get(TAGS_RAW_DATA))) {
record.setTagsRawData(new byte[] {});
} else {
// Don't read the tags as they has been in the data binary already.
record.setTagsRawData(Base64.getDecoder().decode((String) converter.get(TAGS_RAW_DATA)));
}
record.setTagsRawData(converter.getWith(TAGS_RAW_DATA, HashMapConverter.ToEntity.Base64Decoder.INSTANCE));
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
}
......@@ -141,11 +133,7 @@ public abstract class AbstractLogRecord extends Record {
converter.accept(CONTENT_TYPE, record.getContentType());
converter.accept(CONTENT, record.getContent());
converter.accept(TIMESTAMP, record.getTimestamp());
if (CollectionUtils.isEmpty(record.getTagsRawData())) {
converter.accept(TAGS_RAW_DATA, Const.EMPTY_STRING);
} else {
converter.accept(TAGS_RAW_DATA, new String(Base64.getEncoder().encode(record.getTagsRawData())));
}
converter.accept(TAGS_RAW_DATA, record.getTagsRawData());
converter.accept(TAGS, record.getTagsInString());
}
}
......
......@@ -18,15 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import java.util.Base64;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
......@@ -34,7 +28,10 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@SuperDataset
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
......@@ -118,11 +115,7 @@ public class SegmentRecord extends Record {
record.setLatency(((Number) converter.get(LATENCY)).intValue());
record.setIsError(((Number) converter.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String) converter.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String) converter.get(DATA_BINARY)));
}
record.setDataBinary(converter.getWith(DATA_BINARY, HashMapConverter.ToEntity.Base64Decoder.INSTANCE));
// Don't read the tags as they have been in the data binary already.
return record;
}
......@@ -138,11 +131,7 @@ public class SegmentRecord extends Record {
converter.accept(LATENCY, storageData.getLatency());
converter.accept(IS_ERROR, storageData.getIsError());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
converter.accept(DATA_BINARY, Const.EMPTY_STRING);
} else {
converter.accept(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
converter.accept(DATA_BINARY, storageData.getDataBinary());
converter.accept(TAGS, storageData.getTags());
}
}
......
......@@ -17,10 +17,8 @@
package org.apache.skywalking.oap.server.core.browser.manual.errorlog;
import java.util.Base64;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
......@@ -29,9 +27,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@SuperDataset
@Stream(name = BrowserErrorLogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.BROWSER_ERROR_LOG, builder = BrowserErrorLogRecord.Builder.class, processor = RecordStreamProcessor.class)
......@@ -96,12 +93,7 @@ public class BrowserErrorLogRecord extends Record {
record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
record.setErrorCategory(((Number) converter.get(ERROR_CATEGORY)).intValue());
String dataBinary = (String) converter.get(DATA_BINARY);
if (StringUtil.isEmpty(dataBinary)) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode(dataBinary));
}
record.setDataBinary(converter.getWith(DATA_BINARY, HashMapConverter.ToEntity.Base64Decoder.INSTANCE));
return record;
}
......@@ -114,11 +106,7 @@ public class BrowserErrorLogRecord extends Record {
converter.accept(TIMESTAMP, storageData.getTimestamp());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
converter.accept(ERROR_CATEGORY, storageData.getErrorCategory());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
converter.accept(DATA_BINARY, Const.EMPTY_STRING);
} else {
converter.accept(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
converter.accept(DATA_BINARY, storageData.getDataBinary());
}
}
}
......@@ -31,7 +31,6 @@ import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedInde
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT;
......@@ -95,11 +94,7 @@ public class ProfileThreadSnapshotRecord extends Record {
converter.accept(DUMP_TIME, storageData.getDumpTime());
converter.accept(SEQUENCE, storageData.getSequence());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getStackBinary())) {
converter.accept(STACK_BINARY, Const.EMPTY_STRING);
} else {
converter.accept(STACK_BINARY, new String(Base64.getEncoder().encode(storageData.getStackBinary())));
}
converter.accept(STACK_BINARY, new String(Base64.getEncoder().encode(storageData.getStackBinary())));
}
}
}
......@@ -18,9 +18,20 @@
package org.apache.skywalking.oap.server.core.storage.type;
import java.util.function.Function;
/**
* A function supplier to convert raw data from database to object defined in OAP
*/
public interface Convert2Entity {
Object get(String fieldName);
/**
* Use the given type decoder to decode value of given field name.
*
* @param fieldName to read value
* @param typeDecoder to decode the value
* @return decoded value
*/
<T, R> R getWith(String fieldName, Function<T, R> typeDecoder);
}
......@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.core.storage.type;
import java.util.List;
/**
* A function supplier to accept key-value pair, and convert to the expected database structure according to storage
* implementation.
......@@ -25,9 +27,25 @@ package org.apache.skywalking.oap.server.core.storage.type;
* @param <R> Type of database required structure.
*/
public interface Convert2Storage<R> {
/**
* Accept general type key/value.
*/
void accept(String fieldName, Object fieldValue);
/**
* Accept String key and byte array value.
*/
void accept(String fieldName, byte[] fieldValue);
/**
* Accept String key and String list value.
*/
void accept(String fieldName, List<String> fieldValue);
Object get(String fieldName);
/**
* @return the converted data
*/
R obtain();
}
\ No newline at end of file
......@@ -18,9 +18,15 @@
package org.apache.skywalking.oap.server.core.storage.type;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
public class HashMapConverter {
/**
......@@ -34,6 +40,30 @@ public class HashMapConverter {
public Object get(final String fieldName) {
return source.get(fieldName);
}
@Override
public <T, R> R getWith(final String fieldName, final Function<T, R> typeDecoder) {
final T value = (T) source.get(fieldName);
return typeDecoder.apply(value);
}
/**
* Default Base64Decoder supplier
*/
public static class Base64Decoder implements Function<String, byte[]> {
public static final Base64Decoder INSTANCE = new Base64Decoder();
private Base64Decoder() {
}
@Override
public byte[] apply(final String encodedStr) {
if (StringUtil.isEmpty(encodedStr)) {
return new byte[] {};
}
return Base64.getDecoder().decode(encodedStr);
}
}
}
/**
......@@ -51,6 +81,20 @@ public class HashMapConverter {
source.put(fieldName, fieldValue);
}
@Override
public void accept(final String fieldName, final byte[] fieldValue) {
if (CollectionUtils.isEmpty(fieldValue)) {
source.put(fieldName, Const.EMPTY_STRING);
} else {
source.put(fieldName, new String(Base64.getEncoder().encode(fieldValue)));
}
}
@Override
public void accept(final String fieldName, final List<String> fieldValue) {
this.accept(fieldName, (Object) fieldValue);
}
@Override
public Object get(final String fieldName) {
return source.get(fieldName);
......
......@@ -18,25 +18,38 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
public class HashMapConverterWrapper {
/**
* Create a wrapper to exclude "Tag" field to storage. Because this field is being replaced in JDBC implementation.
*
* @param origin converter from core builder.
* @return a new wrapper
*/
public static HashMapConverter.ToStorage of(Convert2Storage origin) {
return new HashMapConverter.ToStorage() {
@Override
public void accept(final String fieldName, final Object fieldValue) {
if (fieldName.equals("tags")) {
return;
}
origin.accept(fieldName, fieldValue);
public class JDBCConverter {
public static class RecordToStorage extends HashMapConverter.ToStorage {
private Map<String, Object> notStoredSource = new HashMap<>(1);
@Override
public void accept(final String fieldName, final Object fieldValue) {
super.accept(fieldName, fieldValue);
}
/**
* Skip String list type column in SQL-style database. The values are processed by
* AbstractSearchTagBuilder#analysisSearchTag(List, Convert2Storage) and TAGS_RAW_DATA column
*
* NOTICE, this method should only invoke once as mostly there is one tag list as properties for now.
*/
@Override
public void accept(final String fieldName, final List<String> fieldValue) {
notStoredSource.put(fieldName, fieldValue);
}
@Override
public Object get(final String fieldName) {
final Object ret = super.get(fieldName);
if (ret == null) {
return notStoredSource.get(fieldName);
}
};
return ret;
}
}
}
......@@ -22,7 +22,6 @@ import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.HashMapConverterWrapper;
public class H2AlarmRecordBuilder extends AbstractSearchTagBuilder<Record> {
......@@ -38,9 +37,8 @@ public class H2AlarmRecordBuilder extends AbstractSearchTagBuilder<Record> {
}
@Override
public void entity2Storage(final Record record, final Convert2Storage originConverter) {
public void entity2Storage(final Record record, final Convert2Storage converter) {
final AlarmRecord storageData = (AlarmRecord) record;
Convert2Storage converter = HashMapConverterWrapper.of(originConverter);
final AlarmRecord.Builder builder = new AlarmRecord.Builder();
builder.entity2Storage(storageData, converter);
analysisSearchTag(storageData.getTags(), converter);
......
......@@ -22,7 +22,6 @@ import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.HashMapConverterWrapper;
public class H2LogRecordBuilder extends AbstractSearchTagBuilder<Record> {
......@@ -38,9 +37,8 @@ public class H2LogRecordBuilder extends AbstractSearchTagBuilder<Record> {
}
@Override
public void entity2Storage(final Record record, final Convert2Storage originConverter) {
public void entity2Storage(final Record record, final Convert2Storage converter) {
final LogRecord storageData = (LogRecord) record;
Convert2Storage converter = HashMapConverterWrapper.of(originConverter);
final LogRecord.Builder builder = new LogRecord.Builder();
builder.entity2Storage(storageData, converter);
analysisSearchTag(storageData.getTags(), converter);
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
......@@ -50,7 +51,8 @@ public class H2ManagementDAO extends H2SQLExecutor implements IManagementDAO {
return;
}
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder);
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder,
new HashMapConverter.ToStorage());
insertExecutor.invoke(connection);
} catch (IOException | SQLException e) {
throw new IOException(e.getMessage(), e);
......
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
......@@ -53,7 +54,7 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO {
@Override
public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
return getInsertExecutor(model.getName(), metrics, storageBuilder);
return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage());
}
@Override
......
......@@ -24,6 +24,7 @@ import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
......@@ -44,7 +45,7 @@ public class H2NoneStreamDAO extends H2SQLExecutor implements INoneStreamDAO {
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
try (Connection connection = h2Client.getConnection()) {
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder);
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage());
insertExecutor.invoke(connection);
} catch (IOException | SQLException e) {
throw new IOException(e.getMessage(), e);
......
......@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.JDBCConverter;
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
private JDBCHikariCPClient h2Client;
......@@ -98,6 +99,6 @@ public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
return getInsertExecutor(model.getName(), record, storageBuilder, maxSizeOfArrayColumn);
return getInsertExecutor(model.getName(), record, storageBuilder, new JDBCConverter.RecordToStorage(), maxSizeOfArrayColumn);
}
}
......@@ -29,6 +29,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
......@@ -105,16 +106,17 @@ public class H2SQLExecutor {
}
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder) throws IOException {
return getInsertExecutor(modelName, metrics, storageBuilder, 1);
StorageBuilder<T> storageBuilder,
Convert2Storage<Map<String, Object>> converter) throws IOException {
return getInsertExecutor(modelName, metrics, storageBuilder, converter, 1);
}
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder,
Convert2Storage<Map<String, Object>> converter,
int maxSizeOfArrayColumn) throws IOException {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> objectMap = toStorage.obtain();
storageBuilder.entity2Storage(metrics, converter);
Map<String, Object> objectMap = converter.obtain();
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES");
List<ModelColumn> columns = TableMetaInfo.get(modelName).getColumns();
......
......@@ -23,7 +23,6 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.HashMapConverterWrapper;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TAGS;
......@@ -45,9 +44,8 @@ public class H2SegmentRecordBuilder extends AbstractSearchTagBuilder<Record> {
}
@Override
public void entity2Storage(final Record record, final Convert2Storage originConverter) {
public void entity2Storage(final Record record, final Convert2Storage converter) {
SegmentRecord storageData = (SegmentRecord) record;
Convert2Storage converter = HashMapConverterWrapper.of(originConverter);
final SegmentRecord.Builder builder = new SegmentRecord.Builder();
builder.entity2Storage(storageData, converter);
analysisSearchTag(storageData.getTagsRawData(), converter);
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
......@@ -78,7 +79,8 @@ public class H2UITemplateManagementDAO extends H2SQLExecutor implements UITempla
}
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
try (ResultSet resultSet = h2Client.executeQuery(
connection, sql.toString(), condition.toArray(new Object[0]))) {
final List<DashboardConfiguration> configs = new ArrayList<>();
final UITemplate.Builder builder = new UITemplate.Builder();
UITemplate uiTemplate = null;
......@@ -87,7 +89,8 @@ public class H2UITemplateManagementDAO extends H2SQLExecutor implements UITempla
if (uiTemplate != null) {
configs.add(new DashboardConfiguration().fromEntity(uiTemplate));
}
} while (uiTemplate != null);
}
while (uiTemplate != null);
return configs;
}
} catch (SQLException | JDBCClientException e) {
......@@ -98,13 +101,18 @@ public class H2UITemplateManagementDAO extends H2SQLExecutor implements UITempla
@Override
public TemplateChangeStatus addTemplate(final DashboardSetting setting) throws IOException {
final UITemplate uiTemplate = setting.toEntity();
final SQLExecutor insertExecutor = getInsertExecutor(UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
final SQLExecutor insertExecutor = getInsertExecutor(
UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage());
try (Connection connection = h2Client.getConnection()) {
insertExecutor.invoke(connection);
return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
} catch (SQLException | JDBCClientException e) {
log.error(e.getMessage(), e);
return TemplateChangeStatus.builder().status(false).id(setting.getId()).message("Can't add a new template").build();
return TemplateChangeStatus.builder()
.status(false)
.id(setting.getId())
.message("Can't add a new template")
.build();
}
}
......@@ -116,7 +124,8 @@ public class H2UITemplateManagementDAO extends H2SQLExecutor implements UITempla
@Override
public TemplateChangeStatus disableTemplate(final String id) throws IOException {
final UITemplate uiTemplate = (UITemplate) getByID(h2Client, UITemplate.INDEX_NAME, id, new UITemplate.Builder());
final UITemplate uiTemplate = (UITemplate) getByID(
h2Client, UITemplate.INDEX_NAME, id, new UITemplate.Builder());
if (uiTemplate == null) {
return TemplateChangeStatus.builder().status(false).id(id).message("Can't find the template").build();
}
......@@ -125,13 +134,18 @@ public class H2UITemplateManagementDAO extends H2SQLExecutor implements UITempla
}
private TemplateChangeStatus executeUpdate(final UITemplate uiTemplate) throws IOException {
final SQLExecutor updateExecutor = getUpdateExecutor(UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
final SQLExecutor updateExecutor = getUpdateExecutor(
UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
try (Connection connection = h2Client.getConnection()) {
updateExecutor.invoke(connection);
return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
} catch (SQLException | JDBCClientException e) {
log.error(e.getMessage(), e);
return TemplateChangeStatus.builder().status(false).id(uiTemplate.getTemplateId()).message("Can't add/update the template").build();
return TemplateChangeStatus.builder()
.status(false)
.id(uiTemplate.getTemplateId())
.message("Can't add/update the template")
.build();
}
}
}
......@@ -18,11 +18,9 @@
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import java.util.Base64;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
......@@ -31,9 +29,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@SuperDataset
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
......@@ -126,11 +123,7 @@ public class ZipkinSpanRecord extends Record {
record.setLatency(((Number) converter.get(LATENCY)).intValue());
record.setIsError(((Number) converter.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String) converter.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String) converter.get(DATA_BINARY)));
}
record.setDataBinary(converter.getWith(DATA_BINARY, HashMapConverter.ToEntity.Base64Decoder.INSTANCE));
record.setEncode(((Number) converter.get(ENCODE)).intValue());
// Don't read the tags as they have been in the data binary already.
return record;
......@@ -149,11 +142,7 @@ public class ZipkinSpanRecord extends Record {
converter.accept(LATENCY, storageData.getLatency());
converter.accept(IS_ERROR, storageData.getIsError());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
converter.accept(DATA_BINARY, Const.EMPTY_STRING);
} else {
converter.accept(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
converter.accept(DATA_BINARY, storageData.getDataBinary());
converter.accept(ENCODE, storageData.getEncode());
converter.accept(TAGS, storageData.getTags());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册