提交 89d403fa 编写于 作者: V Vlad Ilyushchenko

CUTLASS: text loader unit tests. Refactored text loader interface for somewhat better usability.

上级 5ab0b1ed
......@@ -39,16 +39,16 @@ import java.io.Closeable;
import static com.questdb.std.Chars.utf8DecodeMultiByte;
public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
private static final Log LOG = LogFactory.getLog(CairoTextWriter.class);
private final CairoConfiguration configuration;
private final CairoEngine engine;
private final LongList errors = new LongList();
private final LongList columnErrorCounts = new LongList();
private final DirectCharSink utf8Sink;
private final AppendMemory appendMemory = new AppendMemory();
private final Path path;
private final TableStructureAdapter tableStructureAdapter = new TableStructureAdapter();
private String tableName;
private CharSequence tableName;
private ObjList<TextMetadata> textMetadata;
private TableWriter writer;
private long _size;
......@@ -93,7 +93,7 @@ public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
@Override
public void clear() {
writer = Misc.free(writer);
errors.clear();
columnErrorCounts.clear();
_size = 0;
}
......@@ -111,23 +111,22 @@ public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
} else {
writer.commit();
}
writer = Misc.free(writer);
}
}
public LongList getErrors() {
return errors;
public LongList getColumnErrorCounts() {
return columnErrorCounts;
}
public long getImportedRowCount() {
return writer.size() - _size;
public RecordMetadata getMetadata() {
return writer.getMetadata();
}
public RecordMetadata getTextMetadata() {
return writer.getMetadata();
public long getWrittenLineCount() {
return writer.size() - _size;
}
public CairoTextWriter of(String name, boolean overwrite, boolean durable, int atomicity) {
public CairoTextWriter of(CharSequence name, boolean overwrite, boolean durable, int atomicity) {
this.tableName = name;
this.overwrite = overwrite;
this.durable = durable;
......@@ -182,7 +181,7 @@ public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
} catch (NumericException | Utf8Exception ignore) {
LogRecord logRecord = LOG.error().$("type syntax [type=").$(ColumnType.nameOf(textMetadata.getQuick(i).type)).$("]\n\t");
logRecord.$('[').$(line).$(':').$(i).$("] -> ").$(values.getQuick(i)).$();
errors.increment(i);
columnErrorCounts.increment(i);
switch (atomicity) {
case Atomicity.SKIP_ALL:
writer.rollback();
......@@ -226,9 +225,13 @@ public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
// now, compare column count.
// Cannot continue if different
if (metadata.getColumnCount() != this.textMetadata.size()) {
if (metadata.getColumnCount() < this.textMetadata.size()) {
writer.close();
throw CairoException.instance(0).put("column count mismatch [text=").put(textMetadata.size()).put(", table=").put(metadata.getColumnCount()).put(']');
throw CairoException.instance(0)
.put("column count mismatch [textColumnCount=").put(textMetadata.size())
.put(", tableColumnCount=").put(metadata.getColumnCount())
.put(", table=").put(tableName)
.put(']');
}
......@@ -247,6 +250,10 @@ public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
void prepareTable(ObjList<TextMetadata> metadata) {
assert writer == null;
if (metadata.size() == 0) {
throw CairoException.instance(0).put("cannot determine text structure");
}
this.textMetadata = metadata;
switch (engine.getStatus(path, tableName)) {
case TableUtils.TABLE_DOES_NOT_EXIST:
......@@ -263,10 +270,10 @@ public class CairoTextWriter implements TextLexerListener, Closeable, Mutable {
}
break;
default:
throw CairoException.instance(0).put("name is reserved");
throw CairoException.instance(0).put("name is reserved [table=").put(tableName).put(']');
}
_size = writer.size();
errors.seed(writer.getMetadata().getColumnCount(), 0);
columnErrorCounts.seed(writer.getMetadata().getColumnCount(), 0);
}
private class TableStructureAdapter implements TableStructure {
......
......@@ -58,4 +58,14 @@ public class DefaultTextConfiguration implements TextConfiguration {
public int getUtf8SinkCapacity() {
return 4096;
}
@Override
public int getTextAnalysisMaxLines() {
return 1000;
}
@Override
public double getMaxRequiredDelimiterStdDev() {
return 0.1222d;
}
}
......@@ -37,4 +37,8 @@ public interface TextConfiguration {
long getRollBufferSize();
int getUtf8SinkCapacity();
double getMaxRequiredDelimiterStdDev();
int getTextAnalysisMaxLines();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.text;
import com.questdb.std.IntList;
import com.questdb.std.IntLongPriorityQueue;
import com.questdb.std.ObjectFactory;
import com.questdb.std.Unsafe;
import static com.questdb.cutlass.text.TextDelimiter.*;
public class TextDelimiterDetector {
public static final ObjectFactory<TextDelimiterDetector> FACTORY = TextDelimiterDetector::new;
private static final int maxLines = 10000;
private final IntList commas = new IntList(maxLines);
private final IntList pipes = new IntList(maxLines);
private final IntList tabs = new IntList(maxLines);
private final IntList semicolons = new IntList(maxLines);
private final IntLongPriorityQueue heap = new IntLongPriorityQueue();
private double stdDev;
private int avgRecLen;
private char delimiter;
private TextDelimiterDetector() {
}
public char getDelimiter() {
return delimiter;
}
public double getStdDev() {
return stdDev;
}
public void of(long address, int len) {
long lim = address + len;
long p = address;
boolean suspended = false;
int line = 0;
int comma = 0;
int pipe = 0;
int tab = 0;
int semicolon = 0;
// previous values
int _comma = 0;
int _pipe = 0;
int _tab = 0;
int _semicolon = 0;
commas.clear();
pipes.clear();
tabs.clear();
semicolons.clear();
this.avgRecLen = 0;
this.stdDev = Double.POSITIVE_INFINITY;
this.delimiter = 0;
boolean eol = false;
while (p < lim && line < maxLines) {
char b = (char) Unsafe.getUnsafe().getByte(p++);
if (suspended && b != '"') {
continue;
}
switch (b) {
case ',':
comma++;
if (eol) {
eol = false;
}
break;
case '|':
pipe++;
if (eol) {
eol = false;
}
break;
case '\t':
tab++;
if (eol) {
eol = false;
}
break;
case ';':
semicolon++;
if (eol) {
eol = false;
}
break;
case '"':
suspended = !suspended;
if (eol) {
eol = false;
}
break;
case '\n':
if (eol) {
break;
}
line++;
commas.add(comma - _comma);
pipes.add(pipe - _pipe);
tabs.add(tab - _tab);
semicolons.add(semicolon - _semicolon);
_comma = comma;
_pipe = pipe;
_tab = tab;
_semicolon = semicolon;
eol = true;
break;
default:
if (eol) {
eol = false;
}
break;
}
}
if (line == 0) {
return;
}
this.avgRecLen = len / line;
heap.clear();
heap.add(CSV, comma);
heap.add(PIPE, pipe);
heap.add(TAB, tab);
heap.add(';', semicolon);
this.delimiter = (char) heap.peekBottom();
IntList test;
switch (delimiter) {
case CSV:
test = commas;
break;
case PIPE:
test = pipes;
break;
case TAB:
test = tabs;
break;
case ';':
test = semicolons;
break;
default:
throw new IllegalArgumentException("Unsupported delimiter: " + delimiter);
}
// compute variance on test delimiter
double temp;
int n = test.size();
if (n == 0) {
delimiter = 0;
return;
}
temp = 0;
for (int i = 0; i < n; i++) {
temp += test.getQuick(i);
}
double mean = temp / n;
temp = 0;
for (int i = 0; i < n; i++) {
int v = test.getQuick(i);
temp += (mean - v) * (mean - v);
}
this.stdDev = Math.sqrt(temp / n);
}
int getAvgRecLen() {
return avgRecLen;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.text;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.std.Unsafe;
import java.io.Closeable;
public class TextDelimiterScanner implements Closeable {
private static final Log LOG = LogFactory.getLog(TextDelimiterScanner.class);
private static final byte[] priorities = new byte[Byte.MAX_VALUE + 1];
private static final double DOUBLE_TOLERANCE = 0.00000001d;
private final long matrix;
private final int matrixSize;
private final int matrixRowSize;
private final int lineCountLimit;
private final double maxRequiredDelimiterStdDev;
private CharSequence tableName;
public TextDelimiterScanner(TextConfiguration configuration) {
this.lineCountLimit = configuration.getTextAnalysisMaxLines();
this.matrixRowSize = (Byte.MAX_VALUE + 1) * Integer.BYTES;
this.matrixSize = matrixRowSize * lineCountLimit;
this.matrix = Unsafe.malloc(this.matrixSize);
this.maxRequiredDelimiterStdDev = configuration.getMaxRequiredDelimiterStdDev();
}
@Override
public void close() {
Unsafe.free(matrix, matrixSize);
}
private static void configurePriority(byte value, byte priority) {
assert value > -1;
priorities[value] = priority;
}
private static byte getPriority(byte value) {
return priorities[value];
}
byte scan(long address, int len) {
final long hi = address + len;
int lineCount = 0;
boolean quotes = false;
long cursor = address;
boolean delayedClosingQuote = false;
// bit field that has bit set for bytes
// that occurred in this text
long byteBitFieldLo = 0;
long byteBitFieldHi = 0;
boolean lineHasContent = false;
Unsafe.getUnsafe().setMemory(matrix, matrixSize, (byte) 0);
while (cursor < hi && lineCount < lineCountLimit) {
byte b = Unsafe.getUnsafe().getByte(cursor++);
if (delayedClosingQuote) {
delayedClosingQuote = false;
// this is double quote '""' situation
if (b == '"') {
continue;
} else {
// last quote was genuine closing one
quotes = false;
}
}
// ignore everything that is in quotes
if (quotes) {
if (b == '"') {
delayedClosingQuote = true;
}
continue;
}
switch (b) {
case '\n':
case '\r':
// if line doesn't have content we just ignore
// line end, thus skipping empty lines as well
if (lineHasContent) {
lineCount++;
lineHasContent = false;
}
continue;
case '"':
quotes = true;
continue;
default:
if ((b > 0 && b < '0') || (b > '9' && b < 'A') || (b > 'Z' && b < 'a') || (b > 'z')) {
break;
}
continue;
}
lineHasContent = true;
long pos = matrix + (lineCount * matrixRowSize + b * Integer.BYTES);
Unsafe.getUnsafe().putInt(pos, Unsafe.getUnsafe().getInt(pos) + 1);
if (b < 64) {
byteBitFieldLo = byteBitFieldLo | (1L << b);
} else {
byteBitFieldHi = byteBitFieldHi | (1L << (Byte.MAX_VALUE - b));
}
}
// calculate standard deviation for every byte in the matrix
byte delimiter = Byte.MIN_VALUE;
if (lineCount < 2) {
LOG.info().$("not enough lines [table=").$(tableName).$(']').$();
throw UnknownDelimiterException.INSTANCE;
}
double lastStdDev = Double.MAX_VALUE;
byte lastDelimiterPriority = Byte.MIN_VALUE;
double lastDelimiterMean = 0;
for (int i = 0, n = Byte.MAX_VALUE + 1; i < n; i++) {
boolean set;
if (i < 64) {
set = (byteBitFieldLo & (1L << i)) != 0;
} else {
set = (byteBitFieldHi & (1L << (Byte.MAX_VALUE - i))) != 0;
}
if (set) {
long offset = i * Integer.BYTES;
// calculate mean
long sum = 0;
for (int l = 0; l < lineCount; l++) {
sum += Unsafe.getUnsafe().getInt(matrix + offset);
offset += matrixRowSize;
}
offset = i * Integer.BYTES;
final double mean = (double) sum / lineCount;
if (mean > 0) {
double squareSum = 0.0;
for (int l = 0; l < lineCount; l++) {
double x = Unsafe.getUnsafe().getInt(matrix + offset) - mean;
squareSum += x * x;
offset += matrixRowSize;
}
double stdDev = Math.sqrt(squareSum / lineCount);
final byte thisPriority = getPriority((byte) i);
// when stddev of this is less than last - use this
// when stddev of this is the same as last then
// choose on priority (higher is better)
// when priority is the same choose on mean (higher is better
if (stdDev < lastStdDev
|| (
(Math.abs(stdDev - lastStdDev) < DOUBLE_TOLERANCE)
&&
(lastDelimiterPriority < thisPriority || lastDelimiterPriority == thisPriority && lastDelimiterMean > mean)
)) {
lastStdDev = stdDev;
lastDelimiterPriority = thisPriority;
lastDelimiterMean = mean;
delimiter = (byte) i;
}
}
}
}
assert delimiter > 0;
if (lastStdDev < maxRequiredDelimiterStdDev) {
LOG.info()
.$("determined [table=").$(tableName)
.$(", delimiter='").$((char) delimiter)
.$("', priority=").$(lastDelimiterPriority)
.$(", mean=").$(lastDelimiterMean)
.$(", stddev=").$(lastStdDev)
.$(']').$();
return delimiter;
}
LOG.info()
.$("min deviation is too high [stddev=").$(lastStdDev)
.$(", max=").$(maxRequiredDelimiterStdDev)
.$(']').$();
throw UnknownDelimiterException.INSTANCE;
}
void setTableName(CharSequence tableName) {
this.tableName = tableName;
}
static {
configurePriority((byte) ',', (byte) 10);
configurePriority((byte) '\t', (byte) 10);
configurePriority((byte) '|', (byte) 10);
configurePriority((byte) ':', (byte) 9);
configurePriority((byte) ' ', (byte) 8);
configurePriority((byte) ';', (byte) 8);
}
}
......@@ -39,11 +39,10 @@ public class TextLexer implements Closeable, Mutable {
private final static Log LOG = LogFactory.getLog(TextLexer.class);
private final ObjList<DirectByteCharSequence> fields = new ObjList<>();
private final ObjectPool<DirectByteCharSequence> csPool = new ObjectPool<>(DirectByteCharSequence.FACTORY, 16);
private final ObjectPool<TextMetadata> mPool = new ObjectPool<>(TextMetadata::new, 256);
private final TextMetadataDetector metadataDetector;
private final long lineRollBufLimit;
private boolean ignoreEolOnce;
private char separator;
private byte columnDelimiter;
private boolean inQuote;
private boolean delayedOutQuote;
private boolean eol;
......@@ -54,7 +53,7 @@ public class TextLexer implements Closeable, Mutable {
private long lineCount;
private boolean useLineRollBuf = false;
private long lineRollBufCur;
private TextLexerListener textLexerListener;
private Listener textLexerListener;
private long lastLineStart;
private long lineRollBufLen;
private long lineRollBufPtr;
......@@ -62,9 +61,10 @@ public class TextLexer implements Closeable, Mutable {
private long lastQuotePos = -1;
private long errorCount = 0;
private boolean rollBufferUnusable = false;
private CharSequence tableName;
public TextLexer(TextConfiguration textConfiguration, TypeProbeCollection typeProbeCollection, long rollBufferSize, long rollBufferLimit) {
this.metadataDetector = new TextMetadataDetector(mPool, typeProbeCollection, textConfiguration);
this.metadataDetector = new TextMetadataDetector(typeProbeCollection, textConfiguration);
this.lineRollBufLen = rollBufferSize;
this.lineRollBufLimit = rollBufferLimit;
this.lineRollBufPtr = Unsafe.malloc(lineRollBufLen);
......@@ -88,7 +88,6 @@ public class TextLexer implements Closeable, Mutable {
restart(false);
this.fields.clear();
this.csPool.clear();
this.mPool.clear();
this.metadataDetector.clear();
errorCount = 0;
fieldMax = -1;
......@@ -107,13 +106,13 @@ public class TextLexer implements Closeable, Mutable {
return lineCount;
}
public TextLexer of(char separator) {
public TextLexer of(byte columnDelimiter) {
clear();
this.separator = separator;
this.columnDelimiter = columnDelimiter;
return this;
}
public void parse(long lo, long len, int lineCountLimit, TextLexerListener textLexerListener) {
public void parse(long lo, long len, int lineCountLimit, Listener textLexerListener) {
this.textLexerListener = textLexerListener;
this.fieldHi = useLineRollBuf ? lineRollBufCur : (this.fieldLo = lo);
parse(lo, len, lineCountLimit);
......@@ -123,7 +122,7 @@ public class TextLexer implements Closeable, Mutable {
if (useLineRollBuf) {
if (inQuote && lastQuotePos < fieldHi) {
errorCount++;
LOG.info().$("quote is missing").$();
LOG.info().$("quote is missing [table=").$(tableName).$(']').$();
} else {
this.fieldHi++;
stashField(fieldIndex);
......@@ -161,14 +160,14 @@ public class TextLexer implements Closeable, Mutable {
private boolean growRollBuf(long requiredLength) {
if (requiredLength > lineRollBufLimit) {
// todo: log content of roll buffer
LOG.info().$("too long [line=").$(lineCount).$(']').$();
LOG.info().$("too long [table=").$(tableName).$(", line=").$(lineCount).$(']').$();
errorCount++;
rollBufferUnusable = true;
return false;
}
final long len = Math.min(lineRollBufLimit, requiredLength << 1);
LOG.info().$("Resizing line roll buffer: ").$(lineRollBufLen).$(" -> ").$(len).$();
LOG.info().$("resizing ").$(lineRollBufLen).$(" -> ").$(len).$(" [table=").$(tableName).$(']').$();
long p = Unsafe.malloc(len);
long l = lineRollBufCur - lineRollBufPtr;
if (l > 0) {
......@@ -223,7 +222,7 @@ public class TextLexer implements Closeable, Mutable {
inQuote = delayedOutQuote = false;
}
if (c == separator) {
if (c == columnDelimiter) {
if (eol) {
uneol(lo);
}
......@@ -304,7 +303,7 @@ public class TextLexer implements Closeable, Mutable {
}
private void reportExtraFields() {
LogRecord logRecord = LOG.error().$("extra fields [job=]\n\t").$(lineCount).$(" -> ");
LogRecord logRecord = LOG.error().$("extra fields [table=").$(tableName).$("]\n\t").$(lineCount).$(" -> ");
for (int i = 0, n = fields.size(); i < n; i++) {
if (i > 0) {
logRecord.$(',');
......@@ -328,6 +327,11 @@ public class TextLexer implements Closeable, Mutable {
}
}
void setTableName(CharSequence tableName) {
this.tableName = tableName;
this.metadataDetector.setTableName(tableName);
}
private void shift(long d) {
for (int i = 0; i < fieldIndex; i++) {
fields.getQuick(i).lshift(d);
......@@ -381,4 +385,9 @@ public class TextLexer implements Closeable, Mutable {
eol = false;
this.lastLineStart = this.fieldLo - lo;
}
@FunctionalInterface
public interface Listener {
void onFields(long line, ObjList<DirectByteCharSequence> fields, int hi);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.text;
import com.questdb.std.ObjList;
import com.questdb.std.str.DirectByteCharSequence;
@FunctionalInterface
public interface TextLexerListener {
void onFields(long line, ObjList<DirectByteCharSequence> fields, int hi);
}
......@@ -25,9 +25,13 @@ package com.questdb.cutlass.text;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cairo.sql.RecordMetadata;
import com.questdb.cutlass.json.JsonException;
import com.questdb.cutlass.json.JsonLexer;
import com.questdb.cutlass.text.typeprobe.TypeProbeCollection;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.std.LongList;
import com.questdb.std.Mutable;
import com.questdb.std.str.Path;
import com.questdb.std.time.DateFormatFactory;
......@@ -39,14 +43,17 @@ public class TextLoader implements Closeable, Mutable {
public static final int LOAD_JSON_METADATA = 0;
public static final int ANALYZE_STRUCTURE = 1;
public static final int LOAD_DATA = 2;
private static final Log LOG = LogFactory.getLog(TextLoader.class);
private final CairoTextWriter textWriter;
private final TextMetadataParser textMetadataParser;
private final TextLexer textLexer;
private final JsonLexer jsonLexer;
private final Path path = new Path();
private final int textAnalysisMaxLines;
private final TextDelimiterScanner textDelimiterScanner;
private int state;
private boolean forceHeaders = false;
private int statsLineCountLimit = Integer.MAX_VALUE;
private byte columnDelimiter = -1;
public TextLoader(
CairoConfiguration configuration,
......@@ -68,6 +75,8 @@ public class TextLoader implements Closeable, Mutable {
);
textWriter = new CairoTextWriter(configuration, engine, path, textConfiguration);
textMetadataParser = new TextMetadataParser(textConfiguration, dateLocaleFactory, dateFormatFactory);
textAnalysisMaxLines = textConfiguration.getTextAnalysisMaxLines();
textDelimiterScanner = new TextDelimiterScanner(textConfiguration);
}
@Override
......@@ -77,6 +86,7 @@ public class TextLoader implements Closeable, Mutable {
textMetadataParser.clear();
jsonLexer.clear();
forceHeaders = false;
columnDelimiter = -1;
}
@Override
......@@ -86,20 +96,48 @@ public class TextLoader implements Closeable, Mutable {
textMetadataParser.close();
jsonLexer.close();
path.close();
textDelimiterScanner.close();
}
public void configureColumnDelimiter(byte columnDelimiter) {
this.columnDelimiter = columnDelimiter;
assert this.columnDelimiter > 0;
}
public void configureDestination(String tableName, boolean overwrite, boolean durable, int atomicity) {
textWriter.of(tableName, overwrite, durable, atomicity);
textDelimiterScanner.setTableName(tableName);
textMetadataParser.setTableName(tableName);
textLexer.setTableName(tableName);
LOG.info()
.$("configured [table=").$(tableName)
.$(", overwrite=").$(overwrite)
.$(", durable=").$(durable)
.$(", atomicity=").$(atomicity)
.$(']').$();
}
public byte getColumnDelimiter() {
return columnDelimiter;
}
public LongList getColumnErrorCounts() {
return textWriter.getColumnErrorCounts();
}
public void configureSeparator(char columnSeparator) {
textLexer.of(columnSeparator);
public RecordMetadata getMetadata() {
return textWriter.getMetadata();
}
public long getLineCount() {
public long getParsedLineCount() {
return textLexer.getLineCount();
}
public long getWrittenLineCount() {
return textWriter.getWrittenLineCount();
}
public boolean isForceHeaders() {
return forceHeaders;
}
......@@ -109,12 +147,18 @@ public class TextLoader implements Closeable, Mutable {
}
public void parse(long address, int len) throws JsonException {
switch (state) {
case LOAD_JSON_METADATA:
jsonLexer.parse(address, len, textMetadataParser);
break;
case ANALYZE_STRUCTURE:
textLexer.analyseStructure(address, len, statsLineCountLimit, forceHeaders, textMetadataParser.getTextMetadata());
if (columnDelimiter > 0) {
textLexer.of(columnDelimiter);
} else {
textLexer.of(textDelimiterScanner.scan(address, len));
}
textLexer.analyseStructure(address, len, textAnalysisMaxLines, forceHeaders, textMetadataParser.getTextMetadata());
textWriter.prepareTable(textLexer.getDetectedMetadata());
textLexer.parse(address, len, Integer.MAX_VALUE, textWriter);
break;
......@@ -130,10 +174,6 @@ public class TextLoader implements Closeable, Mutable {
this.state = state;
}
public void setStatsLineCountLimit(int statsLineCountLimit) {
this.statsLineCountLimit = statsLineCountLimit;
}
public void wrapUp() throws JsonException {
switch (state) {
case LOAD_JSON_METADATA:
......
......@@ -31,7 +31,6 @@ import com.questdb.std.time.DateLocale;
public class TextMetadata implements Mutable {
public int type;
public CharSequence pattern;
public DateFormat dateFormat;
public DateLocale dateLocale;
public CharSequence name = "";
......@@ -44,9 +43,6 @@ public class TextMetadata implements Mutable {
public void copyTo(TextMetadata _m) {
_m.type = this.type;
if (this.type == ColumnType.DATE) {
if (this.pattern != null) {
_m.pattern = this.pattern;
}
if (this.dateFormat != null) {
_m.dateFormat = this.dateFormat;
......@@ -57,7 +53,6 @@ public class TextMetadata implements Mutable {
}
} else {
_m.pattern = this.pattern;
_m.dateFormat = this.dateFormat;
_m.dateLocale = this.dateLocale;
}
......@@ -67,7 +62,6 @@ public class TextMetadata implements Mutable {
public String toString() {
return "TextMetadata{" +
"type=" + ColumnType.nameOf(type) +
", pattern=" + pattern +
", dateLocale=" + (dateLocale == null ? null : dateLocale.getId()) +
", name=" + name +
'}';
......
......@@ -38,7 +38,7 @@ import java.io.Closeable;
import static com.questdb.std.Chars.utf8DecodeMultiByte;
public class TextMetadataDetector implements TextLexerListener, Mutable, Closeable {
public class TextMetadataDetector implements TextLexer.Listener, Mutable, Closeable {
private static final Log LOG = LogFactory.getLog(TextMetadataDetector.class);
private final StringSink tempSink = new StringSink();
private final ObjList<TextMetadata> _metadata = new ObjList<>();
......@@ -46,19 +46,18 @@ public class TextMetadataDetector implements TextLexerListener, Mutable, Closeab
private final IntList _blanks = new IntList();
private final IntList _histogram = new IntList();
private final CharSequenceObjHashMap<TextMetadata> schemaColumns = new CharSequenceObjHashMap<>();
private final ObjectPool<TextMetadata> mPool;
private final ObjectPool<TextMetadata> mPool = new ObjectPool<>(TextMetadata::new, 256);
private final TypeProbeCollection typeProbeCollection;
private final DirectCharSink utf8Sink;
private int fieldCount;
private boolean header = false;
private boolean forceHeader = false;
private CharSequence tableName;
public TextMetadataDetector(
ObjectPool<TextMetadata> mPool,
TypeProbeCollection typeProbeCollection,
TextConfiguration textConfiguration
) {
this.mPool = mPool;
this.typeProbeCollection = typeProbeCollection;
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkCapacity());
}
......@@ -93,6 +92,7 @@ public class TextMetadataDetector implements TextLexerListener, Mutable, Closeab
_metadata.clear();
schemaColumns.clear();
forceHeader = false;
mPool.clear();
}
@Override
......@@ -111,7 +111,12 @@ public class TextMetadataDetector implements TextLexerListener, Mutable, Closeab
}
header = true;
} else {
LOG.info().$("no header [lineCount=").$(lineCount).$(", errorCount=").$(errorCount).$(", forceHeader=").$(forceHeader).$(']').$();
LOG.info()
.$("no header [table=").$(tableName)
.$(", lineCount=").$(lineCount)
.$(", errorCount=").$(errorCount)
.$(", forceHeader=").$(forceHeader)
.$(']').$();
}
// make up field names if there is no header
......@@ -203,7 +208,6 @@ public class TextMetadataDetector implements TextLexerListener, Mutable, Closeab
unprobed = false;
TypeProbe probe = typeProbeCollection.getProbe(k);
m.type = probe.getType();
m.pattern = probe.getFormat();
m.dateFormat = probe.getDateFormat();
m.dateLocale = probe.getDateLocale();
if (allStrings) {
......@@ -274,6 +278,10 @@ public class TextMetadataDetector implements TextLexerListener, Mutable, Closeab
this._headers.setAll(count, "");
}
void setTableName(CharSequence tableName) {
this.tableName = tableName;
}
private void stashPossibleHeader(ObjList<DirectByteCharSequence> values, int hi) {
for (int i = 0; i < hi; i++) {
DirectByteCharSequence value = values.getQuick(i);
......@@ -281,7 +289,7 @@ public class TextMetadataDetector implements TextLexerListener, Mutable, Closeab
if (utf8Decode(value.getLo(), value.getHi(), utf8Sink)) {
_headers.setQuick(i, normalise(utf8Sink));
} else {
LOG.info().$("utf8 error [line=0, col=").$(i).$(']').$();
LOG.info().$("utf8 error [table=").$(tableName).$(", line=0, col=").$(i).$(']').$();
}
}
}
......
......@@ -64,38 +64,7 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
private long buf;
private long bufCapacity = 0;
private int bufSize = 0;
public TextMetadataParser(
TextConfiguration textConfiguration,
DateLocaleFactory dateLocaleFactory,
DateFormatFactory dateFormatFactory
) {
this.textMetadataPool = new ObjectPool<>(TextMetadata::new, textConfiguration.getMetadataPoolSize());
this.textMetadata = new ObjList<>();
this.csPool = new ObjectPool<>(FloatingCharSequence::new, textConfiguration.getMetadataStringPoolSize());
this.dateLocaleFactory = dateLocaleFactory;
this.dateFormatFactory = dateFormatFactory;
}
@Override
public void clear() {
bufSize = 0;
state = S_NEED_ARRAY;
textMetadata.clear();
csPool.clear();
clearStage();
textMetadataPool.clear();
textMetadata.clear();
}
@Override
public void close() {
clear();
if (bufCapacity > 0) {
Unsafe.free(buf, bufCapacity);
bufCapacity = 0;
}
}
private CharSequence tableName;
@Override
public void onEvent(int code, CharSequence tag, int position) throws JsonException {
......@@ -115,7 +84,7 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
case JsonLexer.EVT_NAME:
this.propertyIndex = propertyNameMap.get(tag);
if (this.propertyIndex == -1) {
LOG.info().$("unknown [tag=").$(tag).$(']').$();
LOG.info().$("unknown [table=").$(tableName).$(", tag=").$(tag).$(']').$();
}
break;
case JsonLexer.EVT_VALUE:
......@@ -140,7 +109,7 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
}
break;
default:
LOG.info().$("ignoring [value=").$(tag).$(']').$();
LOG.info().$("ignoring [table=").$(tableName).$(", value=").$(tag).$(']').$();
break;
}
break;
......@@ -155,6 +124,42 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
}
}
public TextMetadataParser(
TextConfiguration textConfiguration,
DateLocaleFactory dateLocaleFactory,
DateFormatFactory dateFormatFactory
) {
this.textMetadataPool = new ObjectPool<>(TextMetadata::new, textConfiguration.getMetadataPoolSize());
this.textMetadata = new ObjList<>();
this.csPool = new ObjectPool<>(FloatingCharSequence::new, textConfiguration.getMetadataStringPoolSize());
this.dateLocaleFactory = dateLocaleFactory;
this.dateFormatFactory = dateFormatFactory;
}
@Override
public void clear() {
bufSize = 0;
state = S_NEED_ARRAY;
textMetadata.clear();
csPool.clear();
clearStage();
textMetadataPool.clear();
textMetadata.clear();
}
@Override
public void close() {
clear();
if (bufCapacity > 0) {
Unsafe.free(buf, bufCapacity);
bufCapacity = 0;
}
}
void setTableName(CharSequence tableName) {
this.tableName = tableName;
}
private void clearStage() {
name = null;
pattern = null;
......@@ -194,7 +199,6 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
TextMetadata m = textMetadataPool.next();
m.name = name;
m.type = type;
m.pattern = pattern;
m.dateFormat = dateFormat;
m.dateLocale = dateLocale == null && type == ColumnType.DATE ? dateLocaleFactory.getDefaultDateLocale() : dateLocale;
textMetadata.add(m);
......
......@@ -23,8 +23,6 @@
package com.questdb.cutlass.text;
public final class TextDelimiter {
public static final char CSV = ',';
public static final char TAB = '\t';
public static final char PIPE = '|';
public class UnknownDelimiterException extends RuntimeException {
public static final UnknownDelimiterException INSTANCE = new UnknownDelimiterException();
}
......@@ -40,11 +40,6 @@ public class BooleanProbe implements TypeProbe {
return null;
}
@Override
public String getFormat() {
return null;
}
@Override
public int getType() {
return ColumnType.BOOLEAN;
......
......@@ -30,13 +30,11 @@ import com.questdb.std.time.DateLocale;
import com.questdb.store.ColumnType;
public class DateProbe implements TypeProbe {
private final String pattern;
private final DateLocale dateLocale;
private final DateFormat format;
public DateProbe(DateFormatFactory dateFormatFactory, DateLocale dateLocale, String pattern) {
public DateProbe(DateFormatFactory dateFormatFactory, DateLocale dateLocale, CharSequence pattern) {
this.dateLocale = dateLocale;
this.pattern = pattern;
this.format = dateFormatFactory.get(pattern);
}
......@@ -50,11 +48,6 @@ public class DateProbe implements TypeProbe {
return dateLocale;
}
@Override
public String getFormat() {
return pattern;
}
@Override
public int getType() {
return ColumnType.DATE;
......
......@@ -41,11 +41,6 @@ public class DoubleProbe implements TypeProbe {
return null;
}
@Override
public String getFormat() {
return null;
}
@Override
public int getType() {
return ColumnType.DOUBLE;
......
......@@ -25,25 +25,9 @@ package com.questdb.cutlass.text.typeprobe;
import com.questdb.std.Numbers;
import com.questdb.std.NumericException;
import com.questdb.std.time.DateFormat;
import com.questdb.std.time.DateLocale;
import com.questdb.store.ColumnType;
public class IntProbe implements TypeProbe {
@Override
public DateFormat getDateFormat() {
return null;
}
@Override
public DateLocale getDateLocale() {
return null;
}
@Override
public String getFormat() {
return null;
}
@Override
public int getType() {
......
......@@ -40,11 +40,6 @@ public class LongProbe implements TypeProbe {
return null;
}
@Override
public String getFormat() {
return null;
}
@Override
public int getType() {
return ColumnType.LONG;
......
......@@ -27,11 +27,13 @@ import com.questdb.std.time.DateFormat;
import com.questdb.std.time.DateLocale;
public interface TypeProbe {
DateFormat getDateFormat();
default DateFormat getDateFormat() {
return null;
}
DateLocale getDateLocale();
String getFormat();
default DateLocale getDateLocale() {
return null;
}
int getType();
......
......@@ -63,7 +63,7 @@ public class ObjectPool<T extends Mutable> implements Mutable {
private void expand() {
fill();
size <<= 1;
LOG.info().$("pool resize [class=").$(factory.getClass().getName()).$(", size=").$(size).$();
LOG.info().$("pool resize [class=").$(factory.getClass().getName()).$(", size=").$(size).$(']').$();
}
private void fill() {
......
......@@ -86,8 +86,8 @@ public class TextMetadataParserTest {
ObjList<TextMetadata> metadata = parseMetadata(in);
Assert.assertEquals(2, metadata.size());
Assert.assertEquals("TextMetadata{type=INT, pattern=xyz, dateLocale=en-US, name=x}", metadata.get(0).toString());
Assert.assertEquals("TextMetadata{type=DATE, pattern=xyz, dateLocale=" + defaultLocaleId + ", name=y}", metadata.get(1).toString());
Assert.assertEquals("TextMetadata{type=INT, dateLocale=en-US, name=x}", metadata.get(0).toString());
Assert.assertEquals("TextMetadata{type=DATE, dateLocale=" + defaultLocaleId + ", name=y}", metadata.get(1).toString());
}
@Test
......
......@@ -54,13 +54,10 @@ public class TypeProbeCollectionTest {
DateLocale defaultLocale = DateLocaleFactory.INSTANCE.getDefaultDateLocale();
Assert.assertEquals("dd/MM/y", typeProbeCollection.getProbe(4).getFormat());
Assert.assertEquals(defaultLocale.getId(), typeProbeCollection.getProbe(4).getDateLocale().getId());
Assert.assertEquals("yyyy-MM-dd HH:mm:ss", typeProbeCollection.getProbe(5).getFormat());
Assert.assertEquals("es-PA", typeProbeCollection.getProbe(5).getDateLocale().getId());
Assert.assertEquals("MM/dd/y", typeProbeCollection.getProbe(6).getFormat());
Assert.assertEquals(defaultLocale.getId(), typeProbeCollection.getProbe(6).getDateLocale().getId());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册