Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
b79270fd
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b79270fd
编写于
12月 26, 2019
作者:
N
Nikita Mikhaylov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
remove callbacks in update
上级
3af3ce2c
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
220 addition
and
193 deletion
+220
-193
dbms/src/Dictionaries/CacheDictionary.cpp
dbms/src/Dictionaries/CacheDictionary.cpp
+141
-22
dbms/src/Dictionaries/CacheDictionary.h
dbms/src/Dictionaries/CacheDictionary.h
+7
-10
dbms/src/Dictionaries/CacheDictionary.inc.h
dbms/src/Dictionaries/CacheDictionary.inc.h
+72
-161
未找到文件。
dbms/src/Dictionaries/CacheDictionary.cpp
浏览文件 @
b79270fd
...
...
@@ -361,10 +361,7 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
std
::
back_inserter
(
required_expired_ids
),
[](
auto
&
pair
)
{
return
pair
.
first
;
});
/// Callbacks are empty because we don't want to receive them after an unknown period of time.
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_expired_ids
,
[
&
](
const
auto
,
const
auto
)
{},
[
&
](
const
auto
,
const
auto
)
{}
);
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_expired_ids
);
tryPushToUpdateQueueOrThrow
(
update_unit_ptr
);
/// Update is async - no need to wait.
...
...
@@ -385,26 +382,27 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
std
::
begin
(
cache_expired_ids
),
std
::
end
(
cache_expired_ids
),
std
::
back_inserter
(
required_ids
),
[](
auto
&
pair
)
{
return
pair
.
first
;
});
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
std
::
move
(
required_ids
),
[
&
](
const
Key
id
,
const
size_t
)
{
for
(
const
auto
row
:
cache_not_foun
d_ids
[
id
])
out
[
row
]
=
true
;
for
(
const
auto
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
true
;
},
[
&
](
const
Key
id
,
const
size_t
)
{
for
(
const
auto
row
:
cache_not_found_ids
[
id
])
out
[
row
]
=
false
;
for
(
const
auto
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
true
;
}
);
auto
on_cell_updated
=
[
&
]
(
const
Key
id
,
const
size_t
)
{
for
(
const
auto
row
:
cache_not_found_ids
[
id
]
)
out
[
row
]
=
true
;
for
(
const
auto
row
:
cache_expire
d_ids
[
id
])
out
[
row
]
=
true
;
};
auto
on_id_not_found
=
[
&
]
(
const
Key
id
,
const
size_t
)
{
for
(
const
auto
row
:
cache_not_found_ids
[
id
])
out
[
row
]
=
false
;
for
(
const
auto
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
true
;
}
;
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_ids
);
tryPushToUpdateQueueOrThrow
(
update_unit_ptr
);
waitForCurrentUpdateFinish
(
update_unit_ptr
);
prepareAnswer
(
update_unit_ptr
,
on_cell_updated
,
on_id_not_found
);
}
...
...
@@ -714,7 +712,15 @@ void CacheDictionary::updateThreadFunction()
update_queue
.
pop
(
unit_ptr
);
try
{
update
(
unit_ptr
->
requested_ids
,
unit_ptr
->
on_cell_updated
,
unit_ptr
->
on_id_not_found
);
auto
found_ids_mask_ptr
=
std
::
make_shared
<
std
::
unordered_map
<
Key
,
UInt8
>>
(
unit_ptr
->
requested_ids
.
size
());
/// Copy shared_ptr to let this map be alive until other thread finish his stuff
unit_ptr
->
found_ids_mask_ptr
=
found_ids_mask_ptr
;
for
(
const
auto
id
:
unit_ptr
->
requested_ids
)
found_ids_mask_ptr
->
insert
({
id
,
0
});
update
(
unit_ptr
->
requested_ids
,
*
found_ids_mask_ptr
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
update_mutex
);
unit_ptr
->
is_done
=
true
;
is_update_finished
.
notify_all
();
...
...
@@ -745,4 +751,117 @@ void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr)
std
::
to_string
(
update_queue
.
size
()),
ErrorCodes
::
CACHE_DICTIONARY_UPDATE_FAIL
);
}
void
CacheDictionary
::
update
(
const
std
::
vector
<
Key
>
&
requested_ids
,
std
::
unordered_map
<
Key
,
UInt8
>
&
remaining_ids
)
const
{
CurrentMetrics
::
Increment
metric_increment
{
CurrentMetrics
::
DictCacheRequests
};
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheKeysRequested
,
requested_ids
.
size
());
const
auto
now
=
std
::
chrono
::
system_clock
::
now
();
if
(
now
>
backoff_end_time
)
{
try
{
if
(
error_count
)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr
=
source_ptr
->
clone
();
}
Stopwatch
watch
;
/// Go to external storage. Might be very slow and blocking.
auto
stream
=
source_ptr
->
loadIds
(
requested_ids
);
const
ProfilingScopedWriteRWLock
write_lock
{
rw_lock
,
ProfileEvents
::
DictCacheLockWriteNs
};
stream
->
readPrefix
();
while
(
const
auto
block
=
stream
->
read
())
{
const
auto
id_column
=
typeid_cast
<
const
ColumnUInt64
*>
(
block
.
safeGetByPosition
(
0
).
column
.
get
());
if
(
!
id_column
)
throw
Exception
{
name
+
": id column has type different from UInt64."
,
ErrorCodes
::
TYPE_MISMATCH
};
const
auto
&
ids
=
id_column
->
getData
();
/// cache column pointers
const
auto
column_ptrs
=
ext
::
map
<
std
::
vector
>
(
ext
::
range
(
0
,
attributes
.
size
()),
[
&
block
](
size_t
i
)
{
return
block
.
safeGetByPosition
(
i
+
1
).
column
.
get
();
});
for
(
const
auto
i
:
ext
::
range
(
0
,
ids
.
size
()))
{
const
auto
id
=
ids
[
i
];
const
auto
find_result
=
findCellIdx
(
id
,
now
);
const
auto
&
cell_idx
=
find_result
.
cell_idx
;
auto
&
cell
=
cells
[
cell_idx
];
for
(
const
auto
attribute_idx
:
ext
::
range
(
0
,
attributes
.
size
()))
{
const
auto
&
attribute_column
=
*
column_ptrs
[
attribute_idx
];
auto
&
attribute
=
attributes
[
attribute_idx
];
setAttributeValue
(
attribute
,
cell_idx
,
attribute_column
[
i
]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if
(
cell
.
id
==
0
&&
cell_idx
!=
zero_cell_idx
)
element_count
.
fetch_add
(
1
,
std
::
memory_order_relaxed
);
cell
.
id
=
id
;
if
(
dict_lifetime
.
min_sec
!=
0
&&
dict_lifetime
.
max_sec
!=
0
)
{
std
::
uniform_int_distribution
<
UInt64
>
distribution
{
dict_lifetime
.
min_sec
,
dict_lifetime
.
max_sec
};
cell
.
setExpiresAt
(
now
+
std
::
chrono
::
seconds
{
distribution
(
rnd_engine
)});
}
else
cell
.
setExpiresAt
(
std
::
chrono
::
time_point
<
std
::
chrono
::
system_clock
>::
max
());
/// mark corresponding id as found
remaining_ids
[
id
]
=
1
;
}
}
stream
->
readSuffix
();
error_count
=
0
;
last_exception
=
std
::
exception_ptr
{};
backoff_end_time
=
std
::
chrono
::
system_clock
::
time_point
{};
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheRequestTimeNs
,
watch
.
elapsed
());
}
catch
(...)
{
++
error_count
;
last_exception
=
std
::
current_exception
();
backoff_end_time
=
now
+
std
::
chrono
::
seconds
(
calculateDurationWithBackoff
(
rnd_engine
,
error_count
));
tryLogException
(
last_exception
,
log
,
"Could not update cache dictionary '"
+
getName
()
+
"', next update is scheduled at "
+
ext
::
to_string
(
backoff_end_time
));
}
}
size_t
not_found_num
=
0
,
found_num
=
0
;
/// TODO: Replace! without checking the whole map with O(n) complexity
/// Check which ids have not been found and require setting null_value
for
(
const
auto
&
id_found_pair
:
remaining_ids
)
{
if
(
id_found_pair
.
second
)
{
++
found_num
;
continue
;
}
++
not_found_num
;
}
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheKeysRequestedMiss
,
not_found_num
);
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheKeysRequestedFound
,
found_num
);
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheRequests
);
}
}
dbms/src/Dictionaries/CacheDictionary.h
浏览文件 @
b79270fd
...
...
@@ -254,8 +254,7 @@ private:
template
<
typename
DefaultGetter
>
void
getItemsString
(
Attribute
&
attribute
,
const
PaddedPODArray
<
Key
>
&
ids
,
ColumnString
*
out
,
DefaultGetter
&&
get_default
)
const
;
template
<
typename
PresentIdHandler
,
typename
AbsentIdHandler
>
void
update
(
const
std
::
vector
<
Key
>
&
requested_ids
,
PresentIdHandler
&&
on_cell_updated
,
AbsentIdHandler
&&
on_id_not_found
)
const
;
void
update
(
const
std
::
vector
<
Key
>
&
requested_ids
,
std
::
unordered_map
<
Key
,
UInt8
>
&
found_ids_mask_ptr
)
const
;
PaddedPODArray
<
Key
>
getCachedIds
()
const
;
...
...
@@ -325,18 +324,13 @@ private:
struct
UpdateUnit
{
UpdateUnit
(
std
::
vector
<
Key
>
requested_ids_
,
std
::
function
<
void
(
const
Key
,
const
size_t
)
>
on_cell_updated_
,
std
::
function
<
void
(
const
Key
,
const
size_t
)
>
on_id_not_found_
)
:
requested_ids
(
std
::
move
(
requested_ids_
)),
on_cell_updated
(
std
::
move
(
on_cell_updated_
)),
on_id_not_found
(
std
::
move
(
on_id_not_found_
))
{}
std
::
vector
<
Key
>
requested_ids_
)
:
requested_ids
(
std
::
move
(
requested_ids_
))
{}
std
::
shared_ptr
<
std
::
unordered_map
<
Key
,
UInt8
>>
found_ids_mask_ptr
{
nullptr
};
std
::
atomic
<
bool
>
is_done
{
false
};
std
::
exception_ptr
current_exception
{
nullptr
};
std
::
vector
<
Key
>
requested_ids
;
std
::
function
<
void
(
const
Key
,
const
size_t
)
>
on_cell_updated
;
std
::
function
<
void
(
const
Key
,
const
size_t
)
>
on_id_not_found
;
};
using
UpdateUnitPtr
=
std
::
shared_ptr
<
UpdateUnit
>
;
...
...
@@ -353,6 +347,9 @@ private:
mutable
std
::
condition_variable
is_update_finished
;
std
::
atomic
<
bool
>
finished
{
false
};
template
<
typename
PresentIdHandler
,
typename
AbsentIdHandler
>
void
prepareAnswer
(
UpdateUnitPtr
,
PresentIdHandler
&&
,
AbsentIdHandler
&&
)
const
;
};
}
dbms/src/Dictionaries/CacheDictionary.inc.h
浏览文件 @
b79270fd
...
...
@@ -107,7 +107,7 @@ void CacheDictionary::getItemsNumberImpl(
if
(
cache_expired_ids
.
empty
())
return
;
/// Update async only if allow_read_expired_keys_is_enabled
/// Update async only if allow_read_expired_keys_is_enabled
add condvar usage and better code
if
(
allow_read_expired_keys
)
{
std
::
vector
<
Key
>
required_expired_ids
;
...
...
@@ -116,10 +116,7 @@ void CacheDictionary::getItemsNumberImpl(
[](
auto
&
pair
)
{
return
pair
.
first
;
});
/// request new values
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_expired_ids
,
[
&
](
const
auto
,
const
auto
)
{},
[
&
](
const
auto
,
const
auto
)
{});
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_expired_ids
);
tryPushToUpdateQueueOrThrow
(
update_unit_ptr
);
...
...
@@ -142,34 +139,37 @@ void CacheDictionary::getItemsNumberImpl(
std
::
back_inserter
(
required_ids
),
[](
auto
&
pair
)
{
return
pair
.
first
;
});
/// Request new values
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_ids
,
[
&
](
const
auto
id
,
const
auto
cell_idx
)
{
const
auto
attribute_value
=
attribute_array
[
cell_idx
];
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_ids
);
for
(
const
size_t
row
:
cache_not_found_ids
[
id
])
out
[
row
]
=
static_cast
<
OutputType
>
(
attribute_value
);
auto
on_cell_updated
=
[
&
]
(
const
auto
id
,
const
auto
cell_idx
)
{
const
auto
attribute_value
=
attribute_array
[
cell_idx
];
for
(
const
size_t
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
static_cast
<
OutputType
>
(
attribute_value
);
},
[
&
](
const
auto
id
,
const
auto
)
{
for
(
const
size_t
row
:
cache_not_found_ids
[
id
])
out
[
row
]
=
get_default
(
row
);
for
(
const
size_t
row
:
cache_not_found_ids
[
id
])
out
[
row
]
=
static_cast
<
OutputType
>
(
attribute_value
);
for
(
const
size_t
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
get_default
(
row
);
});
for
(
const
size_t
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
static_cast
<
OutputType
>
(
attribute_value
);
};
auto
on_id_not_found
=
[
&
]
(
const
auto
id
,
const
auto
)
{
for
(
const
size_t
row
:
cache_not_found_ids
[
id
])
out
[
row
]
=
get_default
(
row
);
for
(
const
size_t
row
:
cache_expired_ids
[
id
])
out
[
row
]
=
get_default
(
row
);
};
tryPushToUpdateQueueOrThrow
(
update_unit_ptr
);
waitForCurrentUpdateFinish
(
update_unit_ptr
);
prepareAnswer
(
update_unit_ptr
,
on_cell_updated
,
on_id_not_found
);
}
template
<
typename
DefaultGetter
>
void
CacheDictionary
::
getItemsString
(
Attribute
&
attribute
,
const
PaddedPODArray
<
Key
>
&
ids
,
ColumnString
*
out
,
DefaultGetter
&&
get_default
)
const
{
Attribute
&
attribute
,
const
PaddedPODArray
<
Key
>
&
ids
,
ColumnString
*
out
,
DefaultGetter
&&
get_default
)
const
{
const
auto
rows
=
ext
::
size
(
ids
);
/// save on some allocations
...
...
@@ -186,14 +186,18 @@ void CacheDictionary::getItemsString(
const
auto
now
=
std
::
chrono
::
system_clock
::
now
();
/// fetch up-to-date values, discard on fail
for
(
const
auto
row
:
ext
::
range
(
0
,
rows
))
{
for
(
const
auto
row
:
ext
::
range
(
0
,
rows
))
{
const
auto
id
=
ids
[
row
];
const
auto
find_result
=
findCellIdx
(
id
,
now
);
if
(
!
find_result
.
valid
)
{
if
(
!
find_result
.
valid
)
{
found_outdated_values
=
true
;
break
;
}
else
{
}
else
{
const
auto
&
cell_idx
=
find_result
.
cell_idx
;
const
auto
&
cell
=
cells
[
cell_idx
];
const
auto
string_ref
=
cell
.
isDefault
()
?
get_default
(
row
)
:
attribute_array
[
cell_idx
];
...
...
@@ -203,7 +207,8 @@ void CacheDictionary::getItemsString(
}
/// optimistic code completed successfully
if
(
!
found_outdated_values
)
{
if
(
!
found_outdated_values
)
{
query_count
.
fetch_add
(
rows
,
std
::
memory_order_relaxed
);
hit_count
.
fetch_add
(
rows
,
std
::
memory_order_release
);
return
;
...
...
@@ -225,13 +230,15 @@ void CacheDictionary::getItemsString(
const
ProfilingScopedReadRWLock
read_lock
{
rw_lock
,
ProfileEvents
::
DictCacheLockReadNs
};
const
auto
now
=
std
::
chrono
::
system_clock
::
now
();
for
(
const
auto
row
:
ext
::
range
(
0
,
ids
.
size
()))
{
for
(
const
auto
row
:
ext
::
range
(
0
,
ids
.
size
()))
{
const
auto
id
=
ids
[
row
];
const
auto
find_result
=
findCellIdx
(
id
,
now
);
auto
insert_value_routine
=
[
&
]()
{
auto
insert_value_routine
=
[
&
]()
{
const
auto
&
cell_idx
=
find_result
.
cell_idx
;
const
auto
&
cell
=
cells
[
cell_idx
];
const
auto
string_ref
=
cell
.
isDefault
()
?
get_default
(
row
)
:
attribute_array
[
cell_idx
];
...
...
@@ -242,16 +249,18 @@ void CacheDictionary::getItemsString(
total_length
+=
string_ref
.
size
+
1
;
};
if
(
!
find_result
.
valid
)
{
if
(
find_result
.
outdated
)
{
if
(
!
find_result
.
valid
)
{
if
(
find_result
.
outdated
)
{
cache_expired_ids
[
id
].
push_back
(
row
);
if
(
allow_read_expired_keys
)
insert_value_routine
();
}
else
{
}
else
cache_not_found_ids
[
id
].
push_back
(
row
);
}
}
else
{
}
else
{
++
cache_hit
;
insert_value_routine
();
}
...
...
@@ -275,8 +284,7 @@ void CacheDictionary::getItemsString(
std
::
transform
(
std
::
begin
(
cache_expired_ids
),
std
::
end
(
cache_expired_ids
),
std
::
back_inserter
(
required_expired_ids
),
[](
auto
&
pair
)
{
return
pair
.
first
;
});
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_expired_ids
,
[
&
](
const
auto
,
const
auto
)
{},
[
&
](
const
auto
,
const
auto
)
{});
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_expired_ids
);
tryPushToUpdateQueueOrThrow
(
update_unit_ptr
);
...
...
@@ -297,23 +305,25 @@ void CacheDictionary::getItemsString(
std
::
begin
(
cache_expired_ids
),
std
::
end
(
cache_expired_ids
),
std
::
back_inserter
(
required_ids
),
[](
auto
&
pair
)
{
return
pair
.
first
;
});
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_ids
,
[
&
](
const
auto
id
,
const
auto
cell_idx
)
{
const
auto
attribute_value
=
attribute_array
[
cell_idx
];
auto
on_cell_updated
=
[
&
]
(
const
auto
id
,
const
auto
cell_idx
)
{
const
auto
attribute_value
=
attribute_array
[
cell_idx
];
map
[
id
]
=
String
{
attribute_value
};
total_length
+=
(
attribute_value
.
size
+
1
)
*
cache_not_found_ids
[
id
].
size
();
},
[
&
](
const
auto
id
,
const
auto
)
{
for
(
const
auto
row
:
cache_not_found_ids
[
id
])
total_length
+=
get_default
(
row
).
size
+
1
;
});
map
[
id
]
=
String
{
attribute_value
};
total_length
+=
(
attribute_value
.
size
+
1
)
*
cache_not_found_ids
[
id
].
size
();
};
auto
on_id_not_found
=
[
&
]
(
const
auto
id
,
const
auto
)
{
for
(
const
auto
row
:
cache_not_found_ids
[
id
])
total_length
+=
get_default
(
row
).
size
+
1
;
};
auto
update_unit_ptr
=
std
::
make_shared
<
UpdateUnit
>
(
required_ids
);
tryPushToUpdateQueueOrThrow
(
update_unit_ptr
);
waitForCurrentUpdateFinish
(
update_unit_ptr
);
prepareAnswer
(
update_unit_ptr
,
on_cell_updated
,
on_id_not_found
);
}
out
->
getChars
().
reserve
(
total_length
);
...
...
@@ -329,122 +339,27 @@ void CacheDictionary::getItemsString(
}
template
<
typename
PresentIdHandler
,
typename
AbsentIdHandler
>
void
CacheDictionary
::
update
(
const
std
::
vector
<
Key
>
&
requested_ids
,
PresentIdHandler
&&
on_cell_updated
,
AbsentIdHandler
&&
on_id_not_found
)
const
void
CacheDictionary
::
prepareAnswer
(
UpdateUnitPtr
update_unit_ptr
,
PresentIdHandler
&&
on_cell_updated
,
AbsentIdHandler
&&
on_id_not_found
)
const
{
CurrentMetrics
::
Increment
metric_increment
{
CurrentMetrics
::
DictCacheRequests
};
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheKeysRequested
,
requested_ids
.
size
());
std
::
unordered_map
<
Key
,
UInt8
>
remaining_ids
{
requested_ids
.
size
()};
for
(
const
auto
id
:
requested_ids
)
remaining_ids
.
insert
({
id
,
0
});
/// Prepare answer
const
ProfilingScopedReadRWLock
read_lock
{
rw_lock
,
ProfileEvents
::
DictCacheLockReadNs
};
const
auto
now
=
std
::
chrono
::
system_clock
::
now
();
if
(
now
>
backoff_end_time
)
for
(
const
auto
&
id
:
update_unit_ptr
->
requested_ids
)
{
try
{
if
(
error_count
)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr
=
source_ptr
->
clone
();
}
Stopwatch
watch
;
/// Go to external storage. Might be very slow and blocking.
auto
stream
=
source_ptr
->
loadIds
(
requested_ids
);
const
ProfilingScopedWriteRWLock
write_lock
{
rw_lock
,
ProfileEvents
::
DictCacheLockWriteNs
};
stream
->
readPrefix
();
while
(
const
auto
block
=
stream
->
read
())
{
const
auto
id_column
=
typeid_cast
<
const
ColumnUInt64
*>
(
block
.
safeGetByPosition
(
0
).
column
.
get
());
if
(
!
id_column
)
throw
Exception
{
name
+
": id column has type different from UInt64."
,
ErrorCodes
::
TYPE_MISMATCH
};
const
auto
&
ids
=
id_column
->
getData
();
/// cache column pointers
const
auto
column_ptrs
=
ext
::
map
<
std
::
vector
>
(
ext
::
range
(
0
,
attributes
.
size
()),
[
&
block
](
size_t
i
)
{
return
block
.
safeGetByPosition
(
i
+
1
).
column
.
get
();
});
for
(
const
auto
i
:
ext
::
range
(
0
,
ids
.
size
()))
{
const
auto
id
=
ids
[
i
];
const
auto
find_result
=
findCellIdx
(
id
,
now
);
const
auto
&
cell_idx
=
find_result
.
cell_idx
;
auto
&
cell
=
cells
[
cell_idx
];
for
(
const
auto
attribute_idx
:
ext
::
range
(
0
,
attributes
.
size
()))
{
const
auto
&
attribute_column
=
*
column_ptrs
[
attribute_idx
];
auto
&
attribute
=
attributes
[
attribute_idx
];
setAttributeValue
(
attribute
,
cell_idx
,
attribute_column
[
i
]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if
(
cell
.
id
==
0
&&
cell_idx
!=
zero_cell_idx
)
element_count
.
fetch_add
(
1
,
std
::
memory_order_relaxed
);
cell
.
id
=
id
;
if
(
dict_lifetime
.
min_sec
!=
0
&&
dict_lifetime
.
max_sec
!=
0
)
{
std
::
uniform_int_distribution
<
UInt64
>
distribution
{
dict_lifetime
.
min_sec
,
dict_lifetime
.
max_sec
};
cell
.
setExpiresAt
(
now
+
std
::
chrono
::
seconds
{
distribution
(
rnd_engine
)});
}
else
cell
.
setExpiresAt
(
std
::
chrono
::
time_point
<
std
::
chrono
::
system_clock
>::
max
());
/// inform caller
on_cell_updated
(
id
,
cell_idx
);
/// mark corresponding id as found
remaining_ids
[
id
]
=
1
;
}
}
stream
->
readSuffix
();
error_count
=
0
;
last_exception
=
std
::
exception_ptr
{};
backoff_end_time
=
std
::
chrono
::
system_clock
::
time_point
{};
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheRequestTimeNs
,
watch
.
elapsed
());
}
catch
(...)
{
++
error_count
;
last_exception
=
std
::
current_exception
();
backoff_end_time
=
now
+
std
::
chrono
::
seconds
(
calculateDurationWithBackoff
(
rnd_engine
,
error_count
));
tryLogException
(
last_exception
,
log
,
"Could not update cache dictionary '"
+
getName
()
+
"', next update is scheduled at "
+
ext
::
to_string
(
backoff_end_time
));
}
}
size_t
not_found_num
=
0
,
found_num
=
0
;
const
auto
find_result
=
findCellIdx
(
id
,
now
);
const
auto
&
cell_idx
=
find_result
.
cell_idx
;
auto
&
cell
=
cells
[
cell_idx
];
const
auto
was_id_updated
=
update_unit_ptr
->
found_ids_mask_ptr
->
at
(
id
);
/// Check which ids have not been found and require setting null_value
for
(
const
auto
&
id_found_pair
:
remaining_ids
)
{
if
(
id_found_pair
.
second
)
if
(
was_id_updated
)
{
++
found_num
;
on_cell_updated
(
id
,
find_result
.
cell_idx
)
;
continue
;
}
++
not_found_num
;
const
auto
id
=
id_found_pair
.
first
;
const
auto
find_result
=
findCellIdx
(
id
,
now
);
const
auto
&
cell_idx
=
find_result
.
cell_idx
;
auto
&
cell
=
cells
[
cell_idx
];
if
(
error_count
)
{
...
...
@@ -487,10 +402,6 @@ void CacheDictionary::update(
/// inform caller that the cell has not been found
on_id_not_found
(
id
,
cell_idx
);
}
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheKeysRequestedMiss
,
not_found_num
);
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheKeysRequestedFound
,
found_num
);
ProfileEvents
::
increment
(
ProfileEvents
::
DictCacheRequests
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录