From 1a10e655577bb7afadb273929d8d5daf6eb37993 Mon Sep 17 00:00:00 2001 From: Vlad Ilyushchenko Date: Sun, 13 Oct 2019 09:10:19 +0100 Subject: [PATCH] GRIFFIN: drop table bugfix and tests NET: replaced constant boolean NOT logic when dealing with result of Net.bindUdp() --- .../line/udp/GenericLineProtoReceiver.java | 57 ++++++------ .../line/udp/LinuxLineProtoReceiver.java | 41 ++++----- .../java/io/questdb/griffin/SqlCompiler.java | 2 +- .../io/questdb/network/NetworkFacade.java | 1 - .../cutlass/pgwire/PGJobContextTest.java | 3 +- .../io/questdb/griffin/DropTableTest.java | 88 +++++++++++++++++++ 6 files changed, 139 insertions(+), 53 deletions(-) create mode 100644 core/src/test/java/io/questdb/griffin/DropTableTest.java diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java b/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java index 06c4a8f38..665f46294 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/GenericLineProtoReceiver.java @@ -56,9 +56,7 @@ public class GenericLineProtoReceiver implements Closeable, Job { CairoEngine engine, CairoSecurityContext cairoSecurityContext ) { - nf = receiverCfg.getNetworkFacade(); - fd = nf.socketUdp(); if (fd < 0) { int errno = nf.errno(); @@ -67,41 +65,42 @@ public class GenericLineProtoReceiver implements Closeable, Job { } try { - if (!nf.bindUdp(fd, 0, receiverCfg.getPort())) { - int errno = nf.errno(); - LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); - throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); - } - - if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + if (nf.bindUdp(fd, 0, receiverCfg.getPort())) { + if (nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + this.commitRate = receiverCfg.getCommitRate(); + + if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { + LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); + } + + this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize()); + + lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); + parser = new CairoLineProtoParser(engine, cairoSecurityContext); + lexer.withParser(parser); + + LOG.info() + .$("started [fd=").$(fd) + .$(", bind=").$(receiverCfg.getBindIPv4Address()) + .$(", group=").$(receiverCfg.getGroupIPv4Address()) + .$(", port=").$(receiverCfg.getPort()) + .$(", commitRate=").$(commitRate) + .$(']').$(); + + return; + } int errno = nf.errno(); LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$(); throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']'); + } + int errno = nf.errno(); + LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); + throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); } catch (CairoException e) { close(); throw e; } - - this.commitRate = receiverCfg.getCommitRate(); - - if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { - LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); - } - - this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize()); - - lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); - parser = new CairoLineProtoParser(engine, cairoSecurityContext); - lexer.withParser(parser); - - LOG.info() - .$("started [fd=").$(fd) - .$(", bind=").$(receiverCfg.getBindIPv4Address()) - .$(", group=").$(receiverCfg.getGroupIPv4Address()) - .$(", port=").$(receiverCfg.getPort()) - .$(", commitRate=").$(commitRate) - .$(']').$(); } @Override diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java b/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java index bc75fa930..f4337bc6e 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/LinuxLineProtoReceiver.java @@ -66,35 +66,36 @@ public class LinuxLineProtoReceiver implements Closeable, Job { try { // when listening for multicast packets bind address must be 0 - if (!nf.bindUdp(fd, 0, receiverCfg.getPort())) { - int errno = nf.errno(); - LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); - throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); - } + if (nf.bindUdp(fd, 0, receiverCfg.getPort())) { + if (nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + this.commitRate = receiverCfg.getCommitRate(); + this.msgCount = receiverCfg.getMsgCount(); + + if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { + LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); + } - if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) { + msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount); + lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); + parser = new CairoLineProtoParser(engine, cairoSecurityContext); + lexer.withParser(parser); + + LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$(); + + return; + } int errno = nf.errno(); LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$(); throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']'); } + int errno = nf.errno(); + LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$(); + throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort()); + } catch (CairoException e) { close(); throw e; } - - this.commitRate = receiverCfg.getCommitRate(); - this.msgCount = receiverCfg.getMsgCount(); - - if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) { - LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$(); - } - - msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount); - lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize()); - parser = new CairoLineProtoParser(engine, cairoSecurityContext); - lexer.withParser(parser); - - LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$(); } @Override diff --git a/core/src/main/java/io/questdb/griffin/SqlCompiler.java b/core/src/main/java/io/questdb/griffin/SqlCompiler.java index e1db67c08..701343977 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCompiler.java +++ b/core/src/main/java/io/questdb/griffin/SqlCompiler.java @@ -1050,7 +1050,7 @@ public class SqlCompiler implements Closeable { final int tableNamePosition = lexer.getPosition(); - tok = expectToken(lexer, "table name"); + tok = GenericLexer.unquote(expectToken(lexer, "table name")); tableExistsOrFail(tableNamePosition, tok, executionContext); diff --git a/core/src/main/java/io/questdb/network/NetworkFacade.java b/core/src/main/java/io/questdb/network/NetworkFacade.java index b1c3c8d48..9f774a716 100644 --- a/core/src/main/java/io/questdb/network/NetworkFacade.java +++ b/core/src/main/java/io/questdb/network/NetworkFacade.java @@ -65,7 +65,6 @@ public interface NetworkFacade { long socketUdp(); - // todo: un-tangle boolean bindUdp(long fd, int ipv4Address, int port); boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address); diff --git a/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java b/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java index 136661627..21e60c8ea 100644 --- a/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/pgwire/PGJobContextTest.java @@ -1746,8 +1746,7 @@ public class PGJobContextTest extends AbstractGriffinTest { new Thread(() -> { long fd = Net.socketTcp(true); try { - - nf.setReusePort(fd); + Assert.assertEquals(0, nf.setReusePort(fd)); nf.configureNoLinger(fd); Assert.assertTrue(nf.bindTcp(fd, 0, 9120)); diff --git a/core/src/test/java/io/questdb/griffin/DropTableTest.java b/core/src/test/java/io/questdb/griffin/DropTableTest.java new file mode 100644 index 000000000..f6f40a323 --- /dev/null +++ b/core/src/test/java/io/questdb/griffin/DropTableTest.java @@ -0,0 +1,88 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2019 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + ******************************************************************************/ + +package io.questdb.griffin; + +import io.questdb.test.tools.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +public class DropTableTest extends AbstractGriffinTest { + + @Test + public void testDropExisting() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table instrument (a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table instrument"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + + }); + } + + @Test + public void testDropUtf8() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table научный (a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table научный"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + }); + } + + @Test + public void testDropUtf8Quoted() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table 'научный руководитель'(a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table 'научный руководитель'"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + }); + } + + @Test + public void testDropQuoted() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CompiledQuery cc = compiler.compile("create table 'large table' (a int)"); + Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType()); + + cc = compiler.compile("drop table 'large table'"); + Assert.assertEquals(CompiledQuery.DROP, cc.getType()); + }); + } + + @Test + public void testDropMissingFrom() throws Exception { + TestUtils.assertMemoryLeak(() -> { + try { + compiler.compile("drop i_am_missing"); + } catch (SqlException e) { + Assert.assertEquals(5, e.getPosition()); + TestUtils.assertContains("'table' expected", e.getFlyweightMessage()); + } + }); + } +} -- GitLab