Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doodoocoder
prometheus
提交
c5d87301
P
prometheus
项目概览
doodoocoder
/
prometheus
与 Fork 源项目一致
从无法访问的项目Fork
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
prometheus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
c5d87301
编写于
7月 16, 2015
作者:
B
Björn Rabenstein
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #880 from prometheus/beorn7/fix
Fix the storage corruption bug.
上级
82e2fd85
699946bf
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
94 addition
and
23 deletion
+94
-23
storage/local/chunk.go
storage/local/chunk.go
+1
-0
storage/local/crashrecovery.go
storage/local/crashrecovery.go
+3
-4
storage/local/index/index.go
storage/local/index/index.go
+1
-0
storage/local/instrumentation.go
storage/local/instrumentation.go
+2
-0
storage/local/persistence.go
storage/local/persistence.go
+7
-11
storage/local/persistence_test.go
storage/local/persistence_test.go
+2
-2
storage/local/series.go
storage/local/series.go
+1
-1
storage/local/storage.go
storage/local/storage.go
+4
-4
storage/local/storage_test.go
storage/local/storage_test.go
+73
-1
未找到文件。
storage/local/chunk.go
浏览文件 @
c5d87301
...
...
@@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// The DefaultChunkEncoding can be changed via a flag.
var
DefaultChunkEncoding
=
doubleDelta
type
chunkEncoding
byte
...
...
storage/local/crashrecovery.go
浏览文件 @
c5d87301
...
...
@@ -254,7 +254,7 @@ func (p *persistence) sanitizeSeries(
// disk. Treat this series as a freshly unarchived one
// by loading the chunkDescs and setting all parameters
// based on the loaded chunkDescs.
cds
,
err
:=
p
.
loadChunkDescs
(
fp
,
clientmodel
.
Latest
)
cds
,
err
:=
p
.
loadChunkDescs
(
fp
,
0
)
if
err
!=
nil
{
log
.
Errorf
(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s"
,
...
...
@@ -286,8 +286,7 @@ func (p *persistence) sanitizeSeries(
// First, throw away the chunkDescs without chunks.
s
.
chunkDescs
=
s
.
chunkDescs
[
s
.
persistWatermark
:
]
numMemChunkDescs
.
Sub
(
float64
(
s
.
persistWatermark
))
// Load all the chunk descs.
cds
,
err
:=
p
.
loadChunkDescs
(
fp
,
clientmodel
.
Latest
)
cds
,
err
:=
p
.
loadChunkDescs
(
fp
,
0
)
if
err
!=
nil
{
log
.
Errorf
(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s"
,
...
...
@@ -407,7 +406,7 @@ func (p *persistence) cleanUpArchiveIndexes(
if
_
,
err
:=
p
.
archivedFingerprintToMetrics
.
Delete
(
fp
);
err
!=
nil
{
return
err
}
cds
,
err
:=
p
.
loadChunkDescs
(
clientmodel
.
Fingerprint
(
fp
),
clientmodel
.
Latest
)
cds
,
err
:=
p
.
loadChunkDescs
(
clientmodel
.
Fingerprint
(
fp
),
0
)
if
err
!=
nil
{
return
err
}
...
...
storage/local/index/index.go
浏览文件 @
c5d87301
...
...
@@ -33,6 +33,7 @@ const (
labelPairToFingerprintsDir
=
"labelpair_to_fingerprints"
)
// LevelDB cache sizes, changeable via flags.
var
(
FingerprintMetricCacheSize
=
10
*
1024
*
1024
FingerprintTimeRangeCacheSize
=
5
*
1024
*
1024
...
...
storage/local/instrumentation.go
浏览文件 @
c5d87301
...
...
@@ -89,6 +89,8 @@ func init() {
var
(
// Global counter, also used internally, so not implemented as
// metrics. Collected in memorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
numMemChunks
int64
// Metric descriptors for the above.
...
...
storage/local/persistence.go
浏览文件 @
c5d87301
...
...
@@ -444,10 +444,11 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde
return
chunks
,
nil
}
// loadChunkDescs loads chunkDescs for a series up until a given time. It is
// the caller's responsibility to not persist or drop anything for the same
// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is
// the number of chunkDescs to skip from the end of the series file. It is the
// caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently.
func
(
p
*
persistence
)
loadChunkDescs
(
fp
clientmodel
.
Fingerprint
,
beforeTime
clientmodel
.
Timestamp
)
([]
*
chunkDesc
,
error
)
{
func
(
p
*
persistence
)
loadChunkDescs
(
fp
clientmodel
.
Fingerprint
,
offsetFromEnd
int
)
([]
*
chunkDesc
,
error
)
{
f
,
err
:=
p
.
openChunkFileForReading
(
fp
)
if
os
.
IsNotExist
(
err
)
{
return
nil
,
nil
...
...
@@ -469,8 +470,8 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
)
}
numChunks
:=
int
(
fi
.
Size
())
/
chunkLenWithHeader
cds
:=
make
([]
*
chunkDesc
,
0
,
numChunks
)
numChunks
:=
int
(
fi
.
Size
())
/
chunkLenWithHeader
-
offsetFromEnd
cds
:=
make
([]
*
chunkDesc
,
numChunks
)
chunkTimesBuf
:=
make
([]
byte
,
16
)
for
i
:=
0
;
i
<
numChunks
;
i
++
{
_
,
err
:=
f
.
Seek
(
offsetForChunkIndex
(
i
)
+
chunkHeaderFirstTimeOffset
,
os
.
SEEK_SET
)
...
...
@@ -482,15 +483,10 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
if
err
!=
nil
{
return
nil
,
err
}
cd
:
=
&
chunkDesc
{
cd
s
[
i
]
=
&
chunkDesc
{
chunkFirstTime
:
clientmodel
.
Timestamp
(
binary
.
LittleEndian
.
Uint64
(
chunkTimesBuf
)),
chunkLastTime
:
clientmodel
.
Timestamp
(
binary
.
LittleEndian
.
Uint64
(
chunkTimesBuf
[
8
:
])),
}
if
!
cd
.
chunkLastTime
.
Before
(
beforeTime
)
{
// From here on, we have chunkDescs in memory already.
break
}
cds
=
append
(
cds
,
cd
)
}
chunkDescOps
.
WithLabelValues
(
load
)
.
Add
(
float64
(
len
(
cds
)))
numMemChunkDescs
.
Add
(
float64
(
len
(
cds
)))
...
...
storage/local/persistence_test.go
浏览文件 @
c5d87301
...
...
@@ -122,7 +122,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
}
}
// Load all chunk descs.
actualChunkDescs
,
err
:=
p
.
loadChunkDescs
(
fp
,
1
0
)
actualChunkDescs
,
err
:=
p
.
loadChunkDescs
(
fp
,
0
)
if
len
(
actualChunkDescs
)
!=
10
{
t
.
Errorf
(
"Got %d chunkDescs, want %d."
,
len
(
actualChunkDescs
),
10
)
}
...
...
@@ -974,7 +974,7 @@ func BenchmarkLoadChunkDescs(b *testing.B) {
for
i
:=
0
;
i
<
b
.
N
;
i
++
{
for
_
,
s
:=
range
fpStrings
{
fp
.
LoadFromString
(
s
)
cds
,
err
:=
p
.
loadChunkDescs
(
fp
,
clientmodel
.
Latest
)
cds
,
err
:=
p
.
loadChunkDescs
(
fp
,
0
)
if
err
!=
nil
{
b
.
Error
(
err
)
}
...
...
storage/local/series.go
浏览文件 @
c5d87301
...
...
@@ -384,7 +384,7 @@ func (s *memorySeries) preloadChunksForRange(
firstChunkDescTime
=
s
.
chunkDescs
[
0
]
.
firstTime
()
}
if
s
.
chunkDescsOffset
!=
0
&&
from
.
Before
(
firstChunkDescTime
)
{
cds
,
err
:=
mss
.
loadChunkDescs
(
fp
,
firstChunkDescTime
)
cds
,
err
:=
mss
.
loadChunkDescs
(
fp
,
s
.
persistWatermark
)
if
err
!=
nil
{
return
nil
,
err
}
...
...
storage/local/storage.go
浏览文件 @
c5d87301
...
...
@@ -589,7 +589,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
// end up with a series without any chunkDescs for a
// while (which is confusing as it makes the series
// appear as archived or purged).
cds
,
err
=
s
.
loadChunkDescs
(
fp
,
clientmodel
.
Latest
)
cds
,
err
=
s
.
loadChunkDescs
(
fp
,
0
)
if
err
!=
nil
{
log
.
Errorf
(
"Error loading chunk descs for fingerprint %v (metric %v): %v"
,
fp
,
m
,
err
)
}
...
...
@@ -979,7 +979,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(
return
}
// If we are here, the series is not archived, so check for chunkDesc
// eviction next
// eviction next
.
series
.
evictChunkDescs
(
iOldestNotEvicted
)
return
series
.
dirty
&&
!
seriesWasDirty
...
...
@@ -1107,8 +1107,8 @@ func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []i
}
// See persistence.loadChunkDescs for detailed explanation.
func
(
s
*
memorySeriesStorage
)
loadChunkDescs
(
fp
clientmodel
.
Fingerprint
,
beforeTime
clientmodel
.
Timestamp
)
([]
*
chunkDesc
,
error
)
{
return
s
.
persistence
.
loadChunkDescs
(
fp
,
beforeTime
)
func
(
s
*
memorySeriesStorage
)
loadChunkDescs
(
fp
clientmodel
.
Fingerprint
,
offsetFromEnd
int
)
([]
*
chunkDesc
,
error
)
{
return
s
.
persistence
.
loadChunkDescs
(
fp
,
offsetFromEnd
)
}
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
...
...
storage/local/storage_test.go
浏览文件 @
c5d87301
...
...
@@ -1116,6 +1116,78 @@ func TestEvictAndPurgeSeriesChunkType1(t *testing.T) {
testEvictAndPurgeSeries
(
t
,
1
)
}
func
testEvictAndLoadChunkDescs
(
t
*
testing
.
T
,
encoding
chunkEncoding
)
{
samples
:=
make
(
clientmodel
.
Samples
,
10000
)
for
i
:=
range
samples
{
samples
[
i
]
=
&
clientmodel
.
Sample
{
Timestamp
:
clientmodel
.
Timestamp
(
2
*
i
),
Value
:
clientmodel
.
SampleValue
(
float64
(
i
*
i
)),
}
}
// Give last sample a timestamp of now so that the head chunk will not
// be closed (which would then archive the time series later as
// everything will get evicted).
samples
[
len
(
samples
)
-
1
]
=
&
clientmodel
.
Sample
{
Timestamp
:
clientmodel
.
Now
(),
Value
:
clientmodel
.
SampleValue
(
3.14
),
}
s
,
closer
:=
NewTestStorage
(
t
,
encoding
)
defer
closer
.
Close
()
// Adjust memory chunks to lower value to see evictions.
s
.
maxMemoryChunks
=
1
for
_
,
sample
:=
range
samples
{
s
.
Append
(
sample
)
}
s
.
WaitForIndexing
()
fp
:=
clientmodel
.
Metric
{}
.
FastFingerprint
()
series
,
ok
:=
s
.
fpToSeries
.
get
(
fp
)
if
!
ok
{
t
.
Fatal
(
"could not find series"
)
}
oldLen
:=
len
(
series
.
chunkDescs
)
// Maintain series without any dropped chunks.
s
.
maintainMemorySeries
(
fp
,
0
)
// Give the evict goroutine an opportunity to run.
time
.
Sleep
(
10
*
time
.
Millisecond
)
// Maintain series again to trigger chunkDesc eviction
s
.
maintainMemorySeries
(
fp
,
0
)
if
oldLen
<=
len
(
series
.
chunkDescs
)
{
t
.
Errorf
(
"Expected number of chunkDescs to decrease, old number %d, current number %d."
,
oldLen
,
len
(
series
.
chunkDescs
))
}
// Load everything back.
p
:=
s
.
NewPreloader
()
p
.
PreloadRange
(
fp
,
0
,
100000
,
time
.
Hour
)
if
oldLen
!=
len
(
series
.
chunkDescs
)
{
t
.
Errorf
(
"Expected number of chunkDescs to have reached old value again, old number %d, current number %d."
,
oldLen
,
len
(
series
.
chunkDescs
))
}
p
.
Close
()
// Now maintain series with drops to make sure nothing crazy happens.
s
.
maintainMemorySeries
(
fp
,
100000
)
if
len
(
series
.
chunkDescs
)
!=
1
{
t
.
Errorf
(
"Expected exactly one chunkDesc left, got %d."
,
len
(
series
.
chunkDescs
))
}
}
func
TestEvictAndLoadChunkDescsType0
(
t
*
testing
.
T
)
{
testEvictAndLoadChunkDescs
(
t
,
0
)
}
func
TestEvictAndLoadChunkDescsType1
(
t
*
testing
.
T
)
{
testEvictAndLoadChunkDescs
(
t
,
1
)
}
func
benchmarkAppend
(
b
*
testing
.
B
,
encoding
chunkEncoding
)
{
samples
:=
make
(
clientmodel
.
Samples
,
b
.
N
)
for
i
:=
range
samples
{
...
...
@@ -1437,7 +1509,7 @@ func TestAppendOutOfOrder(t *testing.T) {
err
=
pl
.
PreloadRange
(
fp
,
0
,
2
,
5
*
time
.
Minute
)
if
err
!=
nil
{
t
.
Fatalf
(
"
e
rror preloading chunks: %s"
,
err
)
t
.
Fatalf
(
"
E
rror preloading chunks: %s"
,
err
)
}
it
:=
s
.
NewIterator
(
fp
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录