From b1dc53c7604880f4a05866d552edfbae5cbc6089 Mon Sep 17 00:00:00 2001 From: Vlad Ilyushchenko Date: Sat, 2 May 2020 15:05:32 +0100 Subject: [PATCH] chore(griffin): text parsing refactor - trying to improve "copy" performance --- .../questdb/cutlass/text/CairoTextWriter.java | 83 ++++++++++++------ .../cutlass/text/TextDelimiterScanner.java | 7 +- .../questdb/cutlass/text/TextException.java | 5 ++ .../io/questdb/cutlass/text/TextLexer.java | 68 ++++++++------- .../io/questdb/cutlass/text/TextLoader.java | 4 +- .../java/io/questdb/griffin/SqlCompiler.java | 1 - .../questdb/cutlass/text/TextLoaderTest.java | 86 +++++++++++++++++++ 7 files changed, 193 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/io/questdb/cutlass/text/CairoTextWriter.java b/core/src/main/java/io/questdb/cutlass/text/CairoTextWriter.java index 396543372..fc1be8a5c 100644 --- a/core/src/main/java/io/questdb/cutlass/text/CairoTextWriter.java +++ b/core/src/main/java/io/questdb/cutlass/text/CairoTextWriter.java @@ -37,7 +37,7 @@ import io.questdb.std.str.Path; import java.io.Closeable; -public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { +public class CairoTextWriter implements Closeable, Mutable { private static final Log LOG = LogFactory.getLog(CairoTextWriter.class); private final CairoConfiguration configuration; private final CairoEngine engine; @@ -57,7 +57,9 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { private int timestampIndex; private CharSequence timestampIndexCol; private ObjList types; + private final TextLexer.Listener nonPartitionedListener = this::onFieldsNonPartitioned; private TimestampAdapter timestampAdapter; + private final TextLexer.Listener partitionedListener = this::onFieldsPartitioned; public CairoTextWriter( CairoEngine engine, @@ -112,6 +114,10 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { return tableName; } + public TextLexer.Listener getTextListener() { + return timestampAdapter != null ? partitionedListener : nonPartitionedListener; + } + public long getWrittenLineCount() { return writer == null ? 0 : writer.size() - _size; } @@ -125,23 +131,9 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { this.timestampIndexCol = timestampIndexCol; } - @Override - public void onFields(long line, ObjList values, int valuesLength) { - long timestamp = 0L; - if (timestampAdapter != null) { - final DirectByteCharSequence dbcs = values.getQuick(timestampIndex); - try { - timestamp = timestampAdapter.getTimestamp(dbcs); - } catch (NumericException e) { - logError(line, timestampIndex, dbcs); - return; - } - } - final TableWriter.Row w = writer.newRow(timestamp); + public void onFieldsNonPartitioned(long line, ObjList values, int valuesLength) { + final TableWriter.Row w = writer.newRow(); for (int i = 0; i < valuesLength; i++) { - if (timestampAdapter != null && i == timestampIndex) { - continue; - } final DirectByteCharSequence dbcs = values.getQuick(i); if (dbcs.length() == 0) { continue; @@ -166,10 +158,37 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { w.append(); } - private void logError(long line, int i, DirectByteCharSequence dbcs) { - LogRecord logRecord = LOG.error().$("type syntax [type=").$(ColumnType.nameOf(types.getQuick(i).getType())).$("]\n\t"); - logRecord.$('[').$(line).$(':').$(i).$("] -> ").$(dbcs).$(); - columnErrorCounts.increment(i); + public void onFieldsPartitioned(long line, ObjList values, int valuesLength) { + final int timestampIndex = this.timestampIndex; + DirectByteCharSequence dbcs = values.getQuick(timestampIndex); + try { + final TableWriter.Row w = writer.newRow(timestampAdapter.getTimestamp(dbcs)); + for (int i = 0; i < valuesLength; i++) { + dbcs = values.getQuick(i); + if (i == timestampIndex || dbcs.length() == 0) { + continue; + } + try { + types.getQuick(i).write(w, i, dbcs); + } catch (Exception ignore) { + logError(line, i, dbcs); + switch (atomicity) { + case Atomicity.SKIP_ALL: + writer.rollback(); + throw CairoException.instance(0).put("bad syntax [line=").put(line).put(", col=").put(i).put(']'); + case Atomicity.SKIP_ROW: + w.cancel(); + return; + default: + // SKIP column + break; + } + } + } + w.append(); + } catch (NumericException e) { + logError(line, timestampIndex, dbcs); + } } private void createTable( @@ -186,6 +205,12 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { this.types = detectedTypes; } + private void logError(long line, int i, DirectByteCharSequence dbcs) { + LogRecord logRecord = LOG.error().$("type syntax [type=").$(ColumnType.nameOf(types.getQuick(i).getType())).$("]\n\t"); + logRecord.$('[').$(line).$(':').$(i).$("] -> ").$(dbcs).$(); + columnErrorCounts.increment(i); + } + private void logTypeError(int i) { LOG.info() .$("mis-detected [table=").$(tableName) @@ -305,13 +330,19 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { } @Override - public boolean isIndexed(int columnIndex) { return false; } + public boolean isIndexed(int columnIndex) { + return false; + } @Override - public boolean isSequential(int columnIndex) { return false; } + public boolean isSequential(int columnIndex) { + return false; + } @Override - public int getPartitionBy() { return partitionBy; } + public int getPartitionBy() { + return partitionBy; + } @Override public boolean getSymbolCacheFlag(int columnIndex) { @@ -329,7 +360,9 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { } @Override - public int getTimestampIndex() { return timestampIndex; } + public int getTimestampIndex() { + return timestampIndex; + } TableStructureAdapter of(ObjList names, ObjList types) throws TextException { this.names = names; diff --git a/core/src/main/java/io/questdb/cutlass/text/TextDelimiterScanner.java b/core/src/main/java/io/questdb/cutlass/text/TextDelimiterScanner.java index 7d07f0c9d..3a60cab59 100644 --- a/core/src/main/java/io/questdb/cutlass/text/TextDelimiterScanner.java +++ b/core/src/main/java/io/questdb/cutlass/text/TextDelimiterScanner.java @@ -235,7 +235,8 @@ public class TextDelimiterScanner implements Closeable { assert delimiter > 0; - if (lastDelimiterStdDev < maxRequiredDelimiterStdDev) { + // exclude '.' as delimiter + if (delimiter != '.' && lastDelimiterStdDev < maxRequiredDelimiterStdDev) { LOG.info() .$("scan result [table=`").$(tableName) .$("`, delimiter='").$((char) delimiter) @@ -258,6 +259,8 @@ public class TextDelimiterScanner implements Closeable { .$(", delimiterMaxStdDev=").$(maxRequiredDelimiterStdDev) .$(", lineLengthStdDev=").$(lineLengthStdDev) .$(", lineLengthMaxStdDev=").$(maxRequiredLineLengthStdDev) + .$(", lineCountLimit=").$(lineCountLimit) + .$(", lineCount=").$(lineCount) .$(']').$(); throw TextException.$("min deviation is too high [delimiterStdDev=") @@ -265,6 +268,8 @@ public class TextDelimiterScanner implements Closeable { .put(", delimiterMaxStdDev=").put(maxRequiredDelimiterStdDev) .put(", lineLengthStdDev=").put(lineLengthStdDev) .put(", lineLengthMaxStdDev=").put(maxRequiredLineLengthStdDev) + .put(", lineCountLimit=").put(lineCountLimit) + .put(", lineCount=").put(lineCount) .put(']'); } diff --git a/core/src/main/java/io/questdb/cutlass/text/TextException.java b/core/src/main/java/io/questdb/cutlass/text/TextException.java index f0306fbe8..4b3d8193f 100644 --- a/core/src/main/java/io/questdb/cutlass/text/TextException.java +++ b/core/src/main/java/io/questdb/cutlass/text/TextException.java @@ -65,6 +65,11 @@ public class TextException extends Exception implements Sinkable { return this; } + public TextException put(long c) { + message.put(c); + return this; + } + @Override public void toSink(CharSink sink) { sink.put(message); diff --git a/core/src/main/java/io/questdb/cutlass/text/TextLexer.java b/core/src/main/java/io/questdb/cutlass/text/TextLexer.java index 6780e59d1..89df812e4 100644 --- a/core/src/main/java/io/questdb/cutlass/text/TextLexer.java +++ b/core/src/main/java/io/questdb/cutlass/text/TextLexer.java @@ -99,10 +99,6 @@ public class TextLexer implements Closeable, Mutable { fieldMax = -1; } - public long getErrorCount() { - return errorCount; - } - @Override public void close() { if (lineRollBufPtr != 0) { @@ -112,10 +108,22 @@ public class TextLexer implements Closeable, Mutable { metadataDetector.close(); } + public long getErrorCount() { + return errorCount; + } + public long getLineCount() { return lineCount; } + public boolean isSkipLinesWithExtraValues() { + return skipLinesWithExtraValues; + } + + public void setSkipLinesWithExtraValues(boolean skipLinesWithExtraValues) { + this.skipLinesWithExtraValues = skipLinesWithExtraValues; + } + public void of(byte columnDelimiter) { clear(); this.columnDelimiter = columnDelimiter; @@ -184,6 +192,29 @@ public class TextLexer implements Closeable, Mutable { } } + private void extraField(int fieldIndex) { + LogRecord logRecord = LOG.error().$("extra fields [table=").$(tableName).$(", fieldIndex=").$(fieldIndex).$(", fieldMax=").$(fieldMax).$("]\n\t").$(lineCount).$(" -> "); + for (int i = 0, n = fields.size(); i < n; i++) { + if (i > 0) { + logRecord.$(','); + } + logRecord.$(fields.getQuick(i)); + } + logRecord.$(" ...").$(); + + if (skipLinesWithExtraValues) { + errorCount++; + ignoreEolOnce = true; + this.fieldIndex = 0; + } else { + // prepare for next field + if (lastQuotePos > -1) { + lastQuotePos = -1; + } + this.fieldLo = this.fieldHi; + } + } + ObjList getColumnNames() { return metadataDetector.getColumnNames(); } @@ -229,14 +260,6 @@ public class TextLexer implements Closeable, Mutable { } } - public boolean isSkipLinesWithExtraValues() { - return skipLinesWithExtraValues; - } - - public void setSkipLinesWithExtraValues(boolean skipLinesWithExtraValues) { - this.skipLinesWithExtraValues = skipLinesWithExtraValues; - } - private void ignoreEolOnce() { eol = true; fieldIndex = 0; @@ -383,26 +406,7 @@ public class TextLexer implements Closeable, Mutable { } if (fieldIndex > fieldMax) { - LogRecord logRecord = LOG.error().$("extra fields [table=").$(tableName).$(", fieldIndex=").$(fieldIndex).$(", fieldMax=").$(fieldMax).$("]\n\t").$(lineCount).$(" -> "); - for (int i = 0, n = fields.size(); i < n; i++) { - if (i > 0) { - logRecord.$(','); - } - logRecord.$(fields.getQuick(i)); - } - logRecord.$(" ...").$(); - - if (skipLinesWithExtraValues) { - errorCount++; - ignoreEolOnce = true; - this.fieldIndex = 0; - } else { - // prepare for next field - if (lastQuotePos > -1) { - lastQuotePos = -1; - } - this.fieldLo = this.fieldHi; - } + extraField(fieldIndex); return; } diff --git a/core/src/main/java/io/questdb/cutlass/text/TextLoader.java b/core/src/main/java/io/questdb/cutlass/text/TextLoader.java index 0ff9ef2dd..85a66ac66 100644 --- a/core/src/main/java/io/questdb/cutlass/text/TextLoader.java +++ b/core/src/main/java/io/questdb/cutlass/text/TextLoader.java @@ -198,7 +198,7 @@ public class TextLoader implements Closeable, Mutable { } private void parseData(long lo, long hi, CairoSecurityContext cairoSecurityContext) { - textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter); + textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter.getTextListener()); } private void parseJsonMetadata(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws TextException { @@ -224,7 +224,7 @@ public class TextLoader implements Closeable, Mutable { textMetadataParser.getColumnTypes() ); textWriter.prepareTable(cairoSecurityContext, textLexer.getColumnNames(), textLexer.getColumnTypes()); - textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter); + textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter.getTextListener()); state = LOAD_DATA; } diff --git a/core/src/main/java/io/questdb/griffin/SqlCompiler.java b/core/src/main/java/io/questdb/griffin/SqlCompiler.java index ac07826a6..cb47befd0 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCompiler.java +++ b/core/src/main/java/io/questdb/griffin/SqlCompiler.java @@ -1105,7 +1105,6 @@ public class SqlCompiler implements Closeable { } } finally { textLoader.clear(); - Unsafe.free(buf, len); } } catch (TextException e) { // we do not expect JSON exception here diff --git a/core/src/test/java/io/questdb/cutlass/text/TextLoaderTest.java b/core/src/test/java/io/questdb/cutlass/text/TextLoaderTest.java index ca0472695..aa8882783 100644 --- a/core/src/test/java/io/questdb/cutlass/text/TextLoaderTest.java +++ b/core/src/test/java/io/questdb/cutlass/text/TextLoaderTest.java @@ -984,6 +984,92 @@ public class TextLoaderTest extends AbstractGriffinTest { }); } + @Test + public void testImportOneColumn() throws Exception { + final TextConfiguration textConfiguration = new DefaultTextConfiguration() { + @Override + public double getMaxRequiredLineLengthStdDev() { + return 2.0; + } + }; + + CairoConfiguration configuration = new DefaultCairoConfiguration(root) { + @Override + public TextConfiguration getTextConfiguration() { + return textConfiguration; + } + }; + try (CairoEngine engine = new CairoEngine(configuration, null)) { + assertNoLeak( + engine, + textLoader -> { + String expected = "s\n" + + "0.5035558920000001\n" + + "0.5370835850000001\n" + + "0.518392756\n" + + "0.898078974\n" + + "0.153959029\n" + + "0.368878817\n" + + "0.7685725170000001\n" + + "0.409412157\n" + + "0.959138401\n" + + "0.49868191100000003\n" + + "0.466161354\n" + + "0.6500869570000001\n" + + "0.201807867\n" + + "0.699247724\n" + + "0.8836387710000001\n" + + "0.855572368\n" + + "0.7143630950000001\n" + + "0.14668836100000002\n" + + "0.507968298\n" + + "0.064159752\n" + + "0.19579689800000002\n" + + "0.7118503740000001\n" + + "0.24136422300000002\n"; + + + String csv = "s\n" + + "0.503555892\n" + + "0.537083585\n" + + "0.518392756\n" + + "0.898078974\n" + + "0.153959029\n" + + "0.368878817\n" + + "0.768572517\n" + + "0.409412157\n" + + "0.959138401\n" + + "0.498681911\n" + + "0.466161354\n" + + "0.650086957\n" + + "0.201807867\n" + + "0.699247724\n" + + "0.883638771\n" + + "0.855572368\n" + + "0.714363095\n" + + "0.146688361\n" + + "0.507968298\n" + + "0.064159752\n" + + "0.195796898\n" + + "0.711850374\n" + + "0.241364223\n"; + + configureLoaderDefaults(textLoader, (byte) -1); + textLoader.setForceHeaders(false); + playText( + engine, + textLoader, + csv, + 1024, + expected, + "{\"columnCount\":1,\"columns\":[{\"index\":0,\"name\":\"s\",\"type\":\"DOUBLE\"}],\"timestampIndex\":-1}", + 23, + 23 + ); + }); + } + } + @Test public void testImportTimestamp() throws Exception { final TextConfiguration textConfiguration = new DefaultTextConfiguration() { -- GitLab