提交 53a13292 编写于 作者: I Itami Sho 提交者: Steve Yurong Su

[IOTDB-5967] Pipe: fix convertToTablet bug and introduce PipeEmptyTabletInsertionEvent (#10044)

Co-authored-by: NSteve Yurong Su <rong@apache.org>
(cherry picked from commit 387c2102)
上级 15a4d199
......@@ -26,9 +26,8 @@ import java.io.IOException;
import java.util.function.BiConsumer;
/**
* Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},{@link
* TabletInsertionEvent#processByIterator(BiConsumer)} or {@link
* TabletInsertionEvent#processTablet(BiConsumer)}.
* Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},
* {@link TabletInsertionEvent#processTablet(BiConsumer)}.
*/
public interface RowCollector {
......
......@@ -33,8 +33,9 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
......@@ -117,13 +118,15 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
try {
if (tabletInsertionEvent instanceof PipeInsertNodeInsertionEvent) {
doTransfer((PipeInsertNodeInsertionEvent) tabletInsertionEvent);
} else if (tabletInsertionEvent instanceof PipeTabletInsertionEvent) {
doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
} else if (tabletInsertionEvent instanceof PipeTabletTabletInsertionEvent) {
doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent);
} else if (tabletInsertionEvent instanceof PipeEmptyTabletInsertionEvent) {
doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent);
} else {
throw new NotImplementedException(
"IoTDBThriftConnectorV1 only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent.");
"IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent.");
}
} catch (TException e) {
LOGGER.error(
......@@ -136,35 +139,40 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
}
}
private void doTransfer(PipeInsertNodeInsertionEvent pipeInsertNodeInsertionEvent)
private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, TException, WALPipeException {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferInsertNodeReq.toTPipeTransferReq(
pipeInsertNodeInsertionEvent.getInsertNode()));
pipeInsertNodeTabletInsertionEvent.getInsertNode()));
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Transfer PipeInsertNodeInsertionEvent %s error, result status %s",
pipeInsertNodeInsertionEvent, resp.status));
"Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
pipeInsertNodeTabletInsertionEvent, resp.status));
}
}
private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
private void doTransfer(PipeTabletTabletInsertionEvent pipeTabletTabletInsertionEvent)
throws PipeException, TException, IOException {
final TPipeTransferResp resp =
client.pipeTransfer(
PipeTransferTabletReq.toTPipeTransferReq(pipeTabletInsertionEvent.convertToTablet()));
PipeTransferTabletReq.toTPipeTransferReq(
pipeTabletTabletInsertionEvent.convertToTablet()));
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Transfer PipeTabletInsertionEvent %s error, result status %s",
pipeTabletInsertionEvent, resp.status));
"Transfer PipeTabletTabletInsertionEvent %s error, result status %s",
pipeTabletTabletInsertionEvent, resp.status));
}
}
private void doTransfer(PipeEmptyTabletInsertionEvent pipeEmptyTabletInsertionEvent) {
// do nothing
}
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.function.BiConsumer;
public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent {
@Override
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
return this;
}
@Override
public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) {
return this;
}
}
......@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
......@@ -38,21 +39,23 @@ import org.slf4j.LoggerFactory;
import java.util.function.BiConsumer;
public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements TabletInsertionEvent {
public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
implements TabletInsertionEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeInsertNodeInsertionEvent.class);
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
private final WALEntryHandler walEntryHandler;
private final ProgressIndex progressIndex;
private TabletInsertionDataContainer dataContainer;
public PipeInsertNodeInsertionEvent(
public PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
this(walEntryHandler, progressIndex, null, null);
}
private PipeInsertNodeInsertionEvent(
private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
ProgressIndex progressIndex,
PipeTaskMeta pipeTaskMeta,
......@@ -104,9 +107,10 @@ public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements Table
}
@Override
public PipeInsertNodeInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeInsertNodeInsertionEvent(walEntryHandler, progressIndex, pipeTaskMeta, pattern);
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler, progressIndex, pipeTaskMeta, pattern);
}
/////////////////////////// TabletInsertionEvent ///////////////////////////
......@@ -149,11 +153,24 @@ public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements Table
}
}
@TestOnly
public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) {
try {
if (dataContainer == null) {
dataContainer = new TabletInsertionDataContainer(insertNode, pattern);
}
return dataContainer.convertToTablet();
} catch (Exception e) {
LOGGER.error("Process tablet error.", e);
throw new PipeException("Process tablet error.", e);
}
}
/////////////////////////// Object ///////////////////////////
@Override
public String toString() {
return "PipeTabletInsertionEvent{"
return "PipeTabletTabletInsertionEvent{"
+ "walEntryHandler="
+ walEntryHandler
+ ", progressIndex="
......
......@@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Objects;
import java.util.function.BiConsumer;
public class PipeTabletInsertionEvent implements TabletInsertionEvent {
public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent {
private final Tablet tablet;
private final String pattern;
private TabletInsertionDataContainer dataContainer;
public PipeTabletInsertionEvent(Tablet tablet) {
public PipeTabletTabletInsertionEvent(Tablet tablet) {
this(Objects.requireNonNull(tablet), null);
}
public PipeTabletInsertionEvent(Tablet tablet, String pattern) {
public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) {
this.tablet = Objects.requireNonNull(tablet);
this.pattern = pattern;
}
......
......@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
......@@ -36,8 +36,8 @@ public class PipeRealtimeCollectEventFactory {
public static PipeRealtimeCollectEvent createCollectEvent(
WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeInsertionEvent(
new PipeInsertNodeInsertionEvent(walEntryHandler, insertNode.getProgressIndex()),
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
new PipeInsertNodeTabletInsertionEvent(walEntryHandler, insertNode.getProgressIndex()),
insertNode,
resource);
}
......
......@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.slf4j.Logger;
......@@ -59,8 +59,8 @@ public class TsFileEpochManager {
event.getPattern());
}
public PipeRealtimeCollectEvent bindPipeInsertNodeInsertionEvent(
PipeInsertNodeInsertionEvent event, InsertNode node, TsFileResource resource) {
public PipeRealtimeCollectEvent bindPipeInsertNodeTabletInsertionEvent(
PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) {
return new PipeRealtimeCollectEvent(
event,
filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new),
......
......@@ -39,7 +39,7 @@ public class PipeRow implements Row {
private final MeasurementSchema[] measurementSchemaList;
private final long[] timestampColumn;
private final Object[][] valueColumns;
private final Object[] valueColumns;
private final TSDataType[] valueColumnTypes;
private final String[] columnNameStringList;
......@@ -49,7 +49,7 @@ public class PipeRow implements Row {
String deviceId,
MeasurementSchema[] measurementSchemaList,
long[] timestampColumn,
Object[][] valueColumns,
Object[] valueColumns,
TSDataType[] valueColumnTypes,
String[] columnNameStringList) {
this.rowIndex = rowIndex;
......@@ -68,42 +68,42 @@ public class PipeRow implements Row {
@Override
public int getInt(int columnIndex) {
return (int) valueColumns[columnIndex][rowIndex];
return ((int[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public long getLong(int columnIndex) {
return (long) valueColumns[columnIndex][rowIndex];
return ((long[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public float getFloat(int columnIndex) {
return (float) valueColumns[columnIndex][rowIndex];
return ((float[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public double getDouble(int columnIndex) {
return (double) valueColumns[columnIndex][rowIndex];
return ((double[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public boolean getBoolean(int columnIndex) {
return (boolean) valueColumns[columnIndex][rowIndex];
return ((boolean[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public Binary getBinary(int columnIndex) {
return Binary.valueOf((String) valueColumns[columnIndex][rowIndex]);
return ((Binary[]) valueColumns[columnIndex])[rowIndex];
}
@Override
public String getString(int columnIndex) {
return (String) valueColumns[columnIndex][rowIndex];
return ((Binary[]) valueColumns[columnIndex])[rowIndex].getStringValue();
}
@Override
public Object getObject(int columnIndex) {
return valueColumns[columnIndex][rowIndex];
return ((Object[]) valueColumns[columnIndex])[rowIndex];
}
@Override
......@@ -113,7 +113,7 @@ public class PipeRow implements Row {
@Override
public boolean isNull(int columnIndex) {
return valueColumns[columnIndex][rowIndex] == null;
return ((Object[]) valueColumns[columnIndex])[rowIndex] == null;
}
@Override
......
......@@ -19,7 +19,8 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
......@@ -65,7 +66,12 @@ public class PipeRowCollector implements RowCollector {
}
public TabletInsertionEvent toTabletInsertionEvent() {
PipeTabletInsertionEvent tabletInsertionEvent = new PipeTabletInsertionEvent(tablet);
if (tablet == null) {
return new PipeEmptyTabletInsertionEvent();
}
PipeTabletTabletInsertionEvent tabletInsertionEvent =
new PipeTabletTabletInsertionEvent(tablet);
this.tablet = null;
return tabletInsertionEvent;
}
......
......@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
import org.apache.iotdb.pipe.api.access.Row;
......@@ -51,7 +52,7 @@ public class TabletInsertionDataContainer {
private String[] columnNameStringList;
private long[] timestampColumn;
private Object[][] valueColumns;
private Object[] valueColumns;
private TSDataType[] valueColumnTypes;
private BitMap[] nullValueColumnBitmaps;
private int rowCount;
......@@ -97,7 +98,7 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
this.valueColumns = new Object[filteredColumnSize][1];
this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
......@@ -111,7 +112,7 @@ public class TabletInsertionDataContainer {
final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
this.valueColumns[filteredColumnIndex][0] = originValueColumns[i];
this.valueColumns[filteredColumnIndex] = originValueColumns[i];
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
}
......@@ -139,7 +140,7 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
this.valueColumns = new Object[filteredColumnSize][];
this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
......@@ -188,7 +189,7 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
this.valueColumns = new Object[filteredColumnSize][];
this.valueColumns = new Object[filteredColumnSize];
this.valueColumnTypes = new TSDataType[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
......@@ -269,51 +270,53 @@ public class TabletInsertionDataContainer {
originMeasurementList, pattern, originColumnIndex2FilteredColumnIndexMapperList);
}
private Object[] convertToColumn(Object originColumn, TSDataType dataType, BitMap bitMap) {
private Object convertToColumn(Object originColumn, TSDataType dataType, BitMap bitMap) {
switch (dataType) {
case INT32:
final int[] intValues = (int[]) originColumn;
final Integer[] integerValues = new Integer[intValues.length];
final int[] integerValues = new int[intValues.length];
for (int i = 0; i < intValues.length; i++) {
integerValues[i] = bitMap != null && bitMap.isMarked(i) ? null : intValues[i];
integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : intValues[i];
}
return integerValues;
case INT64:
final long[] longValues = (long[]) originColumn;
final Long[] longValues2 = new Long[longValues.length];
final long[] longValues2 = new long[longValues.length];
for (int i = 0; i < longValues.length; i++) {
longValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : longValues[i];
longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : longValues[i];
}
return longValues2;
case FLOAT:
final float[] floatValues = (float[]) originColumn;
final Float[] floatValues2 = new Float[floatValues.length];
final float[] floatValues2 = new float[floatValues.length];
for (int i = 0; i < floatValues.length; i++) {
floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : floatValues[i];
floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : floatValues[i];
}
return floatValues2;
case DOUBLE:
final double[] doubleValues = (double[]) originColumn;
final Double[] doubleValues2 = new Double[doubleValues.length];
final double[] doubleValues2 = new double[doubleValues.length];
for (int i = 0; i < doubleValues.length; i++) {
doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : doubleValues[i];
doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : doubleValues[i];
}
return doubleValues2;
case BOOLEAN:
final boolean[] booleanValues = (boolean[]) originColumn;
final Boolean[] booleanValues2 = new Boolean[booleanValues.length];
final boolean[] booleanValues2 = new boolean[booleanValues.length];
for (int i = 0; i < booleanValues.length; i++) {
booleanValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : booleanValues[i];
booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && booleanValues[i];
}
return booleanValues2;
case TEXT:
final Binary[] binaryValues = (Binary[]) originColumn;
final String[] stringValues = new String[binaryValues.length];
final Binary[] stringValues = new Binary[binaryValues.length];
for (int i = 0; i < binaryValues.length; i++) {
stringValues[i] =
bitMap != null && bitMap.isMarked(i)
? null
: (binaryValues[i] == null ? null : binaryValues[i].getStringValue());
: (binaryValues[i] == null
? null
: Binary.valueOf(binaryValues[i].getStringValue()));
}
return stringValues;
default:
......@@ -326,6 +329,10 @@ public class TabletInsertionDataContainer {
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
final PipeRowCollector rowCollector = new PipeRowCollector();
if (valueColumns.length == 0) {
return new PipeEmptyTabletInsertionEvent();
}
for (int i = 0; i < timestampColumn.length; i++) {
consumer.accept(
new PipeRow(
......@@ -355,76 +362,17 @@ public class TabletInsertionDataContainer {
}
final int columnSize = measurementSchemaList.length;
final int rowSize = valueColumns[0].length;
final List<MeasurementSchema> measurementSchemaArrayList =
new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, columnSize));
final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, rowSize);
final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, rowCount);
newTablet.timestamps = timestampColumn;
newTablet.bitMaps = nullValueColumnBitmaps;
newTablet.values = squashFromColumnList(valueColumns, valueColumnTypes);
newTablet.values = valueColumns;
newTablet.rowSize = rowCount;
tablet = newTablet;
return tablet;
}
private Object[] squashFromColumnList(Object[][] valueColumns, TSDataType[] valueColumnTypes) {
final Object[] values = new Object[valueColumns.length];
for (int i = 0; i < valueColumns.length; i++) {
values[i] = squashFromColumn(valueColumns[i], valueColumnTypes[i]);
}
return values;
}
private Object squashFromColumn(Object[] valueColumn, TSDataType valueColumnType) {
switch (valueColumnType) {
case INT32:
final Integer[] intValues = (Integer[]) valueColumn;
final int[] intValues2 = new int[intValues.length];
for (int i = 0; i < intValues.length; i++) {
intValues2[i] = intValues[i] == null ? 0 : intValues[i];
}
return intValues2;
case INT64:
final Long[] longValues = (Long[]) valueColumn;
final long[] longValues2 = new long[longValues.length];
for (int i = 0; i < longValues.length; i++) {
longValues2[i] = longValues[i] == null ? 0 : longValues[i];
}
return longValues2;
case FLOAT:
final Float[] floatValues = (Float[]) valueColumn;
final float[] floatValues2 = new float[floatValues.length];
for (int i = 0; i < floatValues.length; i++) {
floatValues2[i] = floatValues[i] == null ? 0 : floatValues[i];
}
return floatValues2;
case DOUBLE:
final Double[] doubleValues = (Double[]) valueColumn;
final double[] doubleValues2 = new double[doubleValues.length];
for (int i = 0; i < doubleValues.length; i++) {
doubleValues2[i] = doubleValues[i] == null ? 0 : doubleValues[i];
}
return doubleValues2;
case BOOLEAN:
final Boolean[] booleanValues = (Boolean[]) valueColumn;
final boolean[] booleanValues2 = new boolean[booleanValues.length];
for (int i = 0; i < booleanValues.length; i++) {
booleanValues2[i] = booleanValues[i] != null && booleanValues[i];
}
return booleanValues2;
case TEXT:
final String[] stringValues = (String[]) valueColumn;
final Binary[] binaryValues = new Binary[stringValues.length];
for (int i = 0; i < stringValues.length; i++) {
binaryValues[i] = stringValues[i] == null ? null : Binary.valueOf(stringValues[i]);
}
return binaryValues;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", valueColumnType));
}
}
}
......@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.core.event.view.datastructure;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
......@@ -124,7 +124,7 @@ public class TsFileInsertionDataContainer {
@Override
public TabletInsertionEvent next() {
return new PipeTabletInsertionEvent(tabletIterator.next());
return new PipeTabletTabletInsertionEvent(tabletIterator.next());
}
};
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.core.event;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
public class PipeInsertNodeTabletInsertionEventTest {
InsertRowNode insertRowNode;
InsertTabletNode insertTabletNode;
final String deviceId = "root.sg.d1";
final long[] times = new long[] {110L, 111L, 112L, 113L, 114L};
final String[] measurementIds = new String[] {"s1", "s2", "s3", "s4", "s5", "s6"};
final TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.BOOLEAN,
TSDataType.TEXT
};
final MeasurementSchema[] schemas = new MeasurementSchema[6];
final Object[] values = new Object[6];
final String pattern = "root.sg.d1";
Tablet tabletForInsertRowNode;
Tablet tabletForInsertTabletNode;
@Before
public void setUp() throws Exception {
createMeasurementSchema();
createInsertRowNode();
createInsertTabletNode();
createTablet();
}
private void createMeasurementSchema() {
for (int i = 0; i < 6; i++) {
schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]);
}
}
private void createInsertRowNode() throws IllegalPathException {
insertRowNode =
new InsertRowNode(
new PlanNodeId("plan node 1"),
new PartialPath(deviceId),
false,
measurementIds,
dataTypes,
schemas,
times[0],
values,
false);
}
private void createInsertTabletNode() throws IllegalPathException {
this.insertTabletNode =
new InsertTabletNode(
new PlanNodeId("plannode 1"),
new PartialPath(deviceId),
false,
measurementIds,
dataTypes,
schemas,
times,
null,
values,
times.length);
}
private void createTablet() {
// create tablet for insertRowNode
BitMap[] bitMapsForInsertRowNode = new BitMap[6];
for (int i = 0; i < 6; i++) {
bitMapsForInsertRowNode[i] = new BitMap(1);
}
values[0] = new int[1];
values[1] = new long[1];
values[2] = new float[1];
values[3] = new double[1];
values[4] = new boolean[1];
values[5] = new Binary[1];
for (int r = 0; r < 1; r++) {
((int[]) values[0])[r] = 100;
((long[]) values[1])[r] = 10000;
((float[]) values[2])[r] = 2;
((double[]) values[3])[r] = 1.0;
((boolean[]) values[4])[r] = false;
((Binary[]) values[5])[r] = Binary.valueOf("text");
}
tabletForInsertRowNode = new Tablet(deviceId, Arrays.asList(schemas), 1);
tabletForInsertRowNode.values = values;
tabletForInsertRowNode.timestamps = new long[] {times[0]};
tabletForInsertRowNode.rowSize = 1;
tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
// create tablet for insertTabletNode
BitMap[] bitMapsForInsertTabletNode = new BitMap[6];
for (int i = 0; i < 6; i++) {
bitMapsForInsertTabletNode[i] = new BitMap(times.length);
}
values[0] = new int[5];
values[1] = new long[5];
values[2] = new float[5];
values[3] = new double[5];
values[4] = new boolean[5];
values[5] = new Binary[5];
for (int r = 0; r < 5; r++) {
((int[]) values[0])[r] = 100;
((long[]) values[1])[r] = 10000;
((float[]) values[2])[r] = 2;
((double[]) values[3])[r] = 1.0;
((boolean[]) values[4])[r] = false;
((Binary[]) values[5])[r] = Binary.valueOf("text");
}
tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), times.length);
tabletForInsertTabletNode.values = values;
tabletForInsertTabletNode.timestamps = times;
tabletForInsertTabletNode.rowSize = times.length;
tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode;
}
@Test
public void convertToTabletForTest() {
PipeInsertNodeTabletInsertionEvent event1 = new PipeInsertNodeTabletInsertionEvent(null, null);
Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern);
Assert.assertEquals(tablet1, tabletForInsertRowNode);
PipeInsertNodeTabletInsertionEvent event2 = new PipeInsertNodeTabletInsertionEvent(null, null);
Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern);
Assert.assertEquals(tablet2, tabletForInsertTabletNode);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册