未验证 提交 5834f5f2 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7670 from taosdata/hotfix/td-6396

[TD-6396]<hotfix>:modify test script line that generate stb with more cloumns
......@@ -32,6 +32,8 @@ typedef struct {
uint8_t type;
int16_t length;
char* value;
uint32_t fieldSchemaIdx;
} TAOS_SML_KV;
typedef struct {
......@@ -44,6 +46,8 @@ typedef struct {
// first kv must be timestamp
TAOS_SML_KV* fields;
int32_t fieldNum;
uint32_t schemaIdx;
} TAOS_SML_DATA_POINT;
typedef enum {
......@@ -56,7 +60,6 @@ typedef enum {
typedef struct {
uint64_t id;
SHashObj* smlDataToSchema;
} SSmlLinesInfo;
//=================================================================================================
......@@ -175,8 +178,7 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
}
uintptr_t valPointer = (uintptr_t)smlKv;
taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx));
smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
return 0;
}
......@@ -270,8 +272,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
}
uintptr_t valPointer = (uintptr_t)point;
taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx));
point->schemaIdx = (uint32_t)stableIdx;
}
size_t numStables = taosArrayGetSize(stableSchemas);
......@@ -598,19 +599,19 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pSql->cmd.payload, "table name is invalid");
tscFreeRegisteredSqlObj(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
SName sname = {0};
if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
tscFreeRegisteredSqlObj(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
memset(fullTableName, 0, tListLen(fullTableName));
tNameExtractFullName(&sname, fullTableName);
tscFreeRegisteredSqlObj(pSql);
taosReleaseRef(tscObjRef, pSql->self);
size_t size = 0;
taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
......@@ -884,19 +885,20 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind));
size_t rows = taosArrayGetSize(rowsBind);
size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5;
size_t batchSize = MIN(maxBatchSize, rows);
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
info->id, cTableName, rows, batchSize);
SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
for (int i=0; i<rows;) {
for (int i = 0; i < rows;) {
int j = i;
for (; j < i + batchSize && j<rows; ++j) {
taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
}
if (j>=i) {
if (j > i) {
tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
code = doInsertChildTableWithStmt(taos, sql, cTableName, batchBind, info);
if (code != 0) {
taosArrayDestroy(batchBind);
......@@ -916,10 +918,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
uintptr_t valPointer = (uintptr_t)point;
size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pSchemaIndex != NULL);
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
......@@ -963,10 +962,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
tagKVs[*pFieldSchemaIdx] = kv;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
......@@ -980,10 +976,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx);
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -1026,10 +1019,7 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema,
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = colBinds + *pFieldSchemaIdx;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -1067,10 +1057,7 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
uintptr_t valPointer = (uintptr_t)point;
size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pSchemaIndex != NULL);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
......@@ -1113,7 +1100,6 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
int32_t code = TSDB_CODE_SUCCESS;
info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false);
tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
......@@ -1143,11 +1129,10 @@ clean_up:
taosArrayDestroy(schema->tags);
}
taosArrayDestroy(stableSchemas);
taosHashCleanup(info->smlDataToSchema);
return code;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
int code = tscSmlInsert(taos, points, numPoint, info);
......
......@@ -1567,8 +1567,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes->qId = 0;
pRes->numOfRows = 1;
registerSqlObj(pSql);
strtolower(pSql->sqlstr, sql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
......
......@@ -8,8 +8,9 @@
#include <time.h>
#include <unistd.h>
int numThreads = 8;
int numSuperTables = 8;
int numChildTables = 4;
int numChildTables = 4; // per thread, per super table
int numRowsPerChildTable = 2048;
void shuffle(char** lines, size_t n) {
......@@ -24,12 +25,39 @@ void shuffle(char** lines, size_t n) {
}
}
void printThreadId(pthread_t id, char* buf)
{
size_t i;
for (i = sizeof(i); i; --i)
sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1));
}
static int64_t getTimeInUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
typedef struct {
TAOS* taos;
char** lines;
int numLines;
int64_t costTime;
} SThreadInsertArgs;
static void* insertLines(void* args) {
SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
char tidBuf[32] = {0};
printThreadId(pthread_self(), tidBuf);
printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines);
int64_t end = getTimeInUs();
insertArgs->costTime = end-begin;
printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
return NULL;
}
int main(int argc, char* argv[]) {
TAOS_RES* result;
const char* host = "127.0.0.1";
......@@ -60,25 +88,88 @@ int main(int argc, char* argv[]) {
int64_t ts = ct * 1000;
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, i, j, ts + 10 * l);
lines[l] = line;
++l;
{
char** linesStb = calloc(numSuperTables, sizeof(char*));
for (int i = 0; i < numSuperTables; i++) {
char* lineStb = calloc(512, 1);
snprintf(lineStb, 512, lineFormat, i,
numThreads * numSuperTables * numChildTables,
ts + numThreads * numSuperTables * numChildTables * numRowsPerChildTable);
linesStb[i] = lineStb;
}
SThreadInsertArgs args = {0};
args.taos = taos;
args.lines = linesStb;
args.numLines = numSuperTables;
insertLines(&args);
for (int i = 0; i < numSuperTables; ++i) {
free(linesStb[i]);
}
free(linesStb);
}
printf("generate lines...\n");
char*** linesThread = calloc(numThreads, sizeof(char**));
for (int i = 0; i < numThreads; ++i) {
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
linesThread[i] = lines;
}
for (int t = 0; t < numThreads; ++t) {
int l = 0;
char** lines = linesThread[t];
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
int stIdx = i;
int ctIdx = t*numSuperTables*numChildTables + j;
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l);
lines[l] = line;
++l;
}
}
}
}
//shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines");
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable);
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %" PRId64 "\n", code, tstrerror(code), end - begin);
printf("shuffle lines...\n");
for (int t = 0; t < numThreads; ++t) {
shuffle(linesThread[t], numSuperTables * numChildTables * numRowsPerChildTable);
}
printf("begin multi-thread insertion...\n");
int64_t begin = taosGetTimestampUs();
pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i=0; i < numThreads; ++i) {
argsThread[i].lines = linesThread[i];
argsThread[i].taos = taos;
argsThread[i].numLines = numSuperTables * numChildTables * numRowsPerChildTable;
pthread_create(tids+i, NULL, insertLines, argsThread+i);
}
for (int i = 0; i < numThreads; ++i) {
pthread_join(tids[i], NULL);
}
int64_t end = taosGetTimestampUs();
int totalLines = numThreads*numSuperTables*numChildTables*numRowsPerChildTable;
printf("TOTAL LINES: %d\n", totalLines);
printf("THREADS: %d\n", numThreads);
int64_t sumTime = 0;
for (int i=0; i<numThreads; ++i) {
sumTime += argsThread[i].costTime;
}
printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
printf("THROUGHPUT:%d/s\n", (int)throughput);
free(argsThread);
free(tids);
for (int i = 0; i < numThreads; ++i) {
free(linesThread[i]);
}
free(linesThread);
taos_close(taos);
return 0;
}
......@@ -14,7 +14,6 @@
import random
import time
from copy import deepcopy
import numpy as np
from util.log import *
from util.cases import *
from util.sql import *
......@@ -109,7 +108,9 @@ class TDTestCase:
create 1 stb
'''
input_sql = self.getPerfSql(count=count, init=True)
self._conn.insertLines([input_sql])
print(threading.current_thread().name, "create stb line:", input_sql)
self._conn.insert_lines([input_sql])
print(threading.current_thread().name, "create stb end")
def batchCreateTable(self, batch_list):
'''
......@@ -118,23 +119,26 @@ class TDTestCase:
print(threading.current_thread().name, "length=", len(batch_list))
print(threading.current_thread().name, 'firstline', batch_list[0][0:50], '...', batch_list[0][-50:-1])
print(threading.current_thread().name, 'lastline:', batch_list[-1][0:50], '...', batch_list[-1][-50:-1])
self._conn.insertLines(batch_list)
print(threading.current_thread().name, 'end')
begin = time.time_ns();
self._conn.insert_lines(batch_list)
end = time.time_ns();
print(threading.current_thread().name, 'end time:', (end-begin)/10**9)
def splitGenerator(self, table_list, sub_list_len):
def splitGenerator(self, table_list, thread_count):
'''
split a list to n piece of sub_list
[a, b, c, d] ---> [[a, b], [c, d]]
yield type ---> generator
'''
sub_list_len = int(len(table_list)/thread_count)
for i in range(0, len(table_list), sub_list_len):
yield table_list[i:i + sub_list_len]
def genTbListGenerator(self, table_list, sub_list_len):
def genTbListGenerator(self, table_list, thread_count):
'''
split table_list, after split, every sub_list len is sub_list_len
split table_list, after split
'''
table_list_generator = self.splitGenerator(table_list, sub_list_len)
table_list_generator = self.splitGenerator(table_list, thread_count)
return table_list_generator
def genTableList(self, count=4, table_count=10000):
......@@ -190,14 +194,14 @@ class TDTestCase:
for t in threads:
t.join()
def createTables(self, count, table_count=10000, sub_list_len=1000, thread_count=10):
def createTables(self, count, table_count=10000, thread_count=10):
'''
create stb and tb
'''
table_list = self.genTableList(count=count, table_count=table_count)
create_tables_start_time = time.time()
self.createStb()
table_list_generator = self.genTbListGenerator(table_list, sub_list_len)
self.createStb(count=count)
table_list_generator = self.genTbListGenerator(table_list, thread_count)
create_tables_generator, insert_rows_generator = itertools.tee(table_list_generator, 2)
self.multiThreadRun(self.threadCreateTables(table_list_generator=create_tables_generator, thread_count=thread_count))
create_tables_end_time = time.time()
......@@ -216,44 +220,45 @@ class TDTestCase:
return_str = f'insert rows\' time of {count} columns ---> {insert_rows_time}s'
return insert_rows_time, return_str
def schemalessPerfTest(self, count, table_count=10000, sub_list_len=1000, thread_count=10):
def schemalessPerfTest(self, count, table_count=10000, thread_count=10, rows_count=1000):
'''
get performance
'''
insert_rows_generator = self.createTables(count=count, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
return self.insertRows(count=count, rows_generator=insert_rows_generator, rows_count=1000, thread_count=10)
insert_rows_generator = self.createTables(count=count, table_count=table_count, thread_count=thread_count)[0]
return self.insertRows(count=count, rows_generator=insert_rows_generator, rows_count=rows_count, thread_count=thread_count)
def getPerfResults(self, test_times=3, table_count=10000, sub_list_len=1000, thread_count=10):
def getPerfResults(self, test_times=3, table_count=10000, thread_count=10):
col4_time = 0
col1000_time = 0
col4000_time = 0
# for i in range(test_times):
# time_used = self.schemalessPerfTest(count=4, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
# col4_time += time_used
# col4_time /= test_times
# print(col4_time)
tdCom.cleanTb()
for i in range(test_times):
time_used = self.schemalessPerfTest(count=1000, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
col1000_time += time_used
col1000_time /= test_times
print(col1000_time)
tdCom.cleanTb()
time_used = self.schemalessPerfTest(count=4, table_count=table_count, thread_count=thread_count)[0]
col4_time += time_used
col4_time /= test_times
print(col4_time)
# for i in range(test_times):
# tdCom.cleanTb()
# time_used = self.schemalessPerfTest(count=1000, table_count=table_count, thread_count=thread_count)[0]
# col1000_time += time_used
# col1000_time /= test_times
# print(col1000_time)
tdCom.cleanTb()
for i in range(test_times):
time_used = self.schemalessPerfTest(count=4000, table_count=table_count, sub_list_len=sub_list_len, thread_count=thread_count)[0]
col4000_time += time_used
col4000_time /= test_times
print(col4000_time)
# for i in range(test_times):
# tdCom.cleanTb()
# time_used = self.schemalessPerfTest(count=4000, table_count=table_count, thread_count=thread_count)[0]
# col4000_time += time_used
# col4000_time /= test_times
# print(col4000_time)
return col4_time, col1000_time, col4000_time
def run(self):
print("running {}".format(__file__))
tdSql.prepare()
result = self.getPerfResults(test_times=1, table_count=1000, sub_list_len=100, thread_count=10)
result = self.getPerfResults(test_times=1, table_count=1000, thread_count=10)
print(result)
def stop(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册