未验证 提交 7f849487 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

fix(core): NativeLPSZ does not work correctly with UTF8 names (#1623)

上级 235cdef6
......@@ -287,7 +287,7 @@ public class CairoEngine implements Closeable, WriterSource {
if (null == lockedReason) {
boolean locked = readerPool.lock(tableName);
if (locked) {
LOG.info().$("locked [table=`").$(tableName).$("`, thread=").$(Thread.currentThread().getId()).$(']').$();
LOG.info().$("locked [table=`").utf8(tableName).$("`, thread=").$(Thread.currentThread().getId()).$(']').$();
return null;
}
writerPool.unlock(tableName);
......@@ -446,7 +446,7 @@ public class CairoEngine implements Closeable, WriterSource {
) {
readerPool.unlock(tableName);
writerPool.unlock(tableName, writer, newTable);
LOG.info().$("unlocked [table=`").$(tableName).$("`]").$();
LOG.info().$("unlocked [table=`").utf8(tableName).$("`]").$();
}
public void unlockReaders(CharSequence tableName) {
......
......@@ -164,7 +164,7 @@ public class O3OpenColumnJob extends AbstractQueueConsumerJob<O3OpenColumnTask>
public static void openColumn(O3OpenColumnTask task, long cursor, Sequence subSeq, long tmpBuf) {
final int openColumnMode = task.getOpenColumnMode();
final CharSequence pathToTable = task.getPathToTable();
final Path pathToTable = task.getPathToTable();
final int columnType = task.getColumnType();
final CharSequence columnName = task.getColumnName();
final long srcOooLo = task.getSrcOooLo();
......@@ -251,7 +251,7 @@ public class O3OpenColumnJob extends AbstractQueueConsumerJob<O3OpenColumnTask>
public static void openColumn(
int openColumnMode,
CharSequence pathToTable,
Path pathToTable,
CharSequence columnName,
AtomicInteger columnCounter,
AtomicInteger partCounter,
......
......@@ -55,7 +55,7 @@ public class O3PartitionJob extends AbstractQueueConsumerJob<O3PartitionTask> {
}
public static void processPartition(
CharSequence pathToTable,
Path pathToTable,
int partitionBy,
ObjList<MemoryMAR> columns,
ObjList<MemoryCARW> oooColumns,
......@@ -543,7 +543,7 @@ public class O3PartitionJob extends AbstractQueueConsumerJob<O3PartitionTask> {
// find "current" partition boundary in the out of order data
// once we know the boundary we can move on to calculating another one
// srcOooHi is index inclusive of value
final CharSequence pathToTable = task.getPathToTable();
final Path pathToTable = task.getPathToTable();
final int partitionBy = task.getPartitionBy();
final ObjList<MemoryMAR> columns = task.getColumns();
final ObjList<MemoryCARW> oooColumns = task.getO3Columns();
......@@ -616,7 +616,7 @@ public class O3PartitionJob extends AbstractQueueConsumerJob<O3PartitionTask> {
private static void publishOpenColumnTaskHarmonized(
long cursor,
int openColumnMode,
CharSequence pathToTable,
Path pathToTable,
CharSequence columnName,
AtomicInteger columnCounter,
AtomicInteger partCounter,
......@@ -704,7 +704,7 @@ public class O3PartitionJob extends AbstractQueueConsumerJob<O3PartitionTask> {
long txn,
ObjList<MemoryMAR> columns,
ObjList<MemoryCARW> oooColumns,
CharSequence pathToTable,
Path pathToTable,
long srcOooLo,
long srcOooHi,
long srcOooMax,
......@@ -916,7 +916,7 @@ public class O3PartitionJob extends AbstractQueueConsumerJob<O3PartitionTask> {
long tmpBuf,
long cursor,
int openColumnMode,
CharSequence pathToTable,
Path pathToTable,
CharSequence columnName,
AtomicInteger columnCounter,
AtomicInteger partCounter,
......
......@@ -32,7 +32,6 @@ import io.questdb.mp.RingQueue;
import io.questdb.mp.Sequence;
import io.questdb.std.*;
import io.questdb.std.str.MutableCharSink;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.tasks.O3PurgeDiscoveryTask;
......@@ -44,7 +43,7 @@ public class O3PurgeDiscoveryJob extends AbstractQueueConsumerJob<O3PurgeDiscove
private final static Log LOG = LogFactory.getLog(O3PurgeDiscoveryJob.class);
private final CairoConfiguration configuration;
private final MutableCharSink[] sink;
private final NativeLPSZ[] nativeLPSZ;
private final StringSink[] fileNameSinks;
private final LongList[] txnList;
private final RingQueue<O3PurgeTask> purgeQueue;
private final Sequence purgePubSeq;
......@@ -55,11 +54,11 @@ public class O3PurgeDiscoveryJob extends AbstractQueueConsumerJob<O3PurgeDiscove
this.purgeQueue = messageBus.getO3PurgeQueue();
this.purgePubSeq = messageBus.getO3PurgePubSeq();
this.sink = new MutableCharSink[workerCount];
this.nativeLPSZ = new NativeLPSZ[workerCount];
this.fileNameSinks = new StringSink[workerCount];
this.txnList = new LongList[workerCount];
for (int i = 0; i < workerCount; i++) {
sink[i] = new StringSink();
nativeLPSZ[i] = new NativeLPSZ();
fileNameSinks[i] = new StringSink();
txnList[i] = new LongList();
}
}
......@@ -67,7 +66,7 @@ public class O3PurgeDiscoveryJob extends AbstractQueueConsumerJob<O3PurgeDiscove
public static boolean discoverPartitions(
FilesFacade ff,
MutableCharSink sink,
NativeLPSZ nativeLPSZ,
StringSink fileNameSink,
LongList txnList,
RingQueue<O3PurgeTask> purgeQueue,
@Nullable Sequence purgePubSeq,
......@@ -93,7 +92,7 @@ public class O3PurgeDiscoveryJob extends AbstractQueueConsumerJob<O3PurgeDiscove
if (p > 0) {
try {
do {
processDir(sink, nativeLPSZ, tableName, txnList, ff.findName(p), ff.findType(p));
processDir(sink, fileNameSink, tableName, txnList, ff.findName(p), ff.findType(p));
} while (ff.findNext(p) > 0);
} finally {
ff.findClose(p);
......@@ -163,24 +162,23 @@ public class O3PurgeDiscoveryJob extends AbstractQueueConsumerJob<O3PurgeDiscove
private static void processDir(
MutableCharSink sink,
NativeLPSZ nativeLPSZ,
StringSink fileNameSink,
CharSequence tableName,
LongList txnList,
long name,
long pUtf8NameZ,
int type
) {
if (type == Files.DT_DIR) {
nativeLPSZ.of(name);
if (Chars.notDots(nativeLPSZ) && Chars.startsWith(nativeLPSZ, sink)) {
if (Files.isDir(pUtf8NameZ, type, fileNameSink)) {
if (Chars.startsWith(fileNameSink, sink)) {
// extract txn from name
int index = Chars.lastIndexOf(nativeLPSZ, '.');
int index = Chars.lastIndexOf(fileNameSink, '.');
if (index < 0) {
txnList.add(-1);
} else {
try {
txnList.add(Numbers.parseLong(nativeLPSZ, index + 1, nativeLPSZ.length()));
txnList.add(Numbers.parseLong(fileNameSink, index + 1, fileNameSink.length()));
} catch (NumericException e) {
LOG.error().$("unknown directory [table=").utf8(tableName).$(", dir=").utf8(nativeLPSZ).$(']').$();
LOG.error().$("unknown directory [table=").utf8(tableName).$(", dir=").utf8(fileNameSink).$(']').$();
}
}
}
......@@ -193,7 +191,7 @@ public class O3PurgeDiscoveryJob extends AbstractQueueConsumerJob<O3PurgeDiscove
final boolean useful = discoverPartitions(
configuration.getFilesFacade(),
sink[workerId],
nativeLPSZ[workerId],
fileNameSinks[workerId],
txnList[workerId],
purgeQueue,
purgePubSeq,
......
......@@ -42,7 +42,6 @@ import io.questdb.std.*;
import io.questdb.std.datetime.DateFormat;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.str.LPSZ;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.tasks.*;
......@@ -56,7 +55,6 @@ import java.util.function.LongConsumer;
import static io.questdb.cairo.StatusCode.*;
import static io.questdb.cairo.TableUtils.*;
import static io.questdb.std.Files.isDots;
public class TableWriter implements Closeable {
public static final int TIMESTAMP_MERGE_ENTRY_BYTES = Long.BYTES * 2;
......@@ -89,7 +87,6 @@ public class TableWriter implements Closeable {
private final int rootLen;
private final MemoryMR metaMem;
private final int partitionBy;
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final LongList columnTops;
private final FilesFacade ff;
private final DateFormat partitionDirFmt;
......@@ -115,7 +112,6 @@ public class TableWriter implements Closeable {
private final PartitionBy.PartitionFloorMethod partitionFloorMethod;
private final PartitionBy.PartitionCeilMethod partitionCeilMethod;
private final int defaultCommitMode;
private final FindVisitor removePartitionDirectories = this::removePartitionDirectories0;
private final int o3ColumnMemorySize;
private final ObjList<Runnable> nullSetters;
private final ObjList<Runnable> o3NullSetters;
......@@ -130,13 +126,15 @@ public class TableWriter implements Closeable {
private final AtomicInteger o3ErrorCount = new AtomicInteger();
private final MemoryMARW todoMem = Vm.getMARWInstance();
private final TxWriter txWriter;
private final FindVisitor removePartitionDirsNotAttached = this::removePartitionDirsNotAttached;
private final LongList o3PartitionRemoveCandidates = new LongList();
private final ObjectPool<O3MutableAtomicInteger> o3ColumnCounters = new ObjectPool<>(O3MutableAtomicInteger::new, 64);
private final ObjectPool<O3Basket> o3BasketPool = new ObjectPool<>(O3Basket::new, 64);
private final TxnScoreboard txnScoreboard;
private final StringSink o3Sink = new StringSink();
private final NativeLPSZ o3NativeLPSZ = new NativeLPSZ();
private final StringSink fileNameSink = new StringSink();
private final FindVisitor removePartitionDirectories = this::removePartitionDirectories0;
private final FindVisitor removePartitionDirsNotAttached = this::removePartitionDirsNotAttached;
private final StringSink o3FileNameSink = new StringSink();
private final RingQueue<O3PartitionUpdateTask> o3PartitionUpdateQueue;
private final MPSequence o3PartitionUpdatePubSeq;
private final SCSequence o3PartitionUpdateSubSeq;
......@@ -1932,7 +1930,7 @@ public class TableWriter implements Closeable {
O3PurgeDiscoveryJob.discoverPartitions(
ff,
o3Sink,
o3NativeLPSZ,
o3FileNameSink,
rowValueIsNotNull, // reuse, this is only called from writer close
purgeQueue,
purgePubSeq,
......@@ -3901,11 +3899,10 @@ public class TableWriter implements Closeable {
private void removeColumnFiles(CharSequence columnName, int columnType, RemoveFileLambda removeLambda) {
try {
ff.iterateDir(path.$(), (file, type) -> {
nativeLPSZ.of(file);
if (type == Files.DT_DIR && IGNORED_FILES.excludes(nativeLPSZ)) {
ff.iterateDir(path.$(), (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(rootLen);
path.concat(nativeLPSZ);
path.concat(pUtf8NameZ);
int plen = path.length();
removeLambda.remove(ff, dFile(path, columnName));
removeLambda.remove(ff, iFile(path.trimTo(plen), columnName));
......@@ -3966,11 +3963,10 @@ public class TableWriter implements Closeable {
private void removeIndexFiles(CharSequence columnName) {
try {
ff.iterateDir(path.$(), (file, type) -> {
nativeLPSZ.of(file);
if (type == Files.DT_DIR && IGNORED_FILES.excludes(nativeLPSZ)) {
ff.iterateDir(path.$(), (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(rootLen);
path.concat(nativeLPSZ);
path.concat(pUtf8NameZ);
int plen = path.length();
removeFileAndOrLog(ff, BitmapIndexUtils.keyFileName(path.trimTo(plen), columnName));
removeFileAndOrLog(ff, BitmapIndexUtils.valueFileName(path.trimTo(plen), columnName));
......@@ -4014,33 +4010,34 @@ public class TableWriter implements Closeable {
}
}
private void removePartitionDirectories0(long name, int type) {
path.trimTo(rootLen);
path.concat(name).$();
nativeLPSZ.of(name);
int errno;
if (IGNORED_FILES.excludes(nativeLPSZ) && type == Files.DT_DIR && (errno = ff.rmdir(path)) != 0) {
LOG.info().$("could not remove [path=").$(path).$(", errno=").$(errno).$(']').$();
private void removePartitionDirectories0(long pUtf8NameZ, int type) {
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(rootLen);
path.concat(pUtf8NameZ).$();
int errno;
if ((errno = ff.rmdir(path)) != 0) {
LOG.info().$("could not remove [path=").$(path).$(", errno=").$(errno).$(']').$();
}
}
}
private void removePartitionDirsNotAttached(long pName, int type) {
nativeLPSZ.of(pName);
if (!isDots(nativeLPSZ) && type == Files.DT_DIR) {
if (Chars.endsWith(nativeLPSZ, DETACHED_DIR_MARKER)) {
private void removePartitionDirsNotAttached(long pUtf8NameZ, int type) {
if (Files.isDir(pUtf8NameZ, type, fileNameSink)) {
if (Chars.endsWith(fileNameSink, DETACHED_DIR_MARKER)) {
// Do not remove detached partitions
// They are probably about to be attached.
return;
}
try {
long txn = 0;
int txnSep = Chars.indexOf(nativeLPSZ, '.');
int txnSep = Chars.indexOf(fileNameSink, '.');
if (txnSep < 0) {
txnSep = nativeLPSZ.length();
txnSep = fileNameSink.length();
} else {
txn = Numbers.parseLong(nativeLPSZ, txnSep + 1, nativeLPSZ.length());
txn = Numbers.parseLong(fileNameSink, txnSep + 1, fileNameSink.length());
}
long dirTimestamp = partitionDirFmt.parse(nativeLPSZ, 0, txnSep, null);
long dirTimestamp = partitionDirFmt.parse(fileNameSink, 0, txnSep, null);
if (txn <= txWriter.txn &&
(txWriter.attachedPartitionsContains(dirTimestamp) || txWriter.isActivePartition(dirTimestamp))) {
return;
......@@ -4051,7 +4048,7 @@ public class TableWriter implements Closeable {
// we rely on this behaviour to remove leftover directories created by OOO processing
}
path.trimTo(rootLen);
path.concat(pName).$();
path.concat(pUtf8NameZ).$();
int errno;
if ((errno = ff.rmdir(path)) == 0) {
LOG.info().$("removed partition dir: ").$(path).$();
......@@ -4126,13 +4123,12 @@ public class TableWriter implements Closeable {
private void renameColumnFiles(CharSequence columnName, CharSequence newName, int columnType) {
try {
ff.iterateDir(path.$(), (file, type) -> {
nativeLPSZ.of(file);
if (type == Files.DT_DIR && IGNORED_FILES.excludes(nativeLPSZ)) {
ff.iterateDir(path.$(), (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(rootLen);
path.concat(nativeLPSZ);
path.concat(pUtf8NameZ);
other.trimTo(rootLen);
other.concat(nativeLPSZ);
other.concat(pUtf8NameZ);
int plen = path.length();
renameFileOrLog(ff, dFile(path.trimTo(plen), columnName), dFile(other.trimTo(plen), newName));
renameFileOrLog(ff, iFile(path.trimTo(plen), columnName), iFile(other.trimTo(plen), newName));
......
......@@ -24,14 +24,16 @@
package io.questdb.cairo.mig;
import io.questdb.cairo.*;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryARW;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import org.jetbrains.annotations.Nullable;
......@@ -148,66 +150,62 @@ public class EngineMigration {
copyPath.of(root);
final int rootLen = path.length();
final NativeLPSZ nativeLPSZ = new NativeLPSZ();
ff.iterateDir(path.$(), (name, type) -> {
if (type == Files.DT_DIR) {
nativeLPSZ.of(name);
if (Chars.notDots(nativeLPSZ)) {
path.trimTo(rootLen);
path.concat(nativeLPSZ);
copyPath.trimTo(rootLen);
copyPath.concat(nativeLPSZ);
final int plen = path.length();
path.concat(TableUtils.META_FILE_NAME);
if (ff.exists(path.$())) {
final long fd = openFileRWOrFail(ff, path);
try {
int currentTableVersion = TableUtils.readIntOrFail(ff, fd, META_OFFSET_VERSION, mem, path);
if (currentTableVersion < latestVersion) {
LOG.info()
.$("upgrading [path=").$(path)
.$(",fromVersion=").$(currentTableVersion)
.$(",toVersion=").$(latestVersion)
.I$();
copyPath.trimTo(plen);
backupFile(ff, path, copyPath, TableUtils.META_FILE_NAME, currentTableVersion);
path.trimTo(plen);
context.of(path, copyPath, fd);
for (int ver = currentTableVersion + 1; ver <= latestVersion; ver++) {
final MigrationAction migration = getMigrationToVersion(ver);
if (migration != null) {
try {
LOG.info().$("upgrading table [path=").$(path).$(",toVersion=").$(ver).I$();
migration.migrate(context);
path.trimTo(plen);
} catch (Throwable e) {
LOG.error().$("failed to upgrade table path=")
.$(path.trimTo(plen))
.$(", exception: ")
.$(e).$();
throw e;
}
}
ff.iterateDir(path.$(), (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(rootLen);
path.concat(pUtf8NameZ);
copyPath.trimTo(rootLen);
copyPath.concat(pUtf8NameZ);
final int plen = path.length();
path.concat(TableUtils.META_FILE_NAME);
if (ff.exists(path.$())) {
final long fd = openFileRWOrFail(ff, path);
try {
int currentTableVersion = TableUtils.readIntOrFail(ff, fd, META_OFFSET_VERSION, mem, path);
if (currentTableVersion < latestVersion) {
LOG.info()
.$("upgrading [path=").$(path)
.$(",fromVersion=").$(currentTableVersion)
.$(",toVersion=").$(latestVersion)
.I$();
copyPath.trimTo(plen);
backupFile(ff, path, copyPath, TableUtils.META_FILE_NAME, currentTableVersion);
TableUtils.writeIntOrFail(
ff,
fd,
META_OFFSET_VERSION,
ver,
mem,
path.trimTo(plen)
);
path.trimTo(plen);
context.of(path, copyPath, fd);
for (int ver = currentTableVersion + 1; ver <= latestVersion; ver++) {
final MigrationAction migration = getMigrationToVersion(ver);
if (migration != null) {
try {
LOG.info().$("upgrading table [path=").$(path).$(",toVersion=").$(ver).I$();
migration.migrate(context);
path.trimTo(plen);
} catch (Throwable e) {
LOG.error().$("failed to upgrade table path=")
.$(path.trimTo(plen))
.$(", exception: ")
.$(e).$();
throw e;
}
}
TableUtils.writeIntOrFail(
ff,
fd,
META_OFFSET_VERSION,
ver,
mem,
path.trimTo(plen)
);
}
} finally {
ff.close(fd);
path.trimTo(plen);
copyPath.trimTo(plen);
}
} finally {
ff.close(fd);
path.trimTo(plen);
copyPath.trimTo(plen);
}
}
}
......
......@@ -269,7 +269,10 @@ public class ReaderPool extends AbstractPool implements ResourcePool<TableReader
if (r != null) {
r.goodby();
r.close();
LOG.info().$("closed '").$(r.getTableName()).$("' [at=").$(entry.index).$(':').$(index).$(", reason=").$(PoolConstants.closeReasonText(reason)).$(']').$();
LOG.info().$("closed '").utf8(r.getTableName())
.$("' [at=").$(entry.index).$(':').$(index)
.$(", reason=").$(PoolConstants.closeReasonText(reason))
.I$();
notifyListener(thread, r.getTableName(), ev, entry.index, index);
entry.readers[index] = null;
}
......
......@@ -202,7 +202,7 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
$("[compiler: ").$(compilerNanos).
$(", count: ").$(recordCountNanos).
$(", execute: ").$(nanosecondClock.getTicks() - executeStartNanos).
$(", q=`").$(query).
$(", q=`").utf8(query).
$("`]").$();
}
......
......@@ -76,6 +76,6 @@ public final class QueryCache implements Closeable {
}
private void log(CharSequence action, CharSequence sql) {
LOG.info().$(action).$(" [thread=").$(Thread.currentThread().getName()).$(", sql=").$(sql).$(']').$();
LOG.info().$(action).$(" [thread=").$(Thread.currentThread().getName()).$(", sql=").utf8(sql).$(']').$();
}
}
......@@ -46,8 +46,8 @@ import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.datetime.DateFormat;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
......@@ -2489,16 +2489,15 @@ public class SqlCompiler implements Closeable {
protected final Path srcPath = new Path();
private final CharSequenceObjHashMap<RecordToRowCopier> tableBackupRowCopiedCache = new CharSequenceObjHashMap<>();
private final ObjHashSet<CharSequence> tableNames = new ObjHashSet<>();
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final Path dstPath = new Path();
private transient String cachedTmpBackupRoot;
private transient int changeDirPrefixLen;
private transient int currDirPrefixLen;
private final StringSink fileNameSink = new StringSink();
private final FindVisitor confFilesBackupOnFind = (file, type) -> {
nativeLPSZ.of(file);
if (type == Files.DT_FILE) {
srcPath.of(configuration.getConfRoot()).concat(nativeLPSZ).$();
dstPath.trimTo(currDirPrefixLen).concat(nativeLPSZ).$();
srcPath.of(configuration.getConfRoot()).concat(file).$();
dstPath.trimTo(currDirPrefixLen).concat(file).$();
LOG.info().$("backup copying config file [from=").$(srcPath).$(",to=").$(dstPath).I$();
if (ff.copy(srcPath, dstPath) < 0) {
throw CairoException.instance(ff.errno()).put("cannot backup conf file [to=").put(dstPath).put(']');
......@@ -2506,16 +2505,15 @@ public class SqlCompiler implements Closeable {
}
};
private transient SqlExecutionContext currentExecutionContext;
private final FindVisitor sqlDatabaseBackupOnFind = (file, type) -> {
nativeLPSZ.of(file);
if (type == Files.DT_DIR && nativeLPSZ.charAt(0) != '.') {
private final FindVisitor sqlDatabaseBackupOnFind = (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type, fileNameSink)) {
try {
backupTable(nativeLPSZ, currentExecutionContext);
} catch (CairoException ex) {
backupTable(fileNameSink, currentExecutionContext);
} catch (CairoException e) {
LOG.error()
.$("could not backup [path=").$(nativeLPSZ)
.$(", ex=").$(ex.getFlyweightMessage())
.$(", errno=").$(ex.getErrno())
.$("could not backup [path=").$(fileNameSink)
.$(", e=").$(e.getFlyweightMessage())
.$(", errno=").$(e.getErrno())
.$(']').$();
}
}
......
......@@ -24,7 +24,10 @@
package io.questdb.griffin;
import io.questdb.cairo.*;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.TableUtils;
import io.questdb.griffin.model.*;
import io.questdb.std.*;
import org.jetbrains.annotations.NotNull;
......@@ -90,6 +93,10 @@ public final class SqlParser {
this.expressionParser = new ExpressionParser(expressionNodePool, this, characterStore);
}
public static boolean isFullSampleByPeriod(ExpressionNode n) {
return n != null && (n.type == ExpressionNode.CONSTANT || (n.type == ExpressionNode.LITERAL && isValidSampleByPeriodLetter(n.token)));
}
private static SqlException err(GenericLexer lexer, String msg) {
return SqlException.$(lexer.lastTokenPosition(), msg);
}
......@@ -98,6 +105,61 @@ public final class SqlParser {
return SqlException.unexpectedToken(lexer.lastTokenPosition(), token);
}
private static boolean isValidSampleByPeriodLetter(CharSequence token) {
if (token.length() != 1) return false;
switch (token.charAt(0)) {
case 'T':
// millis
case 's':
// seconds
case 'm':
// minutes
case 'h':
// hours
case 'd':
// days
case 'M':
// months
case 'y':
return true;
default:
return false;
}
}
static int parseGeoHashBits(int position, int start, CharSequence sizeStr) throws SqlException {
assert start >= 0;
if (sizeStr.length() - start < 2) {
throw SqlException.position(position)
.put("invalid GEOHASH size, must be number followed by 'C' or 'B' character");
}
int size;
try {
size = Numbers.parseInt(sizeStr, start, sizeStr.length() - 1);
} catch (NumericException e) {
throw SqlException.position(position)
.put("invalid GEOHASH size, must be number followed by 'C' or 'B' character");
}
switch (sizeStr.charAt(sizeStr.length() - 1)) {
case 'C':
case 'c':
size *= 5;
break;
case 'B':
case 'b':
break;
default:
throw SqlException.position(position)
.put("invalid GEOHASH size units, must be 'c', 'C' for chars, or 'b', 'B' for bits");
}
if (size < 1 || size > ColumnType.GEO_HASH_MAX_BITS_LENGTH) {
throw SqlException.position(position)
.put("invalid GEOHASH type precision range, mast be [1, 60] bits, provided=")
.put(size);
}
return size;
}
private void addConcatArgs(ObjList<ExpressionNode> args, ExpressionNode leaf) {
if (leaf.type != ExpressionNode.FUNCTION || !isConcatFunction(leaf.token)) {
args.add(leaf);
......@@ -152,48 +214,6 @@ public final class SqlParser {
throw SqlException.$((lexer.lastTokenPosition()), "'by' expected");
}
private void expectSample(GenericLexer lexer, QueryModel model) throws SqlException {
final ExpressionNode n = expr(lexer, (QueryModel) null);
if (isFullSampleByPeriod(n)) {
model.setSampleBy(n);
return;
}
// This is complex expression of sample by period. It must follow time unit interval
ExpressionNode periodUnit = expectLiteral(lexer);
if (periodUnit == null || periodUnit.type != ExpressionNode.LITERAL || !isValidSampleByPeriodLetter(periodUnit.token)) {
int lexerPosition = lexer.getUnparsed() == null ? lexer.getPosition() : lexer.lastTokenPosition();
throw SqlException.$(periodUnit != null ? periodUnit.position : lexerPosition, "one letter sample by period unit expected");
}
model.setSampleBy(n, periodUnit);
}
public static boolean isFullSampleByPeriod(ExpressionNode n) {
return n != null && (n.type == ExpressionNode.CONSTANT || (n.type == ExpressionNode.LITERAL && isValidSampleByPeriodLetter(n.token)));
}
private static boolean isValidSampleByPeriodLetter(CharSequence token) {
if (token.length() != 1) return false;
switch (token.charAt(0)) {
case 'T':
// millis
case 's':
// seconds
case 'm':
// minutes
case 'h':
// hours
case 'd':
// days
case 'M':
// months
case 'y':
return true;
default:
return false;
}
}
private ExpressionNode expectExpr(GenericLexer lexer) throws SqlException {
final ExpressionNode n = expr(lexer, (QueryModel) null);
if (n != null) {
......@@ -257,6 +277,21 @@ public final class SqlParser {
throw SqlException.$((lexer.lastTokenPosition()), "'offset' expected");
}
private void expectSample(GenericLexer lexer, QueryModel model) throws SqlException {
final ExpressionNode n = expr(lexer, (QueryModel) null);
if (isFullSampleByPeriod(n)) {
model.setSampleBy(n);
return;
}
// This is complex expression of sample by period. It must follow time unit interval
ExpressionNode periodUnit = expectLiteral(lexer);
if (periodUnit == null || periodUnit.type != ExpressionNode.LITERAL || !isValidSampleByPeriodLetter(periodUnit.token)) {
int lexerPosition = lexer.getUnparsed() == null ? lexer.getPosition() : lexer.lastTokenPosition();
throw SqlException.$(periodUnit != null ? periodUnit.position : lexerPosition, "one letter sample by period unit expected");
}
model.setSampleBy(n, periodUnit);
}
private CharSequence expectTableNameOrSubQuery(GenericLexer lexer) throws SqlException {
return tok(lexer, "table name or sub-query");
}
......@@ -1064,7 +1099,9 @@ public final class SqlParser {
throw SqlException.$(lexer.lastTokenPosition(), "'into' expected");
}
model.setTableName(expectLiteral(lexer));
tok = tok(lexer, "table name");
model.setTableName(nextLiteral(GenericLexer.assertNoDotsAndSlashes(GenericLexer.unquote(tok), lexer.lastTokenPosition()), lexer.lastTokenPosition()));
tok = tok(lexer, "'(' or 'select'");
......@@ -1686,39 +1723,6 @@ public final class SqlParser {
return type;
}
static int parseGeoHashBits(int position, int start, CharSequence sizeStr) throws SqlException {
assert start >= 0;
if (sizeStr.length() - start < 2) {
throw SqlException.position(position)
.put("invalid GEOHASH size, must be number followed by 'C' or 'B' character");
}
int size;
try {
size = Numbers.parseInt(sizeStr, start, sizeStr.length() - 1);
} catch (NumericException e) {
throw SqlException.position(position)
.put("invalid GEOHASH size, must be number followed by 'C' or 'B' character");
}
switch (sizeStr.charAt(sizeStr.length() - 1)) {
case 'C':
case 'c':
size *= 5;
break;
case 'B':
case 'b':
break;
default:
throw SqlException.position(position)
.put("invalid GEOHASH size units, must be 'c', 'C' for chars, or 'b', 'B' for bits");
}
if (size < 1 || size > ColumnType.GEO_HASH_MAX_BITS_LENGTH) {
throw SqlException.position(position)
.put("invalid GEOHASH type precision range, mast be [1, 60] bits, provided=")
.put(size);
}
return size;
}
private @NotNull CharSequence tok(GenericLexer lexer, String expectedList) throws SqlException {
final int pos = lexer.getPosition();
CharSequence tok = optTok(lexer);
......@@ -1743,12 +1747,10 @@ public final class SqlParser {
case ')':
case ',':
case '`':
// case '"':
case '\'':
throw SqlException.position(pos).put("literal expected");
default:
break;
}
}
......
......@@ -33,7 +33,6 @@ import io.questdb.griffin.engine.functions.CursorFunction;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import org.jetbrains.annotations.Nullable;
......@@ -116,7 +115,7 @@ public abstract class AbstractClassCatalogueFunctionFactory implements FunctionF
private final DelegatingRecordImpl record = new DelegatingRecordImpl();
private final DiskReadingRecord diskReadingRecord = new DiskReadingRecord();
private final StaticReadingRecord staticReadingRecord = new StaticReadingRecord();
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final StringSink sink = new StringSink();
private final int plimit;
private final int[] intValues = new int[5];
private final long tempMem;
......@@ -188,11 +187,11 @@ public abstract class AbstractClassCatalogueFunctionFactory implements FunctionF
private boolean next0() {
do {
final long pname = ff.findName(findFileStruct);
nativeLPSZ.of(pname);
if (ff.findType(findFileStruct) == Files.DT_DIR && Chars.notDots(nativeLPSZ)) {
final long pUtf8NameZ = ff.findName(findFileStruct);
final long type = ff.findType(findFileStruct);
if (Files.isDir(pUtf8NameZ, type, sink)) {
path.trimTo(plimit);
if (ff.exists(path.concat(pname).concat(TableUtils.META_FILE_NAME).$())) {
if (ff.exists(path.concat(pUtf8NameZ).concat(TableUtils.META_FILE_NAME).$())) {
// open metadata file and read id
long fd = ff.openRO(path);
if (fd > -1) {
......@@ -251,7 +250,6 @@ public abstract class AbstractClassCatalogueFunctionFactory implements FunctionF
}
private class DiskReadingRecord implements Record {
private final StringSink utf8SinkA = new StringSink();
private final StringSink utf8SinkB = new StringSink();
@Override
......@@ -267,7 +265,7 @@ public abstract class AbstractClassCatalogueFunctionFactory implements FunctionF
@Override
public CharSequence getStr(int col) {
if (col == 0) {
return getName(utf8SinkA);
return sink;
}
return null;
}
......@@ -283,17 +281,16 @@ public abstract class AbstractClassCatalogueFunctionFactory implements FunctionF
@Override
public int getStrLen(int col) {
if (col == 0) {
CharSequence cs = getStr(col);
return cs != null ? cs.length() : -1;
return sink.length();
}
return -1;
}
@Nullable
private CharSequence getName(StringSink utf8SinkA) {
utf8SinkA.clear();
if (Chars.utf8DecodeZ(ff.findName(findFileStruct), utf8SinkA)) {
return utf8SinkA;
private CharSequence getName(StringSink sink) {
sink.clear();
if (Chars.utf8DecodeZ(ff.findName(findFileStruct), sink)) {
return sink;
} else {
return null;
}
......
......@@ -32,8 +32,8 @@ import io.questdb.griffin.engine.functions.CursorFunction;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
public class AttrDefCatalogueFunctionFactory implements FunctionFactory {
......@@ -96,7 +96,6 @@ public class AttrDefCatalogueFunctionFactory implements FunctionFactory {
private final Path path;
private final FilesFacade ff;
private final AttrDefCatalogueCursor.DiskReadingRecord diskReadingRecord = new AttrDefCatalogueCursor.DiskReadingRecord();
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final int plimit;
private final long tempMem;
private int tableId = -1;
......@@ -160,12 +159,12 @@ public class AttrDefCatalogueFunctionFactory implements FunctionFactory {
do {
if (readNextFileFromDisk) {
foundMetadataFile = false;
final long pname = ff.findName(findFileStruct);
final long pUtf8NameZ = ff.findName(findFileStruct);
if (hasNextFile) {
nativeLPSZ.of(pname);
if (ff.findType(findFileStruct) == Files.DT_DIR && Chars.notDots(nativeLPSZ)) {
final long type = ff.findType(findFileStruct);
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(plimit);
if (ff.exists(path.concat(pname).concat(TableUtils.META_FILE_NAME).$())) {
if (ff.exists(path.concat(pUtf8NameZ).concat(TableUtils.META_FILE_NAME).$())) {
long fd = ff.openRO(path);
if (fd > -1) {
if (ff.read(fd, tempMem, Integer.BYTES, TableUtils.META_OFFSET_TABLE_ID) == Integer.BYTES) {
......
......@@ -33,8 +33,8 @@ import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.CursorFunction;
import io.questdb.std.*;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import static io.questdb.cutlass.pgwire.PGOids.PG_TYPE_TO_SIZE_MAP;
......@@ -106,7 +106,6 @@ public class AttributeCatalogueFunctionFactory implements FunctionFactory {
private final Path path;
private final FilesFacade ff;
private final DiskReadingRecord diskReadingRecord = new DiskReadingRecord();
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final int plimit;
private final MemoryMR metaMem;
private long findFileStruct = 0;
......@@ -171,14 +170,12 @@ public class AttributeCatalogueFunctionFactory implements FunctionFactory {
do {
if (readNextFileFromDisk) {
foundMetadataFile = false;
final long pname = ff.findName(findFileStruct);
final long pUtf8NameZ = ff.findName(findFileStruct);
if (hasNextFile) {
nativeLPSZ.of(pname);
if (
ff.findType(findFileStruct) == Files.DT_DIR && Chars.notDots(nativeLPSZ)
) {
final long type = ff.findType(findFileStruct);
if (Files.isDir(pUtf8NameZ, type)) {
path.trimTo(plimit);
path.concat(pname);
path.concat(pUtf8NameZ);
if (ff.exists(path.concat(TableUtils.META_FILE_NAME).$())) {
foundMetadataFile = true;
metaMem.smallFile(ff, path, MemoryTag.MMAP_DEFAULT);
......@@ -277,7 +274,7 @@ public class AttributeCatalogueFunctionFactory implements FunctionFactory {
metadata.add(new TableColumnMetadata("atttypmod", 6, ColumnType.INT));
metadata.add(new TableColumnMetadata("attlen", 7, ColumnType.SHORT));
metadata.add(new TableColumnMetadata("attidentity", 8, ColumnType.CHAR));
metadata.add(new TableColumnMetadata("attisdropped", 9,ColumnType.BOOLEAN));
metadata.add(new TableColumnMetadata("attisdropped", 9, ColumnType.BOOLEAN));
METADATA = metadata;
}
}
......@@ -32,8 +32,8 @@ import io.questdb.griffin.engine.functions.CursorFunction;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import static io.questdb.cairo.TableUtils.META_FILE_NAME;
......@@ -110,7 +110,7 @@ public class TableMetadataCursorFactory implements FunctionFactory {
}
private class TableListRecordCursor implements RecordCursor {
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final StringSink sink = new StringSink();
private final TableListRecord record = new TableListRecord();
private long findPtr = 0;
private TableReaderMetadata metaReader;
......@@ -144,10 +144,8 @@ public class TableMetadataCursorFactory implements FunctionFactory {
return false;
}
}
nativeLPSZ.of(ff.findName(findPtr));
int type = ff.findType(findPtr);
if (type == Files.DT_DIR && nativeLPSZ.charAt(0) != '.') {
if (record.open(nativeLPSZ)) {
if (Files.isDir(ff.findName(findPtr), ff.findType(findPtr), sink)) {
if (record.open(sink)) {
return true;
}
}
......@@ -211,7 +209,7 @@ public class TableMetadataCursorFactory implements FunctionFactory {
@Override
public CharSequence getStr(int col) {
if (col == nameColumn) {
return nativeLPSZ;
return sink;
}
if (col == partitionByColumn) {
return PartitionBy.toString(partitionBy);
......@@ -234,7 +232,7 @@ public class TableMetadataCursorFactory implements FunctionFactory {
return getStr(col).length();
}
public boolean open(NativeLPSZ tableName) {
public boolean open(CharSequence tableName) {
int pathLen = path.length();
try {
path.chop$().concat(tableName).concat(META_FILE_NAME).$();
......
......@@ -33,8 +33,8 @@ import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.Files;
import io.questdb.std.FilesFacade;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
public class TableListRecordCursorFactory implements RecordCursorFactory {
private static final RecordMetadata METADATA;
......@@ -71,7 +71,7 @@ public class TableListRecordCursorFactory implements RecordCursorFactory {
}
private class TableListRecordCursor implements RecordCursor {
private final NativeLPSZ nativeLPSZ = new NativeLPSZ();
private final StringSink sink = new StringSink();
private final TableListRecord record = new TableListRecord();
private long findPtr = 0;
......@@ -101,9 +101,7 @@ public class TableListRecordCursorFactory implements RecordCursorFactory {
return false;
}
}
nativeLPSZ.of(ff.findName(findPtr));
int type = ff.findType(findPtr);
if (type == Files.DT_DIR && nativeLPSZ.charAt(0) != '.') {
if (Files.isDir(ff.findName(findPtr), ff.findType(findPtr), sink)) {
return true;
}
}
......@@ -138,7 +136,7 @@ public class TableListRecordCursorFactory implements RecordCursorFactory {
@Override
public CharSequence getStr(int col) {
if (col == 0) {
return nativeLPSZ;
return sink;
}
return null;
}
......
......@@ -485,17 +485,6 @@ public final class Chars {
return value != null && value.length() > 0;
}
public static boolean notDots(CharSequence value) {
final int len = value.length();
if (len > 2) {
return true;
}
if (value.charAt(0) != '.') {
return true;
}
return len == 2 && value.charAt(1) != '.';
}
public static CharSequence repeat(String s, int times) {
return new CharSequence() {
@Override
......
......@@ -26,6 +26,7 @@ package io.questdb.std;
import io.questdb.std.str.LPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import java.io.File;
import java.nio.charset.Charset;
......@@ -201,6 +202,41 @@ public final class Files {
}
}
public static boolean notDots(CharSequence value) {
final int len = value.length();
if (len > 2) {
return true;
}
if (value.charAt(0) != '.') {
return true;
}
return len == 2 && value.charAt(1) != '.';
}
public static boolean notDots(long pUtf8NameZ) {
final byte b0 = Unsafe.getUnsafe().getByte(pUtf8NameZ);
if (b0 != '.') {
return true;
}
final byte b1 = Unsafe.getUnsafe().getByte(pUtf8NameZ + 1);
return b1 != 0 && (b1 != '.' || Unsafe.getUnsafe().getByte(pUtf8NameZ + 2) != 0);
}
public static boolean isDir(long pUtf8NameZ, long type, StringSink nameSink) {
if (type == DT_DIR) {
nameSink.clear();
Chars.utf8DecodeZ(pUtf8NameZ, nameSink);
return notDots(nameSink);
}
return false;
}
public static boolean isDir(long pUtf8NameZ, long type) {
return type == DT_DIR && notDots(pUtf8NameZ);
}
public static long openAppend(LPSZ lpsz) {
return bumpFileCount(openAppend(lpsz.address()));
}
......
......@@ -26,5 +26,5 @@ package io.questdb.std;
@FunctionalInterface
public interface FindVisitor {
void onFind(long name, int type);
void onFind(long pUtf8NameZ, int type);
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.std.str;
import io.questdb.std.Unsafe;
/**
* Represents C LPSZ as Java' CharSequence. Bytes in native memory are
* interpreted as ASCII characters. Multi-byte characters are NOT decoded.
*/
public class NativeLPSZ extends AbstractCharSequence {
private long address;
private int len;
@Override
public int length() {
return len;
}
@Override
public char charAt(int index) {
return (char) Unsafe.getUnsafe().getByte(address + index);
}
@SuppressWarnings("StatementWithEmptyBody")
public NativeLPSZ of(long address) {
this.address = address;
long p = address;
while (Unsafe.getUnsafe().getByte(p++) != 0) ;
this.len = (int) (p - address - 1);
return this;
}
}
......@@ -70,6 +70,17 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return PATH.get().of(root);
}
/**
* Creates path from another instance of Path. The assumption is that
* the source path is already UTF8 encoded and does not require re-encoding.
*
* @param root path
* @return copy of root path
*/
public static Path getThreadLocal(Path root) {
return PATH.get().of(root);
}
public static Path getThreadLocal2(CharSequence root) {
return PATH2.get().of(root);
}
......@@ -82,11 +93,6 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return this;
}
public Path slash$() {
ensureSeparator();
return $();
}
@Override
public long address() {
return ptr;
......@@ -114,11 +120,11 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return concat(str, 0, str.length());
}
public Path concat(long lpsz) {
public Path concat(long pUtf8NameZ) {
ensureSeparator();
long p = lpsz;
long p = pUtf8NameZ;
while (true) {
if (len + OVERHEAD >= capacity) {
......@@ -183,6 +189,18 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return this;
}
@Override
public Path put(int value) {
super.put(value);
return this;
}
@Override
public Path put(long value) {
super.put(value);
return this;
}
@Override
public CharSink put(char[] chars, int start, int len) {
if (len + this.len >= capacity) {
......@@ -193,6 +211,15 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return this;
}
@Override
public void putUtf8Special(char c) {
if (c == '/' && Os.type == Os.WINDOWS) {
put('\\');
} else {
put(c);
}
}
@Override
public final int length() {
return len;
......@@ -245,12 +272,6 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return this;
}
private void checkClosed() {
if (ptr == 0) {
this.ptr = this.wptr = Unsafe.malloc(capacity + 1, MemoryTag.NATIVE_DEFAULT);
}
}
public Path of(CharSequence str, int from, int to) {
checkClosed();
this.wptr = ptr;
......@@ -258,36 +279,29 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return concat(str, from, to);
}
@Override
public Path put(long value) {
super.put(value);
return this;
}
@Override
public Path put(int value) {
super.put(value);
public Path slash() {
ensureSeparator();
return this;
}
@Override
public void putUtf8Special(char c) {
if (c == '/' && Os.type == Os.WINDOWS) {
put('\\');
} else {
put(c);
}
}
public Path slash() {
public Path slash$() {
ensureSeparator();
return this;
return $();
}
@Override
@NotNull
public String toString() {
return ptr == 0 ? "" : AbstractCharSequence.getString(this);
if (ptr != 0) {
final CharSink b = Misc.getThreadLocalBuilder();
if (Unsafe.getUnsafe().getByte(wptr - 1) == 0) {
Chars.utf8Decode(ptr, wptr - 1, b);
} else {
Chars.utf8Decode(ptr, wptr, b);
}
return b.toString();
}
return "";
}
public Path trimTo(int len) {
......@@ -296,6 +310,12 @@ public class Path extends AbstractCharSink implements Closeable, LPSZ {
return this;
}
private void checkClosed() {
if (ptr == 0) {
this.ptr = this.wptr = Unsafe.malloc(capacity + 1, MemoryTag.NATIVE_DEFAULT);
}
}
private void copy(CharSequence str, int from, int to) {
encodeUtf8(str, from, to);
}
......
......@@ -26,11 +26,12 @@ package io.questdb.tasks;
import io.questdb.cairo.BitmapIndexWriter;
import io.questdb.cairo.TableWriter;
import io.questdb.std.str.Path;
import java.util.concurrent.atomic.AtomicInteger;
public class O3OpenColumnTask {
private CharSequence pathToTable;
private Path pathToTable;
private AtomicInteger columnCounter;
private AtomicInteger partCounter;
private long txn;
......@@ -130,7 +131,7 @@ public class O3OpenColumnTask {
return partitionTimestamp;
}
public CharSequence getPathToTable() {
public Path getPathToTable() {
return pathToTable;
}
......@@ -228,7 +229,7 @@ public class O3OpenColumnTask {
public void of(
int openColumnMode,
CharSequence pathToTable,
Path pathToTable,
CharSequence columnName,
AtomicInteger columnCounter,
AtomicInteger partCounter,
......
......@@ -29,11 +29,12 @@ import io.questdb.cairo.TableWriter;
import io.questdb.cairo.vm.api.MemoryCARW;
import io.questdb.cairo.vm.api.MemoryMAR;
import io.questdb.std.ObjList;
import io.questdb.std.str.Path;
import java.util.concurrent.atomic.AtomicInteger;
public class O3PartitionTask {
private CharSequence pathToTable;
private Path pathToTable;
private int partitionBy;
private ObjList<MemoryMAR> columns;
private ObjList<MemoryCARW> o3Columns;
......@@ -81,7 +82,7 @@ public class O3PartitionTask {
return partitionTimestamp;
}
public CharSequence getPathToTable() {
public Path getPathToTable() {
return pathToTable;
}
......@@ -130,7 +131,7 @@ public class O3PartitionTask {
}
public void of(
CharSequence path,
Path path,
int partitionBy,
ObjList<MemoryMAR> columns,
ObjList<MemoryCARW> o3Columns,
......
......@@ -26,8 +26,8 @@ package io.questdb;
import io.questdb.std.*;
import io.questdb.std.datetime.millitime.DateFormatUtils;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
......@@ -191,12 +191,14 @@ public class FilesTest {
try (Path path = new Path().of(temp).$()) {
try (Path cp = new Path()) {
Assert.assertTrue(Files.touch(cp.of(temp).concat("a.txt").$()));
NativeLPSZ name = new NativeLPSZ();
StringSink nameSink = new StringSink();
long pFind = Files.findFirst(path);
Assert.assertTrue(pFind != 0);
try {
do {
names.add(name.of(Files.findName(pFind)).toString());
nameSink.clear();
Chars.utf8DecodeZ(Files.findName(pFind), nameSink);
names.add(nameSink.toString());
} while (Files.findNext(pFind) > 0);
} finally {
Files.findClose(pFind);
......
......@@ -41,9 +41,7 @@ import io.questdb.std.datetime.DateLocale;
import io.questdb.std.datetime.DateLocaleFactory;
import io.questdb.std.datetime.microtime.TimestampFormatCompiler;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.datetime.millitime.DateFormatUtils;
import io.questdb.std.str.LPSZ;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
......@@ -2974,7 +2972,7 @@ public class TableWriterTest extends AbstractCairoTest {
private int getDirCount() {
AtomicInteger count = new AtomicInteger();
try (Path path = new Path()) {
FF.iterateDir(path.of(root).concat(PRODUCT).$(), (name, type) -> {
FF.iterateDir(path.of(root).concat(PRODUCT).$(), (pUtf8NameZ, type) -> {
if (type == Files.DT_DIR) {
count.incrementAndGet();
}
......@@ -3475,17 +3473,14 @@ public class TableWriterTest extends AbstractCairoTest {
writer.removeColumn("supplier");
final NativeLPSZ lpsz = new NativeLPSZ();
try (Path path = new Path()) {
path.of(root).concat(model.getName());
final int plen = path.length();
FF.iterateDir(path.$(), (file, type) -> {
lpsz.of(file);
if (type == Files.DT_DIR && !Files.isDots(lpsz)) {
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.i").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.d").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.top").$()));
FF.iterateDir(path.$(), (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type)) {
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("supplier.i").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("supplier.d").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("supplier.top").$()));
}
});
}
......@@ -3556,30 +3551,28 @@ public class TableWriterTest extends AbstractCairoTest {
writer.renameColumn("supplier", "sup");
final NativeLPSZ lpsz = new NativeLPSZ();
try (Path path = new Path()) {
path.of(root).concat(model.getName());
final int plen = path.length();
if (columnTypeTag == ColumnType.SYMBOL) {
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.v").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.o").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.c").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.k").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(lpsz).concat("sup.v").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(lpsz).concat("sup.o").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(lpsz).concat("sup.c").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(lpsz).concat("sup.k").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat("supplier.v").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat("supplier.o").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat("supplier.c").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat("supplier.k").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat("sup.v").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat("sup.o").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat("sup.c").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat("sup.k").$()));
}
path.trimTo(plen);
FF.iterateDir(path.$(), (file, type) -> {
lpsz.of(file);
if (type == Files.DT_DIR && !Files.isDots(lpsz)) {
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.i").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.d").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(lpsz).concat("supplier.top").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(lpsz).concat("sup.d").$()));
FF.iterateDir(path.$(), (pUtf8NameZ, type) -> {
if (Files.isDir(pUtf8NameZ, type)) {
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("supplier.i").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("supplier.d").$()));
Assert.assertFalse(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("supplier.top").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("sup.d").$()));
if (columnTypeTag == ColumnType.BINARY || columnTypeTag == ColumnType.STRING) {
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(lpsz).concat("sup.i").$()));
Assert.assertTrue(FF.exists(path.trimTo(plen).concat(pUtf8NameZ).concat("sup.i").$()));
}
}
});
......
......@@ -291,6 +291,16 @@ public class O3Test extends AbstractO3Test {
executeWithPool(0, O3Test::testColumnTopMidOOOData0);
}
@Test
public void testColumnTopMidOOODataUtf8Contended() throws Exception {
executeWithPool(0, O3Test::testColumnTopMidOOODataUtf80);
}
@Test
public void testColumnTopMidOOODataUtf8Parallel() throws Exception {
executeWithPool(4, O3Test::testColumnTopMidOOODataUtf80);
}
@Test
public void testColumnTopMidOOODataParallel() throws Exception {
executeWithPool(4, O3Test::testColumnTopMidOOOData0);
......@@ -4935,6 +4945,157 @@ public class O3Test extends AbstractO3Test {
);
}
private static void testColumnTopMidOOODataUtf80(
CairoEngine engine,
SqlCompiler compiler,
SqlExecutionContext sqlExecutionContext
) throws SqlException, URISyntaxException {
compiler.compile(
"create table 'привет от штиблет' as (" +
"select" +
" cast(x as int) i," +
" rnd_symbol('msft','ibm', 'googl') sym," +
" round(rnd_double(0)*100, 3) amt," +
" to_timestamp('2018-01', 'yyyy-MM') + x * 720000000 timestamp," +
" rnd_boolean() b," +
" rnd_str('ABC', 'CDE', null, 'XYZ') c," +
" rnd_double(2) d," +
" rnd_float(2) e," +
" rnd_short(10,1024) f," +
" rnd_date(to_date('2015', 'yyyy'), to_date('2016', 'yyyy'), 2) g," +
" rnd_symbol(4,4,4,2) ik," +
" rnd_long() j," +
" timestamp_sequence(500000000000L,100000000L) ts," +
" rnd_byte(2,50) l," +
" rnd_bin(10, 20, 2) m," +
" rnd_str(5,16,2) n," +
" rnd_char() t" +
" from long_sequence(500)" +
"), index(sym) timestamp (ts) partition by DAY",
sqlExecutionContext
);
compiler.compile("alter table 'привет от штиблет' add column v double", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v1 float", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v2 int", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v3 byte", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v4 short", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v5 boolean", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v6 date", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v7 timestamp", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v8 symbol", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v10 char", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v11 string", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v12 binary", sqlExecutionContext);
compiler.compile("alter table 'привет от штиблет' add column v9 long", sqlExecutionContext);
compiler.compile(
"insert into 'привет от штиблет' " +
"select" +
" cast(x as int) i," +
" rnd_symbol('msft','ibm', 'googl') sym," +
" round(rnd_double(0)*100, 3) amt," +
" to_timestamp('2018-01', 'yyyy-MM') + x * 720000000 timestamp," +
" rnd_boolean() b," +
" rnd_str('ABC', 'CDE', null, 'XYZ') c," +
" rnd_double(2) d," +
" rnd_float(2) e," +
" rnd_short(10,1024) f," +
" rnd_date(to_date('2015', 'yyyy'), to_date('2016', 'yyyy'), 2) g," +
" rnd_symbol(4,4,4,2) ik," +
" rnd_long() j," +
" timestamp_sequence(549920000000L,100000000L) ts," +
" rnd_byte(2,50) l," +
" rnd_bin(10, 20, 2) m," +
" rnd_str(5,16,2) n," +
" rnd_char() t," +
// -------- new columns here ---------------
" rnd_double() v," +
" rnd_float() v1," +
" rnd_int() v2," +
" rnd_byte() v3," +
" rnd_short() v4," +
" rnd_boolean() v5," +
" rnd_date() v6," +
" rnd_timestamp(10,100000,356) v7," +
" rnd_symbol('AAA','BBB', null) v8," +
" rnd_char() v10," +
" rnd_str() v11," +
" rnd_bin() v12," +
" rnd_long() v9" +
" from long_sequence(1000)",
sqlExecutionContext
);
compiler.compile(
"create table append as (" +
"select" +
" cast(x as int) i," +
" rnd_symbol('msft','ibm', 'googl') sym," +
" round(rnd_double(0)*100, 3) amt," +
" to_timestamp('2018-01', 'yyyy-MM') + x * 720000000 timestamp," +
" rnd_boolean() b," +
" rnd_str('ABC', 'CDE', null, 'XYZ') c," +
" rnd_double(2) d," +
" rnd_float(2) e," +
" rnd_short(10,1024) f," +
" rnd_date(to_date('2015', 'yyyy'), to_date('2016', 'yyyy'), 2) g," +
" rnd_symbol(4,4,4,2) ik," +
" rnd_long() j," +
" timestamp_sequence(546600000000L,100000L) ts," +
" rnd_byte(2,50) l," +
" rnd_bin(10, 20, 2) m," +
" rnd_str(5,16,2) n," +
" rnd_char() t," +
// -------- new columns here ---------------
" rnd_double() v," +
" rnd_float() v1," +
" rnd_int() v2," +
" rnd_byte() v3," +
" rnd_short() v4," +
" rnd_boolean() v5," +
" rnd_date() v6," +
" rnd_timestamp(10,100000,356) v7," +
" rnd_symbol('AAA','BBB', null) v8," +
" rnd_char() v10," +
" rnd_str() v11," +
" rnd_bin() v12," +
" rnd_long() v9" +
" from long_sequence(100)" +
") timestamp (ts) partition by DAY",
sqlExecutionContext
);
TestUtils.printSql(compiler, sqlExecutionContext, "select count() from ('привет от штиблет' union all append)", sink2);
TestUtils.printSql(compiler, sqlExecutionContext, "select max(ts) from ('привет от штиблет' union all append)", sink);
final String expectedMaxTimestamp = Chars.toString(sink);
compiler.compile("insert into 'привет от штиблет' select * from append", sqlExecutionContext);
assertSqlResultAgainstFile(
compiler,
sqlExecutionContext,
"'привет от штиблет'",
"/o3/testColumnTopMidOOOData.txt"
);
printSqlResult(compiler, sqlExecutionContext, "select count() from 'привет от штиблет'");
TestUtils.assertEquals(sink2, sink);
try (
final TableWriter w = engine.getWriter(
sqlExecutionContext.getCairoSecurityContext(),
"привет от штиблет",
"test"
)
) {
sink.clear();
sink.put("max\n");
TimestampFormatUtils.appendDateTimeUSec(sink, w.getMaxTimestamp());
sink.put('\n');
TestUtils.assertEquals(expectedMaxTimestamp, sink);
Assert.assertEquals(0, w.getO3RowCount());
}
}
private static void testPartitionedDataAppendOOData0(
CairoEngine engine,
SqlCompiler compiler,
......
......@@ -31,8 +31,8 @@ import io.questdb.mp.SPSequence;
import io.questdb.std.*;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
......@@ -544,20 +544,21 @@ public class LogFactoryTest {
int fileCount = 0;
try (Path path = new Path()) {
NativeLPSZ lpsz = new NativeLPSZ();
StringSink fileNameSink = new StringSink();
path.of(base).$();
long pFind = Files.findFirst(path);
try {
Assert.assertTrue(pFind != 0);
do {
lpsz.of(Files.findName(pFind));
if (Files.isDots(lpsz)) {
fileNameSink.clear();
Chars.utf8DecodeZ(Files.findName(pFind), fileNameSink);
if (Files.isDots(fileNameSink)) {
continue;
}
// don't hardcode hour, it is liable to vary
// because of different default timezones
TestUtils.assertContains(lpsz, mustContain);
Assert.assertFalse(Chars.contains(lpsz, ".1"));
TestUtils.assertContains(fileNameSink, mustContain);
Assert.assertFalse(Chars.contains(fileNameSink, ".1"));
fileCount++;
} while (Files.findNext(pFind) > 0);
......
......@@ -24,11 +24,11 @@
package io.questdb.network;
import io.questdb.std.Chars;
import io.questdb.std.MemoryTag;
import io.questdb.std.Os;
import io.questdb.std.Unsafe;
import io.questdb.std.str.CharSequenceZ;
import io.questdb.std.str.NativeLPSZ;
import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
......@@ -211,8 +211,8 @@ public class NetTest {
@Test
public void testSeek() {
int port = 9993;
NativeLPSZ lpsz = new NativeLPSZ();
String msg = "Test ABC";
StringSink sink = new StringSink();
CharSequenceZ charSink = new CharSequenceZ(msg);
int msgLen = charSink.length() + 1;
......@@ -231,11 +231,12 @@ public class NetTest {
long serverFd = Net.accept(acceptFd);
long serverBuf = Unsafe.malloc(msgLen, MemoryTag.NATIVE_DEFAULT);
Assert.assertEquals(msgLen, Net.peek(serverFd, serverBuf, msgLen));
lpsz.of(serverBuf);
Assert.assertEquals(msg, lpsz.toString());
Chars.utf8DecodeZ(serverBuf, sink);
TestUtils.assertEquals(msg, sink);
Assert.assertEquals(msgLen, Net.recv(serverFd, serverBuf, msgLen));
lpsz.of(serverBuf);
Assert.assertEquals(msg, lpsz.toString());
sink.clear();
Chars.utf8DecodeZ(serverBuf, sink);
TestUtils.assertEquals(msg, sink);
Unsafe.free(serverBuf, msgLen, MemoryTag.NATIVE_DEFAULT);
Net.close(serverFd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册