提交 9e7ed32f 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support stb limit and offset. refactor.

上级 92dd3e0c
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
"confirm_parameter_prompt": "no", "confirm_parameter_prompt": "no",
"insert_interval": 0, "insert_interval": 0,
"num_of_records_per_req": 100, "num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{ "databases": [{
"dbinfo": { "dbinfo": {
"name": "db", "name": "db",
......
...@@ -194,6 +194,7 @@ typedef struct SArguments_S { ...@@ -194,6 +194,7 @@ typedef struct SArguments_S {
int num_of_threads; int num_of_threads;
int insert_interval; int insert_interval;
int num_of_RPR; int num_of_RPR;
int max_sql_len;
int num_of_tables; int num_of_tables;
int num_of_DPT; int num_of_DPT;
int abort; int abort;
...@@ -513,6 +514,7 @@ SArguments g_args = { ...@@ -513,6 +514,7 @@ SArguments g_args = {
10, // num_of_connections/thread 10, // num_of_connections/thread
0, // insert_interval 0, // insert_interval
100, // num_of_RPR 100, // num_of_RPR
TSDB_PAYLOAD_SIZE, // max_sql_len
10000, // num_of_tables 10000, // num_of_tables
10000, // num_of_DPT 10000, // num_of_DPT
0, // abort 0, // abort
...@@ -759,6 +761,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -759,6 +761,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Insertion interval: %d\n", arguments->insert_interval);
printf("# Number of records per req: %d\n", arguments->num_of_RPR); printf("# Number of records per req: %d\n", arguments->num_of_RPR);
printf("# Max SQL length: %d\n", arguments->max_sql_len);
printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Threads: %d\n", arguments->num_of_threads);
printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Tables: %d\n", arguments->num_of_tables);
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
...@@ -1005,6 +1008,7 @@ static int printfInsertMeta() { ...@@ -1005,6 +1008,7 @@ static int printfInsertMeta() {
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval); printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval);
printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR); printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR);
printf("max sql length: \033[33m%d\033[0m\n", g_args.max_sql_len);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
...@@ -1102,25 +1106,37 @@ static int printfInsertMeta() { ...@@ -1102,25 +1106,37 @@ static int printfInsertMeta() {
}else { }else {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
} }
printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n",
printf(" rowsPerTbl: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
printf(" disorderRange: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRange); printf(" rowsPerTbl: \033[33m%d\033[0m\n",
printf(" disorderRatio: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRatio); g_Dbs.db[i].superTbls[j].rowsPerTbl);
printf(" maxSqlLen: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].maxSqlLen); printf(" disorderRange: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].disorderRange);
printf(" timeStampStep: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].timeStampStep); printf(" disorderRatio: \033[33m%d\033[0m\n",
printf(" startTimestamp: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].startTimestamp); g_Dbs.db[i].superTbls[j].disorderRatio);
printf(" sampleFormat: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFormat); printf(" maxSqlLen: \033[33m%d\033[0m\n",
printf(" sampleFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFile); g_Dbs.db[i].superTbls[j].maxSqlLen);
printf(" tagsFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].tagsFile); printf(" timeStampStep: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].timeStampStep);
printf(" columnCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].columnCount); printf(" startTimestamp: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].startTimestamp);
printf(" sampleFormat: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sampleFormat);
printf(" sampleFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sampleFile);
printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) { "binary", 6))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
"nchar", 5))) {
printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k, printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else { } else {
printf("column[%d]:\033[33m%s\033[0m ", k, printf("column[%d]:\033[33m%s\033[0m ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType); g_Dbs.db[i].superTbls[j].columns[k].dataType);
...@@ -1135,7 +1151,8 @@ static int printfInsertMeta() { ...@@ -1135,7 +1151,8 @@ static int printfInsertMeta() {
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) { || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) {
printf("tag[%d]:\033[33m%s(%d)\033[0m ", k, printf("tag[%d]:\033[33m%s(%d)\033[0m ", k,
g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); g_Dbs.db[i].superTbls[j].tags[k].dataType,
g_Dbs.db[i].superTbls[j].tags[k].dataLen);
} else { } else {
printf("tag[%d]:\033[33m%s\033[0m ", k, printf("tag[%d]:\033[33m%s\033[0m ", k,
g_Dbs.db[i].superTbls[j].tags[k].dataType); g_Dbs.db[i].superTbls[j].tags[k].dataType);
...@@ -1904,7 +1921,6 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, ...@@ -1904,7 +1921,6 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName,
return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, sTblName, return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, sTblName,
childTblNameOfSuperTbl, childTblCountOfSuperTbl, childTblNameOfSuperTbl, childTblCountOfSuperTbl,
-1, -1); -1, -1);
} }
static int getSuperTableFromServer(TAOS * taos, char* dbName, static int getSuperTableFromServer(TAOS * taos, char* dbName,
...@@ -2416,10 +2432,13 @@ static void createChildTables() { ...@@ -2416,10 +2432,13 @@ static void createChildTables() {
int j = 0; int j = 0;
while (g_args.datatype[j]) { while (g_args.datatype[j]) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j], "NCHAR", strlen("NCHAR")) == 0)) { || (strncasecmp(g_args.datatype[j],
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", j, g_args.datatype[j]); "NCHAR", strlen("NCHAR")) == 0)) {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
", COL%d %s(60)", j, g_args.datatype[j]);
} else { } else {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", j, g_args.datatype[j]); len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
", COL%d %s", j, g_args.datatype[j]);
} }
len = strlen(tblColsBuf); len = strlen(tblColsBuf);
j++; j++;
...@@ -2427,7 +2446,8 @@ static void createChildTables() { ...@@ -2427,7 +2446,8 @@ static void createChildTables() {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n",
__func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
tblColsBuf, tblColsBuf,
...@@ -2540,19 +2560,29 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { ...@@ -2540,19 +2560,29 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sampleBuf) { static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) {
size_t n = 0; size_t n = 0;
ssize_t readLen = 0; ssize_t readLen = 0;
char * line = NULL; char * line = NULL;
int getRows = 0; int getRows = 0;
memset(sampleBuf, 0, MAX_SAMPLES_ONCE_FROM_FILE* superTblInfo->lenOfOneRow); FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
fprintf(stderr, "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
return -1;
}
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
while (1) { while (1) {
readLen = tgetline(&line, &n, fp); readLen = tgetline(&line, &n, fp);
if (-1 == readLen) { if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) { if(0 != fseek(fp, 0, SEEK_SET)) {
printf("Failed to fseek file: %s, reason:%s\n", fprintf(stderr, "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno)); superTblInfo->sampleFile, strerror(errno));
fclose(fp);
return -1; return -1;
} }
continue; continue;
...@@ -2572,7 +2602,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample ...@@ -2572,7 +2602,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample
continue; continue;
} }
memcpy(sampleBuf + getRows * superTblInfo->lenOfOneRow, line, readLen); memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
getRows++; getRows++;
if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) { if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) {
...@@ -2580,6 +2611,7 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample ...@@ -2580,6 +2611,7 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample
} }
} }
fclose(fp);
tmfree(line); tmfree(line);
return 0; return 0;
} }
...@@ -2810,6 +2842,17 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -2810,6 +2842,17 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
g_args.max_sql_len = maxSqlLen->valueint;
} else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else {
fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n");
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_args.num_of_RPR = numRecPerReq->valueint; g_args.num_of_RPR = numRecPerReq->valueint;
...@@ -3774,18 +3817,22 @@ void postFreeResource() { ...@@ -3774,18 +3817,22 @@ void postFreeResource() {
} }
} }
int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* superTblInfo, int* sampleUsePos, FILE *fp, char* sampleBuf) { static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int* sampleUsePos) {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
int ret = readSampleFromCsvFileToMem(fp, superTblInfo, sampleBuf); int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) { if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
return -1; return -1;
} }
*sampleUsePos = 0; *sampleUsePos = 0;
} }
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%s", sampleBuf + superTblInfo->lenOfOneRow * (*sampleUsePos)); "(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s", superTblInfo->sampleDataBuf + superTblInfo->lenOfOneRow * (*sampleUsePos));
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++; (*sampleUsePos)++;
...@@ -3840,7 +3887,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* ...@@ -3840,7 +3887,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
} }
static void syncWriteForNumberOfTblInOneSql( static void syncWriteForNumberOfTblInOneSql(
threadInfo *winfo, FILE *fp, char* sampleDataBuf) { threadInfo *winfo, char* sampleDataBuf) {
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
int samplePos = 0; int samplePos = 0;
...@@ -3960,9 +4007,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3960,9 +4007,7 @@ static void syncWriteForNumberOfTblInOneSql(
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep, tmp_time += superTblInfo->timeStampStep,
superTblInfo, superTblInfo,
&sampleUsePos, &sampleUsePos);
fp,
sampleDataBuf);
if (retLen < 0) { if (retLen < 0) {
goto free_and_statistics; goto free_and_statistics;
} }
...@@ -4159,7 +4204,14 @@ static void* syncWrite(void *sarg) { ...@@ -4159,7 +4204,14 @@ static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
char buffer[BUFFER_SIZE] = "\0"; char* buffer = calloc(g_args.max_sql_len, 1);
if (NULL == buffer) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
g_args.max_sql_len,
strerror(errno));
return NULL;
}
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
char **data_type = g_args.datatype; char **data_type = g_args.datatype;
int len_of_binary = g_args.len_of_binary; int len_of_binary = g_args.len_of_binary;
...@@ -4181,11 +4233,13 @@ static void* syncWrite(void *sarg) { ...@@ -4181,11 +4233,13 @@ static void* syncWrite(void *sarg) {
winfo->totalAffectedRows = 0; winfo->totalAffectedRows = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int64_t tmp_time = time_counter;
for (int i = 0; i < g_args.num_of_DPT;) { for (int i = 0; i < g_args.num_of_DPT;) {
int tblInserted = i; int tblInserted = i;
int64_t tmp_time = time_counter;
memset(buffer, 0, g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, pstr += sprintf(pstr,
"insert into %s.%s%d values ", "insert into %s.%s%d values ",
...@@ -4264,6 +4318,7 @@ static void* syncWrite(void *sarg) { ...@@ -4264,6 +4318,7 @@ static void* syncWrite(void *sarg) {
} // num_of_DPT } // num_of_DPT
} // tId } // tId
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->threadID,
winfo->totalRowsInserted, winfo->totalRowsInserted,
...@@ -4272,16 +4327,8 @@ static void* syncWrite(void *sarg) { ...@@ -4272,16 +4327,8 @@ static void* syncWrite(void *sarg) {
return NULL; return NULL;
} }
static int prepareSampleData(SSuperTable *superTblInfo) {
static void* syncWriteWithStb(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
FILE *fp = NULL;
char* sampleDataBuf = NULL; char* sampleDataBuf = NULL;
int samplePos = 0;
// each thread read sample data from csv file // each thread read sample data from csv file
if (0 == strncasecmp(superTblInfo->dataSource, if (0 == strncasecmp(superTblInfo->dataSource,
...@@ -4290,42 +4337,49 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4290,42 +4337,49 @@ static void* syncWriteWithStb(void *sarg) {
sampleDataBuf = calloc( sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) { if (sampleDataBuf == NULL) {
printf("Failed to calloc %d Bytes, reason:%s\n", fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno)); strerror(errno));
return NULL; return -1;
} }
fp = fopen(superTblInfo->sampleFile, "r"); int ret = readSampleFromCsvFileToMem(superTblInfo);
if (fp == NULL) {
printf("Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
tmfree(sampleDataBuf);
return NULL;
}
int ret = readSampleFromCsvFileToMem(fp,
superTblInfo, sampleDataBuf);
if (0 != ret) { if (0 != ret) {
tmfree(sampleDataBuf); tmfree(superTblInfo->sampleDataBuf);
tmfclose(fp); return -1;
return NULL;
} }
} }
superTblInfo->sampleDataBuf = sampleDataBuf;
return 0;
}
static void* syncWriteWithStb(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
int samplePos = 0;
if (superTblInfo) {
if (0 != prepareSampleData(superTblInfo))
return NULL;
if (superTblInfo->numberOfTblInOneSql > 0) { if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, fp, sampleDataBuf); syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf);
tmfree(sampleDataBuf); tmfree(superTblInfo->sampleDataBuf);
tmfclose(fp);
return NULL; return NULL;
} }
}
char* buffer = calloc(superTblInfo->maxSqlLen, 1); char* buffer = calloc(superTblInfo->maxSqlLen, 1);
if (NULL == buffer) { if (NULL == buffer) {
printf("Failed to calloc %d Bytes, reason:%s\n", fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
strerror(errno)); strerror(errno));
tmfree(sampleDataBuf); tmfree(superTblInfo->sampleDataBuf);
tmfclose(fp);
return NULL; return NULL;
} }
...@@ -4339,6 +4393,10 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4339,6 +4393,10 @@ static void* syncWriteWithStb(void *sarg) {
debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
if (superTblInfo->childTblLimit ) {
// CBD
}
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) { tID++) {
int64_t start_time = winfo->start_time; int64_t start_time = winfo->start_time;
...@@ -4413,9 +4471,7 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4413,9 +4471,7 @@ static void* syncWriteWithStb(void *sarg) {
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
start_time + superTblInfo->timeStampStep * i, start_time + superTblInfo->timeStampStep * i,
superTblInfo, superTblInfo,
&sampleUsePos, &sampleUsePos);
fp,
sampleDataBuf);
if (retLen < 0) { if (retLen < 0) {
goto free_and_statistics_2; goto free_and_statistics_2;
} }
...@@ -4511,8 +4567,7 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4511,8 +4567,7 @@ static void* syncWriteWithStb(void *sarg) {
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
tmfree(sampleDataBuf); tmfree(superTblInfo->sampleDataBuf);
tmfclose(fp);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->threadID,
...@@ -4602,13 +4657,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4602,13 +4657,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
/*
xxx getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount);
*/
int ntables = 0; int ntables = 0;
if (superTblInfo) if (superTblInfo)
ntables = superTblInfo->childTblCount; ntables = superTblInfo->childTblCount;
...@@ -4642,7 +4690,7 @@ xxx getAllChildNameOfSuperTable(taos, ...@@ -4642,7 +4690,7 @@ xxx getAllChildNameOfSuperTable(taos,
} else if (0 == strncasecmp(precision, "us", 2)) { } else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO; timePrec = TSDB_TIME_PRECISION_MICRO;
} else { } else {
printf("No support precision: %s\n", precision); fprintf(stderr, "No support precision: %s\n", precision);
exit(-1); exit(-1);
} }
} }
...@@ -4774,7 +4822,6 @@ xxx getAllChildNameOfSuperTable(taos, ...@@ -4774,7 +4822,6 @@ xxx getAllChildNameOfSuperTable(taos,
free(infos); free(infos);
} }
void *readTable(void *sarg) { void *readTable(void *sarg) {
#if 1 #if 1
threadInfo *rinfo = (threadInfo *)sarg; threadInfo *rinfo = (threadInfo *)sarg;
...@@ -4933,10 +4980,10 @@ static int insertTestProcess() { ...@@ -4933,10 +4980,10 @@ static int insertTestProcess() {
if (NULL == g_fpOfInsertResult) { if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1; return -1;
} {
printfInsertMetaToFile(g_fpOfInsertResult);
} }
printfInsertMetaToFile(g_fpOfInsertResult);
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n"); printf("Press enter key to continue\n\n");
(void)getchar(); (void)getchar();
...@@ -5056,7 +5103,7 @@ void *superQueryProcess(void *sarg) { ...@@ -5056,7 +5103,7 @@ void *superQueryProcess(void *sarg) {
return NULL; return NULL;
} }
void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
char sourceString[32] = "xxxx"; char sourceString[32] = "xxxx";
char subTblName[MAX_TB_NAME_SIZE*3]; char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s", sprintf(subTblName, "%s.%s",
...@@ -5078,7 +5125,7 @@ void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { ...@@ -5078,7 +5125,7 @@ void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
//printf("3: %s\n", outSql); //printf("3: %s\n", outSql);
} }
void *subQueryProcess(void *sarg) { static void *subQueryProcess(void *sarg) {
char sqlstr[1024]; char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
int64_t st = 0; int64_t st = 0;
...@@ -5430,7 +5477,6 @@ static int subscribeTestProcess() { ...@@ -5430,7 +5477,6 @@ static int subscribeTestProcess() {
&g_queryInfo.subQueryInfo.childTblCount); &g_queryInfo.subQueryInfo.childTblCount);
} }
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from super table //==== create sub threads for query from super table
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 100
self.numberOfRecords = 1000
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/"
os.system("%staosdemo -y -t %d -n %d -x" %
(binPath, self.numberOfTables, self.numberOfRecords))
tdSql.query("show databases")
for i in range(18):
print(tdSql.getData(0, i) )
tdSql.checkData(0, 2, self.numberOfTables)
tdSql.execute("use test")
tdSql.query(
"select count(*) from test.t%d" % (self.numberOfTables -1))
tdSql.checkData(0, 0, self.numberOfRecords)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册