未验证 提交 d3fd2e25 编写于 作者: S Steve Yurong Su 提交者: GitHub

[IOTDB-6041] Pipe: NPE when handling partial insertion using DisruptorQueue in...

[IOTDB-6041] Pipe: NPE when handling partial insertion using DisruptorQueue in extractor stage (#10424)
上级 12a7ae37
......@@ -283,6 +283,11 @@ public class TabletInsertionDataContainer {
for (int i = 0; i < originColumnSize; i++) {
final String measurement = originMeasurementList[i];
// ignore null measurement for partial insert
if (measurement == null) {
continue;
}
// low cost check comes first
if (pattern.length() == deviceId.length() + measurement.length() + 1
// high cost check comes later
......
......@@ -34,6 +34,7 @@ import java.util.List;
import java.util.concurrent.ThreadFactory;
public class DisruptorQueue<E> {
private Disruptor<Container<E>> disruptor;
private RingBuffer<Container<E>> ringBuffer;
......@@ -55,26 +56,11 @@ public class DisruptorQueue<E> {
private WaitStrategy waitStrategy = new BlockingWaitStrategy();
private final List<EventHandler<E>> handlers = new ArrayList<>();
public Builder<E> setRingBufferSize(int ringBufferSize) {
this.ringBufferSize = ringBufferSize;
return this;
}
public Builder<E> setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
public Builder<E> setProducerType(ProducerType producerType) {
this.producerType = producerType;
return this;
}
public Builder<E> setWaitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}
public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
this.handlers.add(eventHandler);
return this;
......@@ -90,6 +76,7 @@ public class DisruptorQueue<E> {
(container, sequence, endOfBatch) ->
handler.onEvent(container.getObj(), sequence, endOfBatch));
}
disruptorQueue.disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
disruptorQueue.disruptor.start();
disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
return disruptorQueue;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DisruptorQueueExceptionHandler implements ExceptionHandler<Object> {
private static final Logger LOGGER =
LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
@Override
public void handleEventException(final Throwable ex, final long sequence, final Object event) {
LOGGER.error("Exception processing: {} {}", sequence, event, ex);
}
@Override
public void handleOnStartException(final Throwable ex) {
LOGGER.warn("Exception during onStart()", ex);
}
@Override
public void handleOnShutdownException(final Throwable ex) {
LOGGER.warn("Exception during onShutdown()", ex);
}
}
......@@ -136,7 +136,12 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
else {
for (String measurement : measurements) {
for (final String measurement : measurements) {
// ignore null measurement for partial insert
if (measurement == null) {
continue;
}
// for example, pattern is root.a.b.c, device is root.a.b and measurement is c
// in this case, the extractor can be matched. other cases are not matched.
// please note that there should be a . between device and measurement.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册