提交 e60179df 编写于 作者: D dapan1121

Merge branch 'develop' into feature/TD-3295

......@@ -32,11 +32,10 @@ def abort_previous(){
milestone(buildNumber)
}
def pre_test(){
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
sudo rmtaos
'''
}
sh '''
sudo rmtaos || echo "taosd has not installed"
'''
sh '''
cd ${WKC}
......
[![Build Status](https://travis-ci.org/taosdata/TDengine.svg?branch=master)](https://travis-ci.org/taosdata/TDengine)
[![Build status](https://ci.appveyor.com/api/projects/status/kf3pwh2or5afsgl9/branch/master?svg=true)](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=develop)](https://coveralls.io/github/taosdata/TDengine?branch=develop)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4201/badge)](https://bestpractices.coreinfrastructure.org/projects/4201)
[![tdengine](https://snapcraft.io//tdengine/badge.svg)](https://snapcraft.io/tdengine)
[![TDengine](TDenginelogo.png)](https://www.taosdata.com)
简体中文 | [English](./README.md)
# TDengine 简介
TDengine是涛思数据专为物联网、车联网、工业互联网、IT运维等设计和优化的大数据平台。除核心的快10倍以上的时序数据库功能外,还提供缓存、数据订阅、流式计算等功能,最大程度减少研发和运维的复杂度,且核心代码,包括集群功能全部开源(开源协议,AGPL v3.0)。
- 10 倍以上性能提升。定义了创新的数据存储结构,单核每秒就能处理至少2万次请求,插入数百万个数据点,读出一千万以上数据点,比现有通用数据库快了十倍以上。
- 硬件或云服务成本降至1/5。由于超强性能,计算资源不到通用大数据方案的1/5;通过列式存储和先进的压缩算法,存储空间不到通用数据库的1/10。
- 全栈时序数据处理引擎。将数据库、消息队列、缓存、流式计算等功能融合一起,应用无需再集成Kafka/Redis/HBase/Spark等软件,大幅降低应用开发和维护成本。
- 强大的分析功能。无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过Shell/Python/R/Matlab随时进行。
- 与第三方工具无缝连接。不用一行代码,即可与Telegraf, Grafana, EMQ X, Prometheus, Matlab, R集成。后续还将支持MQTT, OPC, Hadoop,Spark等, BI工具也将无缝连接。
- 零运维成本、零学习成本。安装、集群一秒搞定,无需分库分表,实时备份。标准SQL,支持JDBC,RESTful,支持Python/Java/C/C++/Go/Node.JS, 与MySQL相似,零学习成本。
# 文档
TDengine是一个高效的存储、查询、分析时序大数据的平台,专为物联网、车联网、工业互联网、运维监测等优化而设计。您可以像使用关系型数据库MySQL一样来使用它,但建议您在使用前仔细阅读一遍下面的文档,特别是 [数据模型](https://www.taosdata.com/cn/documentation/architecture)[数据建模](https://www.taosdata.com/cn/documentation/model)。除本文档之外,欢迎 [下载产品白皮书](https://www.taosdata.com/downloads/TDengine%20White%20Paper.pdf)
# 生成
TDengine目前2.0版服务器仅能在Linux系统上安装和运行,后续会支持Windows、macOS等系统。客户端可以在Windows或Linux上安装和运行。任何OS的应用也可以选择RESTful接口连接服务器taosd。CPU支持X64/ARM64/MIPS64/Alpha64,后续会支持ARM32、RISC-V等CPU架构。用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。本快速指南仅适用于通过源码安装。
## 安装工具
### Ubuntu 16.04 及以上版本 & Debian:
```bash
sudo apt-get install -y gcc cmake build-essential git
```
### Ubuntu 14.04:
```bash
sudo apt-get install -y gcc cmake3 build-essential git binutils-2.26
export PATH=/usr/lib/binutils-2.26/bin:$PATH
```
编译或打包 JDBC 驱动源码,需安装 Java JDK 8 或以上版本和 Apache Maven 2.7 或以上版本。
安装 OpenJDK 8:
```bash
sudo apt-get install -y openjdk-8-jdk
```
安装 Apache Maven:
```bash
sudo apt-get install -y maven
```
### CentOS 7:
```bash
sudo yum install -y gcc gcc-c++ make cmake git
```
安装 OpenJDK 8:
```bash
sudo yum install -y java-1.8.0-openjdk
```
安装 Apache Maven:
```bash
sudo yum install -y maven
```
### CentOS 8 & Fedora:
```bash
sudo dnf install -y gcc gcc-c++ make cmake epel-release git
```
安装 OpenJDK 8:
```bash
sudo dnf install -y java-1.8.0-openjdk
```
安装 Apache Maven:
```bash
sudo dnf install -y maven
```
## 获取源码
首先,你需要从 GitHub 克隆源码:
```bash
git clone https://github.com/taosdata/TDengine.git
cd TDengine
```
Go 连接器和 Grafana 插件在其他独立仓库,如果安装它们的话,需要在 TDengine 目录下通过此命令安装:
```bash
git submodule update --init --recursive
```
## 生成 TDengine
### Linux 系统
```bash
mkdir debug && cd debug
cmake .. && cmake --build .
```
在X86-64、X86、arm64 和 arm32 平台上,TDengine 生成脚本可以自动检测机器架构。也可以手动配置 CPUTYPE 参数来指定 CPU 类型,如 aarch64 或 aarch32 等。
aarch64:
```bash
cmake .. -DCPUTYPE=aarch64 && cmake --build .
```
aarch32:
```bash
cmake .. -DCPUTYPE=aarch32 && cmake --build .
```
### Windows 系统
如果你使用的是 Visual Studio 2013 版本:
打开 cmd.exe,执行 vcvarsall.bat 时,为 64 位操作系统指定“x86_amd64”,为 32 位操作系统指定“x86”。
```bash
mkdir debug && cd debug
"C:\Program Files (x86)\Microsoft Visual Studio 12.0\VC\vcvarsall.bat" < x86_amd64 | x86 >
cmake .. -G "NMake Makefiles"
nmake
```
如果你使用的是 Visual Studio 2019 或 2017 版本:
打开cmd.exe,执行 vcvarsall.bat 时,为 64 位操作系统指定“x64”,为 32 位操作系统指定“x86”。
```bash
mkdir debug && cd debug
"c:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Auxiliary\Build\vcvarsall.bat" < x64 | x86 >
cmake .. -G "NMake Makefiles"
nmake
```
你也可以从开始菜单中找到"Visual Studio < 2019 | 2017 >"菜单项,根据你的系统选择"x64 Native Tools Command Prompt for VS < 2019 | 2017 >"或"x86 Native Tools Command Prompt for VS < 2019 | 2017 >",打开命令行窗口,执行:
```bash
mkdir debug && cd debug
cmake .. -G "NMake Makefiles"
nmake
```
### Mac OS X 系统
安装 Xcode 命令行工具和 cmake. 在 Catalina 和 Big Sur 操作系统上,需要安装 XCode 11.4+ 版本。
```bash
mkdir debug && cd debug
cmake .. && cmake --build .
```
# 安装
如果你不想安装,可以直接在shell中运行。生成完成后,安装 TDengine:
```bash
make install
```
用户可以在[文件目录结构](https://www.taosdata.com/cn/documentation/administrator#directories)中了解更多在操作系统中生成的目录或文件。
安装成功后,在终端中启动 TDengine 服务:
```bash
taosd
```
用户可以使用 TDengine Shell 来连接 TDengine 服务,在终端中,输入:
```bash
taos
```
如果 TDengine Shell 连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印出错误消息。
## 快速运行
TDengine 生成后,在终端执行以下命令:
```bash
./build/bin/taosd -c test/cfg
```
在另一个终端,使用 TDengine shell 连接服务器:
```bash
./build/bin/taos -c test/cfg
```
"-c test/cfg"指定系统配置文件所在目录。
# 体验 TDengine
在TDengine终端中,用户可以通过SQL命令来创建/删除数据库、表等,并进行插入查询操作。
```bash
create database demo;
use demo;
create table t (ts timestamp, speed int);
insert into t values ('2019-07-15 00:00:00', 10);
insert into t values ('2019-07-15 01:00:00', 20);
select * from t;
ts | speed |
===================================
19-07-15 00:00:00.000| 10|
19-07-15 01:00:00.000| 20|
Query OK, 2 row(s) in set (0.001700s)
```
# 应用开发
## 官方连接器
TDengine 提供了丰富的应用程序开发接口,其中包括C/C++、Java、Python、Go、Node.js、C# 、RESTful 等,便于用户快速开发应用:
- Java
- C/C++
- Python
- Go
- RESTful API
- Node.js
## 第三方连接器
TDengine 社区生态中也有一些非常友好的第三方连接器,可以通过以下链接访问它们的源码。
- [Rust Connector](https://github.com/taosdata/TDengine/tree/master/tests/examples/rust)
- [.Net Core Connector](https://github.com/maikebing/Maikebing.EntityFrameworkCore.Taos)
- [Lua Connector](https://github.com/taosdata/TDengine/tree/develop/tests/examples/lua)
# 运行和添加测试例
TDengine 的测试框架和所有测试例全部开源。
点击[这里](tests/How-To-Run-Test-And-How-To-Add-New-Test-Case.md),了解如何运行测试例和添加新的测试例。
# 成为社区贡献者
点击[这里](https://www.taosdata.com/cn/contributor/),了解如何成为 TDengine 的贡献者。
#加入技术交流群
TDengine官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小T为好友,即可入群。
......@@ -6,6 +6,8 @@
[![TDengine](TDenginelogo.png)](https://www.taosdata.com)
English | [简体中文](./README-CN.md)
# What is TDengine?
TDengine is an open-sourced big data platform under [GNU AGPL v3.0](http://www.gnu.org/licenses/agpl-3.0.html), designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. Besides the 10x faster time-series database, it provides caching, stream computing, message queuing and other functionalities to reduce the complexity and cost of development and operation.
......
......@@ -163,6 +163,7 @@ extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB;
extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB;
extern float tsUsedDataDirGB;
extern float tsMinimalLogDirGB;
extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB;
......
......@@ -210,6 +210,7 @@ float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0;
float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0;
float tsUsedDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 1.0f;
int32_t tsTotalMemoryMB = 0;
......
......@@ -187,6 +187,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x0392) //"Invalid topic name)
#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x0393) //"Invalid topic option)
#define TSDB_CODE_MND_INVALID_TOPIC_PARTITONS TAOS_DEF_ERROR_CODE(0, 0x0394) //"Invalid topic partitons num, valid range: [1, 1000])
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists)
// dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed")
......
......@@ -35,6 +35,7 @@ typedef struct {
// FS APIs ====================================
typedef struct {
int64_t tsize;
int64_t used;
int64_t avail;
} SFSMeta;
......@@ -90,4 +91,4 @@ void tfsClosedir(TDIR *tdir);
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -191,6 +191,7 @@ typedef struct SArguments_S {
bool answer_yes;
bool debug_print;
bool verbose_print;
bool performance_print;
char * output_file;
int mode;
char * datatype[MAX_NUM_DATATYPE + 1];
......@@ -440,7 +441,7 @@ typedef unsigned __int32 uint32_t;
static HANDLE g_stdoutHandle;
static DWORD g_consoleMode;
void setupForAnsiEscape(void) {
static void setupForAnsiEscape(void) {
DWORD mode = 0;
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
......@@ -462,7 +463,7 @@ void setupForAnsiEscape(void) {
}
}
void resetAfterAnsiEscape(void) {
static void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
......@@ -472,7 +473,7 @@ void resetAfterAnsiEscape(void) {
}
}
int taosRandom()
static int taosRandom()
{
int number;
rand_s(&number);
......@@ -480,14 +481,14 @@ int taosRandom()
return number;
}
#else
void setupForAnsiEscape(void) {}
static void setupForAnsiEscape(void) {}
void resetAfterAnsiEscape(void) {
static void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
}
int taosRandom()
static int taosRandom()
{
return random();
}
......@@ -526,6 +527,7 @@ SArguments g_args = {
false, // insert_only
false, // debug_print
false, // verbose_print
false, // performance statistic print
false, // answer_yes;
"./output.txt", // output_file
0, // mode : sync or async
......@@ -572,6 +574,10 @@ static FILE * g_fpOfInsertResult = NULL;
do { if (g_args.verbose_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
#define performancePrint(fmt, ...) \
do { if (g_args.performance_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
#define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
......@@ -580,7 +586,7 @@ static FILE * g_fpOfInsertResult = NULL;
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
void printHelp() {
static void printHelp() {
char indent[10] = " ";
printf("%s%s%s%s\n", indent, "-f", indent,
"The meta file to the execution procedure. Default is './meta.json'.");
......@@ -642,7 +648,7 @@ void printHelp() {
*/
}
void parse_args(int argc, char *argv[], SArguments *arguments) {
static void parse_args(int argc, char *argv[], SArguments *arguments) {
char **sptr;
wordexp_t full_path;
......@@ -746,6 +752,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->debug_print = true;
} else if (strcmp(argv[i], "-gg") == 0) {
arguments->verbose_print = true;
} else if (strcmp(argv[i], "-pp") == 0) {
arguments->performance_print = true;
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) {
......@@ -833,13 +841,13 @@ static bool getInfoFromJsonFile(char* file);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
static void init_rand_data();
void tmfclose(FILE *fp) {
static void tmfclose(FILE *fp) {
if (NULL != fp) {
fclose(fp);
}
}
void tmfree(char *buf) {
static void tmfree(char *buf) {
if (NULL != buf) {
free(buf);
}
......@@ -938,7 +946,7 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName)
taos_free_result(res);
}
double getCurrentTime() {
static double getCurrentTime() {
struct timeval tv;
if (gettimeofday(&tv, NULL) != 0) {
perror("Failed to get current time in ms");
......@@ -992,7 +1000,7 @@ static float rand_float(){
}
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
void rand_string(char *str, int size) {
static void rand_string(char *str, int size) {
str[0] = 0;
if (size > 0) {
//--size;
......@@ -2625,10 +2633,10 @@ static void createChildTables() {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s(60)", j, g_args.datatype[j]);
} else {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s", j, g_args.datatype[j]);
}
len = strlen(tblColsBuf);
......@@ -2787,20 +2795,6 @@ static int readSampleFromCsvFileToMem(
return 0;
}
/*
void readSampleFromFileToMem(SSuperTable * supterTblInfo) {
int ret;
if (0 == strncasecmp(supterTblInfo->sampleFormat, "csv", 3)) {
ret = readSampleFromCsvFileToMem(supterTblInfo);
} else if (0 == strncasecmp(supterTblInfo->sampleFormat, "json", 4)) {
ret = readSampleFromJsonFileToMem(supterTblInfo);
}
if (0 != ret) {
exit(-1);
}
}
*/
static bool getColumnAndTagTypeFromInsertJsonFile(
cJSON* stbInfo, SSuperTable* superTbls) {
bool ret = false;
......@@ -3976,10 +3970,6 @@ PARSE_OVER:
static void prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
// readSampleFromFileToMem(&g_Dbs.db[i].superTbls[j]);
//}
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
(void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]);
}
......@@ -4094,7 +4084,7 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
return dataLen;
}
int32_t generateData(char *res, char **data_type,
static int32_t generateData(char *res, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) {
memset(res, 0, MAX_DATA_SIZE);
char *pstr = res;
......@@ -4227,8 +4217,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
}
static int generateDataTail(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo,
SSuperTable* superTblInfo,
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
int batch, char* buffer, int64_t insertRows,
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
int len = 0;
......@@ -4254,7 +4243,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
retLen = getRowDataFromSample(
buffer + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource,
......@@ -4262,7 +4251,9 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime - taosRandom() % superTblInfo->disorderRange;
int64_t d = startTime
+ superTblInfo->timeStampStep * k
- taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData(
buffer + len,
superTblInfo->maxSqlLen - len,
......@@ -4272,7 +4263,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
retLen = generateRowData(
buffer + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
startTime + superTblInfo->timeStampStep * k,
superTblInfo);
}
}
......@@ -4328,7 +4319,8 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
return k;
}
static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer)
static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer)
{
int len;
if (superTblInfo) {
......@@ -4403,7 +4395,8 @@ static int generateDataBuffer(char *pTblName,
char *pstr = buffer;
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, buffer);
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo,
buffer);
pstr += headLen;
int k;
......@@ -4482,6 +4475,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int generatedRecPerTbl = 0;
bool flagSleep = true;
int sleepTimeTotal = 0;
int timeShift = 0;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs();
......@@ -4519,8 +4513,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, insertRows, 0,
startTime + sleepTimeTotal +
pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
startTime + timeShift + sleepTimeTotal,
&(pThreadInfo->samplePos), &dataLen);
pstr += dataLen;
recOfBatch += batchPerTbl;
......@@ -4529,6 +4522,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
timeShift ++;
tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
......@@ -4562,7 +4556,20 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, buffer);
startTs = taosGetTimestampUs();
int affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.6fms\n",
__func__, __LINE__, delay/1000.0);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
......@@ -4574,13 +4581,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->totalAffectedRows += affectedRows;
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
......@@ -4595,8 +4595,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (insert_interval > ((et - st)/1000) ) {
int sleepTime = insert_interval - (et -st)/1000;
// verbosePrint("%s() LN%d sleep: %d ms for insert interval\n",
// __func__, __LINE__, sleepTime);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval;
}
......@@ -4638,6 +4638,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int timeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
......@@ -4665,27 +4666,36 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
int generated = generateDataBuffer(tableName, tableSeq, pThreadInfo, buffer, insertRows,
i, start_time, &(pThreadInfo->samplePos));
int generated = generateDataBuffer(
tableName, tableSeq, pThreadInfo, buffer, insertRows,
i, start_time + pThreadInfo->totalInsertRows * timeStampStep,
&(pThreadInfo->samplePos));
if (generated > 0)
i += generated;
else
goto free_and_statistics_2;
int affectedRows = execInsert(pThreadInfo, buffer, generated);
if (affectedRows < 0)
goto free_and_statistics_2;
pThreadInfo->totalInsertRows += generated;
pThreadInfo->totalAffectedRows += affectedRows;
startTs = taosGetTimestampUs();
int affectedRows = execInsert(pThreadInfo, buffer, generated);
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.6fms\n",
__func__, __LINE__, delay/1000.0);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (affectedRows < 0)
goto free_and_statistics_2;
pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
......@@ -4703,7 +4713,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000;
verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
......@@ -4743,7 +4754,7 @@ static void* syncWrite(void *sarg) {
}
}
void callBack(void *param, TAOS_RES *res, int code) {
static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param;
SSuperTable* superTblInfo = winfo->superTblInfo;
......@@ -4802,7 +4813,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
taos_free_result(res);
}
void *asyncWrite(void *sarg) {
static void *asyncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
......@@ -5084,7 +5095,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
free(infos);
}
void *readTable(void *sarg) {
static void *readTable(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
......@@ -5155,7 +5166,7 @@ void *readTable(void *sarg) {
return NULL;
}
void *readMetric(void *sarg) {
static void *readMetric(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
......@@ -5318,7 +5329,7 @@ static int insertTestProcess() {
return 0;
}
void *superQueryProcess(void *sarg) {
static void *superQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
//char sqlStr[MAX_TB_NAME_SIZE*2];
......@@ -5477,7 +5488,11 @@ static int queryTestProcess() {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE)) {
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
} else {
t_info->taos = NULL;
}
......@@ -5583,7 +5598,7 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF
return tsub;
}
void *subSubscribeProcess(void *sarg) {
static void *subSubscribeProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
char subSqlstr[1024];
......@@ -5650,7 +5665,7 @@ void *subSubscribeProcess(void *sarg) {
return NULL;
}
void *superSubscribeProcess(void *sarg) {
static void *superSubscribeProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
char sqlStr[MAX_TB_NAME_SIZE*2];
......@@ -5747,22 +5762,27 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from super table
if (g_queryInfo.superQueryInfo.sqlCount > 0
&& g_queryInfo.superQueryInfo.concurrent > 0) {
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
if ((g_queryInfo.superQueryInfo.sqlCount <= 0) ||
(g_queryInfo.superQueryInfo.concurrent <= 0)) {
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount,
g_queryInfo.superQueryInfo.concurrent);
exit(-1);
}
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed for create threads\n");
taos_close(taos);
exit(-1);
}
}
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
t_info->taos = taos;
pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
}
}
//==== create sub threads for query from sub table
......@@ -5826,7 +5846,7 @@ static int subscribeTestProcess() {
return 0;
}
void initOfInsertMeta() {
static void initOfInsertMeta() {
memset(&g_Dbs, 0, sizeof(SDbs));
// set default values
......@@ -5838,7 +5858,7 @@ void initOfInsertMeta() {
g_Dbs.use_metric = true;
}
void initOfQueryMeta() {
static void initOfQueryMeta() {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
// set default values
......@@ -5848,7 +5868,7 @@ void initOfQueryMeta() {
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
}
void setParaFromArg(){
static void setParaFromArg(){
if (g_args.host) {
strcpy(g_Dbs.host, g_args.host);
} else {
......@@ -5989,7 +6009,7 @@ static int isCommentLine(char *line) {
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
}
void querySqlFile(TAOS* taos, char* sqlFile)
static void querySqlFile(TAOS* taos, char* sqlFile)
{
FILE *fp = fopen(sqlFile, "r");
if (fp == NULL) {
......@@ -6022,7 +6042,6 @@ void querySqlFile(TAOS* taos, char* sqlFile)
memcpy(cmd + cmd_len, line, read_len);
verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
queryDbExec(taos, cmd, NO_INSERT_TYPE);
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) {
printf("queryDbExec %s failed!\n", cmd);
tmfree(cmd);
......
......@@ -35,7 +35,7 @@
#include "mnodeSdb.h"
#define SDB_TABLE_LEN 12
#define MAX_QUEUED_MSG_NUM 10000
#define MAX_QUEUED_MSG_NUM 100000
typedef enum {
SDB_ACTION_INSERT = 0,
......
......@@ -23,6 +23,7 @@ extern "C" {
// TAOS_OS_FUNC_SYSINFO
typedef struct {
int64_t tsize;
int64_t used;
int64_t avail;
} SysDiskSize;
......
......@@ -138,6 +138,8 @@ void taosPrintOsInfo() {
// uInfo(" os streamMax: %" PRId64, tsStreamMax);
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
struct utsname buf;
......@@ -222,6 +224,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} else {
diskSize->tsize = info.f_blocks * info.f_frsize;
diskSize->avail = info.f_bavail * info.f_frsize;
diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize;
return 0;
}
}
......
......@@ -326,6 +326,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} else {
diskSize->tsize = info.f_blocks * info.f_frsize;
diskSize->avail = info.f_bavail * info.f_frsize;
diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize;
return 0;
}
}
......@@ -506,6 +507,8 @@ void taosPrintOsInfo() {
uInfo(" os streamMax: %" PRId64, tsStreamMax);
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
struct utsname buf;
......
......@@ -136,7 +136,8 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
(PULARGE_INTEGER)&i64FreeBytes);
if (fResult) {
diskSize->tsize = (int64_t)(i64TotalBytes);
diskSize->avail = (int64_t)(i64FreeBytes);
diskSize->avail = (int64_t)(i64FreeBytesToCaller);
diskSize->used = (int64_t)(i64TotalBytes - i64FreeBytes);
return 0;
} else {
uError("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
......@@ -205,6 +206,8 @@ void taosGetSystemInfo() {
void taosPrintOsInfo() {
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
uInfo("==================================");
}
......
......@@ -292,7 +292,7 @@ static int32_t monBuildCpuSql(char *sql) {
// unit is GB
static int32_t monBuildDiskSql(char *sql) {
return sprintf(sql, ", %f, %d", (tsTotalDataDirGB - tsAvailDataDirGB), (int32_t)tsTotalDataDirGB);
return sprintf(sql, ", %f, %d", tsUsedDataDirGB, (int32_t)tsTotalDataDirGB);
}
// unit is Kb
......
......@@ -41,6 +41,7 @@ extern int fsDebugFlag;
// tdisk.c ======================================================
typedef struct {
int64_t size;
int64_t used;
int64_t free;
} SDiskMeta;
......@@ -56,6 +57,7 @@ typedef struct SDisk {
#define DISK_DIR(pd) ((pd)->dir)
#define DISK_META(pd) ((pd)->dmeta)
#define DISK_SIZE(pd) ((pd)->dmeta.size)
#define DISK_USED_SIZE(pd) ((pd)->dmeta.used)
#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free)
SDisk *tfsNewDisk(int level, int id, const char *dir);
......@@ -65,6 +67,7 @@ int tfsUpdateDiskInfo(SDisk *pDisk);
// ttier.c ======================================================
typedef struct {
int64_t size;
int64_t used;
int64_t free;
int16_t nAvailDisks; // # of Available disks
} STierMeta;
......@@ -96,4 +99,4 @@ void tfsPosNextId(STier *pTier);
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -52,6 +52,7 @@ int tfsUpdateDiskInfo(SDisk *pDisk) {
}
pDisk->dmeta.size = diskSize.tsize;
pDisk->dmeta.used = diskSize.used;
pDisk->dmeta.free = diskSize.avail;
return code;
......
......@@ -134,6 +134,7 @@ void tfsUpdateInfo(SFSMeta *pFSMeta) {
tfsUpdateTierInfo(pTier, &tierMeta);
pFSMeta->tsize += tierMeta.size;
pFSMeta->avail += tierMeta.free;
pFSMeta->used += tierMeta.used;
}
tfsLock();
......@@ -585,6 +586,7 @@ void taosGetDisk() {
if (tscEmbedded) {
tfsUpdateInfo(&fsMeta);
tsTotalDataDirGB = (float)(fsMeta.tsize / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
}
......
......@@ -100,6 +100,7 @@ void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) {
continue;
}
pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id));
pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id));
pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id));
pTierMeta->nAvailDisks++;
}
......@@ -166,4 +167,4 @@ void tfsPosNextId(STier *pTier) {
}
pTier->nextid = nextid;
}
\ No newline at end of file
}
......@@ -842,6 +842,10 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
return INT32_MIN;
}
if (key < 0) {
key -= (daysPerFile * tsMsPerDay[precision]);
}
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
fid = INT32_MIN;
......
......@@ -199,6 +199,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_KEEP, "Invalid database opti
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Invalid topic option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_PARTITONS, "Invalid topic partitons num, valid range: [1, 1000]")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed")
......
......@@ -24,7 +24,7 @@
#include "dnode.h"
#include "vnodeStatus.h"
#define MAX_QUEUED_MSG_NUM 10000
#define MAX_QUEUED_MSG_NUM 100000
extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
......
......@@ -275,7 +275,7 @@ python3 ./test.py -f functions/function_twa.py -r 1
python3 ./test.py -f functions/function_twa_test2.py
python3 ./test.py -f functions/function_stddev_td2555.py
python3 ./test.py -f insert/metadataUpdate.py
#python3 ./test.py -f query/last_cache.py
python3 ./test.py -f query/last_cache.py
python3 ./test.py -f query/last_row_cache.py
python3 ./test.py -f account/account_create.py
python3 ./test.py -f alter/alter_table.py
......
......@@ -5,11 +5,13 @@
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count": 2,
"num_of_records_per_req": 10,
"thread_count_create_tbl": 4,
"databases": [{
"dbinfo": {
"name": "db01",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
......@@ -20,31 +22,23 @@
},
"super_tables": [{
"name": "stb01",
"childtable_count": 100,
"childtable_count": 3,
"childtable_prefix": "stb01_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 1000,
"insert_rows": 20,
"timestamp_step": 1000,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "/home/data/sample.csv",
"tags_file": "",
"columns": [{
"type": "SMALLINT"
}, {
"type": "BOOL"
}, {
"type": "BINARY",
"len": 6
"type": "INT"
}],
"tags": [{
"type": "INT"
},{
"type": "BINARY",
"len": 4
}]
}]
}]
......
......@@ -63,7 +63,7 @@ class TDTestCase:
tdSql.checkRows(2)
tdSql.query(
"select apercentile(col1, 1) from test.meters interval(10s)")
"select apercentile(col1, 1) from test.meters interval(100s)")
tdSql.checkRows(1)
tdSql.error("select loc, count(loc) from test.meters")
......
......@@ -24,9 +24,6 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......@@ -55,7 +52,7 @@ class TDTestCase:
tdSql.execute("use db01")
tdSql.query("select count(*) from stb01")
tdSql.checkData(0, 0, 100000)
tdSql.checkData(0, 0, 60)
def stop(self):
tdSql.close()
......
......@@ -842,4 +842,28 @@ if $rows != 10 then
return -1
endi
sql drop topic t1
print ============== create same name topic
sql create database d2
sql create topic t2
sql_error create topic d2
sql_error create topic if not exists d2
sql_error create topic t2
sql create topic if not exists t2
sql_error create topic t2 partitions 5;
sql_error create topic t2 partitions 6;
sql_error create topic t2 partitions 3;
sql_error alter topic t3 partitions 1
sql_error alter topic d2 partitions 1
sql_error alter topic t2 partitions 0
sql_error alter topic t2 partitions 10000
sql_error drop topic d2
sql_error drop topic d3
sql drop database d2
sql drop database t2
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册