提交 13fa3030 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Add record persistence stream analysis. (#1683)

上级 7e23f495
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
......@@ -115,6 +116,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
annotationScan.registerListener(new RecordTypeListener(getManager()));
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.source.*;
......@@ -50,6 +51,8 @@ public class DispatcherManager {
this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()});
this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
......
......@@ -23,11 +23,11 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
public class MergeDataCache<INDICATOR extends Indicator> extends Window<MergeDataCollection<INDICATOR>> implements DataCache {
public class MergeDataCache<INDICATOR extends Indicator> extends Window<INDICATOR> implements DataCache {
private MergeDataCollection<INDICATOR> lockedMergeDataCollection;
private SWCollection<INDICATOR> lockedMergeDataCollection;
@Override public MergeDataCollection<INDICATOR> collectionInstance() {
@Override public SWCollection<INDICATOR> collectionInstance() {
return new MergeDataCollection<>();
}
......
......@@ -24,13 +24,14 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
/**
* @author peng-yongsheng
*/
public class MergeDataCollection<STREAM_DATA extends StreamData> implements Collection<Map<STREAM_DATA, STREAM_DATA>> {
private Map<STREAM_DATA, STREAM_DATA> data;
public class MergeDataCollection<STREAM_DATA extends StreamData> implements SWCollection<STREAM_DATA> {
private Map<STREAM_DATA, STREAM_DATA> collection;
private volatile boolean writing;
private volatile boolean reading;
MergeDataCollection() {
this.data = new HashMap<>();
this.collection = new HashMap<>();
this.writing = false;
this.reading = false;
}
......@@ -59,27 +60,27 @@ public class MergeDataCollection<STREAM_DATA extends StreamData> implements Coll
return reading;
}
boolean containsKey(STREAM_DATA key) {
return data.containsKey(key);
@Override public boolean containsKey(STREAM_DATA key) {
return collection.containsKey(key);
}
void put(STREAM_DATA value) {
data.put(value, value);
@Override public void put(STREAM_DATA value) {
collection.put(value, value);
}
public STREAM_DATA get(STREAM_DATA key) {
return data.get(key);
@Override public STREAM_DATA get(STREAM_DATA key) {
return collection.get(key);
}
@Override public int size() {
return data.size();
return collection.size();
}
@Override public void clear() {
data.clear();
collection.clear();
}
public Map<STREAM_DATA, STREAM_DATA> collection() {
return data;
@Override public Collection<STREAM_DATA> collection() {
return collection.values();
}
}
......@@ -18,21 +18,21 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.storage.StorageData;
/**
* @author peng-yongsheng
*/
public class NonMergeDataCache<STREAM_DATA extends StreamData> extends Window<NonMergeDataCollection<STREAM_DATA>> implements DataCache {
public class NonMergeDataCache<STORAGE_DATA extends StorageData> extends Window<STORAGE_DATA> implements DataCache {
private NonMergeDataCollection<STREAM_DATA> lockedMergeDataCollection;
private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
@Override public NonMergeDataCollection<STREAM_DATA> collectionInstance() {
@Override public SWCollection<STORAGE_DATA> collectionInstance() {
return new NonMergeDataCollection<>();
}
public void add(STREAM_DATA data) {
lockedMergeDataCollection.add(data);
public void add(STORAGE_DATA data) {
lockedMergeDataCollection.put(data);
}
@Override public void writing() {
......
......@@ -19,19 +19,19 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.storage.StorageData;
/**
* @author peng-yongsheng
*/
public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements Collection<List<STREAM_DATA>> {
public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements SWCollection<STORAGE_DATA> {
private final List<STREAM_DATA> data;
private final List<STORAGE_DATA> data;
private volatile boolean writing;
private volatile boolean reading;
NonMergeDataCollection() {
this.data = new LinkedList<>();
this.data = new ArrayList<>();
this.writing = false;
this.reading = false;
}
......@@ -60,10 +60,6 @@ public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements C
return reading;
}
void add(STREAM_DATA value) {
data.add(value);
}
@Override public int size() {
return data.size();
}
......@@ -72,7 +68,19 @@ public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements C
data.clear();
}
public List<STREAM_DATA> collection() {
@Override public boolean containsKey(STORAGE_DATA key) {
throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
}
@Override public STORAGE_DATA get(STORAGE_DATA key) {
throw new UnsupportedOperationException("None merge data collection not support get operation.");
}
@Override public void put(STORAGE_DATA value) {
data.add(value);
}
@Override public Collection<STORAGE_DATA> collection() {
return data;
}
}
......@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.Collection;
/**
* @author peng-yongsheng
*/
public interface Collection<Data> {
public interface SWCollection<DATA> {
void reading();
......@@ -39,5 +41,11 @@ public interface Collection<Data> {
void finishWriting();
Data collection();
Collection<DATA> collection();
boolean containsKey(DATA key);
DATA get(DATA key);
void put(DATA value);
}
......@@ -23,22 +23,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public abstract class Window<WINDOW_COLLECTION extends Collection> {
public abstract class Window<DATA> {
private AtomicInteger windowSwitch = new AtomicInteger(0);
private WINDOW_COLLECTION pointer;
private SWCollection<DATA> pointer;
private WINDOW_COLLECTION windowDataA;
private WINDOW_COLLECTION windowDataB;
private SWCollection<DATA> windowDataA;
private SWCollection<DATA> windowDataB;
protected Window() {
Window() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
}
public abstract WINDOW_COLLECTION collectionInstance();
public abstract SWCollection<DATA> collectionInstance();
public boolean trySwitchPointer() {
return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
......@@ -57,7 +57,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
getLast().reading();
}
protected WINDOW_COLLECTION getCurrentAndWriting() {
SWCollection<DATA> getCurrentAndWriting() {
if (pointer == windowDataA) {
windowDataA.writing();
return windowDataA;
......@@ -67,7 +67,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
}
}
private WINDOW_COLLECTION getCurrent() {
private SWCollection<DATA> getCurrent() {
return pointer;
}
......@@ -75,7 +75,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
return getCurrent().size();
}
public WINDOW_COLLECTION getLast() {
public SWCollection<DATA> getLast() {
if (pointer == windowDataA) {
return windowDataB;
} else {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.source.Segment;
/**
* @author peng-yongsheng
*/
public class SegmentDispatcher implements SourceDispatcher<Segment> {
@Override public void dispatch(Segment source) {
SegmentRecord segment = new SegmentRecord();
segment.setSegmentId(source.getSegmentId());
segment.setTraceId(source.getTraceId());
segment.setServiceId(source.getServiceId());
segment.setEndpointName(source.getEndpointName());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
RecordProcess.INSTANCE.in(segment);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import java.util.*;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
/**
* @author peng-yongsheng
*/
@RecordType
@StorageEntity(name = SegmentRecord.INDEX_NAME, builder = SegmentRecord.Builder.class)
public class SegmentRecord extends Record {
public static final String INDEX_NAME = "segment";
public static final String SEGMENT_ID = "segment_id";
public static final String TRACE_ID = "trace_id";
public static final String SERVICE_ID = "service_id";
public static final String ENDPOINT_NAME = "endpoint_name";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
@Setter @Getter @Column(columnName = SEGMENT_ID) @IDColumn private String segmentId;
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME) @IDColumn private String endpointName;
@Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
@Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
@Override public String id() {
return segmentId;
}
public static class Builder implements StorageBuilder<SegmentRecord> {
@Override public Map<String, Object> data2Map(SegmentRecord storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SEGMENT_ID, storageData.getSegmentId());
map.put(TRACE_ID, storageData.getTraceId());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
map.put(START_TIME, storageData.getStartTime());
map.put(END_TIME, storageData.getEndTime());
map.put(LATENCY, storageData.getLatency());
map.put(IS_ERROR, storageData.getIsError());
map.put(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
map.put(DATA_BINARY, Const.EMPTY_STRING);
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
return map;
}
@Override public SegmentRecord map2Data(Map<String, Object> dbMap) {
SegmentRecord record = new SegmentRecord();
record.setSegmentId((String)dbMap.get(SEGMENT_ID));
record.setTraceId((String)dbMap.get(TRACE_ID));
record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
}
return record;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.record;
import lombok.*;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
public abstract class Record implements StorageData {
public static final String TIME_BUCKET = "time_bucket";
@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.record.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RecordType {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.record.annotation;
import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class RecordTypeListener implements AnnotationListener {
private final ModuleManager moduleManager;
public RecordTypeListener(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
return RecordType.class;
}
@Override public void notify(Class aClass) {
RecordProcess.INSTANCE.create(moduleManager, aClass);
}
}
......@@ -71,7 +71,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
}
}
mergeDataCache.getLast().collection().forEach((Indicator key, Indicator data) -> {
mergeDataCache.getLast().collection().forEach(data -> {
if (logger.isDebugEnabled()) {
logger.debug(data.toString());
}
......
......@@ -19,9 +19,9 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
......@@ -31,48 +31,28 @@ import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, MergeDataCache<Indicator>> {
private static final Logger logger = LoggerFactory.getLogger(IndicatorPersistentWorker.class);
private final String modelName;
private final MergeDataCache<Indicator> mergeDataCache;
private final IBatchDAO batchDAO;
private final IIndicatorDAO indicatorDAO;
private final int blockBatchPersistenceSize;
private final AbstractWorker<Indicator> nextWorker;
IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
super(workerId);
super(moduleManager, workerId, batchSize);
this.modelName = modelName;
this.blockBatchPersistenceSize = batchSize;
this.mergeDataCache = new MergeDataCache<>();
this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
}
public final Window<MergeDataCollection<Indicator>> getCache() {
@Override public MergeDataCache<Indicator> getCache() {
return mergeDataCache;
}
@Override public final void in(Indicator input) {
if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
try {
if (getCache().trySwitchPointer()) {
getCache().switchPointer();
List<?> collection = buildBatchCollection();
batchDAO.batchPersistence(collection);
}
} finally {
getCache().trySwitchPointerFinally();
}
}
cacheData(input);
}
public boolean flushAndSwitch() {
boolean isSwitch;
try {
......@@ -85,29 +65,9 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
return isSwitch;
}
public final List<?> buildBatchCollection() {
List<?> batchCollection = new LinkedList<>();
try {
while (getCache().getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
if (getCache().getLast().collection() != null) {
batchCollection = prepareBatch(getCache().getLast());
}
} finally {
getCache().finishReadingLast();
}
return batchCollection;
}
private List<Object> prepareBatch(MergeDataCollection<Indicator> collection) {
@Override public List<Object> prepareBatch(MergeDataCache<Indicator> cache) {
List<Object> batchCollection = new LinkedList<>();
collection.collection().forEach((id, data) -> {
cache.getLast().collection().forEach(data -> {
Indicator dbData = null;
try {
dbData = indicatorDAO.get(modelName, data);
......@@ -131,7 +91,7 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
return batchCollection;
}
private void cacheData(Indicator input) {
@Override public void cacheData(Indicator input) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(input)) {
Indicator indicator = mergeDataCache.get(input);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.data.Window;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private final int batchSize;
private final IBatchDAO batchDAO;
PersistenceWorker(ModuleManager moduleManager, int workerId, int batchSize) {
super(workerId);
this.batchSize = batchSize;
this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
}
@Override public final void in(INPUT input) {
if (getCache().currentCollectionSize() >= batchSize) {
try {
if (getCache().trySwitchPointer()) {
getCache().switchPointer();
List<?> collection = buildBatchCollection();
batchDAO.batchPersistence(collection);
}
} finally {
getCache().trySwitchPointerFinally();
}
}
cacheData(input);
}
public abstract void cacheData(INPUT input);
public abstract CACHE getCache();
public boolean flushAndSwitch() {
boolean isSwitch;
try {
if (isSwitch = getCache().trySwitchPointer()) {
getCache().switchPointer();
}
} finally {
getCache().trySwitchPointerFinally();
}
return isSwitch;
}
public abstract List<Object> prepareBatch(CACHE cache);
public final List<?> buildBatchCollection() {
List<?> batchCollection = new LinkedList<>();
try {
while (getCache().getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
if (getCache().getLast().collection() != null) {
batchCollection = prepareBatch(getCache());
}
} finally {
getCache().finishReadingLast();
}
return batchCollection;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDataCache<Record>> {
private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);
private final String modelName;
private final NonMergeDataCache<Record> nonMergeDataCache;
private final IRecordDAO recordDAO;
RecordPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
IRecordDAO recordDAO) {
super(moduleManager, workerId, batchSize);
this.modelName = modelName;
this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
}
@Override public NonMergeDataCache<Record> getCache() {
return nonMergeDataCache;
}
@Override public List<Object> prepareBatch(NonMergeDataCache<Record> cache) {
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
return batchCollection;
}
@Override public void cacheData(Record input) {
nonMergeDataCache.writing();
nonMergeDataCache.add(input);
nonMergeDataCache.finishWriting();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public enum RecordProcess {
INSTANCE;
private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();
public void in(Record record) {
workers.get(record.getClass()).in(record);
}
@Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
public void create(ModuleManager moduleManager, Class<? extends Record> recordClass) {
String modelName = StorageEntityAnnotationUtils.getModelName(recordClass);
Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(recordClass);
StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("");
}
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, recordDAO);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
}
}
......@@ -24,5 +24,6 @@ package org.apache.skywalking.oap.server.core.source;
public enum Scope {
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
ServiceComponent, ServiceMapping
ServiceComponent, ServiceMapping,
Segment
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.*;
import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
/**
* @author peng-yongsheng
*/
@SourceType
public class Segment extends Source {
@Override public Scope scope() {
return Scope.Segment;
}
@Override public String getEntityId() {
return segmentId;
}
@Setter @Getter private String segmentId;
@Setter @Getter private String traceId;
@Setter @Getter private int serviceId;
@Setter @Getter private String endpointName;
@Setter @Getter private long startTime;
@Setter @Getter private long endTime;
@Setter @Getter private int latency;
@Setter @Getter private int isError;
@Setter @Getter private byte[] dataBinary;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
/**
* @author peng-yongsheng
*/
public interface IRecordDAO<INSERT> extends DAO {
INSERT prepareBatchInsert(String modelName, Record record) throws IOException;
void deleteHistory(String modelName, Long timeBucketBefore);
}
......@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.storage;
import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
......@@ -65,7 +65,11 @@ public enum PersistenceTimer {
long startTime = System.currentTimeMillis();
try {
List batchAllCollection = new LinkedList();
IndicatorProcess.INSTANCE.getPersistentWorkers().forEach(worker -> {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -30,4 +31,6 @@ public interface StorageDAO extends Service {
IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> storageBuilder);
IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder);
IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder);
}
......@@ -52,4 +52,12 @@ public class CollectionUtils {
public static <T> boolean isNotEmpty(T[] array) {
return array != null && array.length > 0;
}
public static boolean isEmpty(byte[] array) {
return array == null || array.length == 0;
}
public static boolean isNotEmpty(byte[] array) {
return !isEmpty(array);
}
}
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
......@@ -60,6 +61,7 @@ public class TraceModuleProvider extends ModuleProvider {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceComponentSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory());
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
try {
......
......@@ -78,9 +78,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
if (logger.isDebugEnabled()) {
logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
}
notifyListenerToBuild();
buildSegment(segmentCoreInfo.getSegmentId(), segmentDecorator.toByteArray());
return true;
}
} catch (Throwable e) {
......@@ -111,6 +109,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
segmentCoreInfo.setSegmentId(segmentIdBuilder.toString());
segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
......@@ -159,13 +158,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
return true;
}
private void buildSegment(String id, byte[] dataBinary) {
// Segment segment = new Segment();
// segment.setId(id);
// segment.setDataBinary(dataBinary);
// segment.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
if (logger.isDebugEnabled()) {
logger.debug("push to segment buffer write worker, id: {}", id);
......@@ -215,7 +207,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
private void notifyGlobalsListener(UniqueId uniqueId) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
if (listener.containsPoint(SpanListener.Point.TraceIds)) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
}
});
......
......@@ -18,80 +18,20 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
import lombok.*;
/**
* @author peng-yongsheng
*/
@Getter
@Setter
public class SegmentCoreInfo {
private String segmentId;
private String traceId;
private int applicationId;
private int applicationInstanceId;
private long startTime;
private long endTime;
private boolean isError;
private long minuteTimeBucket;
public String getSegmentId() {
return segmentId;
}
public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
}
public int getApplicationId() {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public boolean isError() {
return isError;
}
public void setError(boolean error) {
isError = error;
}
public long getMinuteTimeBucket() {
return minuteTimeBucket;
}
public void setMinuteTimeBucket(long minuteTimeBucket) {
this.minuteTimeBucket = minuteTimeBucket;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
private byte[] dataBinary;
}
......@@ -27,6 +27,6 @@ public interface SpanListener {
boolean containsPoint(Point point);
enum Point {
Entry, Exit, Local, First, GlobalTraceIds
Entry, Exit, Local, First, TraceIds
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener, GlobalTraceIdsListener {
private static final Logger logger = LoggerFactory.getLogger(SegmentSpanListener.class);
private final SourceReceiver sourceReceiver;
private final Segment segment = new Segment();
private final EndpointInventoryCache serviceNameCacheService;
private int entryEndpointId = 0;
private int firstEndpointId = 0;
private SegmentSpanListener(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
this.serviceNameCacheService = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
}
@Override public boolean containsPoint(Point point) {
return Point.First.equals(point) || Point.Entry.equals(point) || Point.TraceIds.equals(point);
}
@Override
public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
segment.setSegmentId(segmentCoreInfo.getSegmentId());
segment.setSegmentId(segmentCoreInfo.getSegmentId());
segment.setServiceId(segmentCoreInfo.getApplicationId());
segment.setLatency((int)(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime()));
segment.setStartTime(segmentCoreInfo.getStartTime());
segment.setEndTime(segmentCoreInfo.getEndTime());
segment.setIsError(BooleanUtils.booleanToValue(segmentCoreInfo.isError()));
segment.setTimeBucket(timeBucket);
segment.setDataBinary(segmentCoreInfo.getDataBinary());
firstEndpointId = spanDecorator.getOperationNameId();
}
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
entryEndpointId = spanDecorator.getOperationNameId();
}
@Override public void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo) {
StringBuilder traceIdBuilder = new StringBuilder();
for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
if (i == 0) {
traceIdBuilder.append(uniqueId.getIdPartsList().get(i));
} else {
traceIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i));
}
}
segment.setTraceId(traceIdBuilder.toString());
}
@Override public void build() {
if (logger.isDebugEnabled()) {
logger.debug("segment duration listener build");
}
if (entryEndpointId == 0) {
segment.setEndpointName(serviceNameCacheService.get(firstEndpointId).getName());
} else {
segment.setEndpointName(serviceNameCacheService.get(entryEndpointId).getName());
}
sourceReceiver.receive(segment);
}
public static class Factory implements SpanListenerFactory {
@Override public SpanListener create(ModuleManager moduleManager) {
return new SegmentSpanListener(moduleManager);
}
}
}
......@@ -32,8 +32,8 @@
<logger name="io.netty" level="INFO"/>
<logger name="org.apache.http" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
<Root level="DEBUG">
<logger name="org.apache.skywalking.oap.server.core.remote" level="INFO"/>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -37,6 +37,8 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "keyword";
} else if (IntKeyLongValueArray.class.equals(type)) {
return "keyword";
} else if (byte[].class.equals(type)) {
return "binary";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
*/
public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
private final StorageBuilder<Record> storageBuilder;
public RecordEsDAO(ElasticSearchClient client, StorageBuilder<Record> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
@Override public IndexRequest prepareBatchInsert(String modelName, Record record) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(record);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) {
Object value = objectMap.get(key);
if (value instanceof StorageDataType) {
builder.field(key, ((StorageDataType)value).toStorageData());
} else {
builder.field(key, value);
}
}
builder.endObject();
return getClient().prepareInsert(modelName, record.id(), builder);
}
@Override public void deleteHistory(String modelName, Long timeBucketBefore) {
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -39,4 +40,8 @@ public class StorageEsDAO extends EsDAO implements StorageDAO {
@Override public IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder) {
return new RegisterEsDAO(getClient(), storageBuilder);
}
@Override public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
return new RecordEsDAO(getClient(), storageBuilder);
}
}
......@@ -19,11 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
......@@ -80,9 +81,9 @@ public class StorageEsInstaller extends ModelInstaller {
Settings settings = createSettingBuilder();
try {
mappingBuilder = createMappingBuilder(tableDefine);
logger.info("mapping builder str: {}", mappingBuilder.prettyPrint());
logger.info("index {}'s mapping builder str: {}", tableDefine.getName(), Strings.toString(mappingBuilder.prettyPrint()));
} catch (Exception e) {
logger.error("create {} index mapping builder error", tableDefine.getName());
logger.error("create {} index mapping builder error, error message: {}", tableDefine.getName(), e.getMessage());
}
boolean isAcknowledged;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册