未验证 提交 a0669042 编写于 作者: B Bolek Ziobrowski 提交者: GitHub

fix(pg): handle sql statement blocks in pg wire to make asyncpg work (#1788)

上级 ba063409
......@@ -88,8 +88,10 @@ stages:
condition: failed()
inputs:
workingDirectory: $(CRASH_LOG_LOCATION)
targetType: 'inline'
script: find . -type f -name 'hs_*.log' -exec sh -c 'x="{}"; mv "$x" "$ARCHIVED_GRASH_LOG"' \;
targetType: "inline"
script:
find . -type f -name 'hs_*.log' -exec sh -c 'x="{}"; mv "$x"
"$ARCHIVED_GRASH_LOG"' \;
env:
ARCHIVED_GRASH_LOG: $(ARCHIVED_GRASH_LOG)
- task: PublishBuildArtifacts@1
......
......@@ -25,9 +25,7 @@ steps:
inputs:
mavenPomFile: "pom.xml"
mavenOptions: "$(MAVEN_OPTS)"
options:
"compile -DskipTests -P build-web-console
-Dhttp.keepAlive=false"
options: "compile -DskipTests -P build-web-console -Dhttp.keepAlive=false"
jdkVersionOption: $(jdk)
condition: eq(variables['SOURCE_CODE_CHANGED'], 'false')
- task: Maven@3
......@@ -83,7 +81,8 @@ steps:
goals: "clean test"
options:
"--batch-mode --quiet -P build-web-console
-Dout=$(Build.SourcesDirectory)/ci/qlog.conf -Dhttp.keepAlive=false -Dtest.exclude=**/griffin/** -DfailIfNoTests=false"
-Dout=$(Build.SourcesDirectory)/ci/qlog.conf -Dhttp.keepAlive=false
-Dtest.exclude=**/griffin/** -DfailIfNoTests=false"
jdkVersionOption: $(jdk)
timeoutInMinutes: 55
condition: |
......@@ -99,7 +98,8 @@ steps:
goals: "clean test"
options:
"--batch-mode --quiet -P build-web-console
-Dout=$(Build.SourcesDirectory)/ci/qlog.conf -Dhttp.keepAlive=false -Dtest=griffin.** -DfailIfNoTests=false"
-Dout=$(Build.SourcesDirectory)/ci/qlog.conf -Dhttp.keepAlive=false
-Dtest=griffin.** -DfailIfNoTests=false"
jdkVersionOption: $(jdk)
timeoutInMinutes: 55
condition: |
......
......@@ -35,7 +35,7 @@
<nodejs.version>v16.13.1</nodejs.version>
<runtime.assembly>none</runtime.assembly>
<javac.target>11</javac.target>
<argLine>-Dfile.encoding=UTF-8</argLine>
<argLine>-ea -Dfile.encoding=UTF-8</argLine>
<test.exclude>None</test.exclude>
</properties>
......
......@@ -68,7 +68,10 @@ public class CairoException extends RuntimeException implements Sinkable, Flywei
@Override
public StackTraceElement[] getStackTrace() {
return EMPTY_STACK_TRACE;
StackTraceElement[] result = EMPTY_STACK_TRACE;
// This is to have correct stack trace reported in CI
assert (result = super.getStackTrace()) != null;
return result;
}
public boolean isCacheable() {
......
......@@ -99,6 +99,9 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
this.queryExecutors.extendAndSet(CompiledQuery.ALTER, this::executeAlterTable);
this.queryExecutors.extendAndSet(CompiledQuery.REPAIR, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.SET, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.BEGIN, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.COMMIT, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.ROLLBACK, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.DROP, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.RENAME_TABLE, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.COPY_LOCAL, sendConfirmation);
......@@ -106,6 +109,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
this.queryExecutors.extendAndSet(CompiledQuery.INSERT_AS_SELECT, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.COPY_REMOTE, JsonQueryProcessor::cannotCopyRemote);
this.queryExecutors.extendAndSet(CompiledQuery.BACKUP_TABLE, sendConfirmation);
this.queryExecutors.extendAndSet(CompiledQuery.CREATE_TABLE_AS_SELECT, sendConfirmation);
this.sqlExecutionContext = sqlExecutionContext;
this.nanosecondClock = engine.getConfiguration().getNanosecondClock();
this.circuitBreaker = new NetworkSqlExecutionCircuitBreaker(configuration.getCircuitBreakerConfiguration());
......
......@@ -48,7 +48,15 @@ import static io.questdb.cutlass.pgwire.PGOids.*;
import static io.questdb.std.datetime.millitime.DateFormatUtils.PG_DATE_MILLI_TIME_Z_FORMAT;
import static io.questdb.std.datetime.millitime.DateFormatUtils.PG_DATE_Z_FORMAT;
/**
* Useful PostgreSQL documentation links:<br>
* <a href="https://www.postgresql.org/docs/current/protocol-flow.html">Wire protocol</a><br>
* <a href="https://www.postgresql.org/docs/current/protocol-message-formats.html">Message formats</a>
*/
public class PGConnectionContext implements IOContext, Mutable, WriterSource {
private final static Log LOG = LogFactory.getLog(PGConnectionContext.class);
public static final String TAG_SET = "SET";
public static final String TAG_BEGIN = "BEGIN";
public static final String TAG_COMMIT = "COMMIT";
......@@ -57,25 +65,31 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
public static final String TAG_OK = "OK";
public static final String TAG_COPY = "COPY";
public static final String TAG_INSERT = "INSERT";
public static final char STATUS_IN_TRANSACTION = 'T';
public static final char STATUS_IN_ERROR = 'E';
public static final char STATUS_IDLE = 'I';
private static final int INT_BYTES_X = Numbers.bswap(Integer.BYTES);
private static final int INT_NULL_X = Numbers.bswap(-1);
private static final int SYNC_PARSE = 1;
private static final int SYNC_DESCRIBE = 2;
private static final int SYNC_BIND = 3;
private static final int SYNC_DESCRIBE_PORTAL = 4;
private static final byte MESSAGE_TYPE_ERROR_RESPONSE = 'E';
private static final int INIT_SSL_REQUEST = 80877103;
private static final int INIT_STARTUP_MESSAGE = 196608;
private static final int INIT_CANCEL_REQUEST = 80877102;
private static final int PREFIXED_MESSAGE_HEADER_LEN = 5;
private static final byte MESSAGE_TYPE_ERROR_RESPONSE = 'E';
private static final byte MESSAGE_TYPE_COMMAND_COMPLETE = 'C';
private static final byte MESSAGE_TYPE_EMPTY_QUERY = 'I';
private static final byte MESSAGE_TYPE_DATA_ROW = 'D';
private static final byte MESSAGE_TYPE_READY_FOR_QUERY = 'Z';
private final static Log LOG = LogFactory.getLog(PGConnectionContext.class);
private static final int PREFIXED_MESSAGE_HEADER_LEN = 5;
private static final byte MESSAGE_TYPE_LOGIN_RESPONSE = 'R';
private static final byte MESSAGE_TYPE_PARAMETER_STATUS = 'S';
private static final byte MESSAGE_TYPE_ROW_DESCRIPTION = 'T';
......@@ -86,13 +100,16 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
private static final byte MESSAGE_TYPE_NO_DATA = 'n';
private static final byte MESSAGE_TYPE_COPY_IN_RESPONSE = 'G';
private static final byte MESSAGE_TYPE_PORTAL_SUSPENDED = 's';
private static final int NO_TRANSACTION = 0;
private static final int IN_TRANSACTION = 1;
private static final int COMMIT_TRANSACTION = 2;
private static final int ERROR_TRANSACTION = 3;
private static final int ROLLING_BACK_TRANSACTION = 4;
private static final String WRITER_LOCK_REASON = "pgConnection";
private static final int PROTOCOL_TAIL_COMMAND_LENGTH = 64;
private final long recvBuffer;
private final long sendBuffer;
private final int recvBufferSize;
......@@ -126,7 +143,15 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
private final ObjectPool<DirectBinarySequence> binarySequenceParamsPool;
private final NetworkSqlExecutionCircuitBreaker circuitBreaker;
private final SCSequence tempSequence = new SCSequence();
//list of pair: column types (with format flag stored in first bit) AND additional type flag
private IntList activeSelectColumnTypes;
//stores result format codes (0=Text,1=Binary) from latest bind message
//we need it in case cursor gets invalidated and bind used non-default binary format for some column(s)
//pg clients (like asyncpg) fail when format sent by server is not the same as requested in bind message
private final IntList bindSelectColumnFormats;
private int parsePhaseBindVariableCount;
private long sendBufferPtr;
private boolean requireInitialMessage = false;
......@@ -146,6 +171,9 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
private TypesAndInsert typesAndInsert = null;
private long fd;
private CharSequence queryText;
//command tag used when returning row count to client,
//see CommandComplete (B) at https://www.postgresql.org/docs/current/protocol-message-formats.html
private CharSequence queryTag;
private CharSequence username;
private boolean authenticationRequired = true;
......@@ -167,6 +195,7 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
private long maxRows;
private final PGResumeProcessor resumeCursorExecuteRef = this::resumeCursorExecute;
private final PGResumeProcessor resumeCursorQueryRef = this::resumeCursorQuery;
private final BatchCallback batchCallback;
public PGConnectionContext(CairoEngine engine, PGWireConfiguration configuration, SqlExecutionContextImpl sqlExecutionContext) {
this.engine = engine;
......@@ -203,6 +232,43 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
final int blockCount = enableInsertCache ? configuration.getInsertCacheBlockCount() : 1; // 8
final int rowCount = enableInsertCache ? configuration.getInsertCacheRowCount() : 1; // 8
this.typesAndInsertCache = new AssociativeCache<>(blockCount, rowCount);
this.batchCallback = new PGConnectionBatchCallback();
this.bindSelectColumnFormats = new IntList();
}
class PGConnectionBatchCallback implements BatchCallback {
@Override
public void preCompile(SqlCompiler compiler) throws SqlException {
prepareForNewBatchQuery();
PGConnectionContext.this.typesAndInsert = null;
PGConnectionContext.this.typesAndSelect = null;
}
@Override
public void postCompile(SqlCompiler compiler, CompiledQuery cq, CharSequence text)
throws SqlException, PeerIsSlowToReadException, PeerDisconnectedException {
PGConnectionContext.this.queryText = text;
LOG.info().$("parse [fd=").$(fd).$(", q=").utf8(text).I$();
processCompiledQuery(compiler, cq);
if (typesAndSelect != null) {
activeSelectColumnTypes = selectColumnTypes;
buildSelectColumnTypes();
assert queryText != null;
queryTag = TAG_SELECT;
setupFactoryAndCursor(compiler);
prepareRowDescription();
sendCursor(0, resumeCursorQueryRef, resumeQueryCompleteRef);
} else if (typesAndInsert != null) {
executeInsert();
} else if (cq.getType() == CompiledQuery.INSERT_AS_SELECT ||
cq.getType() == CompiledQuery.CREATE_TABLE_AS_SELECT) {
prepareCommandComplete(true);
} else {
executeTag();
prepareCommandComplete(false);
}
}
}
public static int getInt(long address, long msgLimit, CharSequence errorMessage) throws BadProtocolException {
......@@ -1043,43 +1109,9 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
}
// not cached - compile to see what it is
final CompiledQuery cc = compiler.compile(queryText, sqlExecutionContext);
sqlExecutionContext.storeTelemetry(cc.getType(), Telemetry.ORIGIN_POSTGRES);
switch (cc.getType()) {
case CompiledQuery.SELECT:
typesAndSelect = typesAndSelectPool.pop();
typesAndSelect.of(cc.getRecordCursorFactory(), bindVariableService);
queryTag = TAG_SELECT;
LOG.debug().$("cache select [sql=").$(queryText).$(", thread=").$(Thread.currentThread().getId()).$(']').$();
break;
case CompiledQuery.INSERT:
queryTag = TAG_INSERT;
typesAndInsert = typesAndInsertPool.pop();
typesAndInsert.of(cc.getInsertStatement(), bindVariableService);
if (bindVariableService.getIndexedVariableCount() > 0) {
LOG.debug().$("cache insert [sql=").$(queryText).$(", thread=").$(Thread.currentThread().getId()).$(']').$();
// we can add insert to cache right away because it is local to the connection
typesAndInsertCache.put(queryText, typesAndInsert);
}
break;
case CompiledQuery.COPY_LOCAL:
// uncached
queryTag = TAG_COPY;
sendCopyInResponse(compiler.getEngine(), cc.getTextLoader());
break;
case CompiledQuery.SET:
configureContextForSet();
break;
case CompiledQuery.ALTER:
try (QueryFuture cf = cc.execute(tempSequence)) {
cf.await();
}
default:
// DDL SQL
queryTag = TAG_OK;
break;
}
final CompiledQuery cc = compiler.compile(queryText, sqlExecutionContext); //here
processCompiledQuery(compiler, cc);
} else {
isEmptyQuery = true;
}
......@@ -1087,20 +1119,65 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
return true;
}
private void configureContextForSet() {
if (SqlKeywords.isBegin(queryText)) {
queryTag = TAG_BEGIN;
transactionState = IN_TRANSACTION;
} else if (SqlKeywords.isCommit(queryText)) {
queryTag = TAG_COMMIT;
if (transactionState != ERROR_TRANSACTION) {
transactionState = COMMIT_TRANSACTION;
}
} else if (SqlKeywords.isRollback(queryText)) {
queryTag = TAG_ROLLBACK;
transactionState = ROLLING_BACK_TRANSACTION;
} else {
queryTag = TAG_SET;
private void processCompiledQuery(SqlCompiler compiler, CompiledQuery cq)
throws PeerDisconnectedException, PeerIsSlowToReadException, SqlException {
sqlExecutionContext.storeTelemetry(cq.getType(), Telemetry.ORIGIN_POSTGRES);
switch (cq.getType()) {
case CompiledQuery.CREATE_TABLE_AS_SELECT:
queryTag = TAG_SELECT;
rowCount = cq.getInsertCount();
break;
case CompiledQuery.SELECT:
typesAndSelect = typesAndSelectPool.pop();
typesAndSelect.of(cq.getRecordCursorFactory(), bindVariableService);
queryTag = TAG_SELECT;
LOG.debug().$("cache select [sql=").$(queryText).$(", thread=").$(Thread.currentThread().getId()).$(']').$();
break;
case CompiledQuery.INSERT:
queryTag = TAG_INSERT;
typesAndInsert = typesAndInsertPool.pop();
typesAndInsert.of(cq.getInsertStatement(), bindVariableService);
if (bindVariableService.getIndexedVariableCount() > 0) {
LOG.debug().$("cache insert [sql=").$(queryText).$(", thread=").$(Thread.currentThread().getId()).$(']').$();
// we can add insert to cache right away because it is local to the connection
typesAndInsertCache.put(queryText, typesAndInsert);
}
break;
case CompiledQuery.INSERT_AS_SELECT:
queryTag = TAG_INSERT;
rowCount = cq.getInsertCount();
break;
case CompiledQuery.COPY_LOCAL:
// uncached
queryTag = TAG_COPY;
sendCopyInResponse(compiler.getEngine(), cq.getTextLoader());
break;
case CompiledQuery.SET:
queryTag = TAG_SET;
break;
case CompiledQuery.BEGIN:
queryTag = TAG_BEGIN;
transactionState = IN_TRANSACTION;
break;
case CompiledQuery.COMMIT:
queryTag = TAG_COMMIT;
if (transactionState != ERROR_TRANSACTION) {
transactionState = COMMIT_TRANSACTION;
}
break;
case CompiledQuery.ROLLBACK:
queryTag = TAG_ROLLBACK;
transactionState = ROLLING_BACK_TRANSACTION;
break;
case CompiledQuery.ALTER:
try (QueryFuture cf = cq.execute(tempSequence)) {
cf.await();
}
default:
// DDL SQL
queryTag = TAG_OK;
break;
}
}
......@@ -1337,7 +1414,7 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
/**
* returns address of where parsing stopped. If there are remaining bytes left
* int the buffer they need to be passed again in parse function along with
* in the buffer they need to be passed again in parse function along with
* any additional bytes received
*/
private void parse(long address, int len, @Transient SqlCompiler compiler)
......@@ -1540,10 +1617,17 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
}
private void prepareForNewQuery() {
prepareForNewBatchQuery();
if(completed) {
characterStore.clear();
}
}
//clears whole state except for characterStore because top-level batch text is using it
private void prepareForNewBatchQuery(){
if (completed) {
LOG.debug().$("prepare for new query").$();
isEmptyQuery = false;
characterStore.clear();
bindVariableService.clear();
currentCursor = Misc.free(currentCursor);
typesAndInsert = null;
......@@ -1708,6 +1792,8 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
}
if (typesAndSelect != null) {
bindSelectColumnFormats.clear();
short columnFormatCodeCount = getShort(lo, msgLimit, "could not read result set column format codes");
if (columnFormatCodeCount > 0) {
......@@ -1718,12 +1804,15 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
final long spaceNeeded = lo + (columnFormatCodeCount + 1) * Short.BYTES;
if (spaceNeeded <= msgLimit) {
bindSelectColumnFormats.setPos(columnCount);
if (columnFormatCodeCount == columnCount) {
// good to go
for (int i = 0; i < columnCount; i++) {
lo += Short.BYTES;
final short code = getShortUnsafe(lo);
activeSelectColumnTypes.setQuick(2 * i, toColumnBinaryType(code, m.getColumnType(i)));
bindSelectColumnFormats.setQuick(i, code);
activeSelectColumnTypes.setQuick(2 * i + 1, 0);
}
} else if (columnFormatCodeCount == 1) {
......@@ -1731,6 +1820,7 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
final short code = getShortUnsafe(lo);
for (int i = 0; i < columnCount; i++) {
activeSelectColumnTypes.setQuick(2 * i, toColumnBinaryType(code, m.getColumnType(i)));
bindSelectColumnFormats.setQuick(i, code);
activeSelectColumnTypes.setQuick(2 * i + 1, 0);
}
} else {
......@@ -1747,7 +1837,18 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
.$(']').$();
throw BadProtocolException.INSTANCE;
}
} else if (columnFormatCodeCount == 0) {
//if count == 0 then we've to use default and clear binary flags that might come from cached statements
final RecordMetadata m = typesAndSelect.getFactory().getMetadata();
final int columnCount = m.getColumnCount();
bindSelectColumnFormats.setPos(columnCount);
for (int i = 0; i < columnCount; i++) {
activeSelectColumnTypes.setQuick(2 * i, toColumnBinaryType((short) 0, m.getColumnType(i)));
bindSelectColumnFormats.setQuick(i, 0);
}
}
}
syncActions.add(SYNC_BIND);
......@@ -1995,27 +2096,21 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
syncActions.add(SYNC_PARSE);
}
//process one or more queries (batch/script) . "Simple Query" in PostgreSQL docs.
private void processQuery(long lo, long limit, @Transient SqlCompiler compiler)
throws BadProtocolException, SqlException, PeerDisconnectedException, PeerIsSlowToReadException {
// simple query, typically a script, which we don't yet support
prepareForNewQuery();
parseQueryText(lo, limit - 1, compiler);
if (typesAndSelect != null) {
activeSelectColumnTypes = selectColumnTypes;
buildSelectColumnTypes();
assert queryText != null;
queryTag = TAG_SELECT;
setupFactoryAndCursor(compiler);
prepareRowDescription();
sendCursor(0, resumeCursorQueryRef, resumeQueryCompleteRef);
} else if (typesAndInsert != null) {
executeInsert();
prepareForNewQuery();
CharacterStoreEntry e = characterStore.newEntry();
if (Chars.utf8Decode(lo, limit - 1, e)) {
queryText = characterStore.toImmutable();
compiler.compileBatch(queryText, sqlExecutionContext, batchCallback );
} else {
executeTag();
prepareCommandComplete(false);
LOG.error().$("invalid UTF8 bytes in parse query").$();
throw BadProtocolException.INSTANCE;
}
sendReadyForNewQuery();
}
......@@ -2241,6 +2336,7 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
currentFactory = Misc.free(currentFactory);
compileQuery(compiler);
buildSelectColumnTypes();
applyLatestBindColumnFormats();
} catch (Throwable e) {
currentFactory = Misc.free(currentFactory);
throw e;
......@@ -2263,6 +2359,15 @@ public class PGConnectionContext implements IOContext, Mutable, WriterSource {
}
}
//replace column formats in activeSelectColumnTypes with those from latest bind call
private void applyLatestBindColumnFormats() {
for (int i = 0; i < bindSelectColumnFormats.size(); i++) {
int newValue = toColumnBinaryType((short) bindSelectColumnFormats.get(i),
toColumnType(activeSelectColumnTypes.getQuick(2 * i)));
activeSelectColumnTypes.setQuick(2 * i, newValue);
}
}
private void shiftReceiveBuffer(long readOffsetBeforeParse) {
final long len = recvBufferWriteOffset - readOffsetBeforeParse;
LOG.debug()
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
/**
* Interface used to add steps before and/or after query compilation, e.g. cache checks and query result sending to jdbc client .
*/
public interface BatchCallback {
void preCompile(SqlCompiler compiler) throws SqlException;
void postCompile(SqlCompiler compiler, CompiledQuery cq, CharSequence queryText)
throws PeerIsSlowToReadException, SqlException, PeerDisconnectedException;
}
......@@ -31,6 +31,7 @@ import io.questdb.griffin.update.UpdateStatement;
import io.questdb.mp.SCSequence;
public interface CompiledQuery {
//these values should be covered in both JsonQueryProcessor and PGConnectionContext
short SELECT = 1;
short INSERT = 2;
short TRUNCATE = 3;
......@@ -48,6 +49,10 @@ public interface CompiledQuery {
short LOCK = 14;
short UNLOCK = 14;
short VACUUM = 15;
short BEGIN = 16;
short COMMIT = 17;
short ROLLBACK = 18;
short CREATE_TABLE_AS_SELECT = 19;
RecordCursorFactory getRecordCursorFactory();
......@@ -71,4 +76,9 @@ public interface CompiledQuery {
* @throws SqlException - throws exception if command execution fails
*/
QueryFuture execute(SCSequence eventSubSeq) throws SqlException;
/**
* Returns number of rows inserted by this command . Used e.g. in pg wire protocol .
*/
long getInsertCount();
}
......@@ -56,6 +56,8 @@ public class CompiledQueryImpl implements CompiledQuery {
private AlterStatement alterStatement;
private short type;
private SqlExecutionContext sqlExecutionContext;
//count of rows affected by this statement ; currently works only for insert as select/create table as insert
private long insertCount;
public CompiledQueryImpl(CairoEngine engine) {
this.engine = engine;
......@@ -128,6 +130,11 @@ public class CompiledQueryImpl implements CompiledQuery {
return QueryFuture.DONE;
}
@Override
public long getInsertCount() {
return this.insertCount;
}
public CompiledQuery of(short type) {
return of(type, null);
}
......@@ -161,6 +168,7 @@ public class CompiledQueryImpl implements CompiledQuery {
private CompiledQuery of(short type, RecordCursorFactory factory) {
this.type = type;
this.recordCursorFactory = factory;
this.insertCount = -1;
return this;
}
......@@ -187,6 +195,12 @@ public class CompiledQueryImpl implements CompiledQuery {
return of(CREATE_TABLE);
}
CompiledQuery ofCreateTableAsSelect(long insertCount) {
of(CREATE_TABLE_AS_SELECT);
this.insertCount = insertCount;
return this;
}
CompiledQuery ofDrop() {
return of(DROP);
}
......@@ -196,8 +210,10 @@ public class CompiledQueryImpl implements CompiledQuery {
return of(INSERT);
}
CompiledQuery ofInsertAsSelect() {
return of(INSERT_AS_SELECT);
CompiledQuery ofInsertAsSelect(long insertCount) {
of(INSERT_AS_SELECT);
this.insertCount = insertCount;
return this;
}
CompiledQuery ofRenameTable() {
......@@ -212,6 +228,18 @@ public class CompiledQueryImpl implements CompiledQuery {
return of(SET);
}
CompiledQuery ofBegin() {
return of(BEGIN);
}
CompiledQuery ofCommit() {
return of(COMMIT);
}
CompiledQuery ofRollback() {
return of(ROLLBACK);
}
CompiledQuery ofTruncate() {
return of(TRUNCATE);
}
......
......@@ -107,7 +107,7 @@ public class InsertStatementImpl implements InsertStatement {
InsertRowImpl row = insertRows.get(i);
row.append(writer);
}
return 1;
return insertRows.size();
}
@Override
......
......@@ -797,7 +797,7 @@ public final class SqlParser {
}
CharSequence tok = optTok(lexer);
if (tok == null || setOperations.excludes(tok)) {
if (tok == null || Chars.equals(tok, ';') || setOperations.excludes(tok)) {
lexer.unparse();
return model;
}
......@@ -860,7 +860,7 @@ public final class SqlParser {
tok = null;
}
if (tok == null) {
if (tok == null || Chars.equals(tok, ';')) { //token can also be ';' on query boundary
QueryModel nestedModel = queryModelPool.next();
nestedModel.setModelPosition(modelPosition);
ExpressionNode func = expressionNodePool.next().of(ExpressionNode.FUNCTION, "long_sequence", 0, lexer.lastTokenPosition());
......@@ -1560,7 +1560,7 @@ public final class SqlParser {
if (tok != null && Chars.equals(tok, ';')) {
alias = createColumnAlias(expr, model);
tok = optTok(lexer);
//tok = optTok(lexer);
} else if (tok != null && columnAliasStop.excludes(tok)) {
assertNotDot(lexer, tok);
......@@ -1577,7 +1577,7 @@ public final class SqlParser {
col.setAlias(alias);
model.addBottomUpColumn(col);
if (tok == null) {
if (tok == null || Chars.equals(tok, ';') ) {
lexer.unparse();
break;
}
......
......@@ -33,6 +33,12 @@ public class SqlUtil {
static final CharSequenceHashSet disallowedAliases = new CharSequenceHashSet();
/**
* Fetches next non-whitespace token that's not part of single or multiline comment.
*
* @param lexer input lexer
* @return with next valid token or null if end of input is reached .
*/
public static CharSequence fetchNext(GenericLexer lexer) {
int blockCount = 0;
boolean lineComment = false;
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.engine.functions.lock;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.constants.StrConstant;
import io.questdb.std.IntList;
import io.questdb.std.ObjList;
/**
* No-op handler for pg_advisory_unlock_all() calls.
*/
public class AdvisoryUnlockAll implements FunctionFactory {
@Override
public String getSignature() {
return "pg_advisory_unlock_all()";
}
@Override
public boolean isGroupBy() {
return false;
}
@Override
public boolean isCursor() {
return false;
}
@Override
public boolean isBoolean() {
return false;
}
@Override
public boolean isRuntimeConstant() {
return true;
}
@Override
public Function newInstance(int position, ObjList<Function> args, IntList argPositions, CairoConfiguration configuration, SqlExecutionContext sqlExecutionContext) throws SqlException {
return StrConstant.NULL;
}
}
......@@ -671,7 +671,10 @@ public final class Chars {
}
}
}
/* Decodes bytes between lo,hi addresses into sink.
* Note: operation might fail in the middle and leave sink in inconsistent state .
* @return true if input is proper utf8 and false otherwise . */
public static boolean utf8Decode(long lo, long hi, CharSinkBase sink) {
long p = lo;
while (p < hi) {
......
......@@ -556,6 +556,9 @@ open module io.questdb {
io.questdb.griffin.engine.functions.catalogue.TableMetadataCursorFactory,
io.questdb.griffin.engine.functions.catalogue.DumpMemoryUsageFunctionFactory,
io.questdb.griffin.engine.functions.catalogue.DumpThreadStacksFunctionFactory,
// PostgreSQL advisory locks functions
io.questdb.griffin.engine.functions.lock.AdvisoryUnlockAll,
// concat()
io.questdb.griffin.engine.functions.str.ConcatFunctionFactory,
// replace()
......
......@@ -532,6 +532,9 @@ io.questdb.griffin.engine.functions.catalogue.TableMetadataCursorFactory
io.questdb.griffin.engine.functions.catalogue.DumpMemoryUsageFunctionFactory
io.questdb.griffin.engine.functions.catalogue.DumpThreadStacksFunctionFactory
# PostgreSQL advisory locks functions
io.questdb.griffin.engine.functions.lock.AdvisoryUnlockAll
# concat()
io.questdb.griffin.engine.functions.str.ConcatFunctionFactory
......@@ -590,4 +593,4 @@ io.questdb.griffin.engine.functions.geohash.GeoHashFromCoordinatesFunctionFactor
# bin functions
io.questdb.griffin.engine.functions.bin.Base64FunctionFactory
io.questdb.griffin.engine.functions.catalogue.TypeOfFunctionFactory
\ No newline at end of file
io.questdb.griffin.engine.functions.catalogue.TypeOfFunctionFactory
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.pgwire;
import io.questdb.griffin.AbstractGriffinTest;
import io.questdb.griffin.DefaultSqlExecutionCircuitBreakerConfiguration;
import io.questdb.griffin.SqlExecutionCircuitBreakerConfiguration;
import io.questdb.network.DefaultIODispatcherConfiguration;
import io.questdb.network.IODispatcherConfiguration;
import io.questdb.network.NetworkFacade;
import io.questdb.network.NetworkFacadeImpl;
import io.questdb.std.Rnd;
import org.jetbrains.annotations.NotNull;
import java.sql.*;
import java.util.Arrays;
import java.util.Properties;
import java.util.TimeZone;
public class BasePGTest extends AbstractGriffinTest {
protected PGWireServer createPGServer(PGWireConfiguration configuration) {
return PGWireServer.create(
configuration,
null,
LOG,
engine,
compiler.getFunctionFactoryCache(),
metrics
);
}
protected PGWireServer createPGServer(int workerCount) {
return createPGServer(workerCount, Long.MAX_VALUE);
}
protected PGWireServer createPGServer(int workerCount, long maxQueryTime) {
final int[] affinity = new int[workerCount];
Arrays.fill(affinity, -1);
final SqlExecutionCircuitBreakerConfiguration circuitBreakerConfiguration = new DefaultSqlExecutionCircuitBreakerConfiguration() {
@Override
public long getMaxTime() {
return maxQueryTime;
}
};
final PGWireConfiguration conf = new DefaultPGWireConfiguration() {
@Override
public SqlExecutionCircuitBreakerConfiguration getCircuitBreakerConfiguration() {
return circuitBreakerConfiguration;
}
@Override
public int[] getWorkerAffinity() {
return affinity;
}
@Override
public int getWorkerCount() {
return workerCount;
}
@Override
public Rnd getRandom() {
return new Rnd();
}
};
return createPGServer(conf);
}
protected void execSelectWithParam(PreparedStatement select, int value) throws SQLException {
sink.clear();
select.setInt(1, value);
try (ResultSet resultSet = select.executeQuery()) {
sink.clear();
while (resultSet.next()) {
sink.put(resultSet.getInt(1));
sink.put('\n');
}
}
}
enum Mode {
Simple("simple"), Extended("extended"), ExtendedForPrepared("extendedForPrepared"), ExtendedCacheEverything("extendedCacheEverything");
private final String value;
Mode(String value) {
this.value = value;
}
}
protected Connection getConnection(boolean simple, boolean binary) throws SQLException {
if (simple) {
return getConnection(Mode.Simple, binary, -2);
} else {
return getConnection(Mode.Extended, binary, -2);
}
}
protected Connection getConnection(Mode mode, boolean binary, int prepareThreshold) throws SQLException {
Properties properties = new Properties();
properties.setProperty("user", "admin");
properties.setProperty("password", "quest");
properties.setProperty("sslmode", "disable");
properties.setProperty("binaryTransfer", Boolean.toString(binary));
properties.setProperty("preferQueryMode", mode.value);
if (prepareThreshold > -2) {//-1 has special meaning in pg jdbc ...
properties.setProperty("prepareThreshold", String.valueOf(prepareThreshold));
}
TimeZone.setDefault(TimeZone.getTimeZone("EDT"));
//use this line to switch to local postgres
//return DriverManager.getConnection("jdbc:postgresql://127.0.0.1:5432/qdb", properties);
return DriverManager.getConnection("jdbc:postgresql://127.0.0.1:8812/qdb", properties);
}
@NotNull
protected NetworkFacade getFragmentedSendFacade() {
return new NetworkFacadeImpl() {
@Override
public int send(long fd, long buffer, int bufferLen) {
int total = 0;
for (int i = 0; i < bufferLen; i++) {
int n = super.send(fd, buffer + i, 1);
if (n < 0) {
return n;
}
total += n;
}
return total;
}
};
}
@NotNull
protected DefaultPGWireConfiguration getHexPgWireConfig() {
return new DefaultPGWireConfiguration() {
@Override
public String getDefaultPassword() {
return "oh";
}
@Override
public String getDefaultUsername() {
return "xyz";
}
@Override
public IODispatcherConfiguration getDispatcherConfiguration() {
return new DefaultIODispatcherConfiguration() {
@Override
public int getBindPort() {
return 8812;
}
@Override
public boolean getPeerNoLinger() {
return false;
}
};
}
};
}
}
......@@ -82,7 +82,7 @@ import static io.questdb.test.tools.TestUtils.drainEngineCmdQueue;
import static org.junit.Assert.*;
@SuppressWarnings("SqlNoDataSourceInspection")
public class PGJobContextTest extends AbstractGriffinTest {
public class PGJobContextTest extends BasePGTest {
private static final Log LOG = LogFactory.getLog(PGJobContextTest.class);
private static final long DAY_MICROS = Timestamps.HOUR_MICROS * 24L;
......@@ -4424,126 +4424,6 @@ create table tab as (
};
}
private PGWireServer createPGServer(PGWireConfiguration configuration) {
return PGWireServer.create(
configuration,
null,
LOG,
engine,
compiler.getFunctionFactoryCache(),
metrics
);
}
private PGWireServer createPGServer(int workerCount) {
return createPGServer(workerCount, Long.MAX_VALUE);
}
private PGWireServer createPGServer(int workerCount, long maxQueryTime) {
final int[] affinity = new int[workerCount];
Arrays.fill(affinity, -1);
final SqlExecutionCircuitBreakerConfiguration circuitBreakerConfiguration = new DefaultSqlExecutionCircuitBreakerConfiguration() {
@Override
public long getMaxTime() {
return maxQueryTime;
}
};
final PGWireConfiguration conf = new DefaultPGWireConfiguration() {
@Override
public SqlExecutionCircuitBreakerConfiguration getCircuitBreakerConfiguration() {
return circuitBreakerConfiguration;
}
@Override
public int[] getWorkerAffinity() {
return affinity;
}
@Override
public int getWorkerCount() {
return workerCount;
}
@Override
public Rnd getRandom() {
return new Rnd();
}
};
return createPGServer(conf);
}
private void execSelectWithParam(PreparedStatement select, int value) throws SQLException {
sink.clear();
select.setInt(1, value);
try (ResultSet resultSet = select.executeQuery()) {
sink.clear();
while (resultSet.next()) {
sink.put(resultSet.getInt(1));
sink.put('\n');
}
}
}
private Connection getConnection(boolean simple, boolean binary) throws SQLException {
Properties properties = new Properties();
properties.setProperty("user", "admin");
properties.setProperty("password", "quest");
properties.setProperty("sslmode", "disable");
properties.setProperty("binaryTransfer", Boolean.toString(binary));
if (simple) {
properties.setProperty("preferQueryMode", "simple");
}
TimeZone.setDefault(TimeZone.getTimeZone("EDT"));
return DriverManager.getConnection("jdbc:postgresql://127.0.0.1:8812/qdb", properties);
}
@NotNull
private NetworkFacade getFragmentedSendFacade() {
return new NetworkFacadeImpl() {
@Override
public int send(long fd, long buffer, int bufferLen) {
int total = 0;
for (int i = 0; i < bufferLen; i++) {
int n = super.send(fd, buffer + i, 1);
if (n < 0) {
return n;
}
total += n;
}
return total;
}
};
}
@NotNull
private DefaultPGWireConfiguration getHexPgWireConfig() {
return new DefaultPGWireConfiguration() {
@Override
public String getDefaultPassword() {
return "oh";
}
@Override
public String getDefaultUsername() {
return "xyz";
}
@Override
public IODispatcherConfiguration getDispatcherConfiguration() {
return new DefaultIODispatcherConfiguration() {
@Override
public int getBindPort() {
return 8812;
}
};
}
};
}
private void insertAllGeoHashTypes(boolean binary) throws Exception {
assertMemoryLeak(() -> {
......
......@@ -1656,6 +1656,36 @@ public class SqlCompilerTest extends AbstractGriffinTest {
assertMemoryLeak(() -> Assert.assertEquals(SET, compiler.compile(query, sqlExecutionContext).getType()));
}
//close command is a no-op in qdb
@Test
public void testCompileCloseDoesNothing() throws Exception {
String query = "CLOSE ALL;";
assertMemoryLeak(() -> Assert.assertEquals(SET, compiler.compile(query, sqlExecutionContext).getType()));
}
//reset command is a no-op in qdb
@Test
public void testCompileResetDoesNothing() throws Exception {
String query = "RESET ALL;";
assertMemoryLeak(() -> Assert.assertEquals(SET, compiler.compile(query, sqlExecutionContext).getType()));
}
//unlisten command is a no-op in qdb (it's a pg-specific notification mechanism)
@Test
public void testCompileUnlistenDoesNothing() throws Exception {
String query = "UNLISTEN *;";
assertMemoryLeak(() -> Assert.assertEquals(SET, compiler.compile(query, sqlExecutionContext).getType()));
}
@Test
public void testCompileStatementsBatch() throws Exception {
String query = "SELECT pg_advisory_unlock_all(); CLOSE ALL;";
assertMemoryLeak(()-> {
compiler.compileBatch(query, sqlExecutionContext, null);
});
}
@Test
public void testCreateAsSelect() throws SqlException {
String expectedData = "a1\ta\tb\tc\td\te\tf\tf1\tg\th\ti\tj\tj1\tk\tl\tm\n" +
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.engine.functions.lock;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.engine.AbstractFunctionFactoryTest;
import io.questdb.test.tools.TestUtils;
import org.junit.Test;
public class PgAdvisoryLocksTest extends AbstractFunctionFactoryTest {
@Test
public void testCallPgAdvisoryUnlockAllDirectlyReturnsNull() throws SqlException {
call().andAssert(null);
}
@Test
public void testCallPgAdvisoryUnlockAllReturnsSingleNullRow() throws Exception {
assertMemoryLeak(() -> TestUtils.assertSql(
compiler,
sqlExecutionContext,
"select pg_advisory_unlock_all();",
sink,
"pg_advisory_unlock_all\n\n"
));
}
@Override
protected FunctionFactory getFunctionFactory() {
return new AdvisoryUnlockAll();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册