未验证 提交 3fae0abb 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

fix(sql): fixed column index dereference in "latest by" SQLs (#1501)

上级 f5be440f
......@@ -26,6 +26,8 @@ package io.questdb;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.mp.*;
import io.questdb.std.DirectObjectFactory;
import io.questdb.std.MemoryTag;
import io.questdb.tasks.*;
import org.jetbrains.annotations.NotNull;
......@@ -123,12 +125,23 @@ public class MessageBusImpl implements MessageBus {
latestByPubSeq.then(latestBySubSeq).then(latestByPubSeq);
// todo: move to configuration
this.tableWriterCommandQueue = new RingQueue<>(() -> new TableWriterTask(2048), 4);
this.tableWriterCommandQueue = new RingQueue<>(
TableWriterTask::new,
2048,
4,
MemoryTag.NATIVE_REPL
);
this.tableWriterCommandPubSeq = new MPSequence(this.tableWriterCommandQueue.getCycle());
this.tableWriterCommandSubSeq = new FanOut();
this.tableWriterCommandPubSeq.then(this.tableWriterCommandSubSeq).then(this.tableWriterCommandPubSeq);
this.tableWriterEventQueue = new RingQueue<>(() -> new TableWriterTask(2048), 4);
this.tableWriterEventQueue = new RingQueue<>(
TableWriterTask::new,
2048,
4,
MemoryTag.NATIVE_REPL
);
this.tableWriterEventPubSeq = new MPSequence(this.tableWriterCommandQueue.getCycle());
this.tableWriterEventSubSeq = new FanOut();
this.tableWriterEventPubSeq.then(this.tableWriterEventSubSeq).then(this.tableWriterEventPubSeq);
......
......@@ -226,6 +226,7 @@ public class TableWriter implements Closeable {
this.fileOperationRetryCount = configuration.getFileOperationRetryCount();
this.tableName = Chars.toString(tableName);
this.o3QuickSortEnabled = configuration.isO3QuickSortEnabled();
// todo: move these sequences and queue onto message bus
this.o3PartitionUpdateQueue = new RingQueue<O3PartitionUpdateTask>(O3PartitionUpdateTask.CONSTRUCTOR, configuration.getO3PartitionUpdateQueueCapacity());
this.o3PartitionUpdatePubSeq = new MPSequence(this.o3PartitionUpdateQueue.getCycle());
this.o3PartitionUpdateSubSeq = new SCSequence();
......
......@@ -81,11 +81,11 @@ class LineTcpMeasurementScheduler implements Closeable {
private final TableStructureAdapter tableStructureAdapter = new TableStructureAdapter();
private final Path path = new Path();
private final MemoryMARW ddlMem = Vm.getMARWInstance();
private final LineTcpReceiverConfiguration configuration;
private Sequence pubSeq;
private int loadCheckCycles = 0;
private int reshuffleCount = 0;
private LineTcpReceiver.SchedulerListener listener;
private final LineTcpReceiverConfiguration configuration;
LineTcpMeasurementScheduler(
LineTcpReceiverConfiguration lineConfiguration,
......@@ -117,11 +117,17 @@ class LineTcpMeasurementScheduler implements Closeable {
int maxMeasurementSize = lineConfiguration.getMaxMeasurementSize();
int queueSize = lineConfiguration.getWriterQueueCapacity();
queue = new RingQueue<>(
() -> new LineTcpMeasurementEvent(
maxMeasurementSize,
(address, addressSize) -> new LineTcpMeasurementEvent(
address,
addressSize,
lineConfiguration.getMicrosecondClock(),
lineConfiguration.getTimestampAdapter()),
queueSize);
lineConfiguration.getTimestampAdapter()
),
getEventSlotSize(maxMeasurementSize),
queueSize,
MemoryTag.NATIVE_DEFAULT
);
pubSeq = new MPSequence(queueSize);
int nWriterThreads = writerWorkerPool.getWorkerCount();
......@@ -173,14 +179,16 @@ class LineTcpMeasurementScheduler implements Closeable {
} finally {
tableUpdateDetailsLock.writeLock().unlock();
}
for (int n = 0; n < queue.getCycle(); n++) {
queue.get(n).close();
}
path.close();
ddlMem.close();
Misc.free(path);
Misc.free(ddlMem);
Misc.free(queue);
}
}
private static long getEventSlotSize(int maxMeasurementSize) {
return Numbers.ceilPow2((long) (maxMeasurementSize / 4) * (Integer.BYTES + Double.BYTES + 1));
}
@NotNull
private TableUpdateDetails assignTableToWriterThread(String tableName) {
TableUpdateDetails tableUpdateDetails;
......@@ -225,10 +233,6 @@ class LineTcpMeasurementScheduler implements Closeable {
return loadCheckCycles;
}
int getReshuffleCount() {
return reshuffleCount;
}
long getNextPublisherEventSequence() {
assert isOpen();
long seq;
......@@ -238,6 +242,10 @@ class LineTcpMeasurementScheduler implements Closeable {
return seq;
}
int getReshuffleCount() {
return reshuffleCount;
}
private boolean isOpen() {
return null != pubSeq;
}
......@@ -448,16 +456,21 @@ class LineTcpMeasurementScheduler implements Closeable {
private volatile boolean rebalanceReleasedByFromThread;
private boolean commitOnWriterClose;
private LineTcpMeasurementEvent(int maxMeasurementSize, MicrosecondClock clock, LineProtoTimestampAdapter timestampAdapter) {
bufSize = (long) (maxMeasurementSize / 4) * (Integer.BYTES + Double.BYTES + 1);
bufLo = Unsafe.malloc(bufSize, MemoryTag.NATIVE_DEFAULT);
private LineTcpMeasurementEvent(
long bufLo,
long bufSize,
MicrosecondClock clock,
LineProtoTimestampAdapter timestampAdapter
) {
this.bufLo = bufLo;
this.bufSize = bufSize;
this.clock = clock;
this.timestampAdapter = timestampAdapter;
}
@Override
public void close() {
Unsafe.free(bufLo, bufSize, MemoryTag.NATIVE_DEFAULT);
// this is concurrent writer release
tableUpdateDetails = Misc.free(tableUpdateDetails);
bufLo = 0;
}
......
......@@ -628,15 +628,18 @@ public class LogFactory implements Closeable {
private FanOut fanOut;
public Holder(int queueDepth, final int recordLength) {
this.ring = new RingQueue<>(() -> new LogRecordSink(recordLength), queueDepth);
this.ring = new RingQueue<>(
LogRecordSink::new,
Numbers.ceilPow2(recordLength),
queueDepth,
MemoryTag.NATIVE_DEFAULT
);
this.lSeq = new MPSequence(queueDepth);
}
@Override
public void close() {
for (int i = 0, n = ring.getCycle(); i < n; i++) {
Misc.free(ring.get(i));
}
Misc.free(ring);
}
}
......@@ -644,5 +647,6 @@ public class LogFactory implements Closeable {
reserved.add("scope");
reserved.add("class");
reserved.add("level");
Os.init();
}
}
......@@ -25,34 +25,25 @@
package io.questdb.log;
import io.questdb.std.Chars;
import io.questdb.std.MemoryTag;
import io.questdb.std.Numbers;
import io.questdb.std.Unsafe;
import io.questdb.std.str.AbstractCharSink;
import io.questdb.std.str.CharSink;
import java.io.Closeable;
public class LogRecordSink extends AbstractCharSink implements Closeable {
public class LogRecordSink extends AbstractCharSink {
private final long address;
private final long lim;
private long _wptr;
private int level;
LogRecordSink(int capacity) {
int c = Numbers.ceilPow2(capacity);
this.address = _wptr = Unsafe.malloc(c, MemoryTag.NATIVE_DEFAULT);
this.lim = address + c;
LogRecordSink(long address, long addressSize) {
this.address = _wptr = address;
this.lim = address + addressSize;
}
public void clear(int len) {
_wptr = address + len;
}
@Override
public void close() {
Unsafe.free(address, lim - address, MemoryTag.NATIVE_DEFAULT);
}
public long getAddress() {
return address;
......
......@@ -24,15 +24,20 @@
package io.questdb.mp;
import io.questdb.std.DirectObjectFactory;
import io.questdb.std.Misc;
import io.questdb.std.Numbers;
import io.questdb.std.ObjectFactory;
import io.questdb.std.Unsafe;
import java.io.Closeable;
public class RingQueue<T> implements Closeable {
private final int mask;
private final T[] buf;
private final long memory;
private long memorySize;
private final int memoryTag;
@SuppressWarnings("unchecked")
public RingQueue(ObjectFactory<T> factory, int cycle) {
......@@ -46,6 +51,26 @@ public class RingQueue<T> implements Closeable {
for (int i = 0; i < cycle; i++) {
buf[i] = factory.newInstance();
}
// heap based queue
this.memory = 0;
this.memorySize = 0;
this.memoryTag = 0;
}
@SuppressWarnings("unchecked")
public RingQueue(DirectObjectFactory<T> factory, long slotSize, int cycle, int memoryTag) {
this.mask = cycle - 1;
this.buf = (T[]) new Object[cycle];
this.memorySize = slotSize * cycle;
this.memoryTag = memoryTag;
this.memory = Unsafe.calloc(memorySize, memoryTag);
long p = memory;
for (int i = 0; i < cycle; i++) {
buf[i] = factory.newInstance(p, slotSize);
p += slotSize;
}
}
@Override
......@@ -53,6 +78,10 @@ public class RingQueue<T> implements Closeable {
for (int i = 0, n = buf.length; i < n; i++) {
Misc.free(buf[i]);
}
if (memorySize > 0) {
Unsafe.free(memory, memorySize, memoryTag);
this.memorySize = 0;
}
}
public T get(long cursor) {
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.std;
@FunctionalInterface
public interface DirectObjectFactory<T> {
T newInstance(long address, long addressSize);
}
......@@ -44,8 +44,8 @@ public class TableWriterTask implements Closeable {
private long sequence;
private long ip;
public TableWriterTask(long size) {
data = Unsafe.calloc(size, MemoryTag.NATIVE_REPL);
public TableWriterTask(long data, long size) {
this.data = data;
this.dataSize = size;
this.appendPtr = data;
this.appendLim = data + dataSize;
......@@ -54,7 +54,6 @@ public class TableWriterTask implements Closeable {
@Override
public void close() {
if (dataSize > 0) {
Unsafe.free(data, dataSize, MemoryTag.NATIVE_REPL);
dataSize = 0;
appendPtr = 0;
appendLim = 0;
......
......@@ -274,7 +274,12 @@ public class LogFactoryTest {
Assert.assertTrue(Files.touch(path.concat("mylog-2015-05-03.log.2").$()));
}
RingQueue<LogRecordSink> queue = new RingQueue<>(() -> new LogRecordSink(1024), 1024);
RingQueue<LogRecordSink> queue = new RingQueue<>(
LogRecordSink::new,
1024,
1024,
MemoryTag.NATIVE_DEFAULT
);
SPSequence pubSeq = new SPSequence(queue.getCycle());
SCSequence subSeq = new SCSequence();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册