Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
Questdb
提交
84fae1ad
Q
Questdb
项目概览
jobily
/
Questdb
大约 1 年 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Q
Questdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
84fae1ad
编写于
3月 28, 2022
作者:
A
Alex Pelagenko
提交者:
GitHub
3月 28, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(util): index rebuild, string col index recover utils (#1978)
上级
944dd03f
变更
14
显示空白变更内容
内联
并排
Showing
14 changed file
with
1605 addition
and
303 deletion
+1605
-303
core/src/main/java/io/questdb/cairo/RebuildColumnBase.java
core/src/main/java/io/questdb/cairo/RebuildColumnBase.java
+254
-0
core/src/main/java/io/questdb/cairo/RebuildIndex.java
core/src/main/java/io/questdb/cairo/RebuildIndex.java
+55
-222
core/src/main/java/io/questdb/cairo/RecoverVarIndex.java
core/src/main/java/io/questdb/cairo/RecoverVarIndex.java
+126
-0
core/src/main/java/io/questdb/log/LogFactory.java
core/src/main/java/io/questdb/log/LogFactory.java
+27
-1
core/src/main/java/io/questdb/log/SyncLogger.java
core/src/main/java/io/questdb/log/SyncLogger.java
+345
-0
core/src/test/java/io/questdb/cairo/RecoverVarIndexTest.java
core/src/test/java/io/questdb/cairo/RecoverVarIndexTest.java
+396
-0
utils/README.md
utils/README.md
+71
-1
utils/pom.xml
utils/pom.xml
+25
-17
utils/src/main/java/io/questdb/cliutil/CmdUtils.java
utils/src/main/java/io/questdb/cliutil/CmdUtils.java
+9
-62
utils/src/main/java/io/questdb/cliutil/RebuildColumnCommandArgs.java
...ain/java/io/questdb/cliutil/RebuildColumnCommandArgs.java
+75
-0
utils/src/main/java/io/questdb/cliutil/RebuildIndex.java
utils/src/main/java/io/questdb/cliutil/RebuildIndex.java
+60
-0
utils/src/main/java/io/questdb/cliutil/RecoverVarIndex.java
utils/src/main/java/io/questdb/cliutil/RecoverVarIndex.java
+46
-0
utils/src/main/java/io/questdb/cliutil/TxSerializer.java
utils/src/main/java/io/questdb/cliutil/TxSerializer.java
+2
-0
utils/src/test/java/io/questdb/cliutil/RebuildIndexTest.java
utils/src/test/java/io/questdb/cliutil/RebuildIndexTest.java
+114
-0
未找到文件。
core/src/main/java/io/questdb/cairo/RebuildColumnBase.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.cairo
;
import
io.questdb.std.FilesFacade
;
import
io.questdb.std.Misc
;
import
io.questdb.std.Mutable
;
import
io.questdb.std.datetime.DateFormat
;
import
io.questdb.std.str.Path
;
import
io.questdb.std.str.StringSink
;
import
java.io.Closeable
;
import
static
io
.
questdb
.
cairo
.
TableUtils
.
lockName
;
public
abstract
class
RebuildColumnBase
implements
Closeable
,
Mutable
{
static
final
int
ALL
=
-
1
;
final
Path
path
=
new
Path
();
private
final
StringSink
tempStringSink
=
new
StringSink
();
CairoConfiguration
configuration
;
int
rootLen
;
String
columnTypeErrorMsg
=
"Wrong column type"
;
private
long
lockFd
;
private
TableReaderMetadata
metadata
;
@Override
public
void
clear
()
{
path
.
trimTo
(
0
);
tempStringSink
.
clear
();
}
@Override
public
void
close
()
{
this
.
path
.
close
();
Misc
.
free
(
metadata
);
}
public
RebuildColumnBase
of
(
CharSequence
tablePath
,
CairoConfiguration
configuration
)
{
this
.
path
.
concat
(
tablePath
);
this
.
rootLen
=
tablePath
.
length
();
this
.
configuration
=
configuration
;
return
this
;
}
public
void
rebuildAll
()
{
rebuildPartitionColumn
(
null
,
null
);
}
public
void
rebuildColumn
(
CharSequence
columnName
)
{
rebuildPartitionColumn
(
null
,
columnName
);
}
public
void
rebuildPartition
(
CharSequence
rebuildPartitionName
)
{
rebuildPartitionColumn
(
rebuildPartitionName
,
null
);
}
public
void
rebuildPartitionColumn
(
CharSequence
rebuildPartitionName
,
CharSequence
rebuildColumn
)
{
FilesFacade
ff
=
configuration
.
getFilesFacade
();
path
.
trimTo
(
rootLen
);
path
.
concat
(
TableUtils
.
META_FILE_NAME
);
if
(
metadata
==
null
)
{
metadata
=
new
TableReaderMetadata
(
ff
);
}
metadata
.
of
(
path
.
$
(),
ColumnType
.
VERSION
);
try
{
lock
(
ff
);
// Resolve column id if the column name specified
int
rebuildColumnIndex
=
ALL
;
if
(
rebuildColumn
!=
null
)
{
rebuildColumnIndex
=
metadata
.
getColumnIndexQuiet
(
rebuildColumn
,
0
,
rebuildColumn
.
length
());
if
(
rebuildColumnIndex
<
0
)
{
throw
CairoException
.
instance
(
0
).
put
(
"Column does not exist"
);
}
}
path
.
trimTo
(
rootLen
);
int
partitionBy
=
metadata
.
getPartitionBy
();
DateFormat
partitionDirFormatMethod
=
PartitionBy
.
getPartitionDirFormatMethod
(
partitionBy
);
try
(
TxReader
txReader
=
new
TxReader
(
ff
).
ofRO
(
path
,
partitionBy
))
{
txReader
.
unsafeLoadAll
();
path
.
trimTo
(
rootLen
);
path
.
trimTo
(
rootLen
).
concat
(
TableUtils
.
COLUMN_VERSION_FILE_NAME
).
$
();
try
(
ColumnVersionReader
columnVersionReader
=
new
ColumnVersionReader
().
ofRO
(
ff
,
path
))
{
final
long
deadline
=
configuration
.
getMicrosecondClock
().
getTicks
()
+
configuration
.
getSpinLockTimeoutUs
();
columnVersionReader
.
readSafe
(
configuration
.
getMicrosecondClock
(),
deadline
);
path
.
trimTo
(
rootLen
);
if
(
PartitionBy
.
isPartitioned
(
partitionBy
))
{
// Resolve partition timestamp if partition name specified
long
rebuildPartitionTs
=
ALL
;
if
(
rebuildPartitionName
!=
null
)
{
rebuildPartitionTs
=
PartitionBy
.
parsePartitionDirName
(
rebuildPartitionName
,
partitionBy
);
}
for
(
int
partitionIndex
=
txReader
.
getPartitionCount
()
-
1
;
partitionIndex
>
-
1
;
partitionIndex
--)
{
long
partitionTimestamp
=
txReader
.
getPartitionTimestamp
(
partitionIndex
);
if
(
rebuildPartitionTs
==
ALL
||
partitionTimestamp
==
rebuildPartitionTs
)
{
long
partitionSize
=
txReader
.
getPartitionSize
(
partitionIndex
);
if
(
partitionIndex
==
txReader
.
getPartitionCount
()
-
1
)
{
partitionSize
=
txReader
.
getTransientRowCount
();
}
long
partitionNameTxn
=
txReader
.
getPartitionNameTxn
(
partitionIndex
);
rebuildColumn
(
rebuildColumnIndex
,
ff
,
metadata
,
partitionDirFormatMethod
,
tempStringSink
,
partitionTimestamp
,
partitionSize
,
partitionNameTxn
,
columnVersionReader
);
}
}
}
else
{
long
partitionSize
=
txReader
.
getTransientRowCount
();
rebuildColumn
(
rebuildColumnIndex
,
ff
,
metadata
,
partitionDirFormatMethod
,
tempStringSink
,
0L
,
partitionSize
,
-
1L
,
columnVersionReader
);
}
}
}
}
finally
{
metadata
.
close
();
path
.
trimTo
(
rootLen
);
lockName
(
path
);
releaseLock
(
ff
);
}
}
protected
boolean
checkColumnType
(
TableReaderMetadata
metadata
,
int
rebuildColumnIndex
)
{
return
true
;
}
private
void
lock
(
FilesFacade
ff
)
{
try
{
path
.
trimTo
(
rootLen
);
lockName
(
path
);
this
.
lockFd
=
TableUtils
.
lock
(
ff
,
path
);
}
finally
{
path
.
trimTo
(
rootLen
);
}
if
(
this
.
lockFd
==
-
1L
)
{
throw
CairoException
.
instance
(
ff
.
errno
()).
put
(
"Cannot lock table: "
).
put
(
path
.
$
());
}
}
private
void
rebuildColumn
(
int
rebuildColumnIndex
,
FilesFacade
ff
,
TableReaderMetadata
metadata
,
DateFormat
partitionDirFormatMethod
,
StringSink
sink
,
long
partitionTimestamp
,
long
partitionSize
,
long
partitionNameTxn
,
ColumnVersionReader
columnVersionReader
)
{
sink
.
clear
();
partitionDirFormatMethod
.
format
(
partitionTimestamp
,
null
,
null
,
sink
);
if
(
rebuildColumnIndex
==
ALL
)
{
for
(
int
columnIndex
=
metadata
.
getColumnCount
()
-
1
;
columnIndex
>
-
1
;
columnIndex
--)
{
if
(
checkColumnType
(
metadata
,
columnIndex
))
{
rebuildColumn
(
metadata
,
columnIndex
,
sink
,
partitionSize
,
ff
,
columnVersionReader
,
partitionTimestamp
,
partitionNameTxn
);
}
}
}
else
{
if
(
checkColumnType
(
metadata
,
rebuildColumnIndex
))
{
rebuildColumn
(
metadata
,
rebuildColumnIndex
,
sink
,
partitionSize
,
ff
,
columnVersionReader
,
partitionTimestamp
,
partitionNameTxn
);
}
else
{
throw
CairoException
.
instance
(
0
).
put
(
columnTypeErrorMsg
);
}
}
}
private
void
rebuildColumn
(
TableReaderMetadata
metadata
,
int
columnIndex
,
StringSink
sink
,
long
partitionSize
,
FilesFacade
ff
,
ColumnVersionReader
columnVersionReader
,
long
partitionTimestamp
,
long
partitionNameTxn
)
{
CharSequence
columnName
=
metadata
.
getColumnName
(
columnIndex
);
int
indexValueBlockCapacity
=
metadata
.
getIndexValueBlockCapacity
(
columnIndex
);
int
writerIndex
=
metadata
.
getWriterIndex
(
columnIndex
);
rebuildColumn
(
columnName
,
sink
,
indexValueBlockCapacity
,
partitionSize
,
ff
,
columnVersionReader
,
writerIndex
,
partitionTimestamp
,
partitionNameTxn
);
}
abstract
protected
void
rebuildColumn
(
CharSequence
columnName
,
CharSequence
partitionName
,
int
indexValueBlockCapacity
,
long
partitionSize
,
FilesFacade
ff
,
ColumnVersionReader
columnVersionReader
,
int
columnIndex
,
long
partitionTimestamp
,
long
partitionNameTxn
);
private
void
releaseLock
(
FilesFacade
ff
)
{
if
(
lockFd
!=
-
1L
)
{
ff
.
close
(
lockFd
);
try
{
path
.
trimTo
(
rootLen
);
lockName
(
path
);
if
(
ff
.
exists
(
path
)
&&
!
ff
.
remove
(
path
))
{
throw
CairoException
.
instance
(
ff
.
errno
()).
put
(
"Cannot remove "
).
put
(
path
);
}
}
finally
{
path
.
trimTo
(
rootLen
);
}
}
}
}
core/src/main/java/io/questdb/cairo/RebuildIndex.java
浏览文件 @
84fae1ad
...
...
@@ -32,139 +32,80 @@ import io.questdb.log.LogFactory;
import
io.questdb.std.FilesFacade
;
import
io.questdb.std.MemoryTag
;
import
io.questdb.std.Misc
;
import
io.questdb.std.Mutable
;
import
io.questdb.std.datetime.DateFormat
;
import
io.questdb.std.datetime.microtime.Timestamps
;
import
io.questdb.std.str.Path
;
import
io.questdb.std.str.StringSink
;
import
java.io.Closeable
;
import
static
io
.
questdb
.
cairo
.
TableUtils
.
lockName
;
/**
* Rebuild index independently of TableWriter
* Main purpose is for support cases when table data is corrupt and TableWriter cannot be opened
*/
public
class
RebuildIndex
implements
Closeable
,
Mutable
{
private
static
final
int
ALL
=
-
1
;
private
final
Path
path
=
new
Path
();
private
final
MemoryMAR
ddlMem
=
Vm
.
getMARInstance
();
private
int
rootLen
;
private
CairoConfiguration
configuration
;
private
long
lockFd
;
private
final
MemoryMR
indexMem
=
Vm
.
getMRInstance
();
public
class
RebuildIndex
extends
RebuildColumnBase
{
private
static
final
Log
LOG
=
LogFactory
.
getLog
(
RebuildIndex
.
class
);
private
TableReaderMetadata
metadata
;
private
final
MemoryMR
indexMem
=
Vm
.
getMRInstance
()
;
private
final
SymbolColumnIndexer
indexer
=
new
SymbolColumnIndexer
();
private
final
StringSink
tempStringSink
=
new
StringSink
();
private
final
MemoryMAR
ddlMem
=
Vm
.
getMARInstance
();
public
RebuildIndex
of
(
CharSequence
tablePath
,
CairoConfiguration
configuration
)
{
this
.
path
.
concat
(
tablePath
);
this
.
rootLen
=
tablePath
.
length
();
this
.
configuration
=
configuration
;
return
this
;
public
RebuildIndex
()
{
super
();
columnTypeErrorMsg
=
"Column is not indexed"
;
}
@Override
public
void
clear
()
{
path
.
trimTo
(
0
);
tempStringSink
.
clear
();
}
public
void
rebuildAll
()
{
rebuildPartitionColumn
(
null
,
null
);
}
public
void
rebuildColumn
(
CharSequence
columnName
)
{
rebuildPartitionColumn
(
null
,
columnName
);
}
public
void
rebuildPartition
(
CharSequence
rebuildPartitionName
)
{
rebuildPartitionColumn
(
rebuildPartitionName
,
null
);
super
.
clear
();
ddlMem
.
close
();
indexer
.
clear
();
}
public
void
rebuildPartitionColumn
(
CharSequence
rebuildPartitionName
,
CharSequence
rebuildColumn
)
{
FilesFacade
ff
=
configuration
.
getFilesFacade
();
path
.
trimTo
(
rootLen
);
path
.
concat
(
TableUtils
.
META_FILE_NAME
);
if
(
metadata
==
null
)
{
metadata
=
new
TableReaderMetadata
(
ff
);
@Override
public
void
close
()
{
super
.
close
();
Misc
.
free
(
indexer
);
}
metadata
.
of
(
path
.
$
(),
ColumnType
.
VERSION
);
try
{
lock
(
ff
);
// Resolve column id if the column name specified
int
rebuildColumnIndex
=
ALL
;
if
(
rebuildColumn
!=
null
)
{
rebuildColumnIndex
=
metadata
.
getColumnIndexQuiet
(
rebuildColumn
,
0
,
rebuildColumn
.
length
());
if
(
rebuildColumnIndex
<
0
)
{
throw
CairoException
.
instance
(
0
).
put
(
"Column does not exist"
);
}
@Override
protected
boolean
checkColumnType
(
TableReaderMetadata
metadata
,
int
rebuildColumnIndex
)
{
return
metadata
.
isColumnIndexed
(
rebuildColumnIndex
);
}
path
.
trimTo
(
rootLen
);
int
partitionBy
=
metadata
.
getPartitionBy
();
DateFormat
partitionDirFormatMethod
=
PartitionBy
.
getPartitionDirFormatMethod
(
partitionBy
);
try
(
TxReader
txReader
=
new
TxReader
(
ff
).
ofRO
(
path
,
partitionBy
))
{
txReader
.
unsafeLoadAll
();
path
.
trimTo
(
rootLen
);
protected
void
rebuildColumn
(
CharSequence
columnName
,
CharSequence
partitionName
,
int
indexValueBlockCapacity
,
long
partitionSize
,
FilesFacade
ff
,
ColumnVersionReader
columnVersionReader
,
int
columnIndex
,
long
partitionTimestamp
,
long
partitionNameTxn
)
{
path
.
trimTo
(
rootLen
).
concat
(
partitionName
);
TableUtils
.
txnPartitionConditionally
(
path
,
partitionNameTxn
);
LOG
.
info
().
$
(
"testing partition path"
).
$
(
path
).
$
();
final
int
plen
=
path
.
length
();
if
(
ff
.
exists
(
path
.
$
()))
{
try
(
final
MemoryMR
roMem
=
indexMem
)
{
long
columnNameTxn
=
columnVersionReader
.
getColumnNameTxn
(
partitionTimestamp
,
columnIndex
);
removeIndexFiles
(
columnName
,
ff
,
columnNameTxn
);
TableUtils
.
dFile
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
path
.
trimTo
(
rootLen
).
concat
(
TableUtils
.
COLUMN_VERSION_FILE_NAME
).
$
();
try
(
ColumnVersionReader
columnVersionReader
=
new
ColumnVersionReader
().
ofRO
(
ff
,
path
))
{
final
long
deadline
=
configuration
.
getMicrosecondClock
().
getTicks
()
+
configuration
.
getSpinLockTimeoutUs
();
columnVersionReader
.
readSafe
(
configuration
.
getMicrosecondClock
(),
deadline
);
path
.
trimTo
(
rootLen
);
if
(
columnVersionReader
.
getColumnTopPartitionTimestamp
(
columnIndex
)
<=
partitionTimestamp
)
{
LOG
.
info
().
$
(
"indexing [path="
).
utf8
(
path
).
I
$
();
final
long
columnTop
=
columnVersionReader
.
getColumnTop
(
partitionTimestamp
,
columnIndex
);
createIndexFiles
(
columnName
,
indexValueBlockCapacity
,
plen
,
ff
,
columnNameTxn
);
if
(
PartitionBy
.
isPartitioned
(
partitionBy
))
{
// Resolve partition timestamp if partition name specified
long
rebuildPartitionTs
=
ALL
;
if
(
rebuildPartitionName
!=
null
)
{
rebuildPartitionTs
=
PartitionBy
.
parsePartitionDirName
(
rebuildPartitionName
,
partitionBy
);
if
(
partitionSize
>
columnTop
)
{
TableUtils
.
dFile
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
final
long
columnSize
=
(
partitionSize
-
columnTop
)
<<
ColumnType
.
pow2SizeOf
(
ColumnType
.
INT
);
roMem
.
of
(
ff
,
path
,
columnSize
,
columnSize
,
MemoryTag
.
MMAP_TABLE_WRITER
);
indexer
.
configureWriter
(
configuration
,
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
,
columnTop
);
indexer
.
index
(
roMem
,
columnTop
,
partitionSize
);
indexer
.
clear
();
}
for
(
int
partitionIndex
=
txReader
.
getPartitionCount
()
-
1
;
partitionIndex
>
-
1
;
partitionIndex
--)
{
long
partitionTimestamp
=
txReader
.
getPartitionTimestamp
(
partitionIndex
);
if
(
rebuildPartitionTs
==
ALL
||
partitionTimestamp
==
rebuildPartitionTs
)
{
long
partitionSize
=
txReader
.
getPartitionSize
(
partitionIndex
);
rebuildIndex
(
rebuildColumnIndex
,
ff
,
indexer
,
metadata
,
partitionDirFormatMethod
,
tempStringSink
,
partitionTimestamp
,
partitionSize
,
columnVersionReader
);
}
}
}
else
{
long
partitionSize
=
txReader
.
getTransientRowCount
();
rebuildIndex
(
rebuildColumnIndex
,
ff
,
indexer
,
metadata
,
partitionDirFormatMethod
,
tempStringSink
,
Long
.
MIN_VALUE
,
partitionSize
,
columnVersionReader
);
}
}
}
}
finally
{
metadata
.
close
();
indexer
.
clear
();
path
.
trimTo
(
rootLen
);
lockName
(
path
);
releaseLock
(
ff
);
LOG
.
info
().
$
(
"partition does not exit "
).
$
(
path
).
$
();
}
}
...
...
@@ -202,88 +143,6 @@ public class RebuildIndex implements Closeable, Mutable {
}
}
private
void
rebuildIndex
(
SymbolColumnIndexer
indexer
,
CharSequence
columnName
,
CharSequence
partitionName
,
int
indexValueBlockCapacity
,
long
partitionSize
,
FilesFacade
ff
,
ColumnVersionReader
columnVersionReader
,
int
columnIndex
,
long
partitionTimestamp
)
{
path
.
trimTo
(
rootLen
).
concat
(
partitionName
);
final
int
plen
=
path
.
length
();
if
(
ff
.
exists
(
path
.
$
()))
{
try
(
final
MemoryMR
roMem
=
indexMem
)
{
long
columnNameTxn
=
columnVersionReader
.
getColumnNameTxn
(
partitionTimestamp
,
columnIndex
);
removeIndexFiles
(
columnName
,
ff
,
columnNameTxn
);
TableUtils
.
dFile
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
if
(
columnVersionReader
.
getColumnTopPartitionTimestamp
(
columnIndex
)
<=
partitionTimestamp
)
{
LOG
.
info
().
$
(
"indexing [path="
).
utf8
(
path
).
I
$
();
final
long
columnTop
=
columnVersionReader
.
getColumnTop
(
partitionTimestamp
,
columnIndex
);
createIndexFiles
(
columnName
,
indexValueBlockCapacity
,
plen
,
ff
,
columnNameTxn
);
if
(
partitionSize
>
columnTop
)
{
TableUtils
.
dFile
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
final
long
columnSize
=
(
partitionSize
-
columnTop
)
<<
ColumnType
.
pow2SizeOf
(
ColumnType
.
INT
);
roMem
.
of
(
ff
,
path
,
columnSize
,
columnSize
,
MemoryTag
.
MMAP_TABLE_WRITER
);
indexer
.
configureWriter
(
configuration
,
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
,
columnTop
);
indexer
.
index
(
roMem
,
columnTop
,
partitionSize
);
}
}
}
}
}
private
void
rebuildIndex
(
int
rebuildColumnIndex
,
FilesFacade
ff
,
SymbolColumnIndexer
indexer
,
TableReaderMetadata
metadata
,
DateFormat
partitionDirFormatMethod
,
StringSink
sink
,
long
partitionTimestamp
,
long
partitionSize
,
ColumnVersionReader
columnVersionReader
)
{
sink
.
clear
();
partitionDirFormatMethod
.
format
(
partitionTimestamp
,
null
,
null
,
sink
);
if
(
rebuildColumnIndex
==
ALL
)
{
for
(
int
columnIndex
=
metadata
.
getColumnCount
()
-
1
;
columnIndex
>
-
1
;
columnIndex
--)
{
if
(
metadata
.
isColumnIndexed
(
columnIndex
))
{
rebuildIndexForColumn
(
metadata
,
columnIndex
,
indexer
,
sink
,
partitionSize
,
ff
,
columnVersionReader
,
partitionTimestamp
);
}
}
}
else
{
if
(
metadata
.
isColumnIndexed
(
rebuildColumnIndex
))
{
rebuildIndexForColumn
(
metadata
,
rebuildColumnIndex
,
indexer
,
sink
,
partitionSize
,
ff
,
columnVersionReader
,
partitionTimestamp
);
}
else
{
throw
CairoException
.
instance
(
0
).
put
(
"Column is not indexed"
);
}
}
}
private
void
rebuildIndexForColumn
(
TableReaderMetadata
metadata
,
int
columnIndex
,
SymbolColumnIndexer
indexer
,
StringSink
sink
,
long
partitionSize
,
FilesFacade
ff
,
ColumnVersionReader
columnVersionReader
,
long
partitionTimestamp
)
{
CharSequence
columnName
=
metadata
.
getColumnName
(
columnIndex
);
int
indexValueBlockCapacity
=
metadata
.
getIndexValueBlockCapacity
(
columnIndex
);
int
writerIndex
=
metadata
.
getWriterIndex
(
columnIndex
);
rebuildIndex
(
indexer
,
columnName
,
sink
,
indexValueBlockCapacity
,
partitionSize
,
ff
,
columnVersionReader
,
writerIndex
,
partitionTimestamp
);
}
private
void
removeIndexFiles
(
CharSequence
columnName
,
FilesFacade
ff
,
long
columnNameTxn
)
{
final
int
plen
=
path
.
length
();
BitmapIndexUtils
.
keyFileName
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
removeFile
(
path
,
ff
);
BitmapIndexUtils
.
valueFileName
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
removeFile
(
path
,
ff
);
}
private
void
removeFile
(
Path
path
,
FilesFacade
ff
)
{
LOG
.
info
().
$
(
"deleting "
).
utf8
(
path
).
$
();
if
(!
ff
.
remove
(
this
.
path
))
{
...
...
@@ -296,38 +155,12 @@ public class RebuildIndex implements Closeable, Mutable {
}
}
private
void
lock
(
FilesFacade
ff
)
{
try
{
path
.
trimTo
(
rootLen
);
lockName
(
path
);
this
.
lockFd
=
TableUtils
.
lock
(
ff
,
path
);
}
finally
{
path
.
trimTo
(
rootLen
);
}
if
(
this
.
lockFd
==
-
1L
)
{
throw
CairoException
.
instance
(
ff
.
errno
()).
put
(
"Cannot lock table: "
).
put
(
path
.
$
());
}
}
@Override
public
void
close
()
{
this
.
path
.
close
();
Misc
.
free
(
metadata
);
}
private
void
removeIndexFiles
(
CharSequence
columnName
,
FilesFacade
ff
,
long
columnNameTxn
)
{
final
int
plen
=
path
.
length
();
BitmapIndexUtils
.
keyFileName
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
removeFile
(
path
,
ff
);
private
void
releaseLock
(
FilesFacade
ff
)
{
if
(
lockFd
!=
-
1L
)
{
ff
.
close
(
lockFd
);
try
{
path
.
trimTo
(
rootLen
);
lockName
(
path
);
if
(
ff
.
exists
(
path
)
&&
!
ff
.
remove
(
path
))
{
throw
CairoException
.
instance
(
ff
.
errno
()).
put
(
"Cannot remove "
).
put
(
path
);
}
}
finally
{
path
.
trimTo
(
rootLen
);
}
}
BitmapIndexUtils
.
valueFileName
(
path
.
trimTo
(
plen
),
columnName
,
columnNameTxn
);
removeFile
(
path
,
ff
);
}
}
core/src/main/java/io/questdb/cairo/RecoverVarIndex.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.cairo
;
import
io.questdb.cairo.vm.MemoryCMARWImpl
;
import
io.questdb.cairo.vm.MemoryCMRImpl
;
import
io.questdb.cairo.vm.api.MemoryCMARW
;
import
io.questdb.cairo.vm.api.MemoryCMR
;
import
io.questdb.log.Log
;
import
io.questdb.log.LogFactory
;
import
io.questdb.std.FilesFacade
;
import
io.questdb.std.MemoryTag
;
public
class
RecoverVarIndex
extends
RebuildColumnBase
{
private
static
final
Log
LOG
=
LogFactory
.
getLog
(
RebuildIndex
.
class
);
protected
boolean
checkColumnType
(
TableReaderMetadata
metadata
,
int
rebuildColumnIndex
)
{
return
metadata
.
getColumnType
(
rebuildColumnIndex
)
==
ColumnType
.
STRING
;
}
@Override
protected
void
rebuildColumn
(
CharSequence
columnName
,
CharSequence
partitionName
,
int
indexValueBlockCapacity
,
long
partitionSize
,
FilesFacade
ff
,
ColumnVersionReader
columnVersionReader
,
int
columnIndex
,
long
partitionTimestamp
,
long
partitionNameTxn
)
{
long
columnNameTxn
=
columnVersionReader
.
getColumnNameTxn
(
partitionTimestamp
,
columnIndex
);
long
columnAddedPartition
=
columnVersionReader
.
getColumnTopPartitionTimestamp
(
columnIndex
);
long
columnTop
=
columnVersionReader
.
getColumnTop
(
partitionTimestamp
,
columnIndex
);
if
(
columnTop
==
0
&&
partitionTimestamp
<
columnAddedPartition
)
{
LOG
.
info
().
$
(
"not rebuilding column "
).
$
(
columnName
).
$
(
" in partition "
).
$ts
(
partitionTimestamp
).
$
(
", column not added to partition"
).
$
();
return
;
}
path
.
trimTo
(
rootLen
).
concat
(
partitionName
);
TableUtils
.
txnPartitionConditionally
(
path
,
partitionNameTxn
);
path
.
concat
(
columnName
);
int
colNameLen
=
path
.
length
();
path
.
put
(
".d"
);
if
(
columnNameTxn
!=
-
1L
)
{
path
.
put
(
'.'
).
put
(
columnNameTxn
);
}
LOG
.
info
().
$
(
"reading: "
).
$
(
path
).
$
();
long
maxOffset
=
ff
.
length
(
path
.
$
());
try
(
MemoryCMR
roMem
=
new
MemoryCMRImpl
(
ff
,
path
.
$
(),
maxOffset
,
MemoryTag
.
NATIVE_DEFAULT
))
{
path
.
trimTo
(
colNameLen
).
put
(
".i"
);
if
(
columnNameTxn
!=
-
1L
)
{
path
.
put
(
'.'
).
put
(
columnNameTxn
);
}
LOG
.
info
().
$
(
"writing: "
).
$
(
path
).
$
();
try
(
MemoryCMARW
rwMem
=
new
MemoryCMARWImpl
(
ff
,
path
.
$
(),
8
*
1024
*
1024
,
0
,
MemoryTag
.
NATIVE_DEFAULT
,
0
))
{
long
expectedRowCount
=
partitionSize
-
columnTop
;
LOG
.
info
().
$
(
"data file length: "
).
$
(
maxOffset
).
$
(
", expected record count: "
).
$
(
expectedRowCount
).
$
();
// index
long
offset
=
0
;
int
rows
=
0
;
while
(
rows
<
expectedRowCount
&&
offset
+
3
<
maxOffset
)
{
int
len
=
roMem
.
getInt
(
offset
);
rwMem
.
putLong
(
offset
);
if
(
len
>
-
1
)
{
offset
+=
4
+
len
*
2L
;
}
else
{
offset
+=
4
;
}
rows
++;
}
if
(
rows
!=
expectedRowCount
)
{
throw
CairoException
.
instance
(
0
)
.
put
(
" rebuild var index file failed [path="
).
put
(
path
)
.
put
(
", expectedRows="
).
put
(
expectedRowCount
)
.
put
(
", actualRows="
).
put
(
rows
).
put
(
']'
);
}
rwMem
.
putLong
(
offset
);
LOG
.
info
().
$
(
"write complete. Index file length: "
).
$
(
rwMem
.
getAppendOffset
()).
$
();
}
}
}
}
core/src/main/java/io/questdb/log/LogFactory.java
浏览文件 @
84fae1ad
...
...
@@ -70,6 +70,7 @@ public class LogFactory implements Closeable {
private
int
queueDepth
=
DEFAULT_QUEUE_DEPTH
;
private
int
recordLength
=
DEFAULT_MSG_SIZE
;
static
boolean
envEnabled
=
true
;
static
boolean
overwriteWithSyncLogging
=
false
;
public
LogFactory
()
{
this
(
MicrosecondClockImpl
.
INSTANCE
);
...
...
@@ -79,6 +80,10 @@ public class LogFactory implements Closeable {
this
.
clock
=
clock
;
}
public
static
void
configureAsync
()
{
overwriteWithSyncLogging
=
false
;
}
public
static
void
configureFromProperties
(
LogFactory
factory
,
Properties
properties
,
WorkerPool
workerPool
,
String
logDir
)
{
factory
.
workerPool
=
workerPool
;
...
...
@@ -195,6 +200,10 @@ public class LogFactory implements Closeable {
configureFromSystemProperties
(
INSTANCE
,
workerPool
);
}
public
static
void
configureSync
()
{
overwriteWithSyncLogging
=
true
;
}
@SuppressWarnings
(
"rawtypes"
)
public
static
Log
getLog
(
Class
clazz
)
{
return
getLog
(
clazz
.
getName
());
...
...
@@ -289,6 +298,7 @@ public class LogFactory implements Closeable {
final
Holder
err
=
scopeConfiguration
.
getHolder
(
Numbers
.
msb
(
LogLevel
.
ERROR
));
final
Holder
cri
=
scopeConfiguration
.
getHolder
(
Numbers
.
msb
(
LogLevel
.
CRITICAL
));
final
Holder
adv
=
scopeConfiguration
.
getHolder
(
Numbers
.
msb
(
LogLevel
.
ADVISORY
));
if
(!
overwriteWithSyncLogging
)
{
return
new
Logger
(
clock
,
compressScope
(
key
,
sink
),
...
...
@@ -305,6 +315,22 @@ public class LogFactory implements Closeable {
);
}
return
new
SyncLogger
(
clock
,
compressScope
(
key
,
sink
),
dbg
==
null
?
null
:
dbg
.
ring
,
dbg
==
null
?
null
:
dbg
.
lSeq
,
inf
==
null
?
null
:
inf
.
ring
,
inf
==
null
?
null
:
inf
.
lSeq
,
err
==
null
?
null
:
err
.
ring
,
err
==
null
?
null
:
err
.
lSeq
,
cri
==
null
?
null
:
cri
.
ring
,
cri
==
null
?
null
:
cri
.
lSeq
,
adv
==
null
?
null
:
adv
.
ring
,
adv
==
null
?
null
:
adv
.
lSeq
);
}
public
int
getQueueDepth
()
{
return
queueDepth
;
}
...
...
core/src/main/java/io/questdb/log/SyncLogger.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.log
;
import
io.questdb.mp.RingQueue
;
import
io.questdb.mp.Sequence
;
import
io.questdb.network.Net
;
import
io.questdb.std.ThreadLocal
;
import
io.questdb.std.*
;
import
io.questdb.std.datetime.microtime.MicrosecondClock
;
import
io.questdb.std.datetime.microtime.TimestampFormatUtils
;
import
io.questdb.std.str.StringSink
;
import
java.io.File
;
public
class
SyncLogger
implements
LogRecord
,
Log
{
private
final
static
ThreadLocal
<
StringSink
>
line
=
new
ThreadLocal
<>(
StringSink:
:
new
);
private
final
CharSequence
name
;
private
final
RingQueue
<
LogRecordSink
>
debugRing
;
private
final
Sequence
debugSeq
;
private
final
RingQueue
<
LogRecordSink
>
infoRing
;
private
final
Sequence
infoSeq
;
private
final
RingQueue
<
LogRecordSink
>
errorRing
;
private
final
Sequence
errorSeq
;
private
final
RingQueue
<
LogRecordSink
>
criticalRing
;
private
final
Sequence
criticalSeq
;
private
final
RingQueue
<
LogRecordSink
>
advisoryRing
;
private
final
Sequence
advisorySeq
;
private
final
MicrosecondClock
clock
;
SyncLogger
(
MicrosecondClock
clock
,
CharSequence
name
,
RingQueue
<
LogRecordSink
>
debugRing
,
Sequence
debugSeq
,
RingQueue
<
LogRecordSink
>
infoRing
,
Sequence
infoSeq
,
RingQueue
<
LogRecordSink
>
errorRing
,
Sequence
errorSeq
,
RingQueue
<
LogRecordSink
>
criticalRing
,
Sequence
criticalSeq
,
RingQueue
<
LogRecordSink
>
advisoryRing
,
Sequence
advisorySeq
)
{
this
.
clock
=
clock
;
this
.
name
=
name
;
this
.
debugRing
=
debugRing
;
this
.
debugSeq
=
debugSeq
;
this
.
infoRing
=
infoRing
;
this
.
infoSeq
=
infoSeq
;
this
.
errorRing
=
errorRing
;
this
.
errorSeq
=
errorSeq
;
this
.
criticalRing
=
criticalRing
;
this
.
criticalSeq
=
criticalSeq
;
this
.
advisoryRing
=
advisoryRing
;
this
.
advisorySeq
=
advisorySeq
;
}
@Override
public
void
$
()
{
StringSink
sink
=
line
.
get
();
System
.
out
.
println
(
sink
);
sink
.
clear
();
}
@Override
public
LogRecord
$
(
CharSequence
sequence
)
{
if
(
sequence
==
null
)
{
sink
().
put
(
"null"
);
}
else
{
sink
().
put
(
sequence
);
}
return
this
;
}
@Override
public
LogRecord
$
(
CharSequence
sequence
,
int
lo
,
int
hi
)
{
sink
().
put
(
sequence
,
lo
,
hi
);
return
this
;
}
@Override
public
LogRecord
$utf8
(
long
lo
,
long
hi
)
{
Chars
.
utf8Decode
(
lo
,
hi
,
this
);
return
this
;
}
@Override
public
LogRecord
$
(
int
x
)
{
sink
().
put
(
x
);
return
this
;
}
@Override
public
LogRecord
$
(
double
x
)
{
sink
().
put
(
x
);
return
this
;
}
@Override
public
LogRecord
$
(
long
x
)
{
sink
().
put
(
x
);
return
this
;
}
@Override
public
LogRecord
$
(
boolean
x
)
{
sink
().
put
(
x
);
return
this
;
}
@Override
public
LogRecord
$
(
char
c
)
{
sink
().
put
(
c
);
return
this
;
}
@Override
public
LogRecord
$
(
Throwable
e
)
{
if
(
e
!=
null
)
{
sink
().
put
(
Misc
.
EOL
).
put
(
e
);
}
return
this
;
}
@Override
public
LogRecord
$
(
File
x
)
{
sink
().
put
(
x
==
null
?
"null"
:
x
.
getAbsolutePath
());
return
this
;
}
@Override
public
LogRecord
$
(
Object
x
)
{
sink
().
put
(
x
==
null
?
"null"
:
x
.
toString
());
return
this
;
}
@Override
public
LogRecord
$
(
Sinkable
x
)
{
if
(
x
==
null
)
{
sink
().
put
(
"null"
);
}
else
{
x
.
toSink
(
sink
());
}
return
this
;
}
@Override
public
LogRecord
$ip
(
long
ip
)
{
Net
.
appendIP4
(
sink
(),
ip
);
return
this
;
}
@Override
public
LogRecord
$ts
(
long
x
)
{
sink
().
putISODate
(
x
);
return
this
;
}
@Override
public
LogRecord
$
256
(
long
a
,
long
b
,
long
c
,
long
d
)
{
Numbers
.
appendLong256
(
a
,
b
,
c
,
d
,
sink
());
return
this
;
}
@Override
public
LogRecord
$hex
(
long
value
)
{
Numbers
.
appendHex
(
sink
(),
value
,
false
);
return
this
;
}
@Override
public
boolean
isEnabled
()
{
return
true
;
}
@Override
public
LogRecord
ts
()
{
sink
().
putISODate
(
clock
.
getTicks
());
return
this
;
}
@Override
public
LogRecord
microTime
(
long
x
)
{
TimestampFormatUtils
.
appendDateTimeUSec
(
sink
(),
x
);
return
this
;
}
@Override
public
LogRecord
utf8
(
CharSequence
sequence
)
{
if
(
sequence
==
null
)
{
sink
().
put
(
"null"
);
}
else
{
sink
().
encodeUtf8
(
sequence
);
}
return
this
;
}
@Override
public
LogRecord
debug
()
{
return
addTimestamp
(
xdebug
(),
LogLevel
.
DEBUG_HEADER
);
}
@Override
public
LogRecord
debugW
()
{
return
addTimestamp
(
xDebugW
(),
LogLevel
.
DEBUG_HEADER
);
}
@Override
public
LogRecord
error
()
{
return
addTimestamp
(
xerror
(),
LogLevel
.
ERROR_HEADER
);
}
@Override
public
LogRecord
errorW
()
{
return
addTimestamp
(
xErrorW
(),
LogLevel
.
ERROR_HEADER
);
}
@Override
public
LogRecord
critical
()
{
return
addTimestamp
(
xcritical
(),
LogLevel
.
CRITICAL_HEADER
);
}
@Override
public
LogRecord
criticalW
()
{
return
addTimestamp
(
xCriticalW
(),
LogLevel
.
CRITICAL_HEADER
);
}
@Override
public
LogRecord
info
()
{
return
addTimestamp
(
xinfo
(),
LogLevel
.
INFO_HEADER
);
}
@Override
public
LogRecord
infoW
()
{
return
addTimestamp
(
xInfoW
(),
LogLevel
.
INFO_HEADER
);
}
@Override
public
LogRecord
advisory
()
{
return
addTimestamp
(
xadvisory
(),
LogLevel
.
ADVISORY_HEADER
);
}
@Override
public
LogRecord
advisoryW
()
{
return
addTimestamp
(
xAdvisoryW
(),
LogLevel
.
ADVISORY_HEADER
);
}
@Override
public
boolean
isDebugEnabled
()
{
return
debugSeq
!=
null
;
}
public
LogRecord
xerror
()
{
return
next
(
errorSeq
,
errorRing
,
LogLevel
.
ERROR
);
}
public
LogRecord
xcritical
()
{
return
next
(
criticalSeq
,
criticalRing
,
LogLevel
.
CRITICAL
);
}
public
LogRecord
xinfo
()
{
return
next
(
infoSeq
,
infoRing
,
LogLevel
.
INFO
);
}
/**
* Guaranteed log delivery at INFO level. The calling thread will wait for async logger
* to become available instead of discarding log message.
*
* @return log record API
*/
public
LogRecord
xInfoW
()
{
return
next
(
infoSeq
,
infoRing
,
LogLevel
.
INFO
);
}
public
LogRecord
xdebug
()
{
return
next
(
debugSeq
,
debugRing
,
LogLevel
.
DEBUG
);
}
public
LogRecord
xDebugW
()
{
return
next
(
infoSeq
,
infoRing
,
LogLevel
.
DEBUG
);
}
@Override
public
LogRecord
xadvisory
()
{
return
next
(
advisorySeq
,
advisoryRing
,
LogLevel
.
ADVISORY
);
}
@Override
public
LogRecord
put
(
char
c
)
{
sink
().
put
(
c
);
return
this
;
}
public
LogRecord
xAdvisoryW
()
{
return
next
(
infoSeq
,
infoRing
,
LogLevel
.
ADVISORY
);
}
public
LogRecord
xCriticalW
()
{
return
next
(
infoSeq
,
infoRing
,
LogLevel
.
CRITICAL
);
}
public
LogRecord
xErrorW
()
{
return
next
(
infoSeq
,
infoRing
,
LogLevel
.
ERROR
);
}
private
LogRecord
addTimestamp
(
LogRecord
rec
,
String
level
)
{
return
rec
.
ts
().
$
(
level
).
$
(
name
);
}
private
LogRecord
next
(
Sequence
seq
,
RingQueue
<
LogRecordSink
>
ring
,
int
level
)
{
if
(
seq
==
null
)
{
return
NullLogRecord
.
INSTANCE
;
}
return
this
;
}
private
StringSink
sink
()
{
return
line
.
get
();
}
}
core/src/test/java/io/questdb/cairo/RecoverVarIndexTest.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.cairo
;
import
io.questdb.cairo.security.AllowAllCairoSecurityContext
;
import
io.questdb.cairo.sql.RecordCursor
;
import
io.questdb.cairo.sql.RecordCursorFactory
;
import
io.questdb.griffin.SqlCompiler
;
import
io.questdb.griffin.SqlException
;
import
io.questdb.griffin.SqlExecutionContextImpl
;
import
io.questdb.griffin.engine.functions.bind.BindVariableServiceImpl
;
import
io.questdb.log.LogFactory
;
import
io.questdb.std.Chars
;
import
io.questdb.std.Files
;
import
io.questdb.std.FilesFacadeImpl
;
import
io.questdb.std.datetime.microtime.Timestamps
;
import
io.questdb.std.str.LPSZ
;
import
io.questdb.std.str.Path
;
import
io.questdb.test.tools.TestUtils
;
import
org.junit.*
;
import
java.util.concurrent.atomic.AtomicInteger
;
public
class
RecoverVarIndexTest
extends
AbstractCairoTest
{
protected
static
CharSequence
root
;
private
static
SqlCompiler
compiler
;
private
static
SqlExecutionContextImpl
sqlExecutionContext
;
private
final
RecoverVarIndex
rebuildVarColumn
=
new
RecoverVarIndex
();
TableWriter
tempWriter
;
@BeforeClass
public
static
void
setUpStatic
()
{
AbstractCairoTest
.
setUpStatic
();
compiler
=
new
SqlCompiler
(
engine
);
BindVariableServiceImpl
bindVariableService
=
new
BindVariableServiceImpl
(
configuration
);
sqlExecutionContext
=
new
SqlExecutionContextImpl
(
engine
,
1
)
.
with
(
AllowAllCairoSecurityContext
.
INSTANCE
,
bindVariableService
,
null
,
-
1
,
null
);
bindVariableService
.
clear
();
LogFactory
.
configureSync
();
}
@AfterClass
public
static
void
tearDownStatic
()
{
compiler
.
close
();
LogFactory
.
configureAsync
();
}
@After
public
void
cleanup
()
{
rebuildVarColumn
.
close
();
}
@Test
public
void
testEmptyTable
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_symbol('A', 'B', 'C') as sym1,"
+
"rnd_symbol(4,4,4,2) as sym2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(0)"
+
"), index(sym1), index(sym2) timestamp(ts) PARTITION BY DAY"
;
checkRecoverVarIndex
(
createTableSql
,
(
tablePath
)
->
{
},
RecoverVarIndex:
:
rebuildAll
);
}
@Test
public
void
testNonPartitionedWithColumnTop
()
throws
Exception
{
String
createAlterInsertSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(5000)"
+
"); "
+
"alter table xxx add column str2 string; "
+
"insert into xxx "
+
"select "
+
"rnd_str('A', 'B', 'C') as str1,"
+
"x,"
+
"timestamp_sequence(100000000L * 5000L, 100000000) ts, "
+
"rnd_str(4,4,4,2) as str2 "
+
"from long_sequence(5000)"
;
checkRecoverVarIndex
(
createAlterInsertSql
,
tablePath
->
removeFileAtPartition
(
"str2.i.1"
,
PartitionBy
.
NONE
,
tablePath
,
0
),
rebuildIndex
->
rebuildIndex
.
rebuildColumn
(
"str2"
));
}
@Test
public
void
testNonPartitionedWithColumnTopAddedLast
()
throws
Exception
{
String
createAlterInsertSql
=
"create table xxx as ("
+
"select "
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(5000)"
+
");"
+
"alter table xxx add column str1 string;"
+
"alter table xxx add column str2 string"
;
checkRecoverVarIndex
(
createAlterInsertSql
,
tablePath
->
{
},
RecoverVarIndex:
:
rebuildAll
);
engine
.
releaseAllWriters
();
compiler
.
compile
(
"insert into xxx values(500100000000L, 50001, 'D', 'I2')"
,
sqlExecutionContext
).
execute
(
null
).
await
();
int
sym1D
=
countByFullScanWhereValueD
();
Assert
.
assertEquals
(
1
,
sym1D
);
}
@Test
public
void
testNonePartitionedOneColumn
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
")"
;
checkRecoverVarIndex
(
createTableSql
,
tablePath
->
removeFileAtPartition
(
"str1.i"
,
PartitionBy
.
NONE
,
tablePath
,
0
),
rebuildIndex
->
rebuildIndex
.
rebuildColumn
(
"str1"
));
}
@Test
public
void
testPartitionedDaily
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
") timestamp(ts) PARTITION BY DAY"
;
checkRecoverVarIndex
(
createTableSql
,
(
tablePath
)
->
{
removeFileAtPartition
(
"str1.i"
,
PartitionBy
.
DAY
,
tablePath
,
0
);
removeFileAtPartition
(
"str2.i"
,
PartitionBy
.
DAY
,
tablePath
,
0
);
},
RecoverVarIndex:
:
rebuildAll
);
}
@Test
public
void
testPartitionedNone
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
")"
;
checkRecoverVarIndex
(
createTableSql
,
(
tablePath
)
->
{
removeFileAtPartition
(
"str1.i"
,
PartitionBy
.
NONE
,
tablePath
,
0
);
removeFileAtPartition
(
"str2.i"
,
PartitionBy
.
NONE
,
tablePath
,
0
);
},
RecoverVarIndex:
:
rebuildAll
);
}
@Test
public
void
testPartitionedOneColumn
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
") timestamp(ts) PARTITION BY DAY"
;
checkRecoverVarIndex
(
createTableSql
,
tablePath
->
removeFileAtPartition
(
"str1.i"
,
PartitionBy
.
DAY
,
tablePath
,
0
),
rebuildIndex
->
rebuildIndex
.
rebuildColumn
(
"str1"
));
}
@Test
public
void
testPartitionedOneColumnFirstPartition
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
") timestamp(ts) PARTITION BY DAY"
;
checkRecoverVarIndex
(
createTableSql
,
tablePath
->
removeFileAtPartition
(
"str1.i"
,
PartitionBy
.
DAY
,
tablePath
,
0
),
rebuildIndex
->
rebuildIndex
.
rebuildPartitionColumn
(
"1970-01-01"
,
"str1"
));
}
@Test
public
void
testPartitionedWithColumnTop
()
throws
Exception
{
String
createAlterInsertSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(5000)"
+
") timestamp(ts) PARTITION BY DAY; "
+
"alter table xxx add column str2 string; "
+
"insert into xxx "
+
"select "
+
"rnd_str('A', 'B', 'C') as str1,"
+
"x,"
+
"timestamp_sequence(100000000L * 5000L, 100000000) ts, "
+
"rnd_str(4,4,4,2) as str2 "
+
"from long_sequence(5000)"
;
checkRecoverVarIndex
(
createAlterInsertSql
,
tablePath
->
removeFileAtPartition
(
"str2.i.1"
,
PartitionBy
.
DAY
,
tablePath
,
Timestamps
.
DAY_MICROS
*
11
),
rebuildIndex
->
rebuildIndex
.
rebuildColumn
(
"str2"
));
}
@Test
public
void
testRebuildColumnTableWriterLockedFails
()
throws
Exception
{
assertMemoryLeak
(()
->
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
") timestamp(ts) PARTITION BY DAY"
;
tempWriter
=
null
;
try
{
checkRecoverVarIndex
(
createTableSql
,
tablePath
->
tempWriter
=
engine
.
getWriter
(
sqlExecutionContext
.
getCairoSecurityContext
(),
"xxx"
,
"test lock"
),
rebuildIndex
->
{
try
{
rebuildIndex
.
rebuildColumn
(
"str1"
);
}
finally
{
tempWriter
.
close
();
}
});
Assert
.
fail
();
}
catch
(
CairoException
ex
)
{
TestUtils
.
assertContains
(
ex
.
getFlyweightMessage
(),
"Cannot lock table"
);
}
});
}
@Test
public
void
testRebuildFailsWriteIFile
()
throws
Exception
{
assertMemoryLeak
(()
->
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
") timestamp(ts) PARTITION BY DAY"
;
AtomicInteger
count
=
new
AtomicInteger
();
ff
=
new
FilesFacadeImpl
()
{
@Override
public
long
openRW
(
LPSZ
name
,
long
opts
)
{
if
(
Chars
.
contains
(
name
,
"str2.i"
)
&&
count
.
incrementAndGet
()
==
14
)
{
return
-
1
;
}
return
Files
.
openRW
(
name
,
opts
);
}
};
try
{
checkRecoverVarIndex
(
createTableSql
,
tablePath
->
{
},
rebuildIndex
->
rebuildIndex
.
rebuildColumn
(
"str2"
));
Assert
.
fail
();
}
catch
(
CairoException
ex
)
{
TestUtils
.
assertContains
(
ex
.
getFlyweightMessage
(),
"could not open read-write"
);
}
});
}
@Test
public
void
testRebuildWrongColumn
()
throws
Exception
{
String
createTableSql
=
"create table xxx as ("
+
"select "
+
"rnd_str('A', 'Bbb', 'Ccccc') as str1,"
+
"rnd_str('A', 'Bbb', 'Ccccc', '412312', '2212321') as str2,"
+
"x,"
+
"timestamp_sequence(0, 100000000) ts "
+
"from long_sequence(10000)"
+
") timestamp(ts) PARTITION BY DAY"
;
try
{
checkRecoverVarIndex
(
createTableSql
,
tablePath
->
{
},
rebuildIndex
->
rebuildIndex
.
rebuildColumn
(
"x"
));
Assert
.
fail
();
}
catch
(
CairoException
ex
)
{
TestUtils
.
assertContains
(
ex
.
getFlyweightMessage
(),
"Wrong column type"
);
}
}
private
void
checkRecoverVarIndex
(
String
createTableSql
,
Action
<
String
>
changeTable
,
Action
<
RecoverVarIndex
>
rebuildIndexAction
)
throws
Exception
{
assertMemoryLeak
(
ff
,
()
->
{
for
(
String
sql
:
createTableSql
.
split
(
";"
))
{
compiler
.
compile
(
sql
,
sqlExecutionContext
).
execute
(
null
).
await
();
}
compiler
.
compile
(
"create table copytbl as (select * from xxx)"
,
sqlExecutionContext
).
execute
(
null
).
await
();
engine
.
releaseAllReaders
();
engine
.
releaseAllWriters
();
String
tablePath
=
configuration
.
getRoot
().
toString
()
+
Files
.
SEPARATOR
+
"xxx"
;
changeTable
.
run
(
tablePath
);
rebuildVarColumn
.
clear
();
rebuildVarColumn
.
of
(
tablePath
,
configuration
);
rebuildIndexAction
.
run
(
rebuildVarColumn
);
TestUtils
.
assertSqlCursors
(
compiler
,
sqlExecutionContext
,
"copytbl"
,
"xxx"
,
LOG
);
});
}
private
int
countByFullScanWhereValueD
()
throws
SqlException
{
int
recordCount
=
0
;
try
(
RecordCursorFactory
factory
=
compiler
.
compile
(
"select * from xxx where str1 = 'D'"
,
sqlExecutionContext
).
getRecordCursorFactory
())
{
try
(
RecordCursor
cursor
=
factory
.
getCursor
(
sqlExecutionContext
))
{
while
(
cursor
.
hasNext
())
{
recordCount
++;
}
}
}
return
recordCount
;
}
private
void
removeFileAtPartition
(
String
fileName
,
int
partitionBy
,
String
tablePath
,
long
partitionTs
)
{
try
(
Path
path
=
new
Path
())
{
path
.
concat
(
tablePath
);
path
.
put
(
Files
.
SEPARATOR
);
PartitionBy
.
setSinkForPartition
(
path
,
partitionBy
,
partitionTs
,
false
);
path
.
concat
(
fileName
);
LOG
.
info
().
$
(
"removing "
).
utf8
(
path
).
$
();
Assert
.
assertTrue
(
Files
.
remove
(
path
.
$
()));
}
}
@FunctionalInterface
interface
Action
<
T
>
{
void
run
(
T
val
);
}
}
utils/README.md
浏览文件 @
84fae1ad
...
...
@@ -5,7 +5,7 @@
Serializes binary
`_txn`
file to / from readable JSON format. Primary usage to
investigate storage issues
Usage
####
Usage
```
io.questdb.cliutil.TxSerializer -d <json_path> | -s <json_path> <txn_path>
...
...
@@ -13,3 +13,73 @@ io.questdb.cliutil.TxSerializer -d <json_path> | -s <json_path> <txn_path>
-
`-d`
option prints contents of
`_txn`
file to std output in JSON format
-
`-s`
option transforms existing JSON file into binary
\_
txn format
#### Examples
```
bash
java
-cp
utils.jar io.questdb.cliutil.TxSerializer
-d
/questdb-root/db/trades-COINBASE/_txn
>
/questdb-root/db/trades-COINBASE/txn.json
java
-cp
utils.jar io.questdb.cliutil.TxSerializer
-s
/questdb-root/db/trades-COINBASE/txn.json /questdb-root/db/trades-COINBASE/_txnCopy
```
### Rebuild index
Rebuilds indexes for a table
#### Usage
```
io.questdb.cliutil.RebuildIndex <table_path> [-p <partition_name>] [-c <column_name>]
```
-
`<table_path>`
full path to the table
-
`-c`
column name, optional. If omitted, all indexed columns will have indexes
rebuilt
-
`-p`
optional partition name. If omitted, all partitions will be affected
#### Examples
```
bash
java
-cp
utils.jar io.questdb.cliutil.RebuildIndex /questdb-root/db/trades-COINBASE
java
-cp
utils.jar io.questdb.cliutil.RebuildIndex /questdb-root/db/trades-COINBASE
-c
symbol
java
-cp
utils.jar io.questdb.cliutil.RebuildIndex /questdb-root/db/trades-COINBASE
-p
2022-03-21
java
-cp
utils.jar io.questdb.cliutil.RebuildIndex /questdb-root/db/trades-COINBASE
-p
2022-03-21
-c
symbol
```
### Rebuild String column index `.i` file
Rebuilds indexes for a table
#### Usage
```
io.questdb.cliutil.RecoverVarIndex <table_path> [-p <partition_name>] [-c <column_name>]
```
-
`<table_path>`
full path to the table
-
`-c`
column name, optional. If omitted, all string columns will have
`.i`
file
rebuild
-
`-p`
optional partition name. If omitted, all partitions will be affected
#### Examples
```
bash
java
-cp
utils.jar io.questdb.cliutil.RecoverVarIndex /questdb-root/db/trades-COINBASE
java
-cp
utils.jar io.questdb.cliutil.RecoverVarIndex /questdb-root/db/trades-COINBASE
-c
stringColumn
java
-cp
utils.jar io.questdb.cliutil.RecoverVarIndex /questdb-root/db/trades-COINBASE
-p
2022-03-21
java
-cp
utils.jar io.questdb.cliutil.RecoverVarIndex /questdb-root/db/trades-COINBASE
-p
2022-03-21
-c
stringColumn
```
## Build Utils project
To build single jar with dependencies run
```
bash
mvn clean package
```
utils/pom.xml
浏览文件 @
84fae1ad
...
...
@@ -34,8 +34,8 @@
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1
1
</maven.compiler.source>
<maven.compiler.target>
1
1
</maven.compiler.target>
<maven.compiler.source>
1
.8
</maven.compiler.source>
<maven.compiler.target>
1
.8
</maven.compiler.target>
</properties>
<dependencies>
...
...
@@ -48,7 +48,7 @@
<dependency>
<groupId>
com.google.code.gson
</groupId>
<artifactId>
gson
</artifactId>
<version>
2.
8.8
</version>
<version>
2.
9.0
</version>
</dependency>
<dependency>
<groupId>
org.questdb
</groupId>
...
...
@@ -57,6 +57,27 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-shade-plugin
</artifactId>
<version>
3.2.1
</version>
<executions>
<execution>
<phase>
package
</phase>
<goals>
<goal>
shade
</goal>
</goals>
</execution>
</executions>
<configuration>
<minimizeJar>
true
</minimizeJar>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>
java8
</id>
...
...
@@ -70,20 +91,7 @@
<activation>
<jdk>
(,1.8]
</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.8.1
</version>
<configuration>
<fork>
true
</fork>
<source>
1.8
</source>
<target>
1.8
</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>
org.jetbrains
</groupId>
...
...
utils/src/main/java/io/questdb/cliutil/
RebuildIndexMain
.java
→
utils/src/main/java/io/questdb/cliutil/
CmdUtils
.java
浏览文件 @
84fae1ad
...
...
@@ -24,30 +24,25 @@
package
io.questdb.cliutil
;
import
io.questdb.BuildInformation
;
import
io.questdb.BuildInformationHolder
;
import
io.questdb.PropServerConfiguration
;
import
io.questdb.ServerConfigurationException
;
import
io.questdb.cairo.CairoException
;
import
io.questdb.cairo.Rebuild
Index
;
import
io.questdb.cairo.Rebuild
ColumnBase
;
import
io.questdb.cutlass.json.JsonException
;
import
io.questdb.log.Log
;
import
io.questdb.log.LogFactory
;
import
io.questdb.std.Files
;
import
java.io.*
;
import
java.io.File
;
import
java.io.FileInputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.Properties
;
public
class
RebuildIndexMain
{
public
static
void
main
(
String
[]
args
)
throws
IOException
,
JsonException
,
ServerConfigurationException
{
CommandLineArgs
params
=
parseCommandArgs
(
args
);
if
(
params
==
null
)
{
// Invalid params, usage already printed
return
;
}
RebuildIndex
ri
=
new
RebuildIndex
();
public
class
CmdUtils
{
static
void
runColumnRebuild
(
RebuildColumnCommandArgs
params
,
RebuildColumnBase
ri
)
throws
IOException
,
ServerConfigurationException
,
JsonException
{
String
rootDirectory
=
params
.
tablePath
+
Files
.
SEPARATOR
+
".."
+
Files
.
SEPARATOR
+
".."
;
final
Properties
properties
=
new
Properties
();
final
String
configurationFileName
=
"/server.conf"
;
...
...
@@ -56,7 +51,7 @@ public class RebuildIndexMain {
try
(
InputStream
is
=
new
FileInputStream
(
configurationFile
))
{
properties
.
load
(
is
);
}
final
Log
log
=
LogFactory
.
getLog
(
"re
build
-index"
);
final
Log
log
=
LogFactory
.
getLog
(
"re
cover-var
-index"
);
PropServerConfiguration
configuration
=
readServerConfiguration
(
rootDirectory
,
properties
,
log
,
new
BuildInformationHolder
());
ri
.
of
(
params
.
tablePath
,
configuration
.
getCairoConfiguration
());
...
...
@@ -67,49 +62,7 @@ public class RebuildIndexMain {
}
}
static
CommandLineArgs
parseCommandArgs
(
String
[]
args
)
{
if
(
args
.
length
>
5
||
args
.
length
%
2
!=
1
)
{
printUsage
();
return
null
;
}
CommandLineArgs
res
=
new
CommandLineArgs
();
res
.
tablePath
=
args
[
0
];
for
(
int
i
=
1
,
n
=
args
.
length
;
i
<
n
;
i
+=
2
)
{
if
(
"-p"
.
equals
(
args
[
i
]))
{
if
(
res
.
partition
==
null
)
{
res
.
partition
=
args
[
i
+
1
];
}
else
{
System
.
err
.
println
(
"-p parameter can be only used once"
);
printUsage
();
return
null
;
}
}
if
(
"-c"
.
equals
(
args
[
i
]))
{
if
(
res
.
column
==
null
)
{
res
.
column
=
args
[
i
+
1
];
}
else
{
System
.
err
.
println
(
"-c parameter can be only used once"
);
printUsage
();
return
null
;
}
}
}
if
(
res
.
tablePath
.
endsWith
(
String
.
valueOf
(
Files
.
SEPARATOR
)))
{
res
.
tablePath
=
res
.
tablePath
.
substring
(
res
.
tablePath
.
length
());
}
return
res
;
}
private
static
void
printUsage
()
{
System
.
out
.
println
(
"usage: "
+
RebuildIndexMain
.
class
.
getName
()
+
" <table_path> [-p <partition_name>] [-c <column_name>]"
);
}
private
static
PropServerConfiguration
readServerConfiguration
(
static
PropServerConfiguration
readServerConfiguration
(
final
String
rootDirectory
,
final
Properties
properties
,
Log
log
,
...
...
@@ -117,10 +70,4 @@ public class RebuildIndexMain {
)
throws
ServerConfigurationException
,
JsonException
{
return
new
PropServerConfiguration
(
rootDirectory
,
properties
,
System
.
getenv
(),
log
,
buildInformation
);
}
static
class
CommandLineArgs
{
String
tablePath
;
String
partition
;
String
column
;
}
}
utils/src/main/java/io/questdb/cliutil/RebuildColumnCommandArgs.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.cliutil
;
import
io.questdb.std.Files
;
public
class
RebuildColumnCommandArgs
{
String
tablePath
;
String
partition
;
String
column
;
static
RebuildColumnCommandArgs
parseCommandArgs
(
String
[]
args
,
String
command
)
{
if
(
args
.
length
>
5
||
args
.
length
%
2
!=
1
)
{
printUsage
(
command
);
return
null
;
}
RebuildColumnCommandArgs
res
=
new
RebuildColumnCommandArgs
();
res
.
tablePath
=
args
[
0
];
for
(
int
i
=
1
,
n
=
args
.
length
;
i
<
n
;
i
+=
2
)
{
if
(
"-p"
.
equals
(
args
[
i
]))
{
if
(
res
.
partition
==
null
)
{
res
.
partition
=
args
[
i
+
1
];
}
else
{
System
.
err
.
println
(
"-p parameter can be only used once"
);
printUsage
(
command
);
return
null
;
}
}
if
(
"-c"
.
equals
(
args
[
i
]))
{
if
(
res
.
column
==
null
)
{
res
.
column
=
args
[
i
+
1
];
}
else
{
System
.
err
.
println
(
"-c parameter can be only used once"
);
printUsage
(
command
);
return
null
;
}
}
}
if
(
res
.
tablePath
.
endsWith
(
String
.
valueOf
(
Files
.
SEPARATOR
)))
{
res
.
tablePath
=
""
;
}
return
res
;
}
private
static
void
printUsage
(
String
command
)
{
System
.
out
.
println
(
"usage: "
+
command
+
" <table_path> [-p <partition_name>] [-c <column_name>]"
);
}
}
utils/src/main/java/io/questdb/cliutil/RebuildIndex.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.cliutil
;
import
io.questdb.BuildInformation
;
import
io.questdb.PropServerConfiguration
;
import
io.questdb.ServerConfigurationException
;
import
io.questdb.cutlass.json.JsonException
;
import
io.questdb.log.Log
;
import
io.questdb.log.LogFactory
;
import
java.io.IOException
;
import
java.util.Properties
;
import
static
io
.
questdb
.
cliutil
.
CmdUtils
.
runColumnRebuild
;
import
static
io
.
questdb
.
cliutil
.
RebuildColumnCommandArgs
.
parseCommandArgs
;
public
class
RebuildIndex
{
public
static
void
main
(
String
[]
args
)
throws
IOException
,
JsonException
,
ServerConfigurationException
{
LogFactory
.
configureSync
();
RebuildColumnCommandArgs
params
=
parseCommandArgs
(
args
,
RebuildIndex
.
class
.
getName
());
if
(
params
==
null
)
{
// Invalid params, usage already printed
return
;
}
runColumnRebuild
(
params
,
new
io
.
questdb
.
cairo
.
RebuildIndex
());
}
private
static
PropServerConfiguration
readServerConfiguration
(
final
String
rootDirectory
,
final
Properties
properties
,
Log
log
,
final
BuildInformation
buildInformation
)
throws
ServerConfigurationException
,
JsonException
{
return
new
PropServerConfiguration
(
rootDirectory
,
properties
,
System
.
getenv
(),
log
,
buildInformation
);
}
}
utils/src/main/java/io/questdb/cliutil/RecoverVarIndex.java
0 → 100644
浏览文件 @
84fae1ad
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.cliutil
;
import
io.questdb.ServerConfigurationException
;
import
io.questdb.cutlass.json.JsonException
;
import
io.questdb.log.LogFactory
;
import
java.io.IOException
;
import
static
io
.
questdb
.
cliutil
.
CmdUtils
.
runColumnRebuild
;
import
static
io
.
questdb
.
cliutil
.
RebuildColumnCommandArgs
.
parseCommandArgs
;
public
class
RecoverVarIndex
{
public
static
void
main
(
String
[]
args
)
throws
IOException
,
JsonException
,
ServerConfigurationException
{
LogFactory
.
configureSync
();
RebuildColumnCommandArgs
params
=
parseCommandArgs
(
args
,
RecoverVarIndex
.
class
.
getName
());
if
(
params
==
null
)
{
// Invalid params, usage already printed
return
;
}
runColumnRebuild
(
params
,
new
io
.
questdb
.
cairo
.
RecoverVarIndex
());
}
}
\ No newline at end of file
utils/src/main/java/io/questdb/cliutil/TxSerializer.java
浏览文件 @
84fae1ad
...
...
@@ -31,6 +31,7 @@ import io.questdb.cairo.TableUtils;
import
io.questdb.cairo.vm.Vm
;
import
io.questdb.cairo.vm.api.MemoryCMARW
;
import
io.questdb.cairo.vm.api.MemoryMR
;
import
io.questdb.log.LogFactory
;
import
io.questdb.std.FilesFacade
;
import
io.questdb.std.FilesFacadeImpl
;
import
io.questdb.std.MemoryTag
;
...
...
@@ -64,6 +65,7 @@ public class TxSerializer {
* Command line arguments: -s <json_path> <txn_path> | -d <txn_path>
*/
public
static
void
main
(
String
[]
args
)
throws
IOException
{
LogFactory
.
configureSync
();
if
(
args
.
length
<
2
||
args
.
length
>
3
)
{
printUsage
();
return
;
...
...
utils/src/test/java/io/questdb/cliutil/RebuildIndex
Main
Test.java
→
utils/src/test/java/io/questdb/cliutil/RebuildIndexTest.java
浏览文件 @
84fae1ad
...
...
@@ -27,51 +27,59 @@ package io.questdb.cliutil;
import
org.junit.Assert
;
import
org.junit.Test
;
public
class
RebuildIndexMainTest
{
@Test
public
void
testNoArgsFails
()
{
Assert
.
assertNull
(
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{}
));
}
import
static
io
.
questdb
.
cliutil
.
RebuildColumnCommandArgs
.
parseCommandArgs
;
public
class
RebuildIndexTest
{
@Test
public
void
testNoTableFails
()
{
Assert
.
assertNull
(
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"-p"
,
"2222"
}
));
public
void
testColumnAndPartition
()
{
RebuildColumnCommandArgs
params
=
parseCommandArgs
(
new
String
[]{
"tablePath"
,
"-c"
,
"abc"
,
"-p"
,
"2020"
},
RebuildIndex
.
class
.
getName
()
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
Assert
.
assertEquals
(
"abc"
,
params
.
column
);
Assert
.
assertEquals
(
"2020"
,
params
.
partition
);
params
=
parseCommandArgs
(
new
String
[]{
"tablePath"
,
"-p"
,
"2020"
,
"-c"
,
"abc"
},
RebuildIndex
.
class
.
getName
()
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
Assert
.
assertEquals
(
"abc"
,
params
.
column
);
Assert
.
assertEquals
(
"2020"
,
params
.
partition
);
}
@Test
public
void
testTooManyPartitionArgsFails
()
{
Assert
.
assertNull
(
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-p"
,
"2222"
,
"-p"
,
"2223"
}
));
public
void
testColumnOnly
()
{
RebuildColumnCommandArgs
params
=
parseCommandArgs
(
new
String
[]{
"tablePath"
,
"-c"
,
"9393"
},
RebuildIndex
.
class
.
getName
()
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
Assert
.
assertEquals
(
"9393"
,
params
.
column
);
}
@Test
public
void
testTooColumnManyArgsFails
()
{
Assert
.
assertNull
(
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-c"
,
"2222"
,
"-c"
,
"2223"
}
));
Assert
.
assertNull
(
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-p"
,
"2222"
,
"-c"
,
"dafda"
,
"-c"
,
"asb"
}
public
void
testNoArgsFails
()
{
Assert
.
assertNull
(
parseCommandArgs
(
new
String
[]{},
""
));
}
@Test
public
void
testTableNameOnly
()
{
RebuildIndexMain
.
CommandLineArgs
params
=
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
}
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
public
void
testNoTableFails
()
{
Assert
.
assertNull
(
parseCommandArgs
(
new
String
[]{
"-p"
,
"2222"
},
""
));
}
@Test
public
void
testPartitionOnly
()
{
Rebuild
IndexMain
.
CommandLineArgs
params
=
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-p"
,
"9393"
}
Rebuild
ColumnCommandArgs
params
=
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-p"
,
"9393"
},
""
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
...
...
@@ -79,31 +87,28 @@ public class RebuildIndexMainTest {
}
@Test
public
void
test
Column
Only
()
{
Rebuild
IndexMain
.
CommandLineArgs
params
=
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-c"
,
"9393"
}
public
void
test
TableName
Only
()
{
Rebuild
ColumnCommandArgs
params
=
parseCommandArgs
(
new
String
[]
{
"tablePath"
},
""
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
Assert
.
assertEquals
(
"9393"
,
params
.
column
);
}
@Test
public
void
test
ColumnAndPartition
()
{
RebuildIndexMain
.
CommandLineArgs
params
=
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-c"
,
"abc"
,
"-p"
,
"2020"
}
);
Assert
.
assertN
otNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
Assert
.
assertEquals
(
"abc"
,
params
.
column
);
Assert
.
assertEquals
(
"2020"
,
params
.
partition
);
public
void
test
TooColumnManyArgsFails
()
{
Assert
.
assertNull
(
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-c"
,
"2222"
,
"-c"
,
"2223"
},
""
)
)
;
Assert
.
assertN
ull
(
parseCommandArgs
(
new
String
[]{
"tablePath"
,
"-p"
,
"2222"
,
"-c"
,
"dafda"
,
"-c"
,
"asb"
},
""
)
);
}
params
=
RebuildIndexMain
.
parseCommandArgs
(
new
String
[]
{
"tablePath"
,
"-p"
,
"2020"
,
"-c"
,
"abc"
}
);
Assert
.
assertNotNull
(
params
);
Assert
.
assertEquals
(
"tablePath"
,
params
.
tablePath
);
Assert
.
assertEquals
(
"abc"
,
params
.
column
);
Assert
.
assertEquals
(
"2020"
,
params
.
partition
);
@Test
public
void
testTooManyPartitionArgsFails
()
{
Assert
.
assertNull
(
parseCommandArgs
(
new
String
[]{
"tablePath"
,
"-p"
,
"2222"
,
"-p"
,
"2223"
},
""
));
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录