未验证 提交 bf03882c 编写于 作者: X Xiangwei Wei 提交者: GitHub

[IOTDB-23] providing some statistics info about the writing operations (#1725)

上级 891d8b6a
......@@ -101,9 +101,9 @@ You can go to jira to pick up the existing issue or create your own issue and ge
Recommended Use Intellij idea. ```mvn clean package -DskipTests``` After putting ```antlr/target/generated-sources/antlr4``` and ```thrift/target/generated-sources/thrift``` marked as ```Source Root```
* Server main function:```server/src/main/java/org/apache/iotdb/db/service/IoTDB```Can be started in debug mode
* Client:```cli/src/main/java/org/apache/iotdb/cli/```,Use Client for linux and WinClint for windows, you can start directly, need the parameter "-h 127.0.0.1 -p 6667 -u root -pw root"
* Server rpc implementation (mainly used for client and server communication, generally start interruption point here):```server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl```
* Server main function:```server/src/main/java/org/apache/iotdb/db/service/IoTDB``, Can be started in debug mode
* Cli:```cli/src/main/java/org/apache/iotdb/cli/```,Use Cli for linux and WinCli for windows, you can start directly with the parameter "-h 127.0.0.1 -p 6667 -u root -pw root"
* Server rpc implementation (mainly used for cli and server communication, generally start interruption point here):```server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl```
* all jdbc statements:executeStatement(TSExecuteStatementReq req)
* jdbc query:executeQueryStatement(TSExecuteStatementReq req)
* native Write interface:insertRecord(TSInsertRecordReq req)
......
......@@ -426,15 +426,6 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| false |
|Effective|After restart system|
* back\_loop\_period_in_second
|Name| back\_loop\_period\_in\_second |
|:---:|:---|
|Description| The frequency at which the system statistic module triggers(in seconds). |
|Type|Int32|
|Default| 5 |
|Effective|After restart system|
* concurrent\_flush\_thread
|Name| concurrent\_flush\_thread |
......@@ -444,24 +435,6 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| 0 |
|Effective|After restart system|
* stat\_monitor\_detect\_freq\_in\_second
|Name| stat\_monitor\_detect\_freq\_in\_second |
|:---:|:---|
|Description| The time interval which the system check whether the current record statistic time range exceeds stat_monitor_retain_interval every time (in seconds) and perform regular cleaning|
|Type| Int32 |
|Default|600 |
|Effective|After restart system|
* stat\_monitor\_retain\_interval\_in\_second
|Name| stat\_monitor\_retain\_interval\_in\_second |
|:---:|:---|
|Description| The retention time of system statistics data(in seconds). Statistics data over the retention time range will be cleaned regularly.|
|Type| Int32 |
|Default|600 |
|Effective|After restart system|
* tsfile\_storage\_fs
|Name| tsfile\_storage\_fs |
......
......@@ -31,332 +31,113 @@ After starting JConsole tool and connecting to IoTDB server, a basic look at IoT
#### JMX MBean Monitoring
By using JConsole tool and connecting with JMX you are provided with some system statistics and parameters.
This section describes how to use the JConsole ```Mbean``` tab to monitor the number of files opened by the IoTDB service process, the size of the data file, and so on. Once connected to JMX, you can find the ```MBean``` named ```org.apache.iotdb.service``` through the ```MBeans``` tab, as shown in the following Figure.
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/20263106/53316064-54aec080-3901-11e9-9a49-76563ac09192.png">
This section describes how to use the JConsole ```Mbean```tab of jconsole to monitor some system configurations of IoTDB, the statistics of writing, and so on. After connecting to JMX, you can find the "MBean" of "org.apache.iotdb.service", as shown in the figure below.
There are several attributes under Monitor, including the numbers of files opened in different folders, the data file size statistics and the values of some system parameters. By double-clicking the value corresponding to an attribute it also displays a line chart of that attribute. Currently, all the opened file count statistics are only supported on ```MacOS``` and most ```Linux``` distro except ```CentOS```. For the OS not supported these statistics returns ```-2```. See the following section for specific introduction of the Monitor attributes.
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/34242296/92922876-16d4a700-f469-11ea-874d-dcf58d5bb1b3.png"> <br>
##### MBean Monitor Attributes List
* DataSizeInByte
There are several attributes under monitor, including data file directory, the statistics of writing and the values of some system parameters. It can also display a line chart of the property by double clicking the value corresponding to the property. For a detailed description of monitor attributes, see the following sections.
|Name| DataSizeInByte |
|:---:|:---|
|Description| The total size of data file.|
|Unit| Byte |
|Type| Long |
* FileNodeNum
|Name| FileNodeNum |
|:---:|:---|
|Description| The count number of FileNode. (Currently not supported)|
|Type| Long |
##### MBean Monitor Attributes List
* OverflowCacheSize
* SystemDirectory
|Name| OverflowCacheSize |
|Name| SystemDirectory |
|:---:|:---|
|Description| The size of out-of-order data cache. (Currently not supported)|
|Unit| Byte |
|Type| Long |
|Description| The absolute directory of data system file. |
|Type| String |
* BufferWriteCacheSize
* DataSizeInByte
|Name| BufferWriteCacheSize |
|Name| DataSizeInByte |
|:---:|:---|
|Description| The size of BufferWriter cache. (Currently not supported)|
|Description| The total size of data file.|
|Unit| Byte |
|Type| Long |
* BaseDirectory
* EnableStatMonitor
|Name| BaseDirectory |
|Name| EnableStatMonitor |
|:---:|:---|
|Description| The absolute directory of data file. |
|Type| String |
* WriteAheadLogStatus
|Name| WriteAheadLogStatus |
|:---:|:---|
|Description| The status of write-ahead-log (WAL). ```True``` means WAL is enabled. |
|Description| If the monitor module is open |
|Type| Boolean |
* TotalOpenFileNum
### Data Monitoring
|Name| TotalOpenFileNum |
|:---:|:---|
|Description| All the opened file number of IoTDB server process. |
|Type| Int |
This module is for providing some statistics info about the writing operations:
* DeltaOpenFileNum
- the data size (in bytes) in IoTDB, the number of data points in IoTDB;
- how many operations are successful or failed executed.
|Name| DeltaOpenFileNum |
|:---:|:---|
|Description| The opened TsFile file number of IoTDB server process. |
|Default Directory| /data/data/settled |
|Type| Int |
#### Enable/disable the module
* WalOpenFileNum
Users can choose to enable or disable the feature of data statistics monitoring (set the `enable_stat_monitor` item in the configuration file).
|Name| WalOpenFileNum |
|:---:|:---|
|Description| The opened write-ahead-log file number of IoTDB server process. |
|Default Directory| /data/wal |
|Type| Int |
* MetadataOpenFileNum
|Name| MetadataOpenFileNum |
|:---:|:---|
|Description| The opened meta-data file number of IoTDB server process. |
|Default Directory| /data/system/schema |
|Type| Int |
* DigestOpenFileNum
|Name| DigestOpenFileNum |
|:---:|:---|
|Description| The opened info file number of IoTDB server process. |
|Default Directory| /data/system/info |
|Type| Int |
* SocketOpenFileNum
|Name| SocketOpenFileNum |
|:---:|:---|
|Description| The Socket link (TCP or UDP) number of the operation system. |
|Type| Int |
* MergePeriodInSecond
|Name| MergePeriodInSecond |
|:---:|:---|
|Description| The interval at which the IoTDB service process periodically triggers the merge process. |
|Unit| Second |
|Type| Long |
* ClosePeriodInSecond
|Name| ClosePeriodInSecond |
|:---:|:---|
|Description| The interval at which the IoTDB service process periodically flushes memory data to disk. |
|Unit| Second |
|Type| Long |
#### Statistics Data Storing
### Data Status Monitoring
By default, the statistics data is only saved in memory and can be accessed using Jconsole.
This module is the statistical monitoring method provided by IoTDB for users to store data information. The statistical data are recorded in the system and stored in the database. The current 0.8.0 version of IoTDB provides statistics for writing data.
The data can also be written as some time series on disk. To enable it, set `enable_monitor_series_write=true` in the configuration file. If so, using `select` statement in IoTDB-cli can query these time series.
The user can choose to enable or disable the data statistics monitoring function (set the `enable_stat_monitor` item in the configuration file).
> Note:
> if `enable_monitor_series_write=true`, when IoTDB is restarted, the previous statistics data will be recovered into memory.
> if `enable_monitor_series_write=false`, IoTDB will forget all statistics data after the instance is restarted.
#### Writing Data Monitor
The current statistics of writing data by the system can be divided into two major modules: **Global Writing Data Statistics** and **Storage Group Writing Data Statistics**. **Global Writing Data Statistics** records the point number written by the user and the number of requests. **Storage Group Writing Data Statistics** records data of a certain storage group.
At present, the monitor system can be divided into two modules: global writing statistics and storage group writing statistics. The global statistics records the number of total points and requests, and the storage group statistics counts the write data of each storage group.
The system defaults to collect data every 5 seconds, and writes the statistics to the IoTDB and stores them in a system-specified locate. (If you need to change the statistic frequency, you can set The `back_loop_period_in_second entry` in the configuration file, see Section [Engine Layer](../Server/Single%20Node%20Setup.md) for details). After the system is refreshed or restarted, IoTDB does not recover the statistics, and the statistics data will restart from zero.
The system sets the collection granularity of the monitoring module to **update the statistical information once one data file is flushed into the disk **, so the data accuracy may be different from the actual situation. To obtain accurate information, **Please call the `flush` method before querying **.
To avoid the excessive use of statistical information, a mechanism is set to periodically clear invalid data for statistical information. The system deletes invalid data at regular intervals. The user set the trigger frequency (`stat_monitor_retain_interval_in_second`, default is 600s, see section [Engine Layer](../Server/Single%20Node%20Setup.md) for details) to set the frequency of deleting data. By setting the valid data duration (`stat_monitor_detect_freq_in_second entry`, the default is 600s, see section [Engine Layer](../Server/Single%20Node%20Setup.md) for details) to set the time period of valid data, that is, the data within the time of the clear operation trigger time is stat_monitor_detect_freq_in_second is valid data. In order to ensure the stability of the system, it is not allowed to delete the statistics frequently. Therefore, if the configuration parameter time is less than the default value (600s), the system will abort the configuration parameter and use the default parameter.
Here are the writing data statistics (the range supported is shown in brackets):
It's convenient for you to use `select` clause to get the writing data statistics the same as other timeseries.
Here are the writing data statistics:
* TOTAL_POINTS (GLOABAL)
* TOTAL_POINTS (GLOBAL)
|Name| TOTAL\_POINTS |
|:---:|:---|
|Description| Calculate the global writing points number.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.global.TOTAL\_POINTS |
|Reset After Restarting System| yes |
|Example| select TOTAL_POINTS from root.stats.write.global|
|Description| Calculate the total number of global writing points. |
|Timeseries Name| root.stats.{"global" \|"storageGroupName"}.TOTAL\_POINTS |
* TOTAL\_REQ\_SUCCESS (GLOABAL)
* TOTAL\_REQ\_SUCCESS (GLOBAL)
|Name| TOTAL\_REQ\_SUCCESS |
|:---:|:---|
|Description| Calculate the global successful requests number.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.global.TOTAL\_REQ\_SUCCESS |
|Reset After Restarting System| yes |
|Example| select TOTAL\_REQ\_SUCCESS from root.stats.write.global|
|Description| Calculate the number of global successful requests. |
|Timeseries Name| root.stats."global".TOTAL\_REQ\_SUCCESS |
* TOTAL\_REQ\_FAIL (GLOABAL)
* TOTAL\_REQ\_FAIL (GLOBAL)
|Name| TOTAL\_REQ\_FAIL |
|:---:|:---|
|Description| Calculate the global failed requests number.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.global.TOTAL\_REQ\_FAIL |
|Reset After Restarting System| yes |
|Example| select TOTAL\_REQ\_FAIL from root.stats.write.global|
* TOTAL\_POINTS\_FAIL (GLOABAL)
|Name| TOTAL\_POINTS\_FAIL |
|:---:|:---|
|Description| Calculate the global failed writing points number.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.global.TOTAL\_POINTS\_FAIL |
|Reset After Restarting System| yes |
|Example| select TOTAL\_POINTS\_FAIL from root.stats.write.global|
|Description| Calculate the number of global failed requests. |
|Timeseries Name| root.stats."global".TOTAL\_REQ\_FAIL |
The above attributes also support visualization in JConsole. For the statistical information of each storage group, in order to avoid the display confusion caused by too many storage groups, the user can input the storage group name in the operation method under monitor MBean to query the corresponding statistical information.
* TOTAL\_POINTS\_SUCCESS (GLOABAL)
|Name| TOTAL\_POINTS\_SUCCESS |
|:---:|:---|
|Description| Calculate the c.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.global.TOTAL\_POINTS\_SUCCESS |
|Reset After Restarting System| yes |
|Example| select TOTAL\_POINTS\_SUCCESS from root.stats.write.global|
* TOTAL\_REQ\_SUCCESS (STORAGE GROUP)
|Name| TOTAL\_REQ\_SUCCESS |
|:---:|:---|
|Description| Calculate the successful requests number for specific storage group|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.\<storage\_group\_name\>.TOTAL\_REQ\_SUCCESS |
|Reset After Restarting System| yes |
|Example| select TOTAL\_REQ\_SUCCESS from root.stats.write.\<storage\_group\_name\>|
* TOTAL\_REQ\_FAIL (STORAGE GROUP)
|Name| TOTAL\_REQ\_FAIL |
|:---:|:---|
|Description| Calculate the fail requests number for specific storage group|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.\<storage\_group\_name\>.TOTAL\_REQ\_FAIL |
|Reset After Restarting System| yes |
|Example| select TOTAL\_REQ\_FAIL from root.stats.write.\<storage\_group\_name\>|
* TOTAL\_POINTS\_SUCCESS (STORAGE GROUP)
|Name| TOTAL\_POINTS\_SUCCESS |
|:---:|:---|
|Description| Calculate the successful writing points number for specific storage group.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.\<storage\_group\_name\>.TOTAL\_POINTS\_SUCCESS |
|Reset After Restarting System| yes |
|Example| select TOTAL\_POINTS\_SUCCESS from root.stats.write.\<storage\_group\_name\>|
* TOTAL\_POINTS\_FAIL (STORAGE GROUP)
|Name| TOTAL\_POINTS\_FAIL |
|:---:|:---|
|Description| Calculate the fail writing points number for specific storage group.|
|Type| Writing data statistics |
|Timeseries Name| root.stats.write.\<storage\_group\_name\>.TOTAL\_POINTS\_FAIL |
|Reset After Restarting System| yes |
|Example| select TOTAL\_POINTS\_FAIL from root.stats.write.\<storage\_group\_name\>|
> Note:
>
> \<storage\_group\_name\> should be replaced by real storage group name, and the '.' in storage group need to be replaced by '_'. For example, the storage group name is 'root.a.b', when using in the statistics, it will change to 'root\_a\_b'
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/34242296/92922942-34a20c00-f469-11ea-8dc2-8229d454583c.png">
##### Example
Here we give some example of using writing data statistics.
Here we give some examples of using writing data statistics.
To know the global successful writing points number, use `select` clause to query it's value. The query statement is:
To know the total number of global writing points, use `select` clause to query it's value. The query statement is:
```
select TOTAL_POINTS_SUCCESS from root.stats.write.global
```sql
select TOTAL_POINTS from root.stats."global"
```
To know the successfule writing points number of root.ln (storage group), the query statement is:
To know the total number of global writing points of root.ln (storage group), the query statement is:
```
select TOTAL_POINTS_SUCCESS from root.stats.write.root_ln
```sql
select TOTAL_POINTS from root.stats."root.ln"
```
To know the current timeseries point in the system, use `MAX_VALUE` function to query. Here is the query statement:
To know the latest statistics of the current system, you can use the latest data query. Here is the query statement:
```sql
flush
select last TOTAL_POINTS from root.stats."global"
```
select MAX_VALUE(TOTAL_POINTS_SUCCESS) from root.stats.write.root_ln
```
#### File Size Monitor
Sometimes we are concerned about how the data file size of IoTDB changes, maybe to help calculate how much disk space is left or the data ingestion speed. The File Size Monitor provides several statistics to show how different types of file-sizes change.
The file size monitor defaults to collect file size data every 5 seconds using the same shared parameter ```back_loop_period_in_second```,
Unlike Writing Data Monitor, currently File Size Monitor does not delete statistic data at regular intervals.
You can also use `select` clause to get the file size statistics like other time series.
Here are the file size statistics:
* DATA
|Name| DATA |
|:---:|:---|
|Description| Calculate the sum of all the files's sizes under the data directory (```data/data``` by default) in byte.|
|Type| File size statistics |
|Timeseries Name| root.stats.file\_size.DATA |
|Reset After Restarting System| No |
|Example| select DATA from root.stats.file\_size.DATA|
* SETTLED
|Name| SETTLED |
|:---:|:---|
|Description| Calculate the sum of all the ```TsFile``` size (under ```data/data/settled``` by default) in byte. If there are multiple ```TsFile``` directories like ```{data/data/settled1, data/data/settled2}```, this statistic is the sum of their size.|
|Type| File size statistics |
|Timeseries Name| root.stats.file\_size.SETTLED |
|Reset After Restarting System| No |
|Example| select SETTLED from root.stats.file\_size.SETTLED|
* OVERFLOW
|Name| OVERFLOW |
|:---:|:---|
|Description| Calculate the sum of all the ```out-of-order data file``` size (under ```data/data/unsequence``` by default) in byte.|
|Type| File size statistics |
|Timeseries Name| root.stats.file\_size.OVERFLOW |
|Reset After Restarting System| No |
|Example| select OVERFLOW from root.stats.file\_size.OVERFLOW|
* WAL
|Name| WAL |
|:---:|:---|
|Description| Calculate the sum of all the ```Write-Ahead-Log file``` size (under ```data/wal``` by default) in byte.|
|Type| File size statistics |
|Timeseries Name| root.stats.file\_size.WAL |
|Reset After Restarting System| No |
|Example| select WAL from root.stats.file\_size.WAL|
* INFO
|Name| INFO|
|:---:|:---|
|Description| Calculate the sum of all the ```.restore```, etc. file size (under ```data/system/info```) in byte.|
|Type| File size statistics |
|Timeseries Name| root.stats.file\_size.INFO |
|Reset After Restarting System| No |
|Example| select INFO from root.stats.file\_size.INFO|
* SCHEMA
|Name| SCHEMA |
|:---:|:---|
|Description| Calculate the sum of all the ```metadata file``` size (under ```data/system/metadata```) in byte.|
|Type| File size statistics |
|Timeseries Name| root.stats.file\_size.SCHEMA |
|Reset After Restarting System| No |
|Example| select SCHEMA from root.stats.file\_size.SCHEMA|
## Performance Monitor
......
......@@ -118,7 +118,7 @@ import -> Maven -> Existing Maven Projects
## 调试代码
* 服务器主函数:```server/src/main/java/org/apache/iotdb/db/service/IoTDB```,可以debug模式启动
* 客户端:```cli/src/main/java/org/apache/iotdb/cli/```,linux 用 Client,windows 用 WinClint,可以直接启动,需要参数"-h 127.0.0.1 -p 6667 -u root -pw root"
* 命令行界面:```cli/src/main/java/org/apache/iotdb/cli/```,linux 用 Cli,windows 用 WinCli,可以直接启动。启动时需要参数"-h 127.0.0.1 -p 6667 -u root -pw root"
* 服务器的 rpc 实现(主要用来客户端和服务器通信,一般在这里开始打断点):```server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl```
* jdbc所有语句:executeStatement(TSExecuteStatementReq req)
* jdbc查询语句:executeQueryStatement(TSExecuteStatementReq req)
......
......@@ -181,15 +181,6 @@
### 引擎层配置
* back\_loop\_period\_in\_second
|名字| back\_loop\_period\_in\_second |
|:---:|:---|
|描述| 系统统计量触发统计的频率,单位为秒。|
|类型|Int32|
|默认值| 5 |
|改后生效方式|重启服务器生效|
* data\_dirs
|名字| data\_dirs |
......@@ -397,26 +388,6 @@
|默认值| 0 |
|改后生效方式|重启服务器生效|
* stat\_monitor\_detect\_freq\_in\_second
|名字| stat\_monitor\_detect\_freq\_in\_second |
|:---:|:---|
|描述| 每隔一段时间(以秒为单位)检测当前记录统计量时间范围是否超过stat_monitor_retain_interval,并进行定时清理。|
|类型| Int32 |
|默认值|600 |
|改后生效方式|重启服务器生效|
* stat\_monitor\_retain\_interval\_in\_second
|名字| stat\_monitor\_retain\_interval\_in\_second |
|:---:|:---|
|描述| 系统统计信息的保留时间(以秒为单位),超过保留时间范围的统计数据将被定时清理。|
|类型| Int32 |
|默认值|600 |
|改后生效方式|重启服务器生效|
* tsfile\_storage\_fs
|名字| tsfile\_storage\_fs |
......
......@@ -370,24 +370,11 @@ enable_last_cache=true
### Statistics Monitor configuration
####################
# Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor that stores statistics info periodically.
# back_loop_period_sec decides the period when StatMonitor writes statistics info into IoTDB.
# stat_monitor_detect_freq_sec decides when IoTDB detects statistics info out-of-date.
# IoTDB just keeps statistics info within stat_monitor_retain_interval_sec seconds before current time.
# Note: IoTDB requires stat_monitor_detect_freq_sec >= 600s and stat_monitor_retain_interval_sec >= 600s.
# The monitor, which writes statistics info to IoTDB periodically, is disabled by default.
# Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor that stores statistics info.
enable_stat_monitor=false
# The period that StatMonitor stores statistics info, the time unit is seconds.
back_loop_period_in_second=5
# The interval at which StatMonitor starts to check whether statistics info can be deleted due to exceeding the retention volume.
# The time unit is seconds.
stat_monitor_detect_freq_in_second=600
# The minimum age of statistics storage information to be eligible for deletion due to age.
# The time unit is seconds.
stat_monitor_retain_interval_in_second=600
# Set enable_monitor_series_write true (or false) to enable (or disable) the writing monitor time series
enable_monitor_series_write=false
####################
### External sort Configuration
......
......@@ -397,24 +397,15 @@ public class IoTDBConfig {
*/
private boolean lastCacheEnable = true;
/**
* The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs. The default
* value is 5s.
*/
private int backLoopPeriodSec = 5;
/**
* Set true to enable statistics monitor service, false to disable statistics service.
*/
private boolean enableStatMonitor = false;
/**
* Set the time interval when StatMonitor performs delete detection. The default value is 600s.
*/
private int statMonitorDetectFreqSec = 60 * 10;
/**
* Set the maximum time to keep monitor statistics information in IoTDB. The default value is
* 600s.
* Set true to enable writing monitor time series.
*/
private int statMonitorRetainIntervalSec = 60 * 10;
private boolean enableMonitorSeriesWrite = false;
/**
* Cache size of {@code checkAndGetDataTypeCache} in {@link MManager}.
......@@ -1110,14 +1101,6 @@ public class IoTDBConfig {
this.tsFileSizeThreshold = tsFileSizeThreshold;
}
public int getBackLoopPeriodSec() {
return backLoopPeriodSec;
}
void setBackLoopPeriodSec(int backLoopPeriodSec) {
this.backLoopPeriodSec = backLoopPeriodSec;
}
public boolean isEnableStatMonitor() {
return enableStatMonitor;
}
......@@ -1126,28 +1109,20 @@ public class IoTDBConfig {
this.enableStatMonitor = enableStatMonitor;
}
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
public boolean isEnableMonitorSeriesWrite() {
return enableMonitorSeriesWrite;
}
void setRpcMaxConcurrentClientNum(int rpcMaxConcurrentClientNum) {
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
}
public int getStatMonitorDetectFreqSec() {
return statMonitorDetectFreqSec;
}
void setStatMonitorDetectFreqSec(int statMonitorDetectFreqSec) {
this.statMonitorDetectFreqSec = statMonitorDetectFreqSec;
public void setEnableMonitorSeriesWrite(boolean enableMonitorSeriesWrite) {
this.enableMonitorSeriesWrite = enableMonitorSeriesWrite;
}
public int getStatMonitorRetainIntervalSec() {
return statMonitorRetainIntervalSec;
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
void setStatMonitorRetainIntervalSec(int statMonitorRetainIntervalSec) {
this.statMonitorRetainIntervalSec = statMonitorRetainIntervalSec;
void setRpcMaxConcurrentClientNum(int rpcMaxConcurrentClientNum) {
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
}
public int getmManagerCacheSize() {
......
......@@ -151,29 +151,10 @@ public class IoTDBDescriptor {
conf.setEnableStatMonitor(Boolean
.parseBoolean(properties.getProperty("enable_stat_monitor",
Boolean.toString(conf.isEnableStatMonitor()))));
conf.setBackLoopPeriodSec(Integer
.parseInt(properties.getProperty("back_loop_period_in_second",
Integer.toString(conf.getBackLoopPeriodSec()))));
int statMonitorDetectFreqSec = Integer.parseInt(
properties.getProperty("stat_monitor_detect_freq_in_second",
Integer.toString(conf.getStatMonitorDetectFreqSec())));
int statMonitorRetainIntervalSec = Integer.parseInt(
properties.getProperty("stat_monitor_retain_interval_in_second",
Integer.toString(conf.getStatMonitorRetainIntervalSec())));
// the conf value must > default value, or may cause system unstable
if (conf.getStatMonitorDetectFreqSec() < statMonitorDetectFreqSec) {
conf.setStatMonitorDetectFreqSec(statMonitorDetectFreqSec);
} else {
logger.info("The stat_monitor_detect_freq_sec value is smaller than default,"
+ " use default value");
}
if (conf.getStatMonitorRetainIntervalSec() < statMonitorRetainIntervalSec) {
conf.setStatMonitorRetainIntervalSec(statMonitorRetainIntervalSec);
} else {
logger.info("The stat_monitor_retain_interval_sec value is smaller than default,"
+ " use default value");
}
conf.setEnableMonitorSeriesWrite(Boolean
.parseBoolean(properties.getProperty("enable_monitor_series_write",
Boolean.toString(conf.isEnableStatMonitor()))));
conf.setEnableMetricService(Boolean.parseBoolean(properties
.getProperty("enable_metric_service", Boolean.toString(conf.isEnableMetricService()))));
......
......@@ -68,6 +68,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
......@@ -372,6 +375,9 @@ public class StorageEngine implements IService {
// TODO monitor: update statistics
try {
storageGroupProcessor.insert(insertRowPlan);
if (config.isEnableStatMonitor()) {
updateMonitorStatistics(storageGroupProcessor, insertRowPlan);
}
} catch (WriteProcessException e) {
throw new StorageEngineException(e);
}
......@@ -392,6 +398,19 @@ public class StorageEngine implements IService {
// TODO monitor: update statistics
storageGroupProcessor.insertTablet(insertTabletPlan);
if (config.isEnableStatMonitor()) {
updateMonitorStatistics(storageGroupProcessor, insertTabletPlan);
}
}
private void updateMonitorStatistics(StorageGroupProcessor processor, InsertPlan insertPlan) {
StatMonitor monitor = StatMonitor.getInstance();
int successPointsNum =
insertPlan.getMeasurements().length - insertPlan.getFailedMeasurementNumber();
// update to storage group statistics
processor.updateMonitorSeriesValue(successPointsNum);
// update to global statistics
monitor.updateStatGlobalValue(successPointsNum);
}
/**
......
......@@ -20,10 +20,15 @@ package org.apache.iotdb.db.engine.flush;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.ServiceType;
......@@ -33,6 +38,7 @@ import org.slf4j.LoggerFactory;
public class FlushManager implements FlushManagerMBean, IService {
private static final Logger logger = LoggerFactory.getLogger(FlushManager.class);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private ConcurrentLinkedDeque<TsFileProcessor> tsFileProcessorQueue = new ConcurrentLinkedDeque<>();
......@@ -93,6 +99,14 @@ public class FlushManager implements FlushManagerMBean, IService {
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
}
registerTsFileProcessor(tsFileProcessor);
// update stat monitor cache to system during each flush()
if (config.isEnableStatMonitor() && config.isEnableMonitorSeriesWrite()) {
try {
StatMonitor.getInstance().saveStatValue(tsFileProcessor.getStorageGroupName());
} catch (StorageEngineException | MetadataException e) {
logger.error("Inserting monitor series data error.", e);
}
}
}
}
......
......@@ -120,6 +120,7 @@ import org.slf4j.LoggerFactory;
public class StorageGroupProcessor {
public static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to upgrade folder";
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
......@@ -131,7 +132,6 @@ public class StorageGroupProcessor {
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final boolean enableMemControl = config.isEnableMemControl();
/**
* indicating the file to be loaded already exists locally.
......@@ -242,6 +242,10 @@ public class StorageGroupProcessor {
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
/**
* value of root.stats."root.sg".TOTAL_POINTS
*/
private long monitorSeriesValue;
private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this);
public boolean isReady() {
......@@ -368,6 +372,19 @@ public class StorageGroupProcessor {
.putAll(endTimeMap);
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
}
public long getMonitorSeriesValue() {
return monitorSeriesValue;
}
public void setMonitorSeriesValue(long monitorSeriesValue) {
this.monitorSeriesValue = monitorSeriesValue;
}
public void updateMonitorSeriesValue(int successPointsNum) {
this.monitorSeriesValue += successPointsNum;
}
private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
......@@ -797,11 +814,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
* @param sequence whether is sequence
* @param start start index of rows to be inserted in insertTabletPlan
* @param end end index of rows to be inserted in insertTabletPlan
* @param results result array
* @param timePartitionId time partition id
* @param sequence whether is sequence
* @param start start index of rows to be inserted in insertTabletPlan
* @param end end index of rows to be inserted in insertTabletPlan
* @param results result array
* @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
......@@ -959,9 +976,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
* @param timeRangeId time partition range
* @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
* @param sequence whether is sequence or not
* @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
......@@ -2092,9 +2109,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
* @param tsfileName origin tsfile name
* @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
* 1]
* 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
......@@ -2158,8 +2175,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
......
......@@ -182,6 +182,7 @@ public class TsFileProcessor {
}
workMemTable.insert(insertRowPlan);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
getLogNode().write(insertRowPlan);
......@@ -242,6 +243,7 @@ public class TsFileProcessor {
tsFileResource
.updateStartTime(insertTabletPlan.getDeviceId().getFullPath(),
insertTabletPlan.getTimes()[start]);
//for sequence tsfile, we update the endTime only when the file is prepared to be closed.
//for unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
......
......@@ -496,7 +496,7 @@ public class MManager {
try {
List<PartialPath> allTimeseries = mtree.getAllTimeseriesPath(prefixPath);
// Monitor storage group seriesPath is not allowed to be deleted
allTimeseries.removeIf(p -> p.startsWith(MonitorConstants.getStatStorageGroupPrefixArray()));
allTimeseries.removeIf(p -> p.startsWith(MonitorConstants.STAT_STORAGE_GROUP_ARRAY));
Set<String> failedNames = new HashSet<>();
for (PartialPath p : allTimeseries) {
......
......@@ -182,6 +182,10 @@ public class PartialPath extends Path implements Comparable<Path> {
return true;
}
public boolean equals(String obj) {
return this.getFullPath().equals(obj);
}
@Override
public int hashCode() {
return this.getFullPath().hashCode();
......
......@@ -19,96 +19,36 @@
package org.apache.iotdb.db.monitor;
import java.io.File;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.monitor.collector.FileSize;
public class MonitorConstants {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
public static final String DATA_TYPE_INT64 = "INT64";
static final String STAT_STORAGE_GROUP_PREFIX = "root.stats";
private static final String[] STAT_STORAGE_GROUP_PREFIX_ARRAY = {"root", "stats"};
static final String FILENODE_PROCESSOR_CONST = "FILENODE_PROCESSOR_CONST";
private static final String FILENODE_MANAGER_CONST = "FILENODE_MANAGER_CONST";
static final String FILE_SIZE_CONST = "FILE_SIZE_CONST";
public static final String MONITOR_PATH_SEPARATOR = ".";
// statistic for file size statistic module
private static final String FILE_SIZE = "file_size";
public static final String FILE_SIZE_STORAGE_GROUP_NAME = STAT_STORAGE_GROUP_PREFIX
+ MONITOR_PATH_SEPARATOR + FILE_SIZE;
// statistic for insert module
private static final String FILE_NODE_MANAGER_PATH = "write.global";
public static final String FILE_NODE_PATH = "write";
/**
* Stat information.
*/
static final String STAT_STORAGE_DELTA_NAME = STAT_STORAGE_GROUP_PREFIX
+ MONITOR_PATH_SEPARATOR + FILE_NODE_MANAGER_PATH;
public static final String INT64 = "INT64";
public static final String PATH_SEPARATOR = ".";
// statistic for data inserting module
public static final String STAT_STORAGE_GROUP_NAME = "root.stats";
public static final String[] STAT_STORAGE_GROUP_ARRAY = {"root", "stats"};
public static final String[] STAT_GLOBAL_ARRAY = {"root", "stats", "\"global\""};
/**
* function for initializing stats values.
*
* @param constantsType produce initialization values for Statistics Params
* @return HashMap contains all the Statistics Params
* Stat information.
*/
static HashMap<String, AtomicLong> initValues(String constantsType) {
HashMap<String, AtomicLong> hashMap = new HashMap<>();
switch (constantsType) {
case FILENODE_PROCESSOR_CONST:
for (FileNodeProcessorStatConstants statConstant : FileNodeProcessorStatConstants
.values()) {
hashMap.put(statConstant.name(), new AtomicLong(0));
}
break;
case FILENODE_MANAGER_CONST:
hashMap = (HashMap<String, AtomicLong>) FileSize.getInstance().getStatParamsHashMap();
break;
case FILE_SIZE_CONST:
for (FileSizeConstants kinds : FileSizeConstants.values()) {
hashMap.put(kinds.name(), new AtomicLong(0));
}
break;
default:
public enum StatMeasurementConstants {
TOTAL_POINTS("TOTAL_POINTS"), TOTAL_REQ_SUCCESS("TOTAL_REQ_SUCCESS"),
TOTAL_REQ_FAIL("TOTAL_REQ_FAIL");
break;
StatMeasurementConstants(String measurement) {
this.measurement = measurement;
}
return hashMap;
}
public enum FileNodeManagerStatConstants {
TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
}
public static String[] getStatStorageGroupPrefixArray() {
return STAT_STORAGE_GROUP_PREFIX_ARRAY;
}
private String measurement;
public enum FileNodeProcessorStatConstants {
TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL
public String getMeasurement() {
return measurement;
}
}
public enum OsStatConstants {
NETWORK_REC, NETWORK_SEND, CPU_USAGE, MEM_USAGE, IOTDB_MEM_SIZE, DISK_USAGE, DISK_READ_SPEED,
DISK_WRITE_SPEED, DISK_TPS
}
public enum FileSizeConstants {
// TODO add multi data dir monitor
WAL(new File(config.getWalDir()).getAbsolutePath()),
SYS(new File(config.getSystemDir()).getAbsolutePath());
public String getPath() {
return path;
}
private String path;
FileSizeConstants(String path) {
this.path = path;
}
}
}
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -16,31 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
package org.apache.iotdb.db.monitor;
public interface MonitorMBean {
public interface StatMonitorMBean {
long getDataSizeInByte();
long getGlobalTotalPointsNum();
int getFileNodeNum();
long getGlobalReqSuccessNum();
long getOverflowCacheSize();
long getGlobalReqFailNum();
long getBufferWriteCacheSize();
long getStorageGroupTotalPointsNum(String storageGroupName);
String getSystemDirectory();
boolean getWriteAheadLogStatus();
int getTotalOpenFileNum();
int getDataOpenFileNum();
int getWalOpenFileNum();
int getDigestOpenFileNum();
long getDataSizeInByte();
int getMetadataOpenFileNum();
boolean getWriteAheadLogStatus();
int getSocketOpenFileNum();
boolean getEnableStatMonitor();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.monitor.collector;
import java.io.File;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is to collect some file size statistics.
*/
public class FileSize implements IStatistic {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(FileSize.class);
private static final long ABNORMAL_VALUE = -1L;
private static final long INIT_VALUE_IF_FILE_NOT_EXIST = 0L;
@Override
public Map<String, TSRecord> getAllStatisticsValue() {
long curTime = System.currentTimeMillis();
TSRecord tsRecord = StatMonitor
.convertToTSRecord(getStatParamsHashMap(), MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME,
curTime);
HashMap<String, TSRecord> ret = new HashMap<>();
ret.put(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, tsRecord);
return ret;
}
@Override
public void registerStatMetadata() {
Map<String, String> hashMap = new HashMap<>();
for (FileSizeConstants kind : FileSizeConstants.values()) {
String seriesPath = MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME
+ MonitorConstants.MONITOR_PATH_SEPARATOR
+ kind.name();
hashMap.put(seriesPath, MonitorConstants.DATA_TYPE_INT64);
}
StatMonitor.getInstance().registerStatStorageGroup(hashMap);
}
@Override
public List<String> getAllPathForStatistic() {
List<String> list = new ArrayList<>();
for (FileSizeConstants kind : MonitorConstants.FileSizeConstants.values()) {
list.add(
MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+ kind.name());
}
return list;
}
@Override
public Map<String, AtomicLong> getStatParamsHashMap() {
Map<FileSizeConstants, Long> fileSizeMap = getFileSizesInByte();
Map<String, AtomicLong> statParamsMap = new HashMap<>();
for (FileSizeConstants kind : MonitorConstants.FileSizeConstants.values()) {
statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind)));
}
return statParamsMap;
}
private static class FileSizeHolder {
private static final FileSize INSTANCE = new FileSize();
}
private FileSize() {
if (config.isEnableStatMonitor()) {
StatMonitor statMonitor = StatMonitor.getInstance();
registerStatMetadata();
statMonitor.registerStatistics(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, this);
}
}
public static FileSize getInstance() {
return FileSizeHolder.INSTANCE;
}
/**
* Return a map[FileSizeConstants, Long]. The key is the dir type and the value is the dir size in
* byte.
*
* @return a map[FileSizeConstants, Long] with the dir type and the dir size in byte
*/
public Map<FileSizeConstants, Long> getFileSizesInByte() {
EnumMap<FileSizeConstants, Long> fileSizes = new EnumMap<>(FileSizeConstants.class);
for (FileSizeConstants kinds : MonitorConstants.FileSizeConstants.values()) {
if (kinds.equals(FileSizeConstants.SYS)) {
fileSizes.put(kinds, collectSeqFileSize(fileSizes, kinds));
} else {
File file = SystemFileFactory.INSTANCE.getFile(kinds.getPath());
if (file.exists()) {
try {
fileSizes.put(kinds, FileUtils.sizeOfDirectory(file));
} catch (Exception e) {
logger.error("Meet error while trying to get {} size with dir {} .", kinds,
kinds.getPath(), e);
fileSizes.put(kinds, ABNORMAL_VALUE);
}
} else {
fileSizes.put(kinds, INIT_VALUE_IF_FILE_NOT_EXIST);
}
}
}
return fileSizes;
}
private long collectSeqFileSize(EnumMap<FileSizeConstants, Long> fileSizes,
FileSizeConstants kinds) {
long fileSize = INIT_VALUE_IF_FILE_NOT_EXIST;
for (String sequenceDir : config.getDataDirs()) {
if (sequenceDir.contains("unsequence")) {
continue;
}
File settledFile = SystemFileFactory.INSTANCE.getFile(sequenceDir);
if (settledFile.exists()) {
try {
fileSize += FileUtils.sizeOfDirectory(settledFile);
} catch (Exception e) {
logger.error("Meet error while trying to get {} size with dir {} .", kinds,
sequenceDir, e);
fileSizes.put(kinds, ABNORMAL_VALUE);
}
}
}
return fileSize;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.monitor.collector;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* this class
*/
public class MemTableWriteTimeCost {
public Map<String, Map<MemTableWriteTimeCostType, long[]>> getTimeCostMaps() {
return timeCostMaps;
}
private Map<String, Map<MemTableWriteTimeCostType, long[]>> timeCostMaps = new ConcurrentHashMap<>();
public static MemTableWriteTimeCost getInstance() {
return MemTableWriteTimeCostHolder.INSTANCE;
}
private static class MemTableWriteTimeCostHolder {
private static final MemTableWriteTimeCost INSTANCE = new MemTableWriteTimeCost();
}
private MemTableWriteTimeCost() {
}
public void init() {
if (timeCostMaps.get(Thread.currentThread().getName()) == null) {
Map<MemTableWriteTimeCostType, long[]> map = new ConcurrentHashMap<>();
for (MemTableWriteTimeCostType type : MemTableWriteTimeCostType.values()) {
map.put(type, new long[2]);
}
timeCostMaps.put(Thread.currentThread().getName(), map);
} else {
timeCostMaps.get(Thread.currentThread().getName()).clear();
for (MemTableWriteTimeCostType type : MemTableWriteTimeCostType.values()) {
timeCostMaps.get(Thread.currentThread().getName()).put(type, new long[2]);
}
}
}
public void measure(MemTableWriteTimeCostType type, long start) {
long elapse = System.currentTimeMillis() - start;
long[] a = new long[2];
// long[0] is the count, long[1] is the latency in ms
if(!timeCostMaps.containsKey(Thread.currentThread().getName()))
return;
a[0] = timeCostMaps.get(Thread.currentThread().getName()).get(type)[0] + 1;
a[1] = timeCostMaps.get(Thread.currentThread().getName()).get(type)[1] + elapse;
timeCostMaps.get(Thread.currentThread().getName()).put(type, a);
}
public enum MemTableWriteTimeCostType {
EXPAND_ARRAY_1,
EXPAND_ARRAY_2,
CAPACITY_1,
CAPACITY_2,
WRITE_1,
WRITE_2,
PUT_TIMESTAMP_1,
PUT_TIMESTAMP_2,
}
}
......@@ -66,6 +66,7 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
......@@ -77,6 +78,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
......@@ -894,8 +896,14 @@ public class PlanExecutor implements IPlanExecutor {
"failed to insert points " + insertRowPlan.getFailedMeasurements());
}
}
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} catch (Exception e) {
// update failed statistics
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().updateFailedStatValue();
}
if (e instanceof StorageEngineException || e instanceof MetadataException) {
throw new QueryProcessException((IoTDBException) e);
}
}
}
......@@ -910,8 +918,14 @@ public class PlanExecutor implements IPlanExecutor {
throw new StorageEngineException(
"failed to insert measurements " + insertTabletPlan.getFailedMeasurements());
}
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} catch (Exception e) {
// update failed statistics
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().updateFailedStatValue();
}
if (e instanceof StorageEngineException || e instanceof MetadataException) {
throw new QueryProcessException((IoTDBException) e);
}
}
}
......
......@@ -101,21 +101,12 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(JMXService.getInstance());
registerManager.register(FlushManager.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
registerManager.register(Monitor.getInstance());
registerManager.register(StatMonitor.getInstance());
registerManager.register(Measurement.INSTANCE);
registerManager.register(TVListAllocator.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
// When registering statMonitor, we should start recovering some statistics
// with latest values stored
// Warn: registMonitor() method should be called after systemDataRecovery()
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().recovery();
}
registerManager.register(RPCService.getInstance());
if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
registerManager.register(MetricsService.getInstance());
......@@ -136,6 +127,8 @@ public class IoTDB implements IoTDBMBean {
}
}
// Warn: registMonitor() method should be called after systemDataRecovery()
registerManager.register(StatMonitor.getInstance());
registerManager.register(SyncServerManager.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
registerManager.register(MergeManager.getINSTANCE());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.utils.OpenFileNumUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Monitor implements MonitorMBean, IService {
private static final Logger logger = LoggerFactory.getLogger(Monitor.class);
public static final Monitor INSTANCE = new Monitor();
public static Monitor getInstance() {
return INSTANCE;
}
private final String mbeanName = String
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@Override
public long getDataSizeInByte() {
try {
long totalSize = 0;
for (String dataDir : config.getDataDirs()) {
totalSize += FileUtils.sizeOfDirectory(SystemFileFactory.INSTANCE.getFile(dataDir));
}
return totalSize;
} catch (Exception e) {
logger.error("meet error while trying to get data size.", e);
return -1;
}
}
@Override
public int getFileNodeNum() {
// TODO Auto-generated method stub
return 0;
}
@Override
public long getOverflowCacheSize() {
// TODO Auto-generated method stub
return 0;
}
@Override
public long getBufferWriteCacheSize() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String getSystemDirectory() {
try {
File file = SystemFileFactory.INSTANCE.getFile(config.getSystemDir());
return file.getAbsolutePath();
} catch (Exception e) {
logger.error("meet error while trying to get base dir.", e);
return "Unavailable";
}
}
@Override
public boolean getWriteAheadLogStatus() {
return config.isEnableWal();
}
@Override
public int getTotalOpenFileNum() {
return OpenFileNumUtil.getInstance()
.get(OpenFileNumUtil.OpenFileNumStatistics.TOTAL_OPEN_FILE_NUM);
}
@Override
public int getDataOpenFileNum() {
return OpenFileNumUtil.getInstance()
.get(OpenFileNumUtil.OpenFileNumStatistics.SEQUENCE_FILE_OPEN_NUM);
}
@Override
public int getWalOpenFileNum() {
return OpenFileNumUtil.getInstance()
.get(OpenFileNumUtil.OpenFileNumStatistics.WAL_OPEN_FILE_NUM);
}
@Override
public int getMetadataOpenFileNum() {
// TODO
return 0;
}
@Override
public int getDigestOpenFileNum() {
return OpenFileNumUtil.getInstance()
.get(OpenFileNumUtil.OpenFileNumStatistics.DIGEST_OPEN_FILE_NUM);
}
@Override
public int getSocketOpenFileNum() {
return OpenFileNumUtil.getInstance()
.get(OpenFileNumUtil.OpenFileNumStatistics.SOCKET_OPEN_FILE_NUM);
}
@Override
public void start() throws StartupException {
try {
JMXService.registerMBean(INSTANCE, mbeanName);
} catch (Exception e) {
throw new StartupException(this.getID().getName(), e.getMessage());
}
}
@Override
public void stop() {
JMXService.deregisterMBean(mbeanName);
}
@Override
public ServiceType getID() {
return ServiceType.MONITOR_SERVICE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.monitor.collector;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class FileSizeTest {
private static final String TEST_FILE_CONTENT = "FileSize UT test file";
private static final String TEST_FILE_PATH =
FileSizeConstants.SYS.getPath() + File.separatorChar + "schemaFile";
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
}
@Ignore
@Test
public void testGetFileSizesInByte() {
boolean isWriteSuccess = true;
File testFile = new File(TEST_FILE_PATH);
if (testFile.exists()) {
try {
Files.delete(testFile.toPath());
} catch (IOException e) {
isWriteSuccess = false;
e.printStackTrace();
}
}
try {
if (!testFile.createNewFile()) {
isWriteSuccess = false;
}
} catch (IOException e) {
e.printStackTrace();
}
long dataSizeBefore = FileSize.getInstance().getFileSizesInByte().get(FileSizeConstants.SYS);
byte[] contentInBytes = TEST_FILE_CONTENT.getBytes();
// insert something into the test file under data dir
try (FileOutputStream fileOutputStream = new FileOutputStream(testFile)) {
fileOutputStream.write(contentInBytes);
fileOutputStream.flush();
} catch (IOException e) {
isWriteSuccess = false;
e.printStackTrace();
}
// calculate the delta of data dir file size
long dataSizeAfter = FileSize.getInstance().getFileSizesInByte().get(FileSizeConstants.SYS);
long deltaSize = dataSizeAfter - dataSizeBefore;
if (isWriteSuccess) {
//check if the the delta of data dir file size is equal to the written content size in byte
assertEquals(contentInBytes.length, deltaSize);
} else {
assertEquals(0, deltaSize);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册