diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java b/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java index 06c4a8f38be3299b6a0df111bac61e8afe1e6cda..665f46294dd248b6fe822c600c7b4ebdecdb7a9d 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java @@ -56,9 +56,7 @@ public class GenericLineProtoReceiver implements Closeable, Job { CairoEngine engine, CairoSecurityContext cairoSecurityContext ) { - nf = receiverCfg.getNetworkFacade(); - fd = nf.socketUdp(); if (fd < 0) { int errno = nf.errno(); @@ -67,41 +65,42 @@ public class GenericLineProtoReceiver implements Closeable, Job { } try { - if (!nf.bindUdp(fd, 0, receiverCfg.getPort())) { - int errno = nf.errno(); - LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); - throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); - } - - if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + if (nf.bindUdp(fd, 0, receiverCfg.getPort())) { + if (nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + this.commitRate = receiverCfg.getCommitRate(); + + if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { + LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); + } + + this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize()); + + lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); + parser = new CairoLineProtoParser(engine, cairoSecurityContext); + lexer.withParser(parser); + + LOG.info() + .$("started [fd=").$(fd) + .$(", bind=").$(receiverCfg.getBindIPv4Address()) + .$(", group=").$(receiverCfg.getGroupIPv4Address()) + .$(", port=").$(receiverCfg.getPort()) + .$(", commitRate=").$(commitRate) + .$(']').$(); + + return; + } int errno = nf.errno(); LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$(); throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']'); + } + int errno = nf.errno(); + LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); + throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); } catch (CairoException e) { close(); throw e; } - - this.commitRate = receiverCfg.getCommitRate(); - - if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { - LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); - } - - this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize()); - - lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); - parser = new CairoLineProtoParser(engine, cairoSecurityContext); - lexer.withParser(parser); - - LOG.info() - .$("started [fd=").$(fd) - .$(", bind=").$(receiverCfg.getBindIPv4Address()) - .$(", group=").$(receiverCfg.getGroupIPv4Address()) - .$(", port=").$(receiverCfg.getPort()) - .$(", commitRate=").$(commitRate) - .$(']').$(); } @Override diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java b/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java index bc75fa9307d8e7e6aadbebbd461408821e3dbe57..f4337bc6ec83c9cc7bad5af88c5e4f68aee3d050 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java @@ -66,35 +66,36 @@ public class LinuxLineProtoReceiver implements Closeable, Job { try { // when listening for multicast packets bind address must be 0 - if (!nf.bindUdp(fd, 0, receiverCfg.getPort())) { - int errno = nf.errno(); - LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); - throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); - } + if (nf.bindUdp(fd, 0, receiverCfg.getPort())) { + if (nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + this.commitRate = receiverCfg.getCommitRate(); + this.msgCount = receiverCfg.getMsgCount(); + + if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { + LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); + } - if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount); + lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); + parser = new CairoLineProtoParser(engine, cairoSecurityContext); + lexer.withParser(parser); + + LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$(); + + return; + } int errno = nf.errno(); LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$(); throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']'); } + int errno = nf.errno(); + LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); + throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); + } catch (CairoException e) { close(); throw e; } - - this.commitRate = receiverCfg.getCommitRate(); - this.msgCount = receiverCfg.getMsgCount(); - - if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { - LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); - } - - msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount); - lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); - parser = new CairoLineProtoParser(engine, cairoSecurityContext); - lexer.withParser(parser); - - LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$(); } @Override diff --git a/core/src/main/java/io/questdb/griffin/SqlCompiler.java b/core/src/main/java/io/questdb/griffin/SqlCompiler.java index e1db67c082ce3f9eb5a8a423dc058163935cefe8..7013439775216c5bb90f7e4653c83a4d6e9f7d0f 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCompiler.java +++ b/core/src/main/java/io/questdb/griffin/SqlCompiler.java @@ -1050,7 +1050,7 @@ public class SqlCompiler implements Closeable { final int tableNamePosition = lexer.getPosition(); - tok = expectToken(lexer, "table name"); + tok = GenericLexer.unquote(expectToken(lexer, "table name")); tableExistsOrFail(tableNamePosition, tok, executionContext); diff --git a/core/src/main/java/io/questdb/network/NetworkFacade.java b/core/src/main/java/io/questdb/network/NetworkFacade.java index b1c3c8d48fc0bda5d746264adff26bbb16fd5213..9f774a71696dcd46a51cc3940b91e612bae2511f 100644 --- a/core/src/main/java/io/questdb/network/NetworkFacade.java +++ b/core/src/main/java/io/questdb/network/NetworkFacade.java @@ -65,7 +65,6 @@ public interface NetworkFacade { long socketUdp(); - // todo: un-tangle boolean bindUdp(long fd, int ipv4Address, int port); boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address); diff --git a/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java b/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java index 1366616278998bf8ffe6896abfe1dddda96bde4c..21e60c8ea256b90c612240ed5b63e4b5669be1f5 100644 --- a/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java @@ -1746,8 +1746,7 @@ public class PGJobContextTest extends AbstractGriffinTest { new Thread(() -> { long fd = Net.socketTcp(true); try { - - nf.setReusePort(fd); + Assert.assertEquals(0, nf.setReusePort(fd)); nf.configureNoLinger(fd); Assert.assertTrue(nf.bindTcp(fd, 0, 9120)); diff --git a/core/src/test/java/io/questdb/griffin/DropTableTest.java b/core/src/test/java/io/questdb/griffin/DropTableTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f6f40a323dfe99c3a9b96bef27c20cc087f78e59 --- /dev/null +++ b/core/src/test/java/io/questdb/griffin/DropTableTest.java @@ -0,0 +1,88 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2019 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + ******************************************************************************/ + +package io.questdb.griffin; + +import io.questdb.test.tools.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +public class DropTableTest extends AbstractGriffinTest { + + @Test + public void testDropExisting() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table instrument (a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table instrument"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + + }); + } + + @Test + public void testDropUtf8() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table научный (a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table научный"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + }); + } + + @Test + public void testDropUtf8Quoted() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table 'научный руководитель'(a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table 'научный руководитель'"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + }); + } + + @Test + public void testDropQuoted() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table 'large table' (a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table 'large table'"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + }); + } + + @Test + public void testDropMissingFrom() throws Exception { + TestUtils.assertMemoryLeak(() -> { + try { + compiler.compile("drop i_am_missing"); + } catch (SqlException e) { + Assert.assertEquals(5, e.getPosition()); + TestUtils.assertContains("'table' expected", e.getFlyweightMessage()); + } + }); + } +}