提交 1a10e655 编写于 作者: V Vlad Ilyushchenko

GRIFFIN: drop table bugfix and tests

NET: replaced constant boolean NOT logic when dealing with result of Net.bindUdp()
上级 67f9a4ac
......@@ -56,9 +56,7 @@ public class GenericLineProtoReceiver implements Closeable, Job {
CairoEngine engine,
CairoSecurityContext cairoSecurityContext
) {
nf = receiverCfg.getNetworkFacade();
fd = nf.socketUdp();
if (fd < 0) {
int errno = nf.errno();
......@@ -67,41 +65,42 @@ public class GenericLineProtoReceiver implements Closeable, Job {
}
try {
if (!nf.bindUdp(fd, 0, receiverCfg.getPort())) {
int errno = nf.errno();
LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$();
throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
}
if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) {
if (nf.bindUdp(fd, 0, receiverCfg.getPort())) {
if (nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) {
this.commitRate = receiverCfg.getCommitRate();
if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) {
LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$();
}
this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize());
lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize());
parser = new CairoLineProtoParser(engine, cairoSecurityContext);
lexer.withParser(parser);
LOG.info()
.$("started [fd=").$(fd)
.$(", bind=").$(receiverCfg.getBindIPv4Address())
.$(", group=").$(receiverCfg.getGroupIPv4Address())
.$(", port=").$(receiverCfg.getPort())
.$(", commitRate=").$(commitRate)
.$(']').$();
return;
}
int errno = nf.errno();
LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$();
throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']');
}
int errno = nf.errno();
LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$();
throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
} catch (CairoException e) {
close();
throw e;
}
this.commitRate = receiverCfg.getCommitRate();
if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) {
LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$();
}
this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize());
lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize());
parser = new CairoLineProtoParser(engine, cairoSecurityContext);
lexer.withParser(parser);
LOG.info()
.$("started [fd=").$(fd)
.$(", bind=").$(receiverCfg.getBindIPv4Address())
.$(", group=").$(receiverCfg.getGroupIPv4Address())
.$(", port=").$(receiverCfg.getPort())
.$(", commitRate=").$(commitRate)
.$(']').$();
}
@Override
......
......@@ -66,35 +66,36 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
try {
// when listening for multicast packets bind address must be 0
if (!nf.bindUdp(fd, 0, receiverCfg.getPort())) {
int errno = nf.errno();
LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$();
throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
}
if (nf.bindUdp(fd, 0, receiverCfg.getPort())) {
if (nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) {
this.commitRate = receiverCfg.getCommitRate();
this.msgCount = receiverCfg.getMsgCount();
if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) {
LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$();
}
if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) {
msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount);
lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize());
parser = new CairoLineProtoParser(engine, cairoSecurityContext);
lexer.withParser(parser);
LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$();
return;
}
int errno = nf.errno();
LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$();
throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']');
}
int errno = nf.errno();
LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$();
throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
} catch (CairoException e) {
close();
throw e;
}
this.commitRate = receiverCfg.getCommitRate();
this.msgCount = receiverCfg.getMsgCount();
if (receiverCfg.getReceiveBufferSize() != -1 && nf.setRcvBuf(fd, receiverCfg.getReceiveBufferSize()) != 0) {
LOG.error().$("cannot set receive buffer size [fd=").$(fd).$(", size=").$(receiverCfg.getReceiveBufferSize()).$(']').$();
}
msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount);
lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize());
parser = new CairoLineProtoParser(engine, cairoSecurityContext);
lexer.withParser(parser);
LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$();
}
@Override
......
......@@ -1050,7 +1050,7 @@ public class SqlCompiler implements Closeable {
final int tableNamePosition = lexer.getPosition();
tok = expectToken(lexer, "table name");
tok = GenericLexer.unquote(expectToken(lexer, "table name"));
tableExistsOrFail(tableNamePosition, tok, executionContext);
......
......@@ -65,7 +65,6 @@ public interface NetworkFacade {
long socketUdp();
// todo: un-tangle
boolean bindUdp(long fd, int ipv4Address, int port);
boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address);
......
......@@ -1746,8 +1746,7 @@ public class PGJobContextTest extends AbstractGriffinTest {
new Thread(() -> {
long fd = Net.socketTcp(true);
try {
nf.setReusePort(fd);
Assert.assertEquals(0, nf.setReusePort(fd));
nf.configureNoLinger(fd);
Assert.assertTrue(nf.bindTcp(fd, 0, 9120));
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.griffin;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Test;
public class DropTableTest extends AbstractGriffinTest {
@Test
public void testDropExisting() throws Exception {
TestUtils.assertMemoryLeak(() -> {
CompiledQuery cc = compiler.compile("create table instrument (a int)");
Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType());
cc = compiler.compile("drop table instrument");
Assert.assertEquals(CompiledQuery.DROP, cc.getType());
});
}
@Test
public void testDropUtf8() throws Exception {
TestUtils.assertMemoryLeak(() -> {
CompiledQuery cc = compiler.compile("create table научный (a int)");
Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType());
cc = compiler.compile("drop table научный");
Assert.assertEquals(CompiledQuery.DROP, cc.getType());
});
}
@Test
public void testDropUtf8Quoted() throws Exception {
TestUtils.assertMemoryLeak(() -> {
CompiledQuery cc = compiler.compile("create table 'научный руководитель'(a int)");
Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType());
cc = compiler.compile("drop table 'научный руководитель'");
Assert.assertEquals(CompiledQuery.DROP, cc.getType());
});
}
@Test
public void testDropQuoted() throws Exception {
TestUtils.assertMemoryLeak(() -> {
CompiledQuery cc = compiler.compile("create table 'large table' (a int)");
Assert.assertEquals(CompiledQuery.CREATE_TABLE, cc.getType());
cc = compiler.compile("drop table 'large table'");
Assert.assertEquals(CompiledQuery.DROP, cc.getType());
});
}
@Test
public void testDropMissingFrom() throws Exception {
TestUtils.assertMemoryLeak(() -> {
try {
compiler.compile("drop i_am_missing");
} catch (SqlException e) {
Assert.assertEquals(5, e.getPosition());
TestUtils.assertContains("'table' expected", e.getFlyweightMessage());
}
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册