提交 d4a5104c 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/wal

......@@ -4,16 +4,99 @@
### 物联网典型场景
在典型的物联网、车联网、运维监测场景中,往往有多种不同类型的数据采集设备,采集一个到多个不同的物理量。而同一种采集设备类型,往往又有多个具体的采集设备分布在不同的地点。大数据处理系统就是要将各种采集的数据汇总,然后进行计算和分析。对于同一类设备,其采集的数据都是很规则的。以智能电表为例,假设每个智能电表采集电流、电压、相位三个量,其采集的数据类似如下的表格:
| Device ID | Time Stamp | current | voltage | phase | location | groupId |
| :-------: | :-----------: | :-----: | :-----: | :---: | :--------------: | :-----: |
| d1001 | 1538548685000 | 10.3 | 219 | 0.31 | Beijing.Chaoyang | 2 |
| d1002 | 1538548684000 | 10.2 | 220 | 0.23 | Beijing.Chaoyang | 3 |
| d1003 | 1538548686500 | 11.5 | 221 | 0.35 | Beijing.Haidian | 3 |
| d1004 | 1538548685500 | 13.4 | 223 | 0.29 | Beijing.Haidian | 2 |
| d1001 | 1538548695000 | 12.6 | 218 | 0.33 | Beijing.Chaoyang | 2 |
| d1004 | 1538548696600 | 11.8 | 221 | 0.28 | Beijing.Haidian | 2 |
| d1002 | 1538548696650 | 10.3 | 218 | 0.25 | Beijing.Chaoyang | 3 |
| d1001 | 1538548696800 | 12.3 | 221 | 0.31 | Beijing.Chaoyang | 2 |
<figure><table>
<thead><tr>
<th style="text-align:center;">设备ID</th>
<th style="text-align:center;">时间戳</th>
<th style="text-align:center;" colspan="3">采集量</th>
<th style="text-align:center;" colspan="2">标签</th>
</tr>
<tr>
<th style="text-align:center;">Device ID</th>
<th style="text-align:center;">Time Stamp</th>
<th style="text-align:center;">current</th>
<th style="text-align:center;">voltage</th>
<th style="text-align:center;">phase</th>
<th style="text-align:center;">location</th>
<th style="text-align:center;">groupId</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align:center;">d1001</td>
<td style="text-align:center;">1538548685000</td>
<td style="text-align:center;">10.3</td>
<td style="text-align:center;">219</td>
<td style="text-align:center;">0.31</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1002</td>
<td style="text-align:center;">1538548684000</td>
<td style="text-align:center;">10.2</td>
<td style="text-align:center;">220</td>
<td style="text-align:center;">0.23</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">3</td>
</tr>
<tr>
<td style="text-align:center;">d1003</td>
<td style="text-align:center;">1538548686500</td>
<td style="text-align:center;">11.5</td>
<td style="text-align:center;">221</td>
<td style="text-align:center;">0.35</td>
<td style="text-align:center;">Beijing.Haidian</td>
<td style="text-align:center;">3</td>
</tr>
<tr>
<td style="text-align:center;">d1004</td>
<td style="text-align:center;">1538548685500</td>
<td style="text-align:center;">13.4</td>
<td style="text-align:center;">223</td>
<td style="text-align:center;">0.29</td>
<td style="text-align:center;">Beijing.Haidian</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1001</td>
<td style="text-align:center;">1538548695000</td>
<td style="text-align:center;">12.6</td>
<td style="text-align:center;">218</td>
<td style="text-align:center;">0.33</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1004</td>
<td style="text-align:center;">1538548696600</td>
<td style="text-align:center;">11.8</td>
<td style="text-align:center;">221</td>
<td style="text-align:center;">0.28</td>
<td style="text-align:center;">Beijing.Haidian</td>
<td style="text-align:center;">2</td>
</tr>
<tr>
<td style="text-align:center;">d1002</td>
<td style="text-align:center;">1538548696650</td>
<td style="text-align:center;">10.3</td>
<td style="text-align:center;">218</td>
<td style="text-align:center;">0.25</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">3</td>
</tr>
<tr>
<td style="text-align:center;">d1001</td>
<td style="text-align:center;">1538548696800</td>
<td style="text-align:center;">12.3</td>
<td style="text-align:center;">221</td>
<td style="text-align:center;">0.31</td>
<td style="text-align:center;">Beijing.Chaoyang</td>
<td style="text-align:center;">2</td>
</tr>
</tbody>
</table></figure>
<center> 表1:智能电表数据示例</center>
......
......@@ -1238,8 +1238,7 @@ void tscColumnListDestroy(SArray* pColumnList) {
*
*/
static int32_t validateQuoteToken(SStrToken* pToken) {
strdequote(pToken->z);
pToken->n = (uint32_t)strtrim(pToken->z);
tscDequoteAndTrimToken(pToken);
int32_t k = tSQLGetToken(pToken->z, &pToken->type);
......@@ -1254,8 +1253,6 @@ static int32_t validateQuoteToken(SStrToken* pToken) {
}
void tscDequoteAndTrimToken(SStrToken* pToken) {
assert(pToken->type == TK_STRING);
uint32_t first = 0, last = pToken->n;
// trim leading spaces
......@@ -1367,7 +1364,8 @@ int32_t tscValidateName(SStrToken* pToken) {
} else {
pStr[firstPartLen] = TS_PATH_DELIMITER[0];
memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
pStr[firstPartLen + sizeof(TS_PATH_DELIMITER[0]) + pToken->n] = 0;
uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1));
memset(pToken->z + pToken->n - offset, ' ', offset);
}
pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0]));
pToken->z = pStr;
......
......@@ -475,6 +475,7 @@ typedef struct {
tsem_t mutex_sem;
int notFinished;
tsem_t lock_sem;
int counter;
} info;
typedef struct {
......@@ -766,6 +767,7 @@ int main(int argc, char *argv[]) {
t_info->data_of_rate = rate;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->counter = 0;
tsem_init(&(t_info->mutex_sem), 0, 1);
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
......@@ -788,14 +790,14 @@ int main(int argc, char *argv[]) {
printf("ASYNC Insert with %d connections:\n", threads);
}
fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n",
ntables * nrecords_per_table, ntables * nrecords_per_table / t,
(ntables * nrecords_per_table) / (t * nrecords_per_request),
fprintf(fp, "|%"PRIu64" | %10.2f | %10.2f | %10.4f |\n\n",
(int64_t)ntables * nrecords_per_table, ntables * nrecords_per_table / t,
((int64_t)ntables * nrecords_per_table) / (t * nrecords_per_request),
t * 1000);
printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n",
t, (long long int)ntables * nrecords_per_table, nrecords_per_request,
((long long int)ntables * nrecords_per_table) / t);
printf("Spent %.4f seconds to insert %"PRIu64" records with %d record(s) per request: %.2f records/second\n",
t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
(int64_t)ntables * nrecords_per_table / t);
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
......@@ -879,6 +881,7 @@ int main(int argc, char *argv[]) {
taos_close(rInfo->taos);
}
taos_cleanup();
return 0;
}
......@@ -1283,68 +1286,39 @@ void *syncWrite(void *sarg) {
void *asyncWrite(void *sarg) {
info *winfo = (info *)sarg;
sTable *tb_infos = (sTable *)malloc(sizeof(sTable) * (winfo->end_table_id - winfo->start_table_id + 1));
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
sTable *tb_info = tb_infos + tID - winfo->start_table_id;
tb_info->data_type = winfo->datatype;
tb_info->ncols_per_record = winfo->ncols_per_record;
tb_info->taos = winfo->taos;
sprintf(tb_info->tb_name, "%s.%s%d", winfo->db_name, winfo->tb_prefix, tID);
tb_info->timestamp = winfo->start_time;
tb_info->counter = 0;
tb_info->target = winfo->nrecords_per_table;
tb_info->len_of_binary = winfo->len_of_binary;
tb_info->nrecords_per_request = winfo->nrecords_per_request;
tb_info->mutex_sem = &(winfo->mutex_sem);
tb_info->notFinished = &(winfo->notFinished);
tb_info->lock_sem = &(winfo->lock_sem);
tb_info->data_of_order = winfo->data_of_order;
tb_info->data_of_rate = winfo->data_of_rate;
/* char buff[BUFFER_SIZE] = "\0"; */
/* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */
/* queryDB(tb_info->taos,buff); */
taos_query_a(winfo->taos, "show databases", callBack, tb_info);
}
taos_query_a(winfo->taos, "show databases", callBack, winfo);
tsem_wait(&(winfo->lock_sem));
free(tb_infos);
return NULL;
}
void callBack(void *param, TAOS_RES *res, int code) {
sTable *tb_info = (sTable *)param;
char **datatype = tb_info->data_type;
int ncols_per_record = tb_info->ncols_per_record;
int len_of_binary = tb_info->len_of_binary;
int64_t tmp_time = tb_info->timestamp;
if (code < 0) {
fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res));
exit(EXIT_FAILURE);
}
info* winfo = (info*)param;
char **datatype = winfo->datatype;
int ncols_per_record = winfo->ncols_per_record;
int len_of_binary = winfo->len_of_binary;
// If finished;
if (tb_info->counter >= tb_info->target) {
tsem_wait(tb_info->mutex_sem);
(*(tb_info->notFinished))--;
if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem);
tsem_post(tb_info->mutex_sem);
int64_t tmp_time = winfo->start_time;
char *buffer = calloc(1, BUFFER_SIZE);
char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
if (winfo->counter >= winfo->nrecords_per_table) {
winfo->start_table_id++;
winfo->counter = 0;
}
if (winfo->start_table_id > winfo->end_table_id) {
tsem_post(&winfo->lock_sem);
free(buffer);
free(data);
taos_free_result(res);
return;
}
char buffer[BUFFER_SIZE] = "\0";
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name);
for (int i = 0; i < tb_info->nrecords_per_request; i++) {
for (int i = 0; i < winfo->nrecords_per_request; i++) {
int rand_num = rand() % 100;
if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate)
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate)
{
int64_t d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, datatype, ncols_per_record, d, len_of_binary);
......@@ -1353,15 +1327,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
}
pstr += sprintf(pstr, "%s", data);
tb_info->counter++;
winfo->counter++;
if (tb_info->counter >= tb_info->target) {
if (winfo->counter >= winfo->nrecords_per_table) {
break;
}
}
tb_info->timestamp = tmp_time;
taos_query_a(tb_info->taos, buffer, callBack, tb_info);
taos_query_a(winfo->taos, buffer, callBack, winfo);
free(buffer);
free(data);
taos_free_result(res);
}
......
......@@ -2348,7 +2348,8 @@ void filterPrepare(void* expr, void* param) {
if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes;
}
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
// to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
tVariantDump(pCond, pInfo->q, pSchema->type, true);
}
}
......
......@@ -68,6 +68,7 @@ static void queryDB(TAOS *taos, char *command) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
taos_cleanup();
exit(EXIT_FAILURE);
}
......@@ -176,6 +177,7 @@ void taos_error(TAOS *con)
{
fprintf(stderr, "TDengine error: %s\n", taos_errstr(con));
taos_close(con);
taos_cleanup();
exit(1);
}
......@@ -211,6 +213,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
}
}
taos_free_result(tres);
}
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
......@@ -222,7 +226,7 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
for (int i = 0; i<numOfRows; ++i) {
// synchronous API to retrieve a row from batch of records
/*TAOS_ROW row = */taos_fetch_row(tres);
/*TAOS_ROW row = */(void)taos_fetch_row(tres);
// process row
}
......@@ -236,7 +240,7 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
if (numOfRows < 0)
printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
taos_free_result(tres);
//taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesProcessed++;
......@@ -246,6 +250,8 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
}
}
taos_free_result(tres);
}
void taos_select_call_back(void *param, TAOS_RES *tres, int code)
......@@ -261,6 +267,10 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
}
else {
printf("%s select failed, code:%d\n", pTable->name, code);
taos_free_result(tres);
taos_cleanup();
exit(1);
}
taos_free_result(tres);
}
......@@ -154,7 +154,7 @@ python3 ./test.py -f query/queryConnection.py
python3 ./test.py -f query/queryCountCSVData.py
python3 ./test.py -f query/natualInterval.py
python3 ./test.py -f query/bug1471.py
python3 ./test.py -f query/dataLossTest.py
#python3 ./test.py -f query/dataLossTest.py
#stream
python3 ./test.py -f stream/metric_1.py
......
{
"filetype":"insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 1,
"databases": [{
"dbinfo": {
"name": "db01",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"update": 0,
"maxtablesPerVnode": 1000
},
"super_tables": [{
"name": "stb01",
"childtable_count": 100,
"childtable_prefix": "stb01_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 1000,
"timestamp_step": 1000,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "/home/data/sample.csv",
"tags_file": "",
"columns": [{
"type": "SMALLINT"
}, {
"type": "BOOL"
}, {
"type": "BINARY",
"len": 6
}],
"tags": [{
"type": "INT"
},{
"type": "BINARY",
"len": 4
}]
}]
}]
}
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("yes | %slowa -f tools/insert.json" % binPath)
tdSql.execute("use db01")
tdSql.query("select count(*) from stb01")
tdSql.checkData(0, 0, 100000)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册