Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
月轩居士
SkyWalking
提交
4eb5def1
S
SkyWalking
项目概览
月轩居士
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
4
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
4eb5def1
编写于
5月 17, 2019
作者:
彭
彭勇升 pengys
提交者:
wu-sheng
5月 17, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Manage models in one place. (#2695)
上级
6fff2a81
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
306 addition
and
119 deletion
+306
-119
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
...apache/skywalking/oap/server/core/CoreModuleProvider.java
+1
-1
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
...ng/oap/server/core/analysis/StreamAnnotationListener.java
+1
-6
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
...p/server/core/analysis/worker/MetricsStreamProcessor.java
+24
-4
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
...ap/server/core/analysis/worker/RecordStreamProcessor.java
+7
-2
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
.../oap/server/core/analysis/worker/TopNStreamProcessor.java
+7
-2
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
...server/core/register/worker/InventoryStreamProcessor.java
+6
-2
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
...kywalking/oap/server/core/storage/model/IModelSetter.java
+5
-1
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
...pache/skywalking/oap/server/core/storage/model/Model.java
+32
-10
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
...walking/oap/server/core/storage/model/ModelInstaller.java
+5
-22
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
...ywalking/oap/server/core/storage/model/StorageModels.java
+19
-5
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
...lking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+13
-62
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
...walking/oap/server/core/storage/ttl/DayTTLCalculator.java
+31
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
...alking/oap/server/core/storage/ttl/HourTTLCalculator.java
+31
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
...king/oap/server/core/storage/ttl/MinuteTTLCalculator.java
+31
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
...lking/oap/server/core/storage/ttl/MonthTTLCalculator.java
+31
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
...king/oap/server/core/storage/ttl/SecondTTLCalculator.java
+31
-0
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
...skywalking/oap/server/core/storage/ttl/TTLCalculator.java
+29
-0
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
...server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+2
-2
未找到文件。
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
浏览文件 @
4eb5def1
...
...
@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHan
import
org.apache.skywalking.oap.server.core.server.*
;
import
org.apache.skywalking.oap.server.core.source.*
;
import
org.apache.skywalking.oap.server.core.storage.PersistenceTimer
;
import
org.apache.skywalking.oap.server.core.storage.
annotation
.StorageModels
;
import
org.apache.skywalking.oap.server.core.storage.
model
.StorageModels
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer
;
import
org.apache.skywalking.oap.server.core.worker.*
;
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java
浏览文件 @
4eb5def1
...
...
@@ -19,11 +19,10 @@
package
org.apache.skywalking.oap.server.core.analysis
;
import
java.lang.annotation.Annotation
;
import
org.apache.skywalking.oap.server.core.
*
;
import
org.apache.skywalking.oap.server.core.
UnexpectedException
;
import
org.apache.skywalking.oap.server.core.analysis.worker.*
;
import
org.apache.skywalking.oap.server.core.annotation.AnnotationListener
;
import
org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor
;
import
org.apache.skywalking.oap.server.core.storage.model.IModelSetter
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
/**
...
...
@@ -48,16 +47,12 @@ public class StreamAnnotationListener implements AnnotationListener {
if
(
stream
.
processor
().
equals
(
InventoryStreamProcessor
.
class
))
{
InventoryStreamProcessor
.
getInstance
().
create
(
moduleDefineHolder
,
stream
,
aClass
);
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
).
putIfAbsent
(
aClass
,
false
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
());
}
else
if
(
stream
.
processor
().
equals
(
RecordStreamProcessor
.
class
))
{
RecordStreamProcessor
.
getInstance
().
create
(
moduleDefineHolder
,
stream
,
aClass
);
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
).
putIfAbsent
(
aClass
,
false
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
());
}
else
if
(
stream
.
processor
().
equals
(
MetricsStreamProcessor
.
class
))
{
MetricsStreamProcessor
.
getInstance
().
create
(
moduleDefineHolder
,
stream
,
aClass
);
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
).
putIfAbsent
(
aClass
,
true
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
());
}
else
if
(
stream
.
processor
().
equals
(
TopNStreamProcessor
.
class
))
{
TopNStreamProcessor
.
getInstance
().
create
(
moduleDefineHolder
,
stream
,
aClass
);
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
).
putIfAbsent
(
aClass
,
false
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
());
}
else
{
throw
new
UnexpectedException
(
"Unknown stream processor."
);
}
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
浏览文件 @
4eb5def1
...
...
@@ -23,7 +23,9 @@ import lombok.Getter;
import
org.apache.skywalking.oap.server.core.*
;
import
org.apache.skywalking.oap.server.core.analysis.*
;
import
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics
;
import
org.apache.skywalking.oap.server.core.config.DownsamplingConfigService
;
import
org.apache.skywalking.oap.server.core.storage.*
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
/**
...
...
@@ -60,10 +62,28 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
throw
new
UnexpectedException
(
"Create "
+
stream
.
storage
().
builder
().
getSimpleName
()
+
" metrics DAO failure."
,
e
);
}
MetricsPersistentWorker
minutePersistentWorker
=
minutePersistentWorker
(
moduleDefineHolder
,
metricsDAO
,
stream
.
name
());
MetricsPersistentWorker
hourPersistentWorker
=
worker
(
moduleDefineHolder
,
metricsDAO
,
stream
.
name
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Hour
.
getName
());
MetricsPersistentWorker
dayPersistentWorker
=
worker
(
moduleDefineHolder
,
metricsDAO
,
stream
.
name
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Day
.
getName
());
MetricsPersistentWorker
monthPersistentWorker
=
worker
(
moduleDefineHolder
,
metricsDAO
,
stream
.
name
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Month
.
getName
());
IModelSetter
modelSetter
=
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
);
DownsamplingConfigService
configService
=
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
DownsamplingConfigService
.
class
);
MetricsPersistentWorker
hourPersistentWorker
=
null
;
MetricsPersistentWorker
dayPersistentWorker
=
null
;
MetricsPersistentWorker
monthPersistentWorker
=
null
;
if
(
configService
.
shouldToHour
())
{
Model
model
=
modelSetter
.
putIfAbsent
(
metricsClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
(),
Downsampling
.
Hour
);
hourPersistentWorker
=
worker
(
moduleDefineHolder
,
metricsDAO
,
model
.
getName
());
}
if
(
configService
.
shouldToDay
())
{
Model
model
=
modelSetter
.
putIfAbsent
(
metricsClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
(),
Downsampling
.
Day
);
dayPersistentWorker
=
worker
(
moduleDefineHolder
,
metricsDAO
,
model
.
getName
());
}
if
(
configService
.
shouldToMonth
())
{
Model
model
=
modelSetter
.
putIfAbsent
(
metricsClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
(),
Downsampling
.
Month
);
monthPersistentWorker
=
worker
(
moduleDefineHolder
,
metricsDAO
,
model
.
getName
());
}
Model
model
=
modelSetter
.
putIfAbsent
(
metricsClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
(),
Downsampling
.
Minute
);
MetricsPersistentWorker
minutePersistentWorker
=
minutePersistentWorker
(
moduleDefineHolder
,
metricsDAO
,
model
.
getName
());
MetricsTransWorker
transWorker
=
new
MetricsTransWorker
(
moduleDefineHolder
,
stream
.
name
(),
minutePersistentWorker
,
hourPersistentWorker
,
dayPersistentWorker
,
monthPersistentWorker
);
MetricsRemoteWorker
remoteWorker
=
new
MetricsRemoteWorker
(
moduleDefineHolder
,
transWorker
,
stream
.
name
());
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
浏览文件 @
4eb5def1
...
...
@@ -20,10 +20,11 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import
java.util.*
;
import
lombok.Getter
;
import
org.apache.skywalking.oap.server.core.
UnexpectedException
;
import
org.apache.skywalking.oap.server.core.
*
;
import
org.apache.skywalking.oap.server.core.analysis.*
;
import
org.apache.skywalking.oap.server.core.analysis.record.Record
;
import
org.apache.skywalking.oap.server.core.storage.*
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
/**
...
...
@@ -48,6 +49,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
@Getter
private
List
<
RecordPersistentWorker
>
persistentWorkers
=
new
ArrayList
<>();
@SuppressWarnings
(
"unchecked"
)
public
void
create
(
ModuleDefineHolder
moduleDefineHolder
,
Stream
stream
,
Class
<?
extends
Record
>
recordClass
)
{
if
(
DisableRegister
.
INSTANCE
.
include
(
stream
.
name
()))
{
return
;
...
...
@@ -61,7 +63,10 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
throw
new
UnexpectedException
(
"Create "
+
stream
.
storage
().
builder
().
getSimpleName
()
+
" record DAO failure."
,
e
);
}
RecordPersistentWorker
persistentWorker
=
new
RecordPersistentWorker
(
moduleDefineHolder
,
stream
.
name
(),
1000
,
recordDAO
);
IModelSetter
modelSetter
=
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
);
Model
model
=
modelSetter
.
putIfAbsent
(
recordClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
(),
Downsampling
.
Second
);
RecordPersistentWorker
persistentWorker
=
new
RecordPersistentWorker
(
moduleDefineHolder
,
model
.
getName
(),
1000
,
recordDAO
);
persistentWorkers
.
add
(
persistentWorker
);
workers
.
put
(
recordClass
,
persistentWorker
);
}
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
浏览文件 @
4eb5def1
...
...
@@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import
java.util.*
;
import
lombok.Getter
;
import
org.apache.skywalking.oap.server.core.
UnexpectedException
;
import
org.apache.skywalking.oap.server.core.
*
;
import
org.apache.skywalking.oap.server.core.analysis.*
;
import
org.apache.skywalking.oap.server.core.analysis.record.Record
;
import
org.apache.skywalking.oap.server.core.analysis.topn.TopN
;
import
org.apache.skywalking.oap.server.core.storage.*
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
/**
...
...
@@ -44,6 +45,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
return
PROCESSOR
;
}
@SuppressWarnings
(
"unchecked"
)
public
void
create
(
ModuleDefineHolder
moduleDefineHolder
,
Stream
stream
,
Class
<?
extends
TopN
>
topNClass
)
{
if
(
DisableRegister
.
INSTANCE
.
include
(
stream
.
name
()))
{
return
;
...
...
@@ -57,7 +59,10 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
throw
new
UnexpectedException
(
"Create "
+
stream
.
storage
().
builder
().
getSimpleName
()
+
" top n record DAO failure."
,
e
);
}
TopNWorker
persistentWorker
=
new
TopNWorker
(
moduleDefineHolder
,
stream
.
name
(),
50
,
recordDAO
);
IModelSetter
modelSetter
=
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
);
Model
model
=
modelSetter
.
putIfAbsent
(
topNClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
());
TopNWorker
persistentWorker
=
new
TopNWorker
(
moduleDefineHolder
,
model
.
getName
(),
50
,
recordDAO
);
persistentWorkers
.
add
(
persistentWorker
);
workers
.
put
(
topNClass
,
persistentWorker
);
}
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
浏览文件 @
4eb5def1
...
...
@@ -19,10 +19,11 @@
package
org.apache.skywalking.oap.server.core.register.worker
;
import
java.util.*
;
import
org.apache.skywalking.oap.server.core.
UnexpectedException
;
import
org.apache.skywalking.oap.server.core.
*
;
import
org.apache.skywalking.oap.server.core.analysis.*
;
import
org.apache.skywalking.oap.server.core.register.RegisterSource
;
import
org.apache.skywalking.oap.server.core.storage.*
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
/**
...
...
@@ -42,6 +43,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
entryWorkers
.
get
(
registerSource
.
getClass
()).
in
(
registerSource
);
}
@SuppressWarnings
(
"unchecked"
)
public
void
create
(
ModuleDefineHolder
moduleDefineHolder
,
Stream
stream
,
Class
<?
extends
RegisterSource
>
inventoryClass
)
{
StorageDAO
storageDAO
=
moduleDefineHolder
.
find
(
StorageModule
.
NAME
).
provider
().
getService
(
StorageDAO
.
class
);
IRegisterDAO
registerDAO
;
...
...
@@ -51,7 +53,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
throw
new
UnexpectedException
(
"Create "
+
stream
.
storage
().
builder
().
getSimpleName
()
+
" register DAO failure."
,
e
);
}
RegisterPersistentWorker
persistentWorker
=
new
RegisterPersistentWorker
(
moduleDefineHolder
,
stream
.
name
(),
registerDAO
,
stream
.
scopeId
());
IModelSetter
modelSetter
=
moduleDefineHolder
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelSetter
.
class
);
Model
model
=
modelSetter
.
putIfAbsent
(
inventoryClass
,
stream
.
name
(),
stream
.
scopeId
(),
stream
.
storage
());
RegisterPersistentWorker
persistentWorker
=
new
RegisterPersistentWorker
(
moduleDefineHolder
,
model
.
getName
(),
registerDAO
,
stream
.
scopeId
());
RegisterRemoteWorker
remoteWorker
=
new
RegisterRemoteWorker
(
moduleDefineHolder
,
persistentWorker
);
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
浏览文件 @
4eb5def1
...
...
@@ -18,6 +18,7 @@
package
org.apache.skywalking.oap.server.core.storage.model
;
import
org.apache.skywalking.oap.server.core.storage.Downsampling
;
import
org.apache.skywalking.oap.server.core.storage.annotation.Storage
;
import
org.apache.skywalking.oap.server.library.module.Service
;
...
...
@@ -25,5 +26,8 @@ import org.apache.skywalking.oap.server.library.module.Service;
* @author peng-yongsheng
*/
public
interface
IModelSetter
extends
Service
{
void
putIfAbsent
(
Class
aClass
,
boolean
isMetrics
,
String
modelName
,
int
scopeId
,
Storage
storage
);
Model
putIfAbsent
(
Class
aClass
,
String
modelName
,
int
scopeId
,
Storage
storage
);
Model
putIfAbsent
(
Class
aClass
,
String
modelName
,
int
scopeId
,
Storage
storage
,
Downsampling
downsampling
);
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
浏览文件 @
4eb5def1
...
...
@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.storage.model;
import
java.util.List
;
import
lombok.Getter
;
import
org.apache.skywalking.oap.server.core.*
;
import
org.apache.skywalking.oap.server.core.storage.Downsampling
;
import
org.apache.skywalking.oap.server.core.storage.ttl.*
;
/**
* @author peng-yongsheng
...
...
@@ -27,21 +30,40 @@ import lombok.Getter;
@Getter
public
class
Model
{
private
final
String
name
;
private
final
boolean
isMetrics
;
private
final
boolean
deleteHistory
;
private
final
List
<
ModelColumn
>
columns
;
private
final
int
sourceScopeId
;
private
final
int
scopeId
;
private
final
TTLCalculator
ttlCalculator
;
public
Model
(
String
name
,
List
<
ModelColumn
>
columns
,
boolean
isMetrics
,
boolean
deleteHistory
,
int
sourceScopeId
)
{
this
.
name
=
name
;
public
Model
(
String
name
,
List
<
ModelColumn
>
columns
,
boolean
deleteHistory
,
int
scopeId
,
Downsampling
downsampling
)
{
this
.
columns
=
columns
;
this
.
isMetrics
=
isMetrics
;
this
.
deleteHistory
=
deleteHistory
;
this
.
sourceScopeId
=
sourceScopeId
;
}
this
.
scopeId
=
scopeId
;
public
Model
copy
(
String
name
)
{
return
new
Model
(
name
,
columns
,
isMetrics
,
deleteHistory
,
sourceScopeId
);
switch
(
downsampling
)
{
case
Minute:
this
.
name
=
name
;
this
.
ttlCalculator
=
new
MinuteTTLCalculator
();
break
;
case
Hour:
this
.
name
=
name
+
Const
.
ID_SPLIT
+
Downsampling
.
Hour
.
getName
();
this
.
ttlCalculator
=
new
HourTTLCalculator
();
break
;
case
Day:
this
.
name
=
name
+
Const
.
ID_SPLIT
+
Downsampling
.
Day
.
getName
();
this
.
ttlCalculator
=
new
DayTTLCalculator
();
break
;
case
Month:
this
.
name
=
name
+
Const
.
ID_SPLIT
+
Downsampling
.
Month
.
getName
();
this
.
ttlCalculator
=
new
MonthTTLCalculator
();
break
;
case
Second:
this
.
name
=
name
;
this
.
ttlCalculator
=
new
SecondTTLCalculator
();
break
;
default
:
throw
new
UnexpectedException
(
"Unexpected downsampling setting."
);
}
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
浏览文件 @
4eb5def1
...
...
@@ -18,10 +18,9 @@
package
org.apache.skywalking.oap.server.core.storage.model
;
import
java.util.
*
;
import
java.util.
List
;
import
org.apache.skywalking.oap.server.core.*
;
import
org.apache.skywalking.oap.server.core.config.DownsamplingConfigService
;
import
org.apache.skywalking.oap.server.core.storage.*
;
import
org.apache.skywalking.oap.server.core.storage.StorageException
;
import
org.apache.skywalking.oap.server.library.client.Client
;
import
org.apache.skywalking.oap.server.library.module.ModuleManager
;
import
org.slf4j.*
;
...
...
@@ -41,39 +40,23 @@ public abstract class ModelInstaller {
public
final
void
install
(
Client
client
)
throws
StorageException
{
IModelGetter
modelGetter
=
moduleManager
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelGetter
.
class
);
DownsamplingConfigService
downsamplingConfigService
=
moduleManager
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
DownsamplingConfigService
.
class
);
List
<
Model
>
models
=
modelGetter
.
getModels
();
List
<
Model
>
allModels
=
new
ArrayList
<>();
models
.
forEach
(
model
->
{
if
(
model
.
isMetrics
())
{
if
(
downsamplingConfigService
.
shouldToHour
())
{
allModels
.
add
(
model
.
copy
(
model
.
getName
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Hour
.
getName
()));
}
if
(
downsamplingConfigService
.
shouldToDay
())
{
allModels
.
add
(
model
.
copy
(
model
.
getName
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Day
.
getName
()));
}
if
(
downsamplingConfigService
.
shouldToMonth
())
{
allModels
.
add
(
model
.
copy
(
model
.
getName
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Month
.
getName
()));
}
}
});
allModels
.
addAll
(
models
);
boolean
debug
=
System
.
getProperty
(
"debug"
)
!=
null
;
if
(
RunningMode
.
isNoInitMode
())
{
for
(
Model
model
:
allM
odels
)
{
for
(
Model
model
:
m
odels
)
{
while
(!
isExists
(
client
,
model
))
{
try
{
logger
.
info
(
"table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later."
,
model
.
getName
());
Thread
.
sleep
(
3000L
);
}
catch
(
InterruptedException
e
)
{
logger
.
error
(
e
.
getMessage
());
}
}
}
}
else
{
for
(
Model
model
:
allM
odels
)
{
for
(
Model
model
:
m
odels
)
{
if
(!
isExists
(
client
,
model
))
{
logger
.
info
(
"table: {} does not exist"
,
model
.
getName
());
createTable
(
client
,
model
);
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/
annotation
/StorageModels.java
→
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/
model
/StorageModels.java
浏览文件 @
4eb5def1
...
...
@@ -13,16 +13,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.core.storage.
annotation
;
package
org.apache.skywalking.oap.server.core.storage.
model
;
import
java.lang.reflect.Field
;
import
java.util.*
;
import
lombok.Getter
;
import
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.core.storage.Downsampling
;
import
org.apache.skywalking.oap.server.core.storage.annotation.*
;
import
org.slf4j.*
;
/**
...
...
@@ -38,13 +38,27 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
this
.
models
=
new
LinkedList
<>();
}
@Override
public
void
putIfAbsent
(
Class
aClass
,
boolean
isMetrics
,
String
modelName
,
int
scopeId
,
Storage
storage
)
{
@Override
public
Model
putIfAbsent
(
Class
aClass
,
String
modelName
,
int
scopeId
,
Storage
storage
)
{
return
putIfAbsent
(
aClass
,
modelName
,
scopeId
,
storage
,
Downsampling
.
Minute
);
}
@Override
public
Model
putIfAbsent
(
Class
aClass
,
String
modelName
,
int
scopeId
,
Storage
storage
,
Downsampling
downsampling
)
{
// Check this scope id is valid.
DefaultScopeDefine
.
nameOf
(
scopeId
);
for
(
Model
model
:
models
)
{
if
(
model
.
getName
().
equals
(
modelName
))
{
return
model
;
}
}
List
<
ModelColumn
>
modelColumns
=
new
LinkedList
<>();
retrieval
(
aClass
,
modelName
,
modelColumns
);
models
.
add
(
new
Model
(
modelName
,
modelColumns
,
isMetrics
,
storage
.
deleteHistory
(),
scopeId
));
Model
model
=
new
Model
(
modelName
,
modelColumns
,
storage
.
deleteHistory
(),
scopeId
,
downsampling
);
models
.
add
(
model
);
return
model
;
}
private
void
retrieval
(
Class
clazz
,
String
modelName
,
List
<
ModelColumn
>
modelColumns
)
{
...
...
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
浏览文件 @
4eb5def1
...
...
@@ -20,29 +20,18 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
import
java.io.IOException
;
import
java.util.List
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.*
;
import
lombok.Setter
;
import
org.apache.skywalking.apm.util.RunnableWithExceptionProtection
;
import
org.apache.skywalking.oap.server.core.Const
;
import
org.apache.skywalking.oap.server.core.CoreModule
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.apache.skywalking.oap.server.core.*
;
import
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics
;
import
org.apache.skywalking.oap.server.core.analysis.record.Record
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterModule
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery
;
import
org.apache.skywalking.oap.server.core.cluster.RemoteInstance
;
import
org.apache.skywalking.oap.server.core.config.DownsamplingConfigService
;
import
org.apache.skywalking.oap.server.core.storage.Downsampling
;
import
org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO
;
import
org.apache.skywalking.oap.server.core.storage.StorageModule
;
import
org.apache.skywalking.oap.server.core.storage.model.IModelGetter
;
import
org.apache.skywalking.oap.server.core.storage.model.Model
;
import
org.apache.skywalking.oap.server.core.cluster.*
;
import
org.apache.skywalking.oap.server.core.storage.*
;
import
org.apache.skywalking.oap.server.core.storage.model.*
;
import
org.apache.skywalking.oap.server.library.module.ModuleManager
;
import
org.apache.skywalking.oap.server.library.util.CollectionUtils
;
import
org.joda.time.DateTime
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.*
;
/**
* @author peng-yongsheng
...
...
@@ -72,63 +61,25 @@ public enum DataTTLKeeperTimer {
return
;
}
TimeBuckets
timeBuckets
=
convertTimeBucket
(
new
DateTime
());
logger
.
info
(
"Beginning to remove expired metrics from the storage."
);
logger
.
info
(
"Metrics in minute dimension before {}, are going to be removed."
,
timeBuckets
.
minuteTimeBucketBefore
);
logger
.
info
(
"Metrics in hour dimension before {}, are going to be removed."
,
timeBuckets
.
hourTimeBucketBefore
);
logger
.
info
(
"Metrics in day dimension before {}, are going to be removed."
,
timeBuckets
.
dayTimeBucketBefore
);
logger
.
info
(
"Metrics in month dimension before {}, are going to be removed."
,
timeBuckets
.
monthTimeBucketBefore
);
DateTime
currentTime
=
new
DateTime
();
IModelGetter
modelGetter
=
moduleManager
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
IModelGetter
.
class
);
DownsamplingConfigService
downsamplingConfigService
=
moduleManager
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
DownsamplingConfigService
.
class
);
List
<
Model
>
models
=
modelGetter
.
getModels
();
models
.
forEach
(
model
->
{
if
(
model
.
isMetrics
())
{
execute
(
model
,
model
.
getName
(),
timeBuckets
.
minuteTimeBucketBefore
,
Metrics
.
TIME_BUCKET
);
if
(
downsamplingConfigService
.
shouldToHour
())
{
execute
(
model
,
model
.
getName
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Hour
.
getName
(),
timeBuckets
.
hourTimeBucketBefore
,
Metrics
.
TIME_BUCKET
);
}
if
(
downsamplingConfigService
.
shouldToDay
())
{
execute
(
model
,
model
.
getName
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Day
.
getName
(),
timeBuckets
.
dayTimeBucketBefore
,
Metrics
.
TIME_BUCKET
);
}
if
(
downsamplingConfigService
.
shouldToMonth
())
{
execute
(
model
,
model
.
getName
()
+
Const
.
ID_SPLIT
+
Downsampling
.
Month
.
getName
(),
timeBuckets
.
monthTimeBucketBefore
,
Metrics
.
TIME_BUCKET
);
}
}
else
{
execute
(
model
,
model
.
getName
(),
timeBuckets
.
recordDataTTL
,
Record
.
TIME_BUCKET
);
if
(
model
.
isDeleteHistory
())
{
execute
(
model
,
model
.
getTtlCalculator
().
timeBefore
(
currentTime
,
dataTTL
));
}
});
}
TimeBuckets
convertTimeBucket
(
DateTime
currentTime
)
{
TimeBuckets
timeBuckets
=
new
TimeBuckets
();
timeBuckets
.
recordDataTTL
=
Long
.
valueOf
(
currentTime
.
plusMinutes
(
0
-
dataTTL
.
getRecordDataTTL
()).
toString
(
"yyyyMMddHHmmss"
));
timeBuckets
.
minuteTimeBucketBefore
=
Long
.
valueOf
(
currentTime
.
plusMinutes
(
0
-
dataTTL
.
getMinuteMetricsDataTTL
()).
toString
(
"yyyyMMddHHmm"
));
timeBuckets
.
hourTimeBucketBefore
=
Long
.
valueOf
(
currentTime
.
plusHours
(
0
-
dataTTL
.
getHourMetricsDataTTL
()).
toString
(
"yyyyMMddHH"
));
timeBuckets
.
dayTimeBucketBefore
=
Long
.
valueOf
(
currentTime
.
plusDays
(
0
-
dataTTL
.
getDayMetricsDataTTL
()).
toString
(
"yyyyMMdd"
));
timeBuckets
.
monthTimeBucketBefore
=
Long
.
valueOf
(
currentTime
.
plusMonths
(
0
-
dataTTL
.
getMonthMetricsDataTTL
()).
toString
(
"yyyyMM"
));
return
timeBuckets
;
}
private
void
execute
(
Model
model
,
String
modelName
,
long
timeBucketBefore
,
String
timeBucketColumnName
)
{
private
void
execute
(
Model
model
,
long
timeBucketBefore
)
{
try
{
if
(
model
.
isDeleteHistory
())
{
moduleManager
.
find
(
StorageModule
.
NAME
).
provider
().
getService
(
IHistoryDeleteDAO
.
class
).
deleteHistory
(
modelName
,
timeBucketColumnName
,
timeBucketBefore
);
}
moduleManager
.
find
(
StorageModule
.
NAME
).
provider
().
getService
(
IHistoryDeleteDAO
.
class
).
deleteHistory
(
model
.
getName
(),
Metrics
.
TIME_BUCKET
,
timeBucketBefore
);
}
catch
(
IOException
e
)
{
logger
.
warn
(
"History of {} delete failure, time bucket {}"
,
model
Name
,
timeBucketBefore
);
logger
.
warn
(
"History of {} delete failure, time bucket {}"
,
model
.
getName
()
,
timeBucketBefore
);
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
class
TimeBuckets
{
private
long
recordDataTTL
;
private
long
minuteTimeBucketBefore
;
private
long
hourTimeBucketBefore
;
private
long
dayTimeBucketBefore
;
private
long
monthTimeBucketBefore
;
}
}
\ No newline at end of file
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DayTTLCalculator.java
0 → 100644
浏览文件 @
4eb5def1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.skywalking.oap.server.core.storage.ttl
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.joda.time.DateTime
;
/**
* @author peng-yongsheng
*/
public
class
DayTTLCalculator
implements
TTLCalculator
{
@Override
public
long
timeBefore
(
DateTime
currentTime
,
DataTTL
dataTTL
)
{
return
Long
.
valueOf
(
currentTime
.
plusDays
(
0
-
dataTTL
.
getDayMetricsDataTTL
()).
toString
(
"yyyyMMdd"
));
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/HourTTLCalculator.java
0 → 100644
浏览文件 @
4eb5def1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.skywalking.oap.server.core.storage.ttl
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.joda.time.DateTime
;
/**
* @author peng-yongsheng
*/
public
class
HourTTLCalculator
implements
TTLCalculator
{
@Override
public
long
timeBefore
(
DateTime
currentTime
,
DataTTL
dataTTL
)
{
return
Long
.
valueOf
(
currentTime
.
plusHours
(
0
-
dataTTL
.
getHourMetricsDataTTL
()).
toString
(
"yyyyMMddHH"
));
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MinuteTTLCalculator.java
0 → 100644
浏览文件 @
4eb5def1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.skywalking.oap.server.core.storage.ttl
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.joda.time.DateTime
;
/**
* @author peng-yongsheng
*/
public
class
MinuteTTLCalculator
implements
TTLCalculator
{
@Override
public
long
timeBefore
(
DateTime
currentTime
,
DataTTL
dataTTL
)
{
return
Long
.
valueOf
(
currentTime
.
plusMinutes
(
0
-
dataTTL
.
getMinuteMetricsDataTTL
()).
toString
(
"yyyyMMddHHmm"
));
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/MonthTTLCalculator.java
0 → 100644
浏览文件 @
4eb5def1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.skywalking.oap.server.core.storage.ttl
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.joda.time.DateTime
;
/**
* @author peng-yongsheng
*/
public
class
MonthTTLCalculator
implements
TTLCalculator
{
@Override
public
long
timeBefore
(
DateTime
currentTime
,
DataTTL
dataTTL
)
{
return
Long
.
valueOf
(
currentTime
.
plusMonths
(
0
-
dataTTL
.
getMonthMetricsDataTTL
()).
toString
(
"yyyyMM"
));
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/SecondTTLCalculator.java
0 → 100644
浏览文件 @
4eb5def1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.skywalking.oap.server.core.storage.ttl
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.joda.time.DateTime
;
/**
* @author peng-yongsheng
*/
public
class
SecondTTLCalculator
implements
TTLCalculator
{
@Override
public
long
timeBefore
(
DateTime
currentTime
,
DataTTL
dataTTL
)
{
return
Long
.
valueOf
(
currentTime
.
plusMinutes
(
0
-
dataTTL
.
getRecordDataTTL
()).
toString
(
"yyyyMMddHHmmss"
));
}
}
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/TTLCalculator.java
0 → 100644
浏览文件 @
4eb5def1
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.skywalking.oap.server.core.storage.ttl
;
import
org.apache.skywalking.oap.server.core.DataTTL
;
import
org.joda.time.DateTime
;
/**
* @author peng-yongsheng
*/
public
interface
TTLCalculator
{
long
timeBefore
(
DateTime
currentTime
,
DataTTL
dataTTL
);
}
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
浏览文件 @
4eb5def1
...
...
@@ -79,7 +79,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
else
if
(
Double
.
class
.
equals
(
type
)
||
double
.
class
.
equals
(
type
))
{
return
"DOUBLE"
;
}
else
if
(
String
.
class
.
equals
(
type
))
{
if
(
DefaultScopeDefine
.
SEGMENT
==
model
.
getS
ourceS
copeId
())
{
if
(
DefaultScopeDefine
.
SEGMENT
==
model
.
getScopeId
())
{
if
(
name
.
getName
().
equals
(
SegmentRecord
.
TRACE_ID
)
||
name
.
getName
().
equals
(
SegmentRecord
.
SEGMENT_ID
))
return
"VARCHAR(300)"
;
}
...
...
@@ -94,7 +94,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
protected
void
createIndexes
(
JDBCHikariCPClient
client
,
Model
model
)
throws
StorageException
{
switch
(
model
.
getS
ourceS
copeId
())
{
switch
(
model
.
getScopeId
())
{
case
SERVICE_INVENTORY:
case
SERVICE_INSTANCE_INVENTORY:
case
NETWORK_ADDRESS:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录