提交 3c456f74 编写于 作者: V Vlad Ilyushchenko

CUTLASS: text import bugfixes

上级 94125f64
...@@ -27,7 +27,7 @@ import io.questdb.network.PeerDisconnectedException; ...@@ -27,7 +27,7 @@ import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException; import io.questdb.network.PeerIsSlowToReadException;
public interface HttpMultipartContentListener { public interface HttpMultipartContentListener {
void onChunk(HttpRequestHeader partHeader, long lo, long hi); void onChunk(HttpRequestHeader partHeader, long lo, long hi) throws PeerDisconnectedException, PeerIsSlowToReadException;
void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException; void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException;
......
...@@ -28,8 +28,8 @@ import io.questdb.cairo.ColumnType; ...@@ -28,8 +28,8 @@ import io.questdb.cairo.ColumnType;
import io.questdb.cairo.PartitionBy; import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.sql.RecordMetadata; import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cutlass.http.*; import io.questdb.cutlass.http.*;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.text.Atomicity; import io.questdb.cutlass.text.Atomicity;
import io.questdb.cutlass.text.TextException;
import io.questdb.cutlass.text.TextLoader; import io.questdb.cutlass.text.TextLoader;
import io.questdb.log.Log; import io.questdb.log.Log;
import io.questdb.log.LogFactory; import io.questdb.log.LogFactory;
...@@ -60,6 +60,12 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -60,6 +60,12 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// processor. For different threads to lookup the same value from local value map the key, // processor. For different threads to lookup the same value from local value map the key,
// which is LV, has to be the same between processor instances // which is LV, has to be the same between processor instances
private static final LocalValue<TextImportProcessorState> LV = new LocalValue<>(); private static final LocalValue<TextImportProcessorState> LV = new LocalValue<>();
static {
atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW);
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
private final TextImportProcessorConfiguration configuration; private final TextImportProcessorConfiguration configuration;
private final CairoEngine engine; private final CairoEngine engine;
private HttpConnectionContext transientContext; private HttpConnectionContext transientContext;
...@@ -74,120 +80,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -74,120 +80,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
this.engine = cairoEngine; this.engine = cairoEngine;
} }
@Override
public void close() {
}
@Override
public void onChunk(HttpRequestHeader partHeader, long lo, long hi) {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (JsonException e) {
// todo: reply something sensible
e.printStackTrace();
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext);
return;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
// todo: these values are incorrect, but ok for now
getAtomicity(rh.getUrlParam("atomicity"))
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equals("schema", partHeader.getContentDispositionName())) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
// todo: disconnect
transientState.messagePart = MESSAGE_UNKNOWN;
}
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
@Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (JsonException e) {
handleJsonException(e);
}
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
transientState.clear();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
if (this.transientState == null) {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(engine));
}
}
@Override
public void resumeSend(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private static void resumeJson(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException { private static void resumeJson(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = state.textLoader; final TextLoader textLoader = state.textLoader;
final RecordMetadata m = textLoader.getMetadata(); final RecordMetadata metadata = textLoader.getMetadata();
final int columnCount = m.getColumnCount();
final LongList errors = textLoader.getColumnErrorCounts(); final LongList errors = textLoader.getColumnErrorCounts();
...@@ -205,17 +100,20 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -205,17 +100,20 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
state.responseState = RESPONSE_COLUMN; state.responseState = RESPONSE_COLUMN;
// fall through // fall through
case RESPONSE_COLUMN: case RESPONSE_COLUMN:
for (; state.columnIndex < columnCount; state.columnIndex++) { if (metadata != null) {
socket.bookmark(); final int columnCount = metadata.getColumnCount();
if (state.columnIndex > 0) { for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.put(','); socket.bookmark();
if (state.columnIndex > 0) {
socket.put(',');
}
socket.put('{').
putQuoted("name").put(':').putQuoted(metadata.getColumnName(state.columnIndex)).put(',').
putQuoted("type").put(':').putQuoted(ColumnType.nameOf(metadata.getColumnType(state.columnIndex))).put(',').
putQuoted("size").put(':').put(ColumnType.sizeOf(metadata.getColumnType(state.columnIndex))).put(',').
putQuoted("errors").put(':').put(errors.getQuick(state.columnIndex));
socket.put('}');
} }
socket.put('{').
putQuoted("name").put(':').putQuoted(m.getColumnName(state.columnIndex)).put(',').
putQuoted("type").put(':').putQuoted(ColumnType.nameOf(m.getColumnType(state.columnIndex))).put(',').
putQuoted("size").put(':').put(ColumnType.sizeOf(m.getColumnType(state.columnIndex))).put(',').
putQuoted("errors").put(':').put(errors.getQuick(state.columnIndex));
socket.put('}');
} }
state.responseState = RESPONSE_SUFFIX; state.responseState = RESPONSE_SUFFIX;
// fall through // fall through
...@@ -247,6 +145,12 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -247,6 +145,12 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return b; return b;
} }
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
private static void pad(CharSink b, int w, long value) { private static void pad(CharSink b, int w, long value) {
int len = (int) Math.log10(value); int len = (int) Math.log10(value);
if (len < 0) { if (len < 0) {
...@@ -313,16 +217,18 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -313,16 +217,18 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// fall through // fall through
case RESPONSE_COLUMN: case RESPONSE_COLUMN:
final int columnCount = metadata.getColumnCount(); if (metadata != null) {
final int columnCount = metadata.getColumnCount();
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark(); for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.put('|'); socket.bookmark();
pad(socket, TO_STRING_COL1_PAD, state.columnIndex); socket.put('|');
pad(socket, TO_STRING_COL2_PAD, metadata.getColumnName(state.columnIndex)); pad(socket, TO_STRING_COL1_PAD, state.columnIndex);
pad(socket, TO_STRING_COL3_PAD + TO_STRING_COL4_PAD + 3, ColumnType.nameOf(metadata.getColumnType(state.columnIndex))); pad(socket, TO_STRING_COL2_PAD, metadata.getColumnName(state.columnIndex));
pad(socket, TO_STRING_COL5_PAD, errors.getQuick(state.columnIndex)); pad(socket, TO_STRING_COL3_PAD + TO_STRING_COL4_PAD + 3, ColumnType.nameOf(metadata.getColumnType(state.columnIndex)));
socket.put(Misc.EOL); pad(socket, TO_STRING_COL5_PAD, errors.getQuick(state.columnIndex));
socket.put(Misc.EOL);
}
} }
state.responseState = RESPONSE_SUFFIX; state.responseState = RESPONSE_SUFFIX;
// fall through // fall through
...@@ -346,6 +252,108 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -346,6 +252,108 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return atomicity == -1 ? Atomicity.SKIP_COL : atomicity; return atomicity == -1 ? Atomicity.SKIP_COL : atomicity;
} }
@Override
public void close() {
}
@Override
public void onChunk(HttpRequestHeader partHeader, long lo, long hi) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (TextException e) {
handleTextException(e);
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext);
return;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
// todo: these values are incorrect, but ok for now
getAtomicity(rh.getUrlParam("atomicity"))
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equals("schema", partHeader.getContentDispositionName())) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
// todo: disconnect
transientState.messagePart = MESSAGE_UNKNOWN;
}
}
@Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (TextException e) {
handleTextException(e);
}
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
transientState.clear();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
if (this.transientState == null) {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(engine));
}
}
@Override
public void resumeSend(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private void doResumeSend( private void doResumeSend(
TextImportProcessorState state, TextImportProcessorState state,
HttpChunkedResponseSocket socket HttpChunkedResponseSocket socket
...@@ -373,9 +381,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -373,9 +381,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
state.clear(); state.clear();
} }
private void handleJsonException(JsonException e) throws PeerDisconnectedException, PeerIsSlowToReadException { private void handleTextException(TextException e) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (configuration.abortBrokenUploads()) { if (configuration.abortBrokenUploads()) {
sendError(transientContext, e.getMessage(), transientState.json); sendError(transientContext, e.getMessage(), Chars.equalsNc("json", transientContext.getRequestHeader().getUrlParam("fmt")));
throw PeerDisconnectedException.INSTANCE; throw PeerDisconnectedException.INSTANCE;
} }
transientState.state = TextImportProcessorState.STATE_DATA_ERROR; transientState.state = TextImportProcessorState.STATE_DATA_ERROR;
...@@ -389,15 +397,16 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -389,15 +397,16 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
) throws PeerDisconnectedException, PeerIsSlowToReadException { ) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket(); final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
if (json) { if (json) {
socket.status(200, CONTENT_TYPE_JSON); socket.status(400, CONTENT_TYPE_JSON);
socket.sendHeader();
socket.put('{').putQuoted("status").put(':').encodeUtf8AndQuote(message).put('}'); socket.put('{').putQuoted("status").put(':').encodeUtf8AndQuote(message).put('}');
} else { } else {
socket.status(400, CONTENT_TYPE_TEXT); socket.status(400, CONTENT_TYPE_TEXT);
socket.sendHeader();
socket.encodeUtf8(message); socket.encodeUtf8(message);
} }
// todo: is this needed, both of these? socket.sendChunk();
socket.done(); socket.done();
throw PeerDisconnectedException.INSTANCE;
} }
private void sendResponse(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException { private void sendResponse(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
...@@ -418,9 +427,4 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -418,9 +427,4 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
sendError(context, state.stateMessage, state.json); sendError(context, state.stateMessage, state.json);
} }
} }
static {
atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW);
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
} }
...@@ -23,10 +23,7 @@ ...@@ -23,10 +23,7 @@
package io.questdb.cutlass.pgwire; package io.questdb.cutlass.pgwire;
import io.questdb.cairo.CairoEngine; import io.questdb.cairo.*;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.sql.Record; import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor; import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory; import io.questdb.cairo.sql.RecordCursorFactory;
...@@ -40,17 +37,14 @@ import io.questdb.log.LogRecord; ...@@ -40,17 +37,14 @@ import io.questdb.log.LogRecord;
import io.questdb.network.*; import io.questdb.network.*;
import io.questdb.std.*; import io.questdb.std.*;
import io.questdb.std.microtime.DateFormatUtils; import io.questdb.std.microtime.DateFormatUtils;
import io.questdb.std.str.AbstractCharSink; import io.questdb.std.str.*;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectByteCharSequence;
import io.questdb.std.str.StdoutSink;
import io.questdb.std.time.DateLocaleFactory; import io.questdb.std.time.DateLocaleFactory;
import static io.questdb.cutlass.pgwire.PGJobContext.*; import static io.questdb.cutlass.pgwire.PGJobContext.*;
import static io.questdb.std.time.DateFormatUtils.*; import static io.questdb.std.time.DateFormatUtils.*;
public class PGConnectionContext implements IOContext, Mutable { public class PGConnectionContext implements IOContext, Mutable {
static final byte MESSAGE_TYPE_ERROR_RESPONSE = 'E'; private static final byte MESSAGE_TYPE_ERROR_RESPONSE = 'E';
private static final int INIT_SSL_REQUEST = 80877103; private static final int INIT_SSL_REQUEST = 80877103;
private static final int INIT_STARTUP_MESSAGE = 196608; private static final int INIT_STARTUP_MESSAGE = 196608;
private static final int INIT_CANCEL_REQUEST = 80877102; private static final int INIT_CANCEL_REQUEST = 80877102;
...@@ -105,6 +99,7 @@ public class PGConnectionContext implements IOContext, Mutable { ...@@ -105,6 +99,7 @@ public class PGConnectionContext implements IOContext, Mutable {
private final String serverVersion; private final String serverVersion;
private final PGAuthenticator authenticator; private final PGAuthenticator authenticator;
private final SqlExecutionContextImpl sqlExecutionContext = new SqlExecutionContextImpl(); private final SqlExecutionContextImpl sqlExecutionContext = new SqlExecutionContextImpl();
private final Path path = new Path();
private int sendCurrentCursorTail = TAIL_NONE; private int sendCurrentCursorTail = TAIL_NONE;
private long sendBufferPtr; private long sendBufferPtr;
private boolean requireInitalMessage = false; private boolean requireInitalMessage = false;
...@@ -444,7 +439,10 @@ public class PGConnectionContext implements IOContext, Mutable { ...@@ -444,7 +439,10 @@ public class PGConnectionContext implements IOContext, Mutable {
} }
break; break;
case 'd': case 'd':
System.out.println("data " + msgLen); System.out.println("data " + msgLen);
// msgLen includes 4 bytes of self
break; break;
default: default:
LOG.error().$("unknown message [type=").$(type).$(']').$(); LOG.error().$("unknown message [type=").$(type).$(']').$();
...@@ -475,6 +473,7 @@ public class PGConnectionContext implements IOContext, Mutable { ...@@ -475,6 +473,7 @@ public class PGConnectionContext implements IOContext, Mutable {
this.fd = -1; this.fd = -1;
Unsafe.free(sendBuffer, sendBufferSize); Unsafe.free(sendBuffer, sendBufferSize);
Unsafe.free(recvBuffer, recvBufferSize); Unsafe.free(recvBuffer, recvBufferSize);
Misc.free(path);
} }
@Override @Override
...@@ -954,19 +953,29 @@ public class PGConnectionContext implements IOContext, Mutable { ...@@ -954,19 +953,29 @@ public class PGConnectionContext implements IOContext, Mutable {
} }
private void sendCopyInResponse(CairoEngine engine, TextLoader textLoader) throws PeerDisconnectedException, PeerIsSlowToReadException { private void sendCopyInResponse(CairoEngine engine, TextLoader textLoader) throws PeerDisconnectedException, PeerIsSlowToReadException {
responseAsciiSink.put(MESSAGE_TYPE_COPY_IN_RESPONSE); if (TableUtils.TABLE_EXISTS == engine.getStatus(
long addr = responseAsciiSink.skip(); sqlExecutionContext.getCairoSecurityContext(),
responseAsciiSink.put((byte) 0); // TEXT (1=BINARY, which we do not support yet) path,
try (TableWriter writer = engine.getWriter(sqlExecutionContext.getCairoSecurityContext(), textLoader.getTableName())) { textLoader.getTableName()
RecordMetadata metadata = writer.getMetadata(); )) {
responseAsciiSink.putNetworkShort((short) metadata.getColumnCount()); responseAsciiSink.put(MESSAGE_TYPE_COPY_IN_RESPONSE);
for (int i = 0, n = metadata.getColumnCount(); i < n; i++) { long addr = responseAsciiSink.skip();
responseAsciiSink.putNetworkShort((short) typeOidMap.get(metadata.getColumnType(i))); responseAsciiSink.put((byte) 0); // TEXT (1=BINARY, which we do not support yet)
try (TableWriter writer = engine.getWriter(sqlExecutionContext.getCairoSecurityContext(), textLoader.getTableName())) {
RecordMetadata metadata = writer.getMetadata();
responseAsciiSink.putNetworkShort((short) metadata.getColumnCount());
for (int i = 0, n = metadata.getColumnCount(); i < n; i++) {
responseAsciiSink.putNetworkShort((short) typeOidMap.get(metadata.getColumnType(i)));
}
} }
responseAsciiSink.putLen(addr);
transientCopyBuffer = Unsafe.malloc(1024 * 1024);
send();
} else {
prepareError(SqlException.$(0, "table '").put(textLoader.getTableName()).put("' does not exist"));
prepareReadyForQuery(responseAsciiSink);
send();
} }
responseAsciiSink.putLen(addr);
transientCopyBuffer = Unsafe.malloc(1024 * 1024);
send();
} }
private void parseQuery( private void parseQuery(
......
...@@ -102,11 +102,11 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { ...@@ -102,11 +102,11 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
} }
public RecordMetadata getMetadata() { public RecordMetadata getMetadata() {
return writer.getMetadata(); return writer == null ? null : writer.getMetadata();
} }
public int getPartitionBy() { public int getPartitionBy() {
return writer.getPartitionBy(); return writer == null ? PartitionBy.NONE : writer.getPartitionBy();
} }
public CharSequence getTableName() { public CharSequence getTableName() {
...@@ -114,7 +114,7 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable { ...@@ -114,7 +114,7 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
} }
public long getWrittenLineCount() { public long getWrittenLineCount() {
return writer.size() - _size; return writer == null ? 0 : writer.size() - _size;
} }
public void of(CharSequence name, boolean overwrite, boolean durable, int atomicity) { public void of(CharSequence name, boolean overwrite, boolean durable, int atomicity) {
......
...@@ -72,7 +72,7 @@ public class TextDelimiterScanner implements Closeable { ...@@ -72,7 +72,7 @@ public class TextDelimiterScanner implements Closeable {
Unsafe.free(matrix, matrixSize); Unsafe.free(matrix, matrixSize);
} }
byte scan(long address, long hi) { byte scan(long address, long hi) throws TextException {
int lineCount = 0; int lineCount = 0;
boolean quotes = false; boolean quotes = false;
long cursor = address; long cursor = address;
...@@ -144,7 +144,7 @@ public class TextDelimiterScanner implements Closeable { ...@@ -144,7 +144,7 @@ public class TextDelimiterScanner implements Closeable {
if (lineCount < 2) { if (lineCount < 2) {
LOG.info().$("not enough lines [table=").$(tableName).$(']').$(); LOG.info().$("not enough lines [table=").$(tableName).$(']').$();
throw UnknownDelimiterException.INSTANCE; throw TextException.$("not enough lines [table=").put(tableName).put(']');
} }
double lastStdDev = Double.MAX_VALUE; double lastStdDev = Double.MAX_VALUE;
...@@ -220,7 +220,11 @@ public class TextDelimiterScanner implements Closeable { ...@@ -220,7 +220,11 @@ public class TextDelimiterScanner implements Closeable {
.$("min deviation is too high [stddev=").$(lastStdDev) .$("min deviation is too high [stddev=").$(lastStdDev)
.$(", max=").$(maxRequiredDelimiterStdDev) .$(", max=").$(maxRequiredDelimiterStdDev)
.$(']').$(); .$(']').$();
throw UnknownDelimiterException.INSTANCE;
throw TextException.$("min deviation is too high [stddev=")
.put(lastStdDev)
.put(", max=").put(maxRequiredDelimiterStdDev)
.put(']');
} }
void setTableName(CharSequence tableName) { void setTableName(CharSequence tableName) {
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.cutlass.text;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class TextException extends Exception implements Sinkable {
private static final ThreadLocal<TextException> tlException = new ThreadLocal<>(TextException::new);
private final StringSink message = new StringSink();
public static TextException $(CharSequence message) {
TextException te = tlException.get();
StringSink sink = te.message;
sink.clear();
sink.put(message);
return te;
}
public CharSequence getFlyweightMessage() {
return message;
}
@Override
public String getMessage() {
return message.toString();
}
public TextException put(CharSequence cs) {
message.put(cs);
return this;
}
public TextException put(char c) {
message.put(c);
return this;
}
public TextException put(double c) {
message.put(c, 6);
return this;
}
@Override
public void toSink(CharSink sink) {
sink.put(message);
}
}
...@@ -161,7 +161,7 @@ public class TextLoader implements Closeable, Mutable { ...@@ -161,7 +161,7 @@ public class TextLoader implements Closeable, Mutable {
this.forceHeaders = forceHeaders; this.forceHeaders = forceHeaders;
} }
public void parse(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws JsonException { public void parse(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws TextException {
parseMethods.getQuick(state).parse(lo, hi, cairoSecurityContext); parseMethods.getQuick(state).parse(lo, hi, cairoSecurityContext);
} }
...@@ -171,10 +171,14 @@ public class TextLoader implements Closeable, Mutable { ...@@ -171,10 +171,14 @@ public class TextLoader implements Closeable, Mutable {
jsonLexer.clear(); jsonLexer.clear();
} }
public void wrapUp() throws JsonException { public void wrapUp() throws TextException {
switch (state) { switch (state) {
case LOAD_JSON_METADATA: case LOAD_JSON_METADATA:
jsonLexer.parseLast(); try {
jsonLexer.parseLast();
} catch (JsonException e) {
throw TextException.$(e.getFlyweightMessage());
}
break; break;
case ANALYZE_STRUCTURE: case ANALYZE_STRUCTURE:
case LOAD_DATA: case LOAD_DATA:
...@@ -190,11 +194,15 @@ public class TextLoader implements Closeable, Mutable { ...@@ -190,11 +194,15 @@ public class TextLoader implements Closeable, Mutable {
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter); textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter);
} }
private void parseJsonMetadata(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws JsonException { private void parseJsonMetadata(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws TextException {
jsonLexer.parse(lo, hi, textMetadataParser); try {
jsonLexer.parse(lo, hi, textMetadataParser);
} catch (JsonException e) {
throw TextException.$(e.getFlyweightMessage());
}
} }
private void parseStructure(long lo, long hi, CairoSecurityContext cairoSecurityContext) { private void parseStructure(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws TextException {
if (columnDelimiter > 0) { if (columnDelimiter > 0) {
textLexer.of(columnDelimiter); textLexer.of(columnDelimiter);
} else { } else {
...@@ -215,6 +223,6 @@ public class TextLoader implements Closeable, Mutable { ...@@ -215,6 +223,6 @@ public class TextLoader implements Closeable, Mutable {
@FunctionalInterface @FunctionalInterface
private interface ParserMethod { private interface ParserMethod {
void parse(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws JsonException; void parse(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws TextException;
} }
} }
...@@ -84,7 +84,7 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable { ...@@ -84,7 +84,7 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
this.typeManager = typeManager; this.typeManager = typeManager;
} }
public static void checkInputs(int position, CharSequence name, int type) throws JsonException { private static void checkInputs(int position, CharSequence name, int type) throws JsonException {
if (name == null) { if (name == null) {
throw JsonException.$(position, "Missing 'name' property"); throw JsonException.$(position, "Missing 'name' property");
} }
...@@ -217,6 +217,11 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable { ...@@ -217,6 +217,11 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
if (dateLocale == null) { if (dateLocale == null) {
throw JsonException.$(localePosition, "Invalid date locale"); throw JsonException.$(localePosition, "Invalid date locale");
} }
// date pattern is required
if (pattern == null) {
throw JsonException.$(0, "DATE format pattern is required");
}
columnTypes.add(typeManager.nextDateAdapter().of(dateFormatFactory.get(pattern), dateLocale)); columnTypes.add(typeManager.nextDateAdapter().of(dateFormatFactory.get(pattern), dateLocale));
break; break;
case ColumnType.TIMESTAMP: case ColumnType.TIMESTAMP:
...@@ -227,6 +232,11 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable { ...@@ -227,6 +232,11 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
if (timestampLocale == null) { if (timestampLocale == null) {
throw JsonException.$(localePosition, "Invalid timestamp locale"); throw JsonException.$(localePosition, "Invalid timestamp locale");
} }
// timestamp pattern is required
if (pattern == null) {
throw JsonException.$(0, "DATE format pattern is required");
}
columnTypes.add(typeManager.nextTimestampAdapter().of(timestampFormatFactory.get(pattern), timestampLocale)); columnTypes.add(typeManager.nextTimestampAdapter().of(timestampFormatFactory.get(pattern), timestampLocale));
break; break;
default: default:
......
...@@ -25,8 +25,8 @@ package io.questdb.griffin; ...@@ -25,8 +25,8 @@ package io.questdb.griffin;
import io.questdb.cairo.*; import io.questdb.cairo.*;
import io.questdb.cairo.sql.*; import io.questdb.cairo.sql.*;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.text.Atomicity; import io.questdb.cutlass.text.Atomicity;
import io.questdb.cutlass.text.TextException;
import io.questdb.cutlass.text.TextLoader; import io.questdb.cutlass.text.TextLoader;
import io.questdb.griffin.model.*; import io.questdb.griffin.model.*;
import io.questdb.log.Log; import io.questdb.log.Log;
...@@ -44,6 +44,31 @@ public class SqlCompiler implements Closeable { ...@@ -44,6 +44,31 @@ public class SqlCompiler implements Closeable {
public static final ObjList<String> sqlControlSymbols = new ObjList<>(8); public static final ObjList<String> sqlControlSymbols = new ObjList<>(8);
private final static Log LOG = LogFactory.getLog(SqlCompiler.class); private final static Log LOG = LogFactory.getLog(SqlCompiler.class);
private static final IntList castGroups = new IntList(); private static final IntList castGroups = new IntList();
static {
castGroups.extendAndSet(ColumnType.BOOLEAN, 2);
castGroups.extendAndSet(ColumnType.BYTE, 1);
castGroups.extendAndSet(ColumnType.SHORT, 1);
castGroups.extendAndSet(ColumnType.CHAR, 1);
castGroups.extendAndSet(ColumnType.INT, 1);
castGroups.extendAndSet(ColumnType.LONG, 1);
castGroups.extendAndSet(ColumnType.FLOAT, 1);
castGroups.extendAndSet(ColumnType.DOUBLE, 1);
castGroups.extendAndSet(ColumnType.DATE, 1);
castGroups.extendAndSet(ColumnType.TIMESTAMP, 1);
castGroups.extendAndSet(ColumnType.STRING, 3);
castGroups.extendAndSet(ColumnType.SYMBOL, 3);
castGroups.extendAndSet(ColumnType.BINARY, 4);
sqlControlSymbols.add("(");
sqlControlSymbols.add(";");
sqlControlSymbols.add(")");
sqlControlSymbols.add(",");
sqlControlSymbols.add("/*");
sqlControlSymbols.add("*/");
sqlControlSymbols.add("--");
}
private final SqlOptimiser optimiser; private final SqlOptimiser optimiser;
private final SqlParser parser; private final SqlParser parser;
private final ObjectPool<ExpressionNode> sqlNodePool; private final ObjectPool<ExpressionNode> sqlNodePool;
...@@ -145,35 +170,6 @@ public class SqlCompiler implements Closeable { ...@@ -145,35 +170,6 @@ public class SqlCompiler implements Closeable {
} }
} }
@Override
public void close() {
Misc.free(path);
Misc.free(textLoader);
}
public CompiledQuery compile(CharSequence query) throws SqlException {
return compile(query, DefaultSqlExecutionContext.INSTANCE);
}
public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException {
clear();
//
// these are quick executions that do not require building of a model
//
lexer.of(query);
final CharSequence tok = SqlUtil.fetchNext(lexer);
final KeywordBasedExecutor executor = keywordBasedExecutors.get(tok);
if (executor == null) {
return compileUsingModel(executionContext);
}
return executor.execute(executionContext);
}
public CairoEngine getEngine() {
return engine;
}
// Creates data type converter. // Creates data type converter.
// INT and LONG NaN values are cast to their representation rather than Double or Float NaN. // INT and LONG NaN values are cast to their representation rather than Double or Float NaN.
private static RecordToRowCopier assembleRecordToRowCopier(BytecodeAssembler asm, ColumnTypes from, RecordMetadata to, ColumnFilter toColumnFilter) { private static RecordToRowCopier assembleRecordToRowCopier(BytecodeAssembler asm, ColumnTypes from, RecordMetadata to, ColumnFilter toColumnFilter) {
...@@ -635,13 +631,42 @@ public class SqlCompiler implements Closeable { ...@@ -635,13 +631,42 @@ public class SqlCompiler implements Closeable {
return tok; return tok;
} }
@Override
public void close() {
Misc.free(path);
Misc.free(textLoader);
}
public CompiledQuery compile(CharSequence query) throws SqlException {
return compile(query, DefaultSqlExecutionContext.INSTANCE);
}
public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException {
clear();
//
// these are quick executions that do not require building of a model
//
lexer.of(query);
final CharSequence tok = SqlUtil.fetchNext(lexer);
final KeywordBasedExecutor executor = keywordBasedExecutors.get(tok);
if (executor == null) {
return compileUsingModel(executionContext);
}
return executor.execute(executionContext);
}
public CairoEngine getEngine() {
return engine;
}
private CompiledQuery alterTable(SqlExecutionContext executionContext) throws SqlException { private CompiledQuery alterTable(SqlExecutionContext executionContext) throws SqlException {
CharSequence tok; CharSequence tok;
expectKeyword(lexer, "table"); expectKeyword(lexer, "table");
final int tableNamePosition = lexer.getPosition(); final int tableNamePosition = lexer.getPosition();
tok = expectToken(lexer, "table name"); tok = GenericLexer.unquote(expectToken(lexer, "table name"));
tableExistsOrFail(tableNamePosition, tok, executionContext); tableExistsOrFail(tableNamePosition, tok, executionContext);
...@@ -654,6 +679,8 @@ public class SqlCompiler implements Closeable { ...@@ -654,6 +679,8 @@ public class SqlCompiler implements Closeable {
alterTableAddColumn(tableNamePosition, writer); alterTableAddColumn(tableNamePosition, writer);
} else if (Chars.equalsLowerCaseAscii("drop", tok)) { } else if (Chars.equalsLowerCaseAscii("drop", tok)) {
alterTableDropColumn(tableNamePosition, writer); alterTableDropColumn(tableNamePosition, writer);
} else {
throw SqlException.$(lexer.lastTokenPosition(), "unexpected token: ").put(tok);
} }
} catch (CairoException e) { } catch (CairoException e) {
LOG.info().$("failed to lock table for alter: ").$((Sinkable) e).$(); LOG.info().$("failed to lock table for alter: ").$((Sinkable) e).$();
...@@ -918,7 +945,7 @@ public class SqlCompiler implements Closeable { ...@@ -918,7 +945,7 @@ public class SqlCompiler implements Closeable {
} finally { } finally {
Unsafe.free(buf, len); Unsafe.free(buf, len);
} }
} catch (JsonException e) { } catch (TextException e) {
// we do not expect JSON exception here // we do not expect JSON exception here
} finally { } finally {
LOG.info().$("copied").$(); LOG.info().$("copied").$();
...@@ -1559,28 +1586,4 @@ public class SqlCompiler implements Closeable { ...@@ -1559,28 +1586,4 @@ public class SqlCompiler implements Closeable {
return this; return this;
} }
} }
static {
castGroups.extendAndSet(ColumnType.BOOLEAN, 2);
castGroups.extendAndSet(ColumnType.BYTE, 1);
castGroups.extendAndSet(ColumnType.SHORT, 1);
castGroups.extendAndSet(ColumnType.CHAR, 1);
castGroups.extendAndSet(ColumnType.INT, 1);
castGroups.extendAndSet(ColumnType.LONG, 1);
castGroups.extendAndSet(ColumnType.FLOAT, 1);
castGroups.extendAndSet(ColumnType.DOUBLE, 1);
castGroups.extendAndSet(ColumnType.DATE, 1);
castGroups.extendAndSet(ColumnType.TIMESTAMP, 1);
castGroups.extendAndSet(ColumnType.STRING, 3);
castGroups.extendAndSet(ColumnType.SYMBOL, 3);
castGroups.extendAndSet(ColumnType.BINARY, 4);
sqlControlSymbols.add("(");
sqlControlSymbols.add(";");
sqlControlSymbols.add(")");
sqlControlSymbols.add(",");
sqlControlSymbols.add("/*");
sqlControlSymbols.add("*/");
sqlControlSymbols.add("--");
}
} }
...@@ -37,7 +37,11 @@ import io.questdb.std.str.StringSink; ...@@ -37,7 +37,11 @@ import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils; import io.questdb.test.tools.TestUtils;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.postgresql.copy.CopyIn;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.postgresql.util.PGTimestamp; import org.postgresql.util.PGTimestamp;
import org.postgresql.util.PSQLException; import org.postgresql.util.PSQLException;
...@@ -1894,4 +1898,41 @@ public class PGJobContextTest extends AbstractGriffinTest { ...@@ -1894,4 +1898,41 @@ public class PGJobContextTest extends AbstractGriffinTest {
}); });
} }
@Test
@Ignore
public void testCopyIn() throws SQLException, BrokenBarrierException, InterruptedException {
final CountDownLatch haltLatch = new CountDownLatch(1);
final AtomicBoolean running = new AtomicBoolean(true);
try {
startBasicServer(
NetworkFacadeImpl.INSTANCE,
new DefaultPGWireConfiguration(),
haltLatch,
running
);
Properties properties = new Properties();
properties.setProperty("user", "admin");
properties.setProperty("password", "quest");
final Connection connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:9120/nabu_app", properties);
PreparedStatement stmt = connection.prepareStatement("create table tab (a int, b int)");
stmt.execute();
CopyManager copyManager = new CopyManager((BaseConnection) connection);
CopyIn copyIn = copyManager.copyIn("copy tab from STDIN");
String text = "a,b\r\n" +
"10,20";
byte[] bytes = text.getBytes();
copyIn.writeToCopy(bytes, 0, bytes.length);
copyIn.endCopy();
} finally {
running.set(false);
haltLatch.await();
}
}
} }
\ No newline at end of file
...@@ -27,7 +27,6 @@ import io.questdb.cairo.*; ...@@ -27,7 +27,6 @@ import io.questdb.cairo.*;
import io.questdb.cairo.security.AllowAllCairoSecurityContext; import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.RecordCursor; import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory; import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer; import io.questdb.cutlass.json.JsonLexer;
import io.questdb.griffin.AbstractGriffinTest; import io.questdb.griffin.AbstractGriffinTest;
import io.questdb.griffin.SqlException; import io.questdb.griffin.SqlException;
...@@ -1211,7 +1210,8 @@ public class TextLoaderTest extends AbstractGriffinTest { ...@@ -1211,7 +1210,8 @@ public class TextLoaderTest extends AbstractGriffinTest {
configureLoaderDefaults(textLoader); configureLoaderDefaults(textLoader);
try { try {
playText0(textLoader, text, 512, ENTITY_MANIPULATOR); playText0(textLoader, text, 512, ENTITY_MANIPULATOR);
} catch (UnknownDelimiterException ignore) { } catch (TextException e) {
TestUtils.assertContains(e.getFlyweightMessage(), "min deviation is too high");
} }
}); });
} }
...@@ -1223,7 +1223,8 @@ public class TextLoaderTest extends AbstractGriffinTest { ...@@ -1223,7 +1223,8 @@ public class TextLoaderTest extends AbstractGriffinTest {
configureLoaderDefaults(textLoader); configureLoaderDefaults(textLoader);
try { try {
playText0(textLoader, text, 512, ENTITY_MANIPULATOR); playText0(textLoader, text, 512, ENTITY_MANIPULATOR);
} catch (UnknownDelimiterException ignore) { } catch (TextException e) {
TestUtils.assertContains(e.getFlyweightMessage(), "not enough lines");
} }
}); });
} }
...@@ -2251,7 +2252,7 @@ public class TextLoaderTest extends AbstractGriffinTest { ...@@ -2251,7 +2252,7 @@ public class TextLoaderTest extends AbstractGriffinTest {
configureLoaderDefaults(textLoader, columnSeparator, atomicity, false); configureLoaderDefaults(textLoader, columnSeparator, atomicity, false);
} }
private void playJson(TextLoader textLoader, String jsonStr) throws JsonException { private void playJson(TextLoader textLoader, String jsonStr) throws TextException {
byte[] json = jsonStr.getBytes(StandardCharsets.UTF_8); byte[] json = jsonStr.getBytes(StandardCharsets.UTF_8);
textLoader.setState(TextLoader.LOAD_JSON_METADATA); textLoader.setState(TextLoader.LOAD_JSON_METADATA);
...@@ -2353,7 +2354,7 @@ public class TextLoaderTest extends AbstractGriffinTest { ...@@ -2353,7 +2354,7 @@ public class TextLoaderTest extends AbstractGriffinTest {
textLoader.clear(); textLoader.clear();
} }
private void playText0(TextLoader textLoader, String text, int firstBufSize, ByteManipulator manipulator) throws JsonException { private void playText0(TextLoader textLoader, String text, int firstBufSize, ByteManipulator manipulator) throws TextException {
byte[] bytes = text.getBytes(StandardCharsets.UTF_8); byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
int len = bytes.length; int len = bytes.length;
long buf = Unsafe.malloc(len); long buf = Unsafe.malloc(len);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册