提交 cbce8327 编写于 作者: V Vlad Ilyushchenko

chore(pg): property-base configuration for PostgreSQL wire protocol

上级 3cc13f17
......@@ -36,7 +36,6 @@ import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.line.*;
import io.questdb.cutlass.line.udp.LineUdpReceiverConfiguration;
import io.questdb.cutlass.pgwire.DefaultPGWireConfiguration;
import io.questdb.cutlass.pgwire.PGWireConfiguration;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
......@@ -113,12 +112,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private final int sharedWorkerCount;
private final boolean sharedWorkerHaltOnError;
private final WorkerPoolConfiguration workerPoolConfiguration = new PropWorkerPoolConfiguration();
private final PGWireConfiguration pgWireConfiguration = new DefaultPGWireConfiguration() {
@Override
public int getWorkerCount() {
return 0;
}
};
private final PGWireConfiguration pgWireConfiguration = new PropPGWireConfiguration();
private final InputFormatConfiguration inputFormatConfiguration;
private final LineProtoTimestampAdapter lineUdpTimestampAdapter;
private final String inputRoot;
......@@ -143,6 +137,8 @@ public class PropServerConfiguration implements ServerConfiguration {
private final int backupMkdirMode;
private final int floatToStrCastScale;
private final int doubleToStrCastScale;
private final PropPGWireDispatcherConfiguration propPGWireDispatcherConfiguration = new PropPGWireDispatcherConfiguration();
private final boolean pgEnabled;
private boolean httpAllowDeflateBeforeSend;
private int[] httpWorkerAffinity;
private int connectionPoolInitialCapacity;
......@@ -192,7 +188,35 @@ public class PropServerConfiguration implements ServerConfiguration {
private long maxHttpQueryResponseRowLimit;
private boolean interruptOnClosedConnection;
private int interruptorNIterationsPerCheck;
private int interruptorBufferSize;;
private int interruptorBufferSize;
private int pgNetActiveConnectionLimit;
private int pgNetBindIPv4Address;
private int pgNetBindPort;
private int pgNetEventCapacity;
private int pgNetIOQueueCapacity;
private long pgNetIdleConnectionTimeout;
private int pgNetInterestQueueCapacity;
private int pgNetListenBacklog;
private int pgNetRcvBufSize;
private int pgNetSndBufSize;
private int pgCharacterStoreCapacity;
private int pgCharacterStorePoolCapacity;
private int pgConnectionPoolInitialCapacity;
private String pgPassword;
private String pgUsername;
private int pgFactoryCacheColumnCount;
private int pgFactoryCacheRowCount;
private int pgIdleRecvCountBeforeGivingUp;
private int pgIdleSendCountBeforeGivingUp;
private int pgMaxBlobSizeOnQuery;
private int pgRecvBufferSize;
private int pgSendBufferSize;
private DateLocale pgDefaultDateLocale;
private TimestampLocale pgDefaultTimestampLocale;
private int[] pgWorkerAffinity;
private int pgWorkerCount;
private boolean pgHaltOnError;
private boolean pgDaemonPool;
public PropServerConfiguration(String root, Properties properties) throws ServerConfigurationException, JsonException {
this.sharedWorkerCount = getInt(properties, "shared.worker.count", 2);
......@@ -281,6 +305,48 @@ public class PropServerConfiguration implements ServerConfiguration {
this.mimeTypesCache = new MimeTypesCache(FilesFacadeImpl.INSTANCE, path);
}
}
this.pgEnabled = getBoolean(properties, "pg.enabled", true);
if (pgEnabled) {
pgNetActiveConnectionLimit = getInt(properties, "pg.net.active.connection.limit", 10);
parseBindTo(properties, "pg.net.bind.to", "0.0.0.0:8812", (a, p) -> {
pgNetBindIPv4Address = a;
pgNetBindPort = p;
});
this.pgNetEventCapacity = getInt(properties, "pg.net.event.capacity", 1024);
this.pgNetIOQueueCapacity = getInt(properties, "pg.net.io.queue.capacity", 1024);
this.pgNetIdleConnectionTimeout = getLong(properties, "pg.net.idle.timeout", 300_000);
this.pgNetInterestQueueCapacity = getInt(properties, "pg.net.interest.queue.capacity", 1024);
this.pgNetListenBacklog = getInt(properties, "pg.net.listen.backlog", 50_000);
this.pgNetRcvBufSize = getIntSize(properties, "pg.net.recv.buf.size", -1);
this.pgNetSndBufSize = getIntSize(properties, "pg.net.send.buf.size", -1);
this.pgCharacterStoreCapacity = getInt(properties, "pg.character.store.capacity", 4096);
this.pgCharacterStorePoolCapacity = getInt(properties, "pg.character.store.pool.capacity", 64);
this.pgConnectionPoolInitialCapacity = getInt(properties, "pg.connection.pool.capacity", 64);
this.pgPassword = getString(properties, "pg.password", "quest");
this.pgUsername = getString(properties, "pg.user", "admin");
this.pgFactoryCacheColumnCount = getInt(properties, "pg.factory.cache.column.count", 16);
this.pgFactoryCacheRowCount = getInt(properties, "pg.factory.cache.row.count", 16);
this.pgIdleRecvCountBeforeGivingUp = getInt(properties, "pg.idle.recv.count.before.giving.up", 10_000);
this.pgIdleSendCountBeforeGivingUp = getInt(properties, "pg.idle.send.count.before.giving.up", 10_000);
this.pgMaxBlobSizeOnQuery = getIntSize(properties, "pg.max.blob.size.on.query", 512 * 1024);
this.pgRecvBufferSize = getIntSize(properties, "pg.recv.buffer.size", 1024 * 1024);
this.pgSendBufferSize = getIntSize(properties, "pg.send.buffer.size", 1024 * 1024);
final String dateLocale = getString(properties, "pg.date.locale", "en");
this.pgDefaultDateLocale = DateLocaleFactory.INSTANCE.getLocale(dateLocale);
if (this.pgDefaultDateLocale == null) {
throw new ServerConfigurationException("pg.date.locale", dateLocale);
}
final String timestampLocale = getString(properties, "pg.timestamp.locale", "en");
this.pgDefaultTimestampLocale = TimestampLocaleFactory.INSTANCE.getLocale(timestampLocale);
if (this.pgDefaultTimestampLocale == null) {
throw new ServerConfigurationException("pg.timestamp.locale", dateLocale);
}
this.pgWorkerCount = getInt(properties, "pg.worker.count", 2);
this.pgWorkerAffinity = getAffinity(properties, "pg.worker.affinity", pgWorkerCount);
this.pgHaltOnError = getBoolean(properties, "pg.halt.on.error", false);
this.pgDaemonPool = getBoolean(properties, "pg.daemon.pool", true);
}
this.commitMode = getCommitMode(properties, "cairo.commit.mode");
this.createAsSelectRetryCount = getInt(properties, "cairo.create.as.select.retry.count", 5);
......@@ -1360,4 +1426,205 @@ public class PropServerConfiguration implements ServerConfiguration {
return sharedWorkerHaltOnError;
}
}
private class PropPGWireDispatcherConfiguration implements IODispatcherConfiguration {
@Override
public String getDispatcherLogName() {
return "pg-server";
}
@Override
public int getActiveConnectionLimit() {
return pgNetActiveConnectionLimit;
}
@Override
public int getBindIPv4Address() {
return pgNetBindIPv4Address;
}
@Override
public int getBindPort() {
return pgNetBindPort;
}
@Override
public MillisecondClock getClock() {
return MillisecondClockImpl.INSTANCE;
}
@Override
public EpollFacade getEpollFacade() {
return EpollFacadeImpl.INSTANCE;
}
@Override
public int getEventCapacity() {
return pgNetEventCapacity;
}
@Override
public int getIOQueueCapacity() {
return pgNetIOQueueCapacity;
}
@Override
public long getIdleConnectionTimeout() {
return pgNetIdleConnectionTimeout;
}
@Override
public int getInitialBias() {
return BIAS_READ;
}
@Override
public int getInterestQueueCapacity() {
return pgNetInterestQueueCapacity;
}
@Override
public int getListenBacklog() {
return pgNetListenBacklog;
}
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public int getRcvBufSize() {
return pgNetRcvBufSize;
}
@Override
public SelectFacade getSelectFacade() {
return SelectFacadeImpl.INSTANCE;
}
@Override
public int getSndBufSize() {
return pgNetSndBufSize;
}
}
private class PropPGWireConfiguration implements PGWireConfiguration {
@Override
public int getCharacterStoreCapacity() {
return pgCharacterStoreCapacity;
}
@Override
public int getCharacterStorePoolCapacity() {
return pgCharacterStorePoolCapacity;
}
@Override
public int getConnectionPoolInitialCapacity() {
return pgConnectionPoolInitialCapacity;
}
@Override
public String getDefaultPassword() {
return pgPassword;
}
@Override
public String getDefaultUsername() {
return pgUsername;
}
@Override
public IODispatcherConfiguration getDispatcherConfiguration() {
return propPGWireDispatcherConfiguration;
}
@Override
public boolean getDumpNetworkTraffic() {
return false;
}
@Override
public int getFactoryCacheColumnCount() {
return pgFactoryCacheColumnCount;
}
@Override
public int getFactoryCacheRowCount() {
return pgFactoryCacheRowCount;
}
@Override
public int getIdleRecvCountBeforeGivingUp() {
return pgIdleRecvCountBeforeGivingUp;
}
@Override
public int getIdleSendCountBeforeGivingUp() {
return pgIdleSendCountBeforeGivingUp;
}
@Override
public int getMaxBlobSizeOnQuery() {
return pgMaxBlobSizeOnQuery;
}
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public int getRecvBufferSize() {
return pgRecvBufferSize;
}
@Override
public int getSendBufferSize() {
return pgSendBufferSize;
}
@Override
public String getServerVersion() {
return "11.3";
}
@Override
public DateLocale getDefaultDateLocale() {
return pgDefaultDateLocale;
}
@Override
public TimestampLocale getDefaultTimestampLocale() {
return pgDefaultTimestampLocale;
}
@Override
public boolean isEnabled() {
return pgEnabled;
}
@Override
public int[] getWorkerAffinity() {
return pgWorkerAffinity;
}
@Override
public int getWorkerCount() {
return pgWorkerCount;
}
@Override
public boolean haltOnError() {
return pgHaltOnError;
}
@Override
public boolean isDaemonPool() {
return pgDaemonPool;
}
}
}
......@@ -150,13 +150,19 @@ public class ServerMain {
messageBus
);
final PGWireServer pgWireServer = PGWireServer.create(
configuration.getPGWireConfiguration(),
workerPool,
log,
cairoEngine,
messageBus
);
final PGWireServer pgWireServer;
if (configuration.getPGWireConfiguration().isEnabled()) {
pgWireServer = PGWireServer.create(
configuration.getPGWireConfiguration(),
workerPool,
log,
cairoEngine,
messageBus
);
} else {
pgWireServer = null;
}
final AbstractLineProtoReceiver lineProtocolReceiver;
......
......@@ -154,4 +154,5 @@ public class DefaultPGWireConfiguration implements PGWireConfiguration {
public TimestampLocale getDefaultTimestampLocale() {
return TimestampFormatUtils.enLocale;
}
}
......@@ -282,3 +282,33 @@
#line.udp.unicast=false
#line.udp.commit.mode
#line.udp.timestamp=n
###################### PG Wire settings #############################
#pg.enabled=true
#pg.net.active.connection.limit=10
#pg.net.bind.to=0.0.0.0:8812
#pg.net.event.capacity=1024
#pg.net.io.queue.capacity=1024)
#pg.net.idle.timeout=300000
#pg.net.interest.queue.capacity=1024
#pg.net.listen.backlog=50000
#pg.net.recv.buf.size=-1
#pg.net.send.buf.size=-1
#pg.character.store.capacity=4096
#pg.character.store.pool.capacity=64
#pg.connection.pool.capacity=64
#pg.password=quest
#pg.user=admin
#pg.factory.cache.column.count=16
#pg.factory.cache.row.count=16
#pg.idle.recv.count.before.giving.up=10000
#pg.idle.send.count.before.giving.up=10000
#pg.max.blob.size.on.query=512k
#pg.recv.buffer.size=1M
#pg.send.buffer.size=1M
#pg.date.locale=en
#pg.timestamp.locale=en
#pg.worker.count=2
#pg.worker.affinity=-1,-1;
#pg.halt.on.error=false
#pg.daemon.pool=true
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
import React from "react"
import styled from "styled-components"
import { Github } from "@styled-icons/remix-fill/Github"
import {Github} from "@styled-icons/remix-fill/Github"
import { Link } from "components"
import {Link} from "components"
const Copyright = styled.div`
display: flex;
......@@ -19,18 +43,18 @@ const Icons = styled.div`
const Footer = () => (
<>
<Copyright>
Copyright &copy; 2014-{new Date().getFullYear()} QuestDB Ltd.
</Copyright>
<Icons>
<Link
color="draculaForeground"
href="https://github.com/questdb/questdb"
rel="noreferrer"
target="_blank"
>
<Github size="18px" />
</Link>
<Copyright>
Copyright &copy; 2014-{new Date().getFullYear()} QuestDB
</Copyright>
<Icons>
<Link
color="draculaForeground"
href="https://github.com/questdb/questdb"
rel="noreferrer"
target="_blank"
>
<Github size="18px"/>
</Link>
</Icons>
</>
)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册