提交 b1dc53c7 编写于 作者: V Vlad Ilyushchenko

chore(griffin): text parsing refactor - trying to improve "copy" performance

上级 c03416be
......@@ -37,7 +37,7 @@ import io.questdb.std.str.Path;
import java.io.Closeable;
public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
public class CairoTextWriter implements Closeable, Mutable {
private static final Log LOG = LogFactory.getLog(CairoTextWriter.class);
private final CairoConfiguration configuration;
private final CairoEngine engine;
......@@ -57,7 +57,9 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
private int timestampIndex;
private CharSequence timestampIndexCol;
private ObjList<TypeAdapter> types;
private final TextLexer.Listener nonPartitionedListener = this::onFieldsNonPartitioned;
private TimestampAdapter timestampAdapter;
private final TextLexer.Listener partitionedListener = this::onFieldsPartitioned;
public CairoTextWriter(
CairoEngine engine,
......@@ -112,6 +114,10 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
return tableName;
}
public TextLexer.Listener getTextListener() {
return timestampAdapter != null ? partitionedListener : nonPartitionedListener;
}
public long getWrittenLineCount() {
return writer == null ? 0 : writer.size() - _size;
}
......@@ -125,23 +131,9 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
this.timestampIndexCol = timestampIndexCol;
}
@Override
public void onFields(long line, ObjList<DirectByteCharSequence> values, int valuesLength) {
long timestamp = 0L;
if (timestampAdapter != null) {
final DirectByteCharSequence dbcs = values.getQuick(timestampIndex);
try {
timestamp = timestampAdapter.getTimestamp(dbcs);
} catch (NumericException e) {
logError(line, timestampIndex, dbcs);
return;
}
}
final TableWriter.Row w = writer.newRow(timestamp);
public void onFieldsNonPartitioned(long line, ObjList<DirectByteCharSequence> values, int valuesLength) {
final TableWriter.Row w = writer.newRow();
for (int i = 0; i < valuesLength; i++) {
if (timestampAdapter != null && i == timestampIndex) {
continue;
}
final DirectByteCharSequence dbcs = values.getQuick(i);
if (dbcs.length() == 0) {
continue;
......@@ -166,10 +158,37 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
w.append();
}
private void logError(long line, int i, DirectByteCharSequence dbcs) {
LogRecord logRecord = LOG.error().$("type syntax [type=").$(ColumnType.nameOf(types.getQuick(i).getType())).$("]\n\t");
logRecord.$('[').$(line).$(':').$(i).$("] -> ").$(dbcs).$();
columnErrorCounts.increment(i);
public void onFieldsPartitioned(long line, ObjList<DirectByteCharSequence> values, int valuesLength) {
final int timestampIndex = this.timestampIndex;
DirectByteCharSequence dbcs = values.getQuick(timestampIndex);
try {
final TableWriter.Row w = writer.newRow(timestampAdapter.getTimestamp(dbcs));
for (int i = 0; i < valuesLength; i++) {
dbcs = values.getQuick(i);
if (i == timestampIndex || dbcs.length() == 0) {
continue;
}
try {
types.getQuick(i).write(w, i, dbcs);
} catch (Exception ignore) {
logError(line, i, dbcs);
switch (atomicity) {
case Atomicity.SKIP_ALL:
writer.rollback();
throw CairoException.instance(0).put("bad syntax [line=").put(line).put(", col=").put(i).put(']');
case Atomicity.SKIP_ROW:
w.cancel();
return;
default:
// SKIP column
break;
}
}
}
w.append();
} catch (NumericException e) {
logError(line, timestampIndex, dbcs);
}
}
private void createTable(
......@@ -186,6 +205,12 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
this.types = detectedTypes;
}
private void logError(long line, int i, DirectByteCharSequence dbcs) {
LogRecord logRecord = LOG.error().$("type syntax [type=").$(ColumnType.nameOf(types.getQuick(i).getType())).$("]\n\t");
logRecord.$('[').$(line).$(':').$(i).$("] -> ").$(dbcs).$();
columnErrorCounts.increment(i);
}
private void logTypeError(int i) {
LOG.info()
.$("mis-detected [table=").$(tableName)
......@@ -305,13 +330,19 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
}
@Override
public boolean isIndexed(int columnIndex) { return false; }
public boolean isIndexed(int columnIndex) {
return false;
}
@Override
public boolean isSequential(int columnIndex) { return false; }
public boolean isSequential(int columnIndex) {
return false;
}
@Override
public int getPartitionBy() { return partitionBy; }
public int getPartitionBy() {
return partitionBy;
}
@Override
public boolean getSymbolCacheFlag(int columnIndex) {
......@@ -329,7 +360,9 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
}
@Override
public int getTimestampIndex() { return timestampIndex; }
public int getTimestampIndex() {
return timestampIndex;
}
TableStructureAdapter of(ObjList<CharSequence> names, ObjList<TypeAdapter> types) throws TextException {
this.names = names;
......
......@@ -235,7 +235,8 @@ public class TextDelimiterScanner implements Closeable {
assert delimiter > 0;
if (lastDelimiterStdDev < maxRequiredDelimiterStdDev) {
// exclude '.' as delimiter
if (delimiter != '.' && lastDelimiterStdDev < maxRequiredDelimiterStdDev) {
LOG.info()
.$("scan result [table=`").$(tableName)
.$("`, delimiter='").$((char) delimiter)
......@@ -258,6 +259,8 @@ public class TextDelimiterScanner implements Closeable {
.$(", delimiterMaxStdDev=").$(maxRequiredDelimiterStdDev)
.$(", lineLengthStdDev=").$(lineLengthStdDev)
.$(", lineLengthMaxStdDev=").$(maxRequiredLineLengthStdDev)
.$(", lineCountLimit=").$(lineCountLimit)
.$(", lineCount=").$(lineCount)
.$(']').$();
throw TextException.$("min deviation is too high [delimiterStdDev=")
......@@ -265,6 +268,8 @@ public class TextDelimiterScanner implements Closeable {
.put(", delimiterMaxStdDev=").put(maxRequiredDelimiterStdDev)
.put(", lineLengthStdDev=").put(lineLengthStdDev)
.put(", lineLengthMaxStdDev=").put(maxRequiredLineLengthStdDev)
.put(", lineCountLimit=").put(lineCountLimit)
.put(", lineCount=").put(lineCount)
.put(']');
}
......
......@@ -65,6 +65,11 @@ public class TextException extends Exception implements Sinkable {
return this;
}
public TextException put(long c) {
message.put(c);
return this;
}
@Override
public void toSink(CharSink sink) {
sink.put(message);
......
......@@ -99,10 +99,6 @@ public class TextLexer implements Closeable, Mutable {
fieldMax = -1;
}
public long getErrorCount() {
return errorCount;
}
@Override
public void close() {
if (lineRollBufPtr != 0) {
......@@ -112,10 +108,22 @@ public class TextLexer implements Closeable, Mutable {
metadataDetector.close();
}
public long getErrorCount() {
return errorCount;
}
public long getLineCount() {
return lineCount;
}
public boolean isSkipLinesWithExtraValues() {
return skipLinesWithExtraValues;
}
public void setSkipLinesWithExtraValues(boolean skipLinesWithExtraValues) {
this.skipLinesWithExtraValues = skipLinesWithExtraValues;
}
public void of(byte columnDelimiter) {
clear();
this.columnDelimiter = columnDelimiter;
......@@ -184,6 +192,29 @@ public class TextLexer implements Closeable, Mutable {
}
}
private void extraField(int fieldIndex) {
LogRecord logRecord = LOG.error().$("extra fields [table=").$(tableName).$(", fieldIndex=").$(fieldIndex).$(", fieldMax=").$(fieldMax).$("]\n\t").$(lineCount).$(" -> ");
for (int i = 0, n = fields.size(); i < n; i++) {
if (i > 0) {
logRecord.$(',');
}
logRecord.$(fields.getQuick(i));
}
logRecord.$(" ...").$();
if (skipLinesWithExtraValues) {
errorCount++;
ignoreEolOnce = true;
this.fieldIndex = 0;
} else {
// prepare for next field
if (lastQuotePos > -1) {
lastQuotePos = -1;
}
this.fieldLo = this.fieldHi;
}
}
ObjList<CharSequence> getColumnNames() {
return metadataDetector.getColumnNames();
}
......@@ -229,14 +260,6 @@ public class TextLexer implements Closeable, Mutable {
}
}
public boolean isSkipLinesWithExtraValues() {
return skipLinesWithExtraValues;
}
public void setSkipLinesWithExtraValues(boolean skipLinesWithExtraValues) {
this.skipLinesWithExtraValues = skipLinesWithExtraValues;
}
private void ignoreEolOnce() {
eol = true;
fieldIndex = 0;
......@@ -383,26 +406,7 @@ public class TextLexer implements Closeable, Mutable {
}
if (fieldIndex > fieldMax) {
LogRecord logRecord = LOG.error().$("extra fields [table=").$(tableName).$(", fieldIndex=").$(fieldIndex).$(", fieldMax=").$(fieldMax).$("]\n\t").$(lineCount).$(" -> ");
for (int i = 0, n = fields.size(); i < n; i++) {
if (i > 0) {
logRecord.$(',');
}
logRecord.$(fields.getQuick(i));
}
logRecord.$(" ...").$();
if (skipLinesWithExtraValues) {
errorCount++;
ignoreEolOnce = true;
this.fieldIndex = 0;
} else {
// prepare for next field
if (lastQuotePos > -1) {
lastQuotePos = -1;
}
this.fieldLo = this.fieldHi;
}
extraField(fieldIndex);
return;
}
......
......@@ -198,7 +198,7 @@ public class TextLoader implements Closeable, Mutable {
}
private void parseData(long lo, long hi, CairoSecurityContext cairoSecurityContext) {
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter);
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter.getTextListener());
}
private void parseJsonMetadata(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws TextException {
......@@ -224,7 +224,7 @@ public class TextLoader implements Closeable, Mutable {
textMetadataParser.getColumnTypes()
);
textWriter.prepareTable(cairoSecurityContext, textLexer.getColumnNames(), textLexer.getColumnTypes());
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter);
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter.getTextListener());
state = LOAD_DATA;
}
......
......@@ -1105,7 +1105,6 @@ public class SqlCompiler implements Closeable {
}
} finally {
textLoader.clear();
Unsafe.free(buf, len);
}
} catch (TextException e) {
// we do not expect JSON exception here
......
......@@ -984,6 +984,92 @@ public class TextLoaderTest extends AbstractGriffinTest {
});
}
@Test
public void testImportOneColumn() throws Exception {
final TextConfiguration textConfiguration = new DefaultTextConfiguration() {
@Override
public double getMaxRequiredLineLengthStdDev() {
return 2.0;
}
};
CairoConfiguration configuration = new DefaultCairoConfiguration(root) {
@Override
public TextConfiguration getTextConfiguration() {
return textConfiguration;
}
};
try (CairoEngine engine = new CairoEngine(configuration, null)) {
assertNoLeak(
engine,
textLoader -> {
String expected = "s\n" +
"0.5035558920000001\n" +
"0.5370835850000001\n" +
"0.518392756\n" +
"0.898078974\n" +
"0.153959029\n" +
"0.368878817\n" +
"0.7685725170000001\n" +
"0.409412157\n" +
"0.959138401\n" +
"0.49868191100000003\n" +
"0.466161354\n" +
"0.6500869570000001\n" +
"0.201807867\n" +
"0.699247724\n" +
"0.8836387710000001\n" +
"0.855572368\n" +
"0.7143630950000001\n" +
"0.14668836100000002\n" +
"0.507968298\n" +
"0.064159752\n" +
"0.19579689800000002\n" +
"0.7118503740000001\n" +
"0.24136422300000002\n";
String csv = "s\n" +
"0.503555892\n" +
"0.537083585\n" +
"0.518392756\n" +
"0.898078974\n" +
"0.153959029\n" +
"0.368878817\n" +
"0.768572517\n" +
"0.409412157\n" +
"0.959138401\n" +
"0.498681911\n" +
"0.466161354\n" +
"0.650086957\n" +
"0.201807867\n" +
"0.699247724\n" +
"0.883638771\n" +
"0.855572368\n" +
"0.714363095\n" +
"0.146688361\n" +
"0.507968298\n" +
"0.064159752\n" +
"0.195796898\n" +
"0.711850374\n" +
"0.241364223\n";
configureLoaderDefaults(textLoader, (byte) -1);
textLoader.setForceHeaders(false);
playText(
engine,
textLoader,
csv,
1024,
expected,
"{\"columnCount\":1,\"columns\":[{\"index\":0,\"name\":\"s\",\"type\":\"DOUBLE\"}],\"timestampIndex\":-1}",
23,
23
);
});
}
}
@Test
public void testImportTimestamp() throws Exception {
final TextConfiguration textConfiguration = new DefaultTextConfiguration() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册