提交 0a4171b9 编写于 作者: P peng-yongsheng

Define stream data.

上级 3f30e6dc
......@@ -51,59 +51,59 @@ public abstract class AbstractData {
this.byteColumns = byteColumns;
}
public int getDataStringsCount() {
public final int getDataStringsCount() {
return dataStrings.length;
}
public int getDataLongsCount() {
public final int getDataLongsCount() {
return dataLongs.length;
}
public int getDataDoublesCount() {
public final int getDataDoublesCount() {
return dataDoubles.length;
}
public int getDataIntegersCount() {
public final int getDataIntegersCount() {
return dataIntegers.length;
}
public int getDataBooleansCount() {
public final int getDataBooleansCount() {
return dataBooleans.length;
}
public int getDataBytesCount() {
public final int getDataBytesCount() {
return dataBytes.length;
}
public void setDataString(int position, String value) {
public final void setDataString(int position, String value) {
dataStrings[position] = value;
}
public void setDataLong(int position, Long value) {
public final void setDataLong(int position, Long value) {
dataLongs[position] = value;
}
public void setDataDouble(int position, Double value) {
public final void setDataDouble(int position, Double value) {
dataDoubles[position] = value;
}
public void setDataInteger(int position, Integer value) {
public final void setDataInteger(int position, Integer value) {
dataIntegers[position] = value;
}
public void setDataBoolean(int position, Boolean value) {
public final void setDataBoolean(int position, Boolean value) {
dataBooleans[position] = value;
}
public void setDataBytes(int position, byte[] dataBytes) {
public final void setDataBytes(int position, byte[] dataBytes) {
this.dataBytes[position] = dataBytes;
}
public String getDataString(int position) {
public final String getDataString(int position) {
return dataStrings[position];
}
public Long getDataLong(int position) {
public final Long getDataLong(int position) {
if (position + 1 > dataLongs.length) {
throw new IndexOutOfBoundsException();
} else if (dataLongs[position] == null) {
......@@ -113,7 +113,7 @@ public abstract class AbstractData {
}
}
public Double getDataDouble(int position) {
public final Double getDataDouble(int position) {
if (position + 1 > dataDoubles.length) {
throw new IndexOutOfBoundsException();
} else if (dataDoubles[position] == null) {
......@@ -123,7 +123,7 @@ public abstract class AbstractData {
}
}
public Integer getDataInteger(int position) {
public final Integer getDataInteger(int position) {
if (position + 1 > dataIntegers.length) {
throw new IndexOutOfBoundsException();
} else if (dataIntegers[position] == null) {
......@@ -133,15 +133,15 @@ public abstract class AbstractData {
}
}
public Boolean getDataBoolean(int position) {
public final Boolean getDataBoolean(int position) {
return dataBooleans[position];
}
public byte[] getDataBytes(int position) {
public final byte[] getDataBytes(int position) {
return dataBytes[position];
}
public void mergeData(AbstractData newData) {
public final void mergeData(AbstractData newData) {
for (int i = 0; i < stringColumns.length; i++) {
String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.getDataString(i));
this.dataStrings[i] = stringData;
......@@ -168,7 +168,7 @@ public abstract class AbstractData {
}
}
@Override public String toString() {
@Override public final String toString() {
StringBuilder dataStr = new StringBuilder();
dataStr.append("string: [");
for (String dataString : dataStrings) {
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.data;
/**
......@@ -25,5 +24,6 @@ package org.apache.skywalking.apm.collector.core.data;
public abstract class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_METRIC_ID = "metric_id";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
......@@ -23,7 +23,7 @@ import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
/**
* @author peng-yongsheng
*/
public abstract class StreamData implements RemoteData, QueueData {
public abstract class StreamData extends AbstractData implements RemoteData, QueueData {
private EndOfBatchContext endOfBatchContext;
......@@ -32,6 +32,23 @@ public abstract class StreamData implements RemoteData, QueueData {
}
@Override public final void setEndOfBatchContext(EndOfBatchContext context) {
this.endOfBatchContext = endOfBatchContext;
this.endOfBatchContext = context;
}
public StreamData(Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
Column[] integerColumns, Column[] booleanColumns, Column[] byteColumns) {
super(stringColumns, longColumns, doubleColumns, integerColumns, booleanColumns, byteColumns);
}
@Override public final String selectKey() {
return getMetricId();
}
public abstract String getId();
public abstract void setId(String id);
public abstract String getMetricId();
public abstract void setMetricId(String metricId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.data;
import org.junit.Assert;
import org.junit.Test;
/**
* @author wu-sheng
*/
public class AbstractHashMessageTest {
public class NewMessage extends AbstractHashMessage {
public NewMessage() {
super("key");
}
}
@Test
public void testHash() {
NewMessage message = new NewMessage();
Assert.assertEquals("key".hashCode(), message.getHashCode());
}
}
......@@ -19,17 +19,18 @@
package org.apache.skywalking.apm.collector.storage.table.alarm;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class ApplicationAlarm extends AbstractData implements Alarm {
public class ApplicationAlarm extends StreamData implements Alarm {
private static final Column[] STRING_COLUMNS = {
new Column(ApplicationAlarmTable.COLUMN_ID, new NonOperation()),
new Column(ApplicationAlarmTable.COLUMN_METRIC_ID, new NonOperation()),
new Column(ApplicationAlarmTable.COLUMN_ALARM_CONTENT, new CoverOperation()),
};
......@@ -49,8 +50,24 @@ public class ApplicationAlarm extends AbstractData implements Alarm {
private static final Column[] BYTE_COLUMNS = {};
public ApplicationAlarm(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
public ApplicationAlarm() {
super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override public String getId() {
return getDataString(0);
}
@Override public void setId(String id) {
setDataString(0, id);
}
@Override public String getMetricId() {
return getDataString(1);
}
@Override public void setMetricId(String metricId) {
setDataString(1, metricId);
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册