提交 bc02e06e 编写于 作者: V Vlad Ilyushchenko

feat(lineproto): Line protocol receiver supports all Influx timestamp formats via configuration

上级 989a940a
......@@ -35,8 +35,7 @@ import io.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import io.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.line.LineProtoNanosTimestampAdapter;
import io.questdb.cutlass.line.LineProtoTimestampAdapter;
import io.questdb.cutlass.line.*;
import io.questdb.cutlass.line.udp.LineUdpReceiverConfiguration;
import io.questdb.cutlass.pgwire.DefaultPGWireConfiguration;
import io.questdb.cutlass.pgwire.PGWireConfiguration;
......@@ -124,6 +123,7 @@ public class PropServerConfiguration implements ServerConfiguration {
}
};
private final InputFormatConfiguration inputFormatConfiguration;
private final LineProtoTimestampAdapter lineUdpTimestampAdapter;
private boolean httpAllowDeflateBeforeSend;
private int[] httpWorkerAffinity;
private int connectionPoolInitialCapacity;
......@@ -336,10 +336,31 @@ public class PropServerConfiguration implements ServerConfiguration {
this.lineUdpMsgCount = getInt(properties, "line.udp.msg.count", 10_000);
this.lineUdpReceiveBufferSize = getIntSize(properties, "line.udp.receive.buffer.size", 1024 * 1024);
this.lineUdpEnabled = getBoolean(properties, "line.udp.enabled", true);
// this.lineUdpWorkerCount = getInt(properties, "line.udp.worker.count", 0);
this.lineUdpOwnThreadAffinity = getInt(properties, "line.udp.own.thread.affinity", -1);
this.lineUdpOwnThread = getBoolean(properties, "line.udp.own.thread", false);
this.lineUdpUnicast = getBoolean(properties, "line.udp.unicast", false);
final String lineUdpTimestampSwitch = getString(properties, "line.udp.timestamp", "n");
switch (lineUdpTimestampSwitch) {
case "u":
lineUdpTimestampAdapter = LineProtoMicroTimestampAdapter.INSTANCE;
break;
case "ms":
lineUdpTimestampAdapter = LineProtoMilliTimestampAdapter.INSTANCE;
break;
case "s":
lineUdpTimestampAdapter = LineProtoSecondTimestampAdapter.INSTANCE;
break;
case "m":
lineUdpTimestampAdapter = LineProtoMinuteTimestampAdapter.INSTANCE;
break;
case "h":
lineUdpTimestampAdapter = LineProtoHourTimestampAdapter.INSTANCE;
break;
default:
lineUdpTimestampAdapter = LineProtoNanoTimestampAdapter.INSTANCE;
break;
}
}
@Override
......@@ -1114,7 +1135,7 @@ public class PropServerConfiguration implements ServerConfiguration {
@Override
public LineProtoTimestampAdapter getTimestampAdapter() {
return LineProtoNanosTimestampAdapter.INSTANCE;
return lineUdpTimestampAdapter;
}
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.microtime.Timestamps;
public class LineProtoHourTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoHourTimestampAdapter INSTANCE = new LineProtoHourTimestampAdapter();
@Override
public long getMicros(CharSequence value) throws NumericException {
return Numbers.parseLong(value) * Timestamps.HOUR_MICROS;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
public class LineProtoMicroTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoMicroTimestampAdapter INSTANCE = new LineProtoMicroTimestampAdapter();
@Override
public long getMicros(CharSequence value) throws NumericException {
return Numbers.parseLong(value);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
public class LineProtoMilliTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoMilliTimestampAdapter INSTANCE = new LineProtoMilliTimestampAdapter();
@Override
public long getMicros(CharSequence value) throws NumericException {
return Numbers.parseLong(value) * 1000L;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.microtime.Timestamps;
public class LineProtoMinuteTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoMinuteTimestampAdapter INSTANCE = new LineProtoMinuteTimestampAdapter();
@Override
public long getMicros(CharSequence value) throws NumericException {
return Numbers.parseLong(value) * Timestamps.MINUTE_MICROS;
}
}
......@@ -27,8 +27,8 @@ package io.questdb.cutlass.line;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
public class LineProtoNanosTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoNanosTimestampAdapter INSTANCE = new LineProtoNanosTimestampAdapter();
public class LineProtoNanoTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoNanoTimestampAdapter INSTANCE = new LineProtoNanoTimestampAdapter();
@Override
public long getMicros(CharSequence value) throws NumericException {
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.microtime.Timestamps;
public class LineProtoSecondTimestampAdapter implements LineProtoTimestampAdapter {
public static final LineProtoSecondTimestampAdapter INSTANCE = new LineProtoSecondTimestampAdapter();
@Override
public long getMicros(CharSequence value) throws NumericException {
return Numbers.parseLong(value) * Timestamps.SECOND_MICROS;
}
}
......@@ -27,6 +27,7 @@ package io.questdb;
import io.questdb.cairo.CommitMode;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.line.*;
import io.questdb.network.EpollFacadeImpl;
import io.questdb.network.IOOperation;
import io.questdb.network.NetworkFacadeImpl;
......@@ -226,6 +227,39 @@ public class PropServerConfigurationTest {
new PropServerConfiguration("root", properties);
}
@Test
public void testLineUdpTimestamp() throws ServerConfigurationException, JsonException {
Properties properties = new Properties();
properties.setProperty("http.enabled", "false");
properties.setProperty("line.udp.timestamp", "");
PropServerConfiguration configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoNanoTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
properties.setProperty("line.udp.timestamp", "n");
configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoNanoTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
properties.setProperty("line.udp.timestamp", "u");
configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoMicroTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
properties.setProperty("line.udp.timestamp", "ms");
configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoMilliTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
properties.setProperty("line.udp.timestamp", "s");
configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoSecondTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
properties.setProperty("line.udp.timestamp", "m");
configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoMinuteTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
properties.setProperty("line.udp.timestamp", "h");
configuration = new PropServerConfiguration("root", properties);
Assert.assertSame(LineProtoHourTimestampAdapter.INSTANCE, configuration.getLineUdpReceiverConfiguration().getTimestampAdapter());
}
@Test(expected = ServerConfigurationException.class)
public void testInvalidBindToPort() throws ServerConfigurationException, JsonException {
Properties properties = new Properties();
......
......@@ -535,7 +535,7 @@ public class CairoLineProtoParserTest extends AbstractCairoTest {
private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration) throws Exception {
TestUtils.assertMemoryLeak(() -> {
try (CairoEngine engine = new CairoEngine(configuration, null)) {
try (CairoLineProtoParser parser = new CairoLineProtoParser(engine, AllowAllCairoSecurityContext.INSTANCE, LineProtoNanosTimestampAdapter.INSTANCE)) {
try (CairoLineProtoParser parser = new CairoLineProtoParser(engine, AllowAllCairoSecurityContext.INSTANCE, LineProtoNanoTimestampAdapter.INSTANCE)) {
byte[] bytes = lines.getBytes(StandardCharsets.UTF_8);
int len = bytes.length;
long mem = Unsafe.malloc(len);
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.NumericException;
import org.junit.Assert;
import org.junit.Test;
public class LineProtoHourTimestampAdapterTest {
@Test
public void testRounding() throws NumericException {
Assert.assertEquals(20444400000000L, LineProtoHourTimestampAdapter.INSTANCE.getMicros("5679"));
}
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.NumericException;
import org.junit.Assert;
import org.junit.Test;
public class LineProtoMicroTimestampAdapterTest {
@Test
public void testRounding() throws NumericException {
Assert.assertEquals(5679L, LineProtoMicroTimestampAdapter.INSTANCE.getMicros("5679"));
}
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.NumericException;
import org.junit.Assert;
import org.junit.Test;
public class LineProtoMilliTimestampAdapterTest {
@Test
public void testRounding() throws NumericException {
Assert.assertEquals(5679000L, LineProtoMilliTimestampAdapter.INSTANCE.getMicros("5679"));
}
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.NumericException;
import org.junit.Assert;
import org.junit.Test;
public class LineProtoMinuteTimestampAdapterTest {
@Test
public void testRounding() throws NumericException {
Assert.assertEquals(340740000000L, LineProtoMinuteTimestampAdapter.INSTANCE.getMicros("5679"));
}
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.NumericException;
import org.junit.Assert;
import org.junit.Test;
public class LineProtoNanoTimestampAdapterTest {
@Test
public void testRounding() throws NumericException {
Assert.assertEquals(56799L, LineProtoNanoTimestampAdapter.INSTANCE.getMicros("56799000"));
}
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cutlass.line;
import io.questdb.std.NumericException;
import org.junit.Assert;
import org.junit.Test;
public class LineProtoSecondTimestampAdapterTest {
@Test
public void testRounding() throws NumericException {
Assert.assertEquals(5679000000L, LineProtoSecondTimestampAdapter.INSTANCE.getMicros("5679"));
}
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ package io.questdb.cutlass.line.udp;
import io.questdb.WorkerPoolAwareConfiguration;
import io.questdb.cairo.*;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.line.LineProtoNanosTimestampAdapter;
import io.questdb.cutlass.line.LineProtoNanoTimestampAdapter;
import io.questdb.cutlass.line.LineProtoTimestampAdapter;
import io.questdb.network.Net;
import io.questdb.network.NetworkFacade;
......@@ -333,7 +333,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
@Override
public LineProtoTimestampAdapter getTimestampAdapter() {
return LineProtoNanosTimestampAdapter.INSTANCE;
return LineProtoNanoTimestampAdapter.INSTANCE;
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册