Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
Questdb
提交
11ac9b61
Q
Questdb
项目概览
jobily
/
Questdb
12 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Q
Questdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
11ac9b61
编写于
11月 04, 2018
作者:
V
Vlad Ilyushchenko
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
GRIFFIN: asof join implementation for sources without rowid
上级
c7d34051
变更
17
展开全部
隐藏空白更改
内联
并排
Showing
17 changed file
with
1087 addition
and
231 deletion
+1087
-231
core/src/main/java/com/questdb/cairo/SymbolAsIntTypes.java
core/src/main/java/com/questdb/cairo/SymbolAsIntTypes.java
+6
-5
core/src/main/java/com/questdb/cairo/VirtualMemory.java
core/src/main/java/com/questdb/cairo/VirtualMemory.java
+19
-21
core/src/main/java/com/questdb/cairo/map/CompactMapRecord.java
...src/main/java/com/questdb/cairo/map/CompactMapRecord.java
+22
-9
core/src/main/java/com/questdb/cairo/map/CompactMapValue.java
.../src/main/java/com/questdb/cairo/map/CompactMapValue.java
+10
-0
core/src/main/java/com/questdb/cairo/map/FastMap.java
core/src/main/java/com/questdb/cairo/map/FastMap.java
+1
-14
core/src/main/java/com/questdb/cairo/map/FastMapRecord.java
core/src/main/java/com/questdb/cairo/map/FastMapRecord.java
+30
-17
core/src/main/java/com/questdb/cairo/map/FastMapValue.java
core/src/main/java/com/questdb/cairo/map/FastMapValue.java
+10
-0
core/src/main/java/com/questdb/cairo/map/MapRecord.java
core/src/main/java/com/questdb/cairo/map/MapRecord.java
+3
-0
core/src/main/java/com/questdb/cairo/map/MapValue.java
core/src/main/java/com/questdb/cairo/map/MapValue.java
+2
-0
core/src/main/java/com/questdb/cairo/map/RecordValueSinkFactory.java
...in/java/com/questdb/cairo/map/RecordValueSinkFactory.java
+1
-1
core/src/main/java/com/questdb/griffin/SqlCodeGenerator.java
core/src/main/java/com/questdb/griffin/SqlCodeGenerator.java
+110
-90
core/src/main/java/com/questdb/griffin/engine/join/AsOfJoinRecordCursorFactory.java
...stdb/griffin/engine/join/AsOfJoinRecordCursorFactory.java
+214
-0
core/src/main/java/com/questdb/std/BinarySequence.java
core/src/main/java/com/questdb/std/BinarySequence.java
+1
-2
core/src/test/java/com/questdb/cairo/map/CompactMapTest.java
core/src/test/java/com/questdb/cairo/map/CompactMapTest.java
+1
-1
core/src/test/java/com/questdb/cairo/map/FastMapTest.java
core/src/test/java/com/questdb/cairo/map/FastMapTest.java
+1
-1
core/src/test/java/com/questdb/cairo/map/RecordValueSinkFactoryTest.java
...ava/com/questdb/cairo/map/RecordValueSinkFactoryTest.java
+92
-1
core/src/test/java/com/questdb/griffin/JoinTest.java
core/src/test/java/com/questdb/griffin/JoinTest.java
+564
-69
未找到文件。
core/src/main/java/com/questdb/cairo/SymbolAsIntTypes.java
浏览文件 @
11ac9b61
...
...
@@ -24,11 +24,7 @@
package
com.questdb.cairo
;
public
class
SymbolAsIntTypes
implements
ColumnTypes
{
private
final
ColumnTypes
base
;
public
SymbolAsIntTypes
(
ColumnTypes
base
)
{
this
.
base
=
base
;
}
private
ColumnTypes
base
;
@Override
public
int
getColumnCount
()
{
...
...
@@ -40,4 +36,9 @@ public class SymbolAsIntTypes implements ColumnTypes {
final
int
type
=
base
.
getColumnType
(
columnIndex
);
return
type
==
ColumnType
.
SYMBOL
?
ColumnType
.
INT
:
type
;
}
public
SymbolAsIntTypes
of
(
ColumnTypes
base
)
{
this
.
base
=
base
;
return
this
;
}
}
core/src/main/java/com/questdb/cairo/VirtualMemory.java
浏览文件 @
11ac9b61
...
...
@@ -756,16 +756,7 @@ public class VirtualMemory implements Closeable {
}
private
void
putBinSequence
(
BinarySequence
value
,
long
pos
,
long
len
)
{
long
offset
=
0L
;
while
(
true
)
{
long
copied
=
value
.
copyTo
(
appendPointer
+
offset
,
pos
,
len
);
if
(
copied
==
len
)
{
break
;
}
len
-=
copied
;
pos
+=
copied
;
offset
+=
copied
;
}
value
.
copyTo
(
appendPointer
,
pos
,
len
);
}
private
void
putBinSlit
(
long
start
,
long
len
)
{
...
...
@@ -1006,17 +997,24 @@ public class VirtualMemory implements Closeable {
}
@Override
public
long
copyTo
(
long
address
,
long
start
,
long
length
)
{
long
offset
=
this
.
offset
+
start
;
int
page
=
pageIndex
(
offset
);
long
pageSize
=
getPageSize
(
page
);
long
pageAddress
=
getPageAddress
(
page
);
long
offsetInPage
=
offsetInPage
(
offset
);
long
len
=
Math
.
min
(
length
,
pageSize
-
offsetInPage
);
assert
len
>
-
1
;
long
srcAddress
=
pageAddress
+
offsetInPage
;
Unsafe
.
getUnsafe
().
copyMemory
(
srcAddress
,
address
,
len
);
return
len
;
public
void
copyTo
(
long
address
,
long
start
,
long
length
)
{
do
{
long
offset
=
this
.
offset
+
start
;
int
page
=
pageIndex
(
offset
);
long
pageSize
=
getPageSize
(
page
);
long
pageAddress
=
getPageAddress
(
page
);
long
offsetInPage
=
offsetInPage
(
offset
);
long
len
=
Math
.
min
(
length
,
pageSize
-
offsetInPage
);
assert
len
>
-
1
;
long
srcAddress
=
pageAddress
+
offsetInPage
;
Unsafe
.
getUnsafe
().
copyMemory
(
srcAddress
,
address
,
len
);
if
(
len
==
length
)
{
break
;
}
length
-=
len
;
start
+=
len
;
address
+=
len
;
}
while
(
true
);
}
public
long
length
()
{
...
...
core/src/main/java/com/questdb/cairo/map/CompactMapRecord.java
浏览文件 @
11ac9b61
...
...
@@ -25,6 +25,7 @@ package com.questdb.cairo.map;
import
com.questdb.cairo.TableUtils
;
import
com.questdb.cairo.VirtualMemory
;
import
com.questdb.cairo.sql.RecordCursor
;
import
com.questdb.std.BinarySequence
;
import
com.questdb.std.Unsafe
;
...
...
@@ -34,11 +35,13 @@ class CompactMapRecord implements MapRecord {
private
final
long
columnOffsets
[];
private
final
CompactMapValue
value
;
private
long
offset
;
private
RecordCursor
symbolTableResolver
;
public
CompactMapRecord
(
VirtualMemory
entries
,
long
columnOffsets
[],
CompactMapValue
value
)
{
this
.
entries
=
entries
;
this
.
columnOffsets
=
columnOffsets
;
this
.
value
=
value
;
this
.
value
.
linkRecord
(
this
);
}
@SuppressWarnings
(
"MethodDoesntCallSuperMethod"
)
...
...
@@ -85,15 +88,6 @@ class CompactMapRecord implements MapRecord {
return
entries
.
getFloat
(
getColumnOffset
(
col
));
}
@Override
public
CharSequence
getStr
(
int
col
)
{
long
o
=
getLong
(
col
);
if
(
o
==
-
1L
)
{
return
null
;
}
return
entries
.
getStr
(
offset
+
o
);
}
@Override
public
int
getInt
(
int
col
)
{
return
entries
.
getInt
(
getColumnOffset
(
col
));
...
...
@@ -114,6 +108,15 @@ class CompactMapRecord implements MapRecord {
return
entries
.
getShort
(
getColumnOffset
(
col
));
}
@Override
public
CharSequence
getStr
(
int
col
)
{
long
o
=
getLong
(
col
);
if
(
o
==
-
1L
)
{
return
null
;
}
return
entries
.
getStr
(
offset
+
o
);
}
@Override
public
CharSequence
getStrB
(
int
col
)
{
long
o
=
getLong
(
col
);
...
...
@@ -132,12 +135,22 @@ class CompactMapRecord implements MapRecord {
return
entries
.
getStrLen
(
offset
+
o
);
}
@Override
public
CharSequence
getSym
(
int
col
)
{
return
symbolTableResolver
.
getSymbolTable
(
col
).
value
(
getInt
(
col
));
}
@Override
public
MapValue
getValue
()
{
value
.
of
(
offset
,
false
);
return
value
;
}
@Override
public
void
setSymbolTableResolver
(
RecordCursor
resolver
)
{
this
.
symbolTableResolver
=
resolver
;
}
private
long
getColumnOffset
(
int
columnIndex
)
{
return
offset
+
Unsafe
.
arrayGet
(
columnOffsets
,
columnIndex
);
}
...
...
core/src/main/java/com/questdb/cairo/map/CompactMapValue.java
浏览文件 @
11ac9b61
...
...
@@ -32,6 +32,7 @@ class CompactMapValue implements MapValue {
private
final
long
columnOffsets
[];
private
long
currentValueOffset
;
private
boolean
_new
;
private
CompactMapRecord
record
;
CompactMapValue
(
VirtualMemory
entries
,
long
[]
columnOffsets
)
{
this
.
entries
=
entries
;
...
...
@@ -138,11 +139,20 @@ class CompactMapValue implements MapValue {
putLong
(
columnIndex
,
value
);
}
@Override
public
void
setMapRecordHere
()
{
record
.
of
(
currentValueOffset
);
}
private
long
getValueColumnOffset
(
int
columnIndex
)
{
assert
currentValueOffset
!=
-
1
;
return
currentValueOffset
+
Unsafe
.
arrayGet
(
columnOffsets
,
columnIndex
);
}
void
linkRecord
(
CompactMapRecord
record
)
{
this
.
record
=
record
;
}
void
of
(
long
offset
,
boolean
_new
)
{
this
.
currentValueOffset
=
offset
;
this
.
_new
=
_new
;
...
...
core/src/main/java/com/questdb/cairo/map/FastMap.java
浏览文件 @
11ac9b61
...
...
@@ -386,22 +386,9 @@ public class FastMap implements Map {
}
checkSize
((
int
)
len
);
int
l
=
(
int
)
(
len
-
4
);
long
offset
=
4L
;
long
pos
=
0
;
Unsafe
.
getUnsafe
().
putInt
(
appendAddress
,
l
);
while
(
true
)
{
long
copied
=
value
.
copyTo
(
appendAddress
+
offset
,
pos
,
l
);
if
(
copied
==
l
)
{
break
;
}
l
-=
copied
;
pos
+=
copied
;
offset
+=
copied
;
}
value
.
copyTo
(
appendAddress
+
4L
,
0L
,
l
);
appendAddress
+=
len
;
writeOffset
();
}
...
...
core/src/main/java/com/questdb/cairo/map/FastMapRecord.java
浏览文件 @
11ac9b61
...
...
@@ -26,6 +26,7 @@ package com.questdb.cairo.map;
import
com.questdb.cairo.ColumnType
;
import
com.questdb.cairo.ColumnTypes
;
import
com.questdb.cairo.TableUtils
;
import
com.questdb.cairo.sql.RecordCursor
;
import
com.questdb.std.BinarySequence
;
import
com.questdb.std.Transient
;
import
com.questdb.std.Unsafe
;
...
...
@@ -44,6 +45,7 @@ final class FastMapRecord implements MapRecord {
private
long
address0
;
private
long
address1
;
private
long
address2
;
private
RecordCursor
symbolTableResolver
;
FastMapRecord
(
int
valueOffsets
[],
...
...
@@ -57,6 +59,7 @@ final class FastMapRecord implements MapRecord {
this
.
keyBlockOffset
=
keyBlockOffset
;
this
.
keyDataOffset
=
keyDataOffset
;
this
.
value
=
value
;
this
.
value
.
linkRecord
(
this
);
// provides feature to position this record at location of map value
int
n
=
keyTypes
.
getColumnCount
();
...
...
@@ -146,23 +149,6 @@ final class FastMapRecord implements MapRecord {
return
Unsafe
.
getUnsafe
().
getFloat
(
addressOfColumn
(
columnIndex
));
}
@Override
public
CharSequence
getStr
(
int
columnIndex
)
{
assert
columnIndex
<
csA
.
length
;
return
getStr0
(
columnIndex
,
Unsafe
.
arrayGet
(
csA
,
columnIndex
));
}
@Override
public
void
getStr
(
int
columnIndex
,
CharSink
sink
)
{
long
address
=
addressOfColumn
(
columnIndex
);
int
len
=
Unsafe
.
getUnsafe
().
getInt
(
address
);
address
+=
4
;
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
sink
.
put
(
Unsafe
.
getUnsafe
().
getChar
(
address
));
address
+=
2
;
}
}
@Override
public
int
getInt
(
int
columnIndex
)
{
return
Unsafe
.
getUnsafe
().
getInt
(
addressOfColumn
(
columnIndex
));
...
...
@@ -183,6 +169,23 @@ final class FastMapRecord implements MapRecord {
return
Unsafe
.
getUnsafe
().
getShort
(
addressOfColumn
(
columnIndex
));
}
@Override
public
CharSequence
getStr
(
int
columnIndex
)
{
assert
columnIndex
<
csA
.
length
;
return
getStr0
(
columnIndex
,
Unsafe
.
arrayGet
(
csA
,
columnIndex
));
}
@Override
public
void
getStr
(
int
columnIndex
,
CharSink
sink
)
{
long
address
=
addressOfColumn
(
columnIndex
);
int
len
=
Unsafe
.
getUnsafe
().
getInt
(
address
);
address
+=
4
;
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
sink
.
put
(
Unsafe
.
getUnsafe
().
getChar
(
address
));
address
+=
2
;
}
}
@Override
public
CharSequence
getStrB
(
int
columnIndex
)
{
return
getStr0
(
columnIndex
,
Unsafe
.
arrayGet
(
csB
,
columnIndex
));
...
...
@@ -193,11 +196,21 @@ final class FastMapRecord implements MapRecord {
return
Unsafe
.
getUnsafe
().
getInt
(
addressOfColumn
(
columnIndex
));
}
@Override
public
CharSequence
getSym
(
int
col
)
{
return
symbolTableResolver
.
getSymbolTable
(
col
).
value
(
getInt
(
col
));
}
@Override
public
MapValue
getValue
()
{
return
value
.
of
(
address0
,
false
);
}
@Override
public
void
setSymbolTableResolver
(
RecordCursor
resolver
)
{
this
.
symbolTableResolver
=
resolver
;
}
private
long
addressOfColumn
(
int
index
)
{
if
(
index
<
split
)
{
...
...
core/src/main/java/com/questdb/cairo/map/FastMapValue.java
浏览文件 @
11ac9b61
...
...
@@ -29,6 +29,7 @@ final class FastMapValue implements MapValue {
private
final
int
valueOffsets
[];
private
long
address
;
private
boolean
_new
;
private
FastMapRecord
record
;
// double-linked
public
FastMapValue
(
int
[]
valueOffsets
)
{
this
.
valueOffsets
=
valueOffsets
;
...
...
@@ -134,10 +135,19 @@ final class FastMapValue implements MapValue {
putLong
(
columnIndex
,
value
);
}
@Override
public
void
setMapRecordHere
()
{
this
.
record
.
of
(
address
);
}
private
long
address0
(
int
index
)
{
return
address
+
Unsafe
.
arrayGet
(
valueOffsets
,
index
);
}
void
linkRecord
(
FastMapRecord
record
)
{
this
.
record
=
record
;
}
FastMapValue
of
(
long
address
,
boolean
_new
)
{
this
.
address
=
address
;
this
.
_new
=
_new
;
...
...
core/src/main/java/com/questdb/cairo/map/MapRecord.java
浏览文件 @
11ac9b61
...
...
@@ -24,7 +24,10 @@
package
com.questdb.cairo.map
;
import
com.questdb.cairo.sql.Record
;
import
com.questdb.cairo.sql.RecordCursor
;
public
interface
MapRecord
extends
Record
{
MapValue
getValue
();
void
setSymbolTableResolver
(
RecordCursor
resolver
);
}
core/src/main/java/com/questdb/cairo/map/MapValue.java
浏览文件 @
11ac9b61
...
...
@@ -66,4 +66,6 @@ public interface MapValue extends Record {
void
putShort
(
int
index
,
short
value
);
void
putTimestamp
(
int
columnIndex
,
long
value
);
void
setMapRecordHere
();
}
core/src/main/java/com/questdb/cairo/map/RecordValueSinkFactory.java
浏览文件 @
11ac9b61
...
...
@@ -77,7 +77,7 @@ public class RecordValueSinkFactory {
int
index
=
columnFilter
.
getColumnIndex
(
i
);
asm
.
aload
(
2
);
asm
.
iconst
(
i
ndex
);
asm
.
iconst
(
i
);
asm
.
aload
(
1
);
asm
.
iconst
(
index
);
...
...
core/src/main/java/com/questdb/griffin/SqlCodeGenerator.java
浏览文件 @
11ac9b61
...
...
@@ -24,6 +24,7 @@
package
com.questdb.griffin
;
import
com.questdb.cairo.*
;
import
com.questdb.cairo.map.RecordValueSinkFactory
;
import
com.questdb.cairo.sql.*
;
import
com.questdb.griffin.engine.EmptyTableRecordCursorFactory
;
import
com.questdb.griffin.engine.functions.columns.SymbolColumn
;
...
...
@@ -52,6 +53,7 @@ public class SqlCodeGenerator {
private
final
ArrayColumnTypes
keyTypes
=
new
ArrayColumnTypes
();
private
final
ArrayColumnTypes
valueTypes
=
new
ArrayColumnTypes
();
private
final
EntityColumnFilter
entityColumnFilter
=
new
EntityColumnFilter
();
private
final
SymbolAsIntTypes
symbolAsIntTypes
=
new
SymbolAsIntTypes
();
private
boolean
fullFatJoins
=
false
;
public
SqlCodeGenerator
(
CairoEngine
engine
,
CairoConfiguration
configuration
,
FunctionParser
functionParser
)
{
...
...
@@ -73,89 +75,66 @@ public class SqlCodeGenerator {
private
RecordCursorFactory
createAsOfJoin
(
RecordMetadata
metadata
,
RecordCursorFactory
master
,
RecordSink
masterKeySink
,
RecordCursorFactory
slave
,
int
joinType
RecordSink
slaveKeySink
,
int
columnSplit
)
{
/*
* JoinContext provides the following information:
* a/bIndexes - index of model where join column is coming from
* a/bNames - name of columns in respective models, these column names are not prefixed with table aliases
* a/bNodes - the original column references, that can include table alias. Sometimes it doesn't when column name is unambiguous
*
* a/b are "inverted" in that "a" for slave and "b" for master
*
* The issue is when we use model indexes and vanilla column names they would only work on single-table
* record cursor but original names with prefixed columns will only work with JoinRecordMetadata
*/
final
RecordMetadata
masterMetadata
=
master
.
getMetadata
();
final
RecordMetadata
slaveMetadata
=
slave
.
getMetadata
();
final
RecordSink
masterKeySink
=
RecordSinkFactory
.
getInstance
(
asm
,
masterMetadata
,
listColumnFilterB
,
true
);
final
RecordSink
slaveKeySink
=
RecordSinkFactory
.
getInstance
(
asm
,
slaveMetadata
,
listColumnFilterA
,
true
);
valueTypes
.
reset
();
valueTypes
.
add
(
ColumnType
.
LONG
);
valueTypes
.
add
(
ColumnType
.
LONG
);
if
(
slave
.
isRandomAccessCursor
()
&&
!
fullFatJoins
)
{
return
new
AsOfJoinLightRecordCursorFactory
(
configuration
,
metadata
,
master
,
slave
,
keyTypes
,
valueTypes
,
masterKeySink
,
slaveKeySink
,
masterMetadata
.
getColumnCount
()
);
}
return
new
AsOfJoinLightRecordCursorFactory
(
configuration
,
metadata
,
master
,
slave
,
keyTypes
,
valueTypes
,
masterKeySink
,
slaveKeySink
,
columnSplit
);
}
entityColumnFilter
.
of
(
slaveMetadata
.
getColumnCount
());
RecordSink
slaveSink
=
RecordSinkFactory
.
getInstance
(
@NotNull
private
RecordCursorFactory
createFullFatAsOfJoin
(
RecordCursorFactory
master
,
RecordMetadata
masterMetadata
,
CharSequence
masterAlias
,
RecordCursorFactory
slave
,
RecordMetadata
slaveMetadata
,
CharSequence
slaveAlias
)
{
RecordSink
masterSink
=
RecordSinkFactory
.
getInstance
(
asm
,
slave
Metadata
,
entityColumnFilter
,
fals
e
master
Metadata
,
listColumnFilterB
,
tru
e
);
if
(
joinType
==
QueryModel
.
JOIN_INNER
)
{
return
new
HashJoinRecordCursorFactory
(
configuration
,
metadata
,
master
,
slave
,
keyTypes
,
valueTypes
,
masterKeySink
,
slaveKeySink
,
slaveSink
,
masterMetadata
.
getColumnCount
()
);
}
JoinRecordMetadata
metadata
=
createJoinMetadata
(
masterAlias
,
masterMetadata
,
slaveAlias
,
slaveMetadata
);
return
new
HashOuterJoinRecordCursorFactory
(
entityColumnFilter
.
of
(
slaveMetadata
.
getColumnCount
());
master
=
new
AsOfJoinRecordCursorFactory
(
configuration
,
metadata
,
master
,
slave
,
keyTypes
,
valueTypes
,
masterKeySink
,
slaveKeySink
,
slaveSink
,
masterMetadata
.
getColumnCount
()
slaveMetadata
,
symbolAsIntTypes
.
of
(
slaveMetadata
),
masterSink
,
RecordSinkFactory
.
getInstance
(
asm
,
slaveMetadata
,
listColumnFilterA
,
true
),
masterMetadata
.
getColumnCount
(),
RecordValueSinkFactory
.
getInstance
(
asm
,
slaveMetadata
,
entityColumnFilter
)
);
return
master
;
}
private
RecordCursorFactory
createHashJoin
(
...
...
@@ -260,6 +239,23 @@ public class SqlCodeGenerator {
);
}
@NotNull
private
JoinRecordMetadata
createJoinMetadata
(
CharSequence
masterAlias
,
RecordMetadata
masterMetadata
,
CharSequence
slaveAlias
,
RecordMetadata
slaveMetadata
)
{
JoinRecordMetadata
metadata
;
metadata
=
new
JoinRecordMetadata
(
configuration
,
masterMetadata
.
getColumnCount
()
+
slaveMetadata
.
getColumnCount
()
);
metadata
.
copyColumnMetadataFrom
(
masterAlias
,
masterMetadata
);
metadata
.
copyColumnMetadataFrom
(
slaveAlias
,
slaveMetadata
);
if
(
masterMetadata
.
getTimestampIndex
()
!=
-
1
)
{
metadata
.
setTimestampIndex
(
masterMetadata
.
getTimestampIndex
());
}
return
metadata
;
}
RecordCursorFactory
generate
(
QueryModel
model
,
SqlExecutionContext
executionContext
)
throws
SqlException
{
clearState
();
return
generateQuery
(
model
,
executionContext
,
true
);
...
...
@@ -280,7 +276,6 @@ public class SqlCodeGenerator {
IntList
ordered
=
model
.
getOrderedJoinModels
();
RecordCursorFactory
master
=
null
;
CharSequence
masterAlias
=
null
;
JoinRecordMetadata
metadata
=
null
;
try
{
int
n
=
ordered
.
size
();
...
...
@@ -317,41 +312,67 @@ public class SqlCodeGenerator {
}
}
metadata
=
new
JoinRecordMetadata
(
configuration
,
masterMetadata
.
getColumnCount
()
+
slaveMetadata
.
getColumnCount
()
);
metadata
.
copyColumnMetadataFrom
(
masterAlias
,
masterMetadata
);
metadata
.
copyColumnMetadataFrom
(
slaveModel
.
getName
(),
slaveMetadata
);
if
(
masterMetadata
.
getTimestampIndex
()
!=
-
1
)
{
metadata
.
setTimestampIndex
(
masterMetadata
.
getTimestampIndex
());
}
// todo: full-fat asof join implementation
switch
(
joinType
)
{
case
QueryModel
.
JOIN_CROSS
:
return
new
CrossJoinRecordCursorFactory
(
metadata
,
createJoinMetadata
(
masterAlias
,
masterMetadata
,
slaveModel
.
getName
(),
slaveMetadata
)
,
master
,
slave
,
masterMetadata
.
getColumnCount
()
);
case
QueryModel
.
JOIN_ASOF
:
processJoinContext
(
index
==
1
,
slaveModel
.
getContext
(),
masterMetadata
,
slaveMetadata
);
master
=
createAsOfJoin
(
metadata
,
master
,
slave
,
joinType
);
if
(
slave
.
isRandomAccessCursor
()
&&
!
fullFatJoins
)
{
master
=
createAsOfJoin
(
createJoinMetadata
(
masterAlias
,
masterMetadata
,
slaveModel
.
getName
(),
slaveMetadata
),
master
,
RecordSinkFactory
.
getInstance
(
asm
,
masterMetadata
,
listColumnFilterB
,
true
),
slave
,
RecordSinkFactory
.
getInstance
(
asm
,
slaveMetadata
,
listColumnFilterA
,
true
),
masterMetadata
.
getColumnCount
()
);
}
else
{
// todo: this is a limiting test
// map doesn't support variable length types in map value, which is ok
// when we join tables on strings - technically string is the key
// and we do not need to store it in value, but we will still reject
//
// never mind, this is a stop-gap measure until I understand the problem
// fully
for
(
int
k
=
0
,
m
=
slaveMetadata
.
getColumnCount
();
k
<
m
;
k
++)
{
int
type
=
slaveMetadata
.
getColumnType
(
k
);
if
(
type
==
ColumnType
.
STRING
||
type
==
ColumnType
.
BINARY
)
{
throw
SqlException
.
position
(
slaveModel
.
getJoinKeywordPosition
()).
put
(
"right side column '"
)
.
put
(
slaveMetadata
.
getColumnName
(
k
)).
put
(
"' is of unsupported type"
);
}
}
master
=
createFullFatAsOfJoin
(
master
,
masterMetadata
,
masterAlias
,
slave
,
slaveMetadata
,
slaveModel
.
getName
()
);
}
masterAlias
=
null
;
break
;
default
:
processJoinContext
(
index
==
1
,
slaveModel
.
getContext
(),
masterMetadata
,
slaveMetadata
);
master
=
createHashJoin
(
metadata
,
createJoinMetadata
(
masterAlias
,
masterMetadata
,
slaveModel
.
getName
(),
slaveMetadata
)
,
master
,
slave
,
joinType
...
...
@@ -378,7 +399,7 @@ public class SqlCodeGenerator {
// this would have been JoinRecordMetadata, which is new instance anyway
// we have to make sure that this metadata is safely transitioned
// to empty cursor factory
metadata
=
(
JoinRecordMetadata
)
master
.
getMetadata
();
JoinRecordMetadata
metadata
=
(
JoinRecordMetadata
)
master
.
getMetadata
();
metadata
.
incrementRefCount
();
RecordCursorFactory
factory
=
new
EmptyTableRecordCursorFactory
(
metadata
);
Misc
.
free
(
master
);
...
...
@@ -387,7 +408,6 @@ public class SqlCodeGenerator {
}
return
master
;
}
catch
(
CairoException
|
SqlException
e
)
{
Misc
.
free
(
metadata
);
Misc
.
free
(
master
);
throw
e
;
}
...
...
core/src/main/java/com/questdb/griffin/engine/join/AsOfJoinRecordCursorFactory.java
0 → 100644
浏览文件 @
11ac9b61
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package
com.questdb.griffin.engine.join
;
import
com.questdb.cairo.AbstractRecordCursorFactory
;
import
com.questdb.cairo.CairoConfiguration
;
import
com.questdb.cairo.ColumnTypes
;
import
com.questdb.cairo.RecordSink
;
import
com.questdb.cairo.map.*
;
import
com.questdb.cairo.sql.*
;
import
com.questdb.griffin.engine.functions.bind.BindVariableService
;
import
com.questdb.std.Misc
;
import
com.questdb.std.Transient
;
public
class
AsOfJoinRecordCursorFactory
extends
AbstractRecordCursorFactory
{
private
final
Map
joinKeyMap
;
private
final
RecordCursorFactory
masterFactory
;
private
final
RecordCursorFactory
slaveFactory
;
private
final
RecordSink
masterKeySink
;
private
final
RecordSink
slaveKeySink
;
private
final
AsOfJoinRecordCursor
cursor
;
public
AsOfJoinRecordCursorFactory
(
CairoConfiguration
configuration
,
RecordMetadata
metadata
,
RecordCursorFactory
masterFactory
,
RecordCursorFactory
slaveFactory
,
@Transient
ColumnTypes
joinColumnTypes
,
@Transient
ColumnTypes
slaveColumnTypes
,
@Transient
ColumnTypes
mapValueTypes
,
RecordSink
masterKeySink
,
RecordSink
slaveKeySink
,
int
columnSplit
,
RecordValueSink
slaveValueSink
)
{
super
(
metadata
);
this
.
masterFactory
=
masterFactory
;
this
.
slaveFactory
=
slaveFactory
;
joinKeyMap
=
MapFactory
.
createMap
(
configuration
,
joinColumnTypes
,
mapValueTypes
);
this
.
masterKeySink
=
masterKeySink
;
this
.
slaveKeySink
=
slaveKeySink
;
this
.
cursor
=
new
AsOfJoinRecordCursor
(
columnSplit
,
joinKeyMap
,
NullRecordFactory
.
getInstance
(
slaveColumnTypes
),
masterFactory
.
getMetadata
().
getTimestampIndex
(),
slaveFactory
.
getMetadata
().
getTimestampIndex
(),
slaveValueSink
);
}
@Override
public
void
close
()
{
joinKeyMap
.
close
();
((
JoinRecordMetadata
)
getMetadata
()).
close
();
masterFactory
.
close
();
slaveFactory
.
close
();
}
@Override
public
RecordCursor
getCursor
(
BindVariableService
bindVariableService
)
{
cursor
.
of
(
masterFactory
.
getCursor
(
bindVariableService
),
slaveFactory
.
getCursor
(
bindVariableService
)
);
return
cursor
;
}
@Override
public
boolean
isRandomAccessCursor
()
{
return
false
;
}
private
class
AsOfJoinRecordCursor
implements
NoRandomAccessRecordCursor
{
private
final
OuterJoinRecord
record
;
private
final
Map
joinKeyMap
;
private
final
int
columnSplit
;
private
final
int
masterTimestampIndex
;
private
final
int
slaveTimestampIndex
;
private
final
RecordValueSink
valueSink
;
private
RecordCursor
masterCursor
;
private
RecordCursor
slaveCursor
;
private
Record
masterRecord
;
private
Record
slaveRecord
;
private
long
slaveTimestamp
=
Long
.
MIN_VALUE
;
private
boolean
danglingSlaveRecord
=
false
;
public
AsOfJoinRecordCursor
(
int
columnSplit
,
Map
joinKeyMap
,
Record
nullRecord
,
int
masterTimestampIndex
,
int
slaveTimestampIndex
,
RecordValueSink
valueSink
)
{
this
.
record
=
new
OuterJoinRecord
(
columnSplit
,
nullRecord
);
this
.
joinKeyMap
=
joinKeyMap
;
this
.
columnSplit
=
columnSplit
;
this
.
masterTimestampIndex
=
masterTimestampIndex
;
this
.
slaveTimestampIndex
=
slaveTimestampIndex
;
this
.
valueSink
=
valueSink
;
}
@Override
public
void
close
()
{
masterCursor
=
Misc
.
free
(
masterCursor
);
slaveCursor
=
Misc
.
free
(
slaveCursor
);
}
@Override
public
Record
getRecord
()
{
return
record
;
}
@Override
public
SymbolTable
getSymbolTable
(
int
columnIndex
)
{
if
(
columnIndex
<
columnSplit
)
{
return
masterCursor
.
getSymbolTable
(
columnIndex
);
}
return
slaveCursor
.
getSymbolTable
(
columnIndex
-
columnSplit
);
}
@Override
public
boolean
hasNext
()
{
if
(
masterCursor
.
hasNext
())
{
final
long
masterTimestamp
=
masterRecord
.
getTimestamp
(
masterTimestampIndex
);
MapKey
key
;
MapValue
value
;
long
slaveTimestamp
=
this
.
slaveTimestamp
;
if
(
slaveTimestamp
<=
masterTimestamp
)
{
if
(
danglingSlaveRecord
)
{
key
=
joinKeyMap
.
withKey
();
key
.
put
(
slaveRecord
,
slaveKeySink
);
value
=
key
.
createValue
();
valueSink
.
copy
(
slaveRecord
,
value
);
danglingSlaveRecord
=
false
;
}
while
(
slaveCursor
.
hasNext
())
{
slaveTimestamp
=
slaveRecord
.
getTimestamp
(
slaveTimestampIndex
);
if
(
slaveTimestamp
<=
masterTimestamp
)
{
key
=
joinKeyMap
.
withKey
();
key
.
put
(
slaveRecord
,
slaveKeySink
);
value
=
key
.
createValue
();
valueSink
.
copy
(
slaveRecord
,
value
);
}
else
{
danglingSlaveRecord
=
true
;
break
;
}
}
this
.
slaveTimestamp
=
slaveTimestamp
;
}
key
=
joinKeyMap
.
withKey
();
key
.
put
(
masterRecord
,
masterKeySink
);
value
=
key
.
findValue
();
if
(
value
!=
null
)
{
value
.
setMapRecordHere
();
record
.
hasSlave
(
true
);
}
else
{
record
.
hasSlave
(
false
);
}
return
true
;
}
return
false
;
}
@Override
public
void
toTop
()
{
joinKeyMap
.
clear
();
slaveTimestamp
=
Long
.
MIN_VALUE
;
danglingSlaveRecord
=
false
;
masterCursor
.
toTop
();
slaveCursor
.
toTop
();
}
void
of
(
RecordCursor
masterCursor
,
RecordCursor
slaveCursor
)
{
joinKeyMap
.
clear
();
slaveTimestamp
=
Long
.
MIN_VALUE
;
danglingSlaveRecord
=
false
;
this
.
masterCursor
=
masterCursor
;
this
.
slaveCursor
=
slaveCursor
;
this
.
masterRecord
=
masterCursor
.
getRecord
();
this
.
slaveRecord
=
slaveCursor
.
getRecord
();
MapRecord
mapRecord
=
joinKeyMap
.
getRecord
();
mapRecord
.
setSymbolTableResolver
(
slaveCursor
);
record
.
of
(
masterRecord
,
mapRecord
);
}
}
}
core/src/main/java/com/questdb/std/BinarySequence.java
浏览文件 @
11ac9b61
...
...
@@ -27,13 +27,12 @@ public interface BinarySequence {
byte
byteAt
(
long
index
);
default
long
copyTo
(
long
address
,
long
start
,
long
length
)
{
default
void
copyTo
(
long
address
,
long
start
,
long
length
)
{
long
size
=
length
();
long
n
=
size
<
length
?
size
:
length
;
for
(
long
l
=
0
;
l
<
n
;
l
++)
{
Unsafe
.
getUnsafe
().
putByte
(
address
+
l
,
byteAt
(
start
+
l
));
}
return
n
;
}
long
length
();
...
...
core/src/test/java/com/questdb/cairo/map/CompactMapTest.java
浏览文件 @
11ac9b61
...
...
@@ -527,7 +527,7 @@ public class CompactMapTest extends AbstractCairoTest {
try
(
CompactMap
map
=
new
CompactMap
(
1024
*
1024
,
new
SymbolAsIntTypes
(
reader
.
getMetadata
()),
new
SymbolAsIntTypes
(
).
of
(
reader
.
getMetadata
()),
new
ArrayColumnTypes
().
reset
()
.
add
(
ColumnType
.
LONG
)
.
add
(
ColumnType
.
INT
)
...
...
core/src/test/java/com/questdb/cairo/map/FastMapTest.java
浏览文件 @
11ac9b61
...
...
@@ -612,7 +612,7 @@ public class FastMapTest extends AbstractCairoTest {
try
(
FastMap
map
=
new
FastMap
(
Numbers
.
SIZE_1MB
,
new
SymbolAsIntTypes
(
reader
.
getMetadata
()),
new
SymbolAsIntTypes
(
).
of
(
reader
.
getMetadata
()),
new
ArrayColumnTypes
().
reset
()
.
add
(
ColumnType
.
LONG
)
.
add
(
ColumnType
.
INT
)
...
...
core/src/test/java/com/questdb/cairo/map/RecordValueSinkFactoryTest.java
浏览文件 @
11ac9b61
...
...
@@ -74,7 +74,7 @@ public class RecordValueSinkFactoryTest extends AbstractCairoTest {
}
try
(
TableReader
reader
=
new
TableReader
(
configuration
,
"all"
))
{
final
SymbolAsIntTypes
valueTypes
=
new
SymbolAsIntTypes
(
reader
.
getMetadata
());
final
SymbolAsIntTypes
valueTypes
=
new
SymbolAsIntTypes
(
).
of
(
reader
.
getMetadata
());
try
(
final
Map
map
=
new
FastMap
(
Numbers
.
SIZE_1MB
,
keyTypes
,
valueTypes
,
N
,
0.5
))
{
EntityColumnFilter
columnFilter
=
new
EntityColumnFilter
();
...
...
@@ -117,4 +117,95 @@ public class RecordValueSinkFactoryTest extends AbstractCairoTest {
}
}
}
@Test
public
void
testSubset
()
{
SingleColumnType
keyTypes
=
new
SingleColumnType
(
ColumnType
.
INT
);
try
(
TableModel
model
=
new
TableModel
(
configuration
,
"all"
,
PartitionBy
.
NONE
)
.
col
(
"int"
,
ColumnType
.
INT
)
.
col
(
"short"
,
ColumnType
.
SHORT
)
.
col
(
"byte"
,
ColumnType
.
BYTE
)
.
col
(
"double"
,
ColumnType
.
DOUBLE
)
.
col
(
"float"
,
ColumnType
.
FLOAT
)
.
col
(
"long"
,
ColumnType
.
LONG
)
.
col
(
"sym"
,
ColumnType
.
SYMBOL
).
symbolCapacity
(
64
)
.
col
(
"bool"
,
ColumnType
.
BOOLEAN
)
.
col
(
"date"
,
ColumnType
.
DATE
)
.
col
(
"ts"
,
ColumnType
.
TIMESTAMP
)
)
{
CairoTestUtils
.
create
(
model
);
}
final
int
N
=
1024
;
final
Rnd
rnd
=
new
Rnd
();
try
(
TableWriter
writer
=
new
TableWriter
(
configuration
,
"all"
))
{
for
(
int
i
=
0
;
i
<
N
;
i
++)
{
TableWriter
.
Row
row
=
writer
.
newRow
(
0
);
row
.
putInt
(
0
,
rnd
.
nextInt
());
row
.
putShort
(
1
,
rnd
.
nextShort
());
row
.
putByte
(
2
,
rnd
.
nextByte
());
row
.
putDouble
(
3
,
rnd
.
nextDouble2
());
row
.
putFloat
(
4
,
rnd
.
nextFloat2
());
row
.
putLong
(
5
,
rnd
.
nextLong
());
row
.
putSym
(
6
,
rnd
.
nextChars
(
10
));
row
.
putBool
(
7
,
rnd
.
nextBoolean
());
row
.
putDate
(
8
,
rnd
.
nextLong
());
row
.
putTimestamp
(
9
,
rnd
.
nextLong
());
row
.
append
();
}
writer
.
commit
();
}
try
(
TableReader
reader
=
new
TableReader
(
configuration
,
"all"
))
{
ArrayColumnTypes
valueTypes
=
new
ArrayColumnTypes
();
valueTypes
.
add
(
ColumnType
.
BOOLEAN
);
valueTypes
.
add
(
ColumnType
.
TIMESTAMP
);
valueTypes
.
add
(
ColumnType
.
INT
);
try
(
final
Map
map
=
new
FastMap
(
Numbers
.
SIZE_1MB
,
keyTypes
,
valueTypes
,
N
,
0.5
))
{
ListColumnFilter
columnFilter
=
new
ListColumnFilter
();
columnFilter
.
add
(
7
);
columnFilter
.
add
(
9
);
columnFilter
.
add
(
6
);
RecordValueSink
sink
=
RecordValueSinkFactory
.
getInstance
(
new
BytecodeAssembler
(),
reader
.
getMetadata
(),
columnFilter
);
RecordCursor
cursor
=
reader
.
getCursor
();
final
Record
record
=
cursor
.
getRecord
();
int
index
=
0
;
while
(
cursor
.
hasNext
())
{
MapKey
key
=
map
.
withKey
();
key
.
putInt
(
index
++);
MapValue
value
=
key
.
createValue
();
sink
.
copy
(
record
,
value
);
}
Assert
.
assertEquals
(
N
,
index
);
rnd
.
reset
();
SymbolTable
symbolTable
=
reader
.
getSymbolMapReader
(
6
);
for
(
int
i
=
0
;
i
<
N
;
i
++)
{
MapKey
key
=
map
.
withKey
();
key
.
putInt
(
i
);
MapValue
value
=
key
.
findValue
();
Assert
.
assertNotNull
(
value
);
rnd
.
nextInt
();
// 0
rnd
.
nextShort
();
// 1
rnd
.
nextByte
();
// 2
rnd
.
nextDouble2
();
// 3
rnd
.
nextFloat2
();
// 4
rnd
.
nextLong
();
// 5
Assert
.
assertEquals
(
symbolTable
.
getQuick
(
rnd
.
nextChars
(
10
)),
value
.
getInt
(
2
));
// 6
Assert
.
assertEquals
(
rnd
.
nextBoolean
(),
value
.
getBool
(
0
));
// 7
rnd
.
nextLong
();
// 8
Assert
.
assertEquals
(
rnd
.
nextLong
(),
value
.
getTimestamp
(
1
));
// 9
}
}
}
}
}
\ No newline at end of file
core/src/test/java/com/questdb/griffin/JoinTest.java
浏览文件 @
11ac9b61
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录