提交 57c1f20f 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support stb limit and offset. create tables works.

上级 a9e5d2b7
...@@ -1797,21 +1797,21 @@ int postProceSql(char* host, uint16_t port, char* sqlstr) ...@@ -1797,21 +1797,21 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
return 0; return 0;
} }
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* getTagValueFromTagSample( SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) { if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
return NULL; return NULL;
} }
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos); dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos);
return dataBuf; return dataBuf;
} }
char* generateTagVaulesForStb(SSuperTable* stbInfo) { static char* generateTagVaulesForStb(SSuperTable* stbInfo) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) { if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
...@@ -1821,13 +1821,15 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) { ...@@ -1821,13 +1821,15 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) {
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "("); dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "(");
for (int i = 0; i < stbInfo->tagCount; i++) { for (int i = 0; i < stbInfo->tagCount; i++) {
if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", 5))) { if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", strlen("binary")))
|| (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", strlen("nchar")))) {
if (stbInfo->tags[i].dataLen > TSDB_MAX_BINARY_LEN) { if (stbInfo->tags[i].dataLen > TSDB_MAX_BINARY_LEN) {
printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); printf("binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
tmfree(dataBuf); tmfree(dataBuf);
return NULL; return NULL;
} }
char* buf = (char*)calloc(stbInfo->tags[i].dataLen+1, 1); char* buf = (char*)calloc(stbInfo->tags[i].dataLen+1, 1);
if (NULL == buf) { if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen); printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen);
...@@ -1835,30 +1837,48 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) { ...@@ -1835,30 +1837,48 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) {
return NULL; return NULL;
} }
rand_string(buf, stbInfo->tags[i].dataLen); rand_string(buf, stbInfo->tags[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "\'%s\', ", buf); dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"\'%s\', ", buf);
tmfree(buf); tmfree(buf);
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "int", 3)) { } else if (0 == strncasecmp(stbInfo->tags[i].dataType,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_int()); "int", strlen("int"))) {
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bigint", 6)) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint()); "%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "float", 5)) { } else if (0 == strncasecmp(stbInfo->tags[i].dataType,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_float()); "bigint", strlen("bigint"))) {
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "double", 6)) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_double()); "%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "smallint", 8)) { } else if (0 == strncasecmp(stbInfo->tags[i].dataType,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_smallint()); "float", strlen("float"))) {
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "tinyint", 7)) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_tinyint()); "%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bool", 4)) { } else if (0 == strncasecmp(stbInfo->tags[i].dataType,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_bool()); "double", strlen("double"))) {
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "timestamp", 4)) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint()); "%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"smallint", strlen("smallint"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_smallint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"tinyint", strlen("tinyint"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_tinyint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"bool", strlen("bool"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_bool());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"timestamp", strlen("timestamp"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64", ", rand_bigint());
} else { } else {
printf("No support data type: %s\n", stbInfo->tags[i].dataType); printf("No support data type: %s\n", stbInfo->tags[i].dataType);
tmfree(dataBuf); tmfree(dataBuf);
return NULL; return NULL;
} }
} }
dataLen -= 2; dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")");
return dataBuf; return dataBuf;
...@@ -2012,7 +2032,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, ...@@ -2012,7 +2032,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
TAOS_RES * res; TAOS_RES * res;
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int count = 0; int count = 0;
//get schema use cmd: describe superTblName; //get schema use cmd: describe superTblName;
snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName); snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName);
res = taos_query(taos, command); res = taos_query(taos, command);
...@@ -2423,7 +2443,7 @@ static void* createTable(void *sarg) ...@@ -2423,7 +2443,7 @@ static void* createTable(void *sarg)
} }
int startMultiThreadCreateChildTable( int startMultiThreadCreateChildTable(
char* cols, int threads, int ntables, char* cols, int threads, int startFrom, int ntables,
char* db_name, SSuperTable* superTblInfo) { char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo)); threadInfo *infos = malloc(threads * sizeof(threadInfo));
...@@ -2446,7 +2466,7 @@ int startMultiThreadCreateChildTable( ...@@ -2446,7 +2466,7 @@ int startMultiThreadCreateChildTable(
int b = 0; int b = 0;
b = ntables % threads; b = ntables % threads;
int last = 0; int last = startFrom;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -2505,12 +2525,23 @@ static void createChildTables() { ...@@ -2505,12 +2525,23 @@ static void createChildTables() {
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
int startFrom;
if (g_Dbs.db[i].superTbls[j].childTblOffset) {
startFrom = g_Dbs.db[i].superTbls[j].childTblOffset;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblLimit;
} else {
startFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
}
verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__,
g_totalChildTables, startFrom);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl, g_Dbs.threadCountByCreateTbl,
g_Dbs.db[i].superTbls[j].childTblCount, startFrom,
g_totalChildTables,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
} }
} else { } else {
// normal table // normal table
...@@ -2538,6 +2569,7 @@ static void createChildTables() { ...@@ -2538,6 +2569,7 @@ static void createChildTables() {
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
tblColsBuf, tblColsBuf,
g_Dbs.threadCountByCreateTbl, g_Dbs.threadCountByCreateTbl,
0,
g_args.num_of_tables, g_args.num_of_tables,
g_Dbs.db[i].dbName, g_Dbs.db[i].dbName,
NULL); NULL);
...@@ -2545,30 +2577,6 @@ static void createChildTables() { ...@@ -2545,30 +2577,6 @@ static void createChildTables() {
} }
} }
/*
static int taosGetLineNum(const char *fileName)
{
int lineNum = 0;
char cmd[1024] = { 0 };
char buf[1024] = { 0 };
sprintf(cmd, "wc -l %s", fileName);
FILE *fp = popen(cmd, "r");
if (fp == NULL) {
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
return lineNum;
}
if (fgets(buf, sizeof(buf), fp)) {
int index = strchr((const char*)buf, ' ') - buf;
buf[index] = '\0';
lineNum = atoi(buf);
}
pclose(fp);
return lineNum;
}
*/
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
...@@ -2655,7 +2663,7 @@ static int readSampleFromCsvFileToMem( ...@@ -2655,7 +2663,7 @@ static int readSampleFromCsvFileToMem(
FILE* fp = fopen(superTblInfo->sampleFile, "r"); FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) { if (fp == NULL) {
fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", fprintf(stderr, "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno)); superTblInfo->sampleFile, strerror(errno));
return -1; return -1;
} }
...@@ -2907,7 +2915,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -2907,7 +2915,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, threads not found\n"); printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl"); cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (threads2 && threads2->type == cJSON_Number) { if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint; g_Dbs.threadCountByCreateTbl = threads2->valueint;
...@@ -4010,7 +4018,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4010,7 +4018,7 @@ static void syncWriteForNumberOfTblInOneSql(
int64_t et = 0xffffffff; int64_t et = 0xffffffff;
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
int32_t tbl_id = 0; int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { for (int tableID = winfo->start_table_id; tableID <= winfo->end_table_id; ) {
int64_t start_time = 0; int64_t start_time = 0;
int inserted = i; int inserted = i;
...@@ -4019,12 +4027,12 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4019,12 +4027,12 @@ static void syncWriteForNumberOfTblInOneSql(
memset(buffer, 0, superTblInfo->maxSqlLen); memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer; char *pstr = buffer;
int32_t end_tbl_id = tID + numberOfTblInOneSql; int32_t end_tbl_id = tableID + numberOfTblInOneSql;
if (end_tbl_id > winfo->end_table_id) { if (end_tbl_id > winfo->end_table_id) {
end_tbl_id = winfo->end_table_id+1; end_tbl_id = winfo->end_table_id+1;
} }
for (tbl_id = tID; tbl_id < end_tbl_id; tbl_id++) { for (tbl_id = tableID; tbl_id < end_tbl_id; tbl_id++) {
sampleUsePos = samplePos; sampleUsePos = samplePos;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
...@@ -4097,10 +4105,10 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4097,10 +4105,10 @@ static void syncWriteForNumberOfTblInOneSql(
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) { "sample", strlen("sample"))) {
retLen = getRowDataFromSample(pstr + len, retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep, start_time += superTblInfo->timeStampStep,
superTblInfo, superTblInfo,
&sampleUsePos); &sampleUsePos);
if (retLen < 0) { if (retLen < 0) {
goto free_and_statistics; goto free_and_statistics;
...@@ -4111,13 +4119,13 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4111,13 +4119,13 @@ static void syncWriteForNumberOfTblInOneSql(
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
int64_t d = start_time - rand() % superTblInfo->disorderRange; int64_t d = start_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len, retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
d, d,
superTblInfo); superTblInfo);
} else { } else {
retLen = generateRowData(pstr + len, retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep, start_time += superTblInfo->timeStampStep,
superTblInfo); superTblInfo);
} }
...@@ -4132,7 +4140,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4132,7 +4140,7 @@ static void syncWriteForNumberOfTblInOneSql(
if (inserted >= superTblInfo->insertRows || if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tID = tbl_id + 1; tableID = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
superTblInfo->lenOfOneRow); superTblInfo->lenOfOneRow);
goto send_to_server; goto send_to_server;
...@@ -4140,7 +4148,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4140,7 +4148,7 @@ static void syncWriteForNumberOfTblInOneSql(
} }
} }
tID = tbl_id; tableID = tbl_id;
inserted += superTblInfo->rowsPerTbl; inserted += superTblInfo->rowsPerTbl;
send_to_server: send_to_server:
...@@ -4209,7 +4217,7 @@ send_to_server: ...@@ -4209,7 +4217,7 @@ send_to_server:
break; break;
} }
if (tID > winfo->end_table_id) { if (tableID > winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos; samplePos = sampleUsePos;
} }
...@@ -4346,7 +4354,7 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) ...@@ -4346,7 +4354,7 @@ static int execInsert(threadInfo *winfo, char *buffer, int k)
return affectedRows; return affectedRows;
} }
static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *buffer, static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *buffer,
int64_t insertRows, int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSampleUsePos) int64_t startFrom, int64_t startTime, int *pSampleUsePos)
{ {
...@@ -4362,6 +4370,8 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4362,6 +4370,8 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
} }
} }
assert(buffer != NULL);
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
...@@ -4374,7 +4384,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4374,7 +4384,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
} else { } else {
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
superTblInfo, superTblInfo,
threadID % superTblInfo->tagSampleCount); tableID % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n"); fprintf(stderr, "tag buf failed to allocate memory\n");
...@@ -4386,7 +4396,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4386,7 +4396,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
"insert into %s.%s%d using %s.%s tags %s values", "insert into %s.%s%d using %s.%s tags %s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
threadID, tableID,
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->sTblName, superTblInfo->sTblName,
tagsValBuf); tagsValBuf);
...@@ -4396,14 +4406,14 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4396,14 +4406,14 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
"insert into %s.%s values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblName + threadID * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + tableID * TSDB_TABLE_NAME_LEN);
} else { } else {
pstr += snprintf(pstr, pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s%d values", "insert into %s.%s%d values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix,
threadID); tableID);
} }
} else { } else {
pstr += snprintf(pstr, pstr += snprintf(pstr,
...@@ -4411,7 +4421,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4411,7 +4421,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
"insert into %s.%s%d values", "insert into %s.%s%d values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix,
threadID); tableID);
} }
int k; int k;
...@@ -4444,7 +4454,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4444,7 +4454,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
retLen = generateRowData( retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom, startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo); superTblInfo);
} }
...@@ -4486,7 +4496,6 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b ...@@ -4486,7 +4496,6 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b
k++; k++;
startFrom ++; startFrom ++;
debugPrint("%s() LN%d k=%d startFrom=%ld insertRows=%ld\n", __func__, __LINE__, k, startFrom, insertRows);
if (startFrom >= insertRows) if (startFrom >= insertRows)
break; break;
} }
...@@ -4523,7 +4532,7 @@ static void* syncWrite(void *sarg) { ...@@ -4523,7 +4532,7 @@ static void* syncWrite(void *sarg) {
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) { if (NULL == buffer) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
strerror(errno)); strerror(errno));
tmfree(superTblInfo->sampleDataBuf); tmfree(superTblInfo->sampleDataBuf);
...@@ -4534,7 +4543,8 @@ static void* syncWrite(void *sarg) { ...@@ -4534,7 +4543,8 @@ static void* syncWrite(void *sarg) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
int64_t endTs; int64_t endTs;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; int insert_interval = superTblInfo?superTblInfo->insertInterval:
g_args.insert_interval;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0xffffffff; uint64_t et = 0xffffffff;
...@@ -4543,12 +4553,12 @@ static void* syncWrite(void *sarg) { ...@@ -4543,12 +4553,12 @@ static void* syncWrite(void *sarg) {
int sampleUsePos; int sampleUsePos;
if (superTblInfo && superTblInfo->childTblLimit ) { if (superTblInfo && superTblInfo->childTblOffset) {
// TODO // TODO
} }
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; for (uint32_t tableID = winfo->start_table_id; tableID <= winfo->end_table_id;
tID++) { tableID++) {
int64_t start_time = winfo->start_time; int64_t start_time = winfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
...@@ -4561,7 +4571,7 @@ static void* syncWrite(void *sarg) { ...@@ -4561,7 +4571,7 @@ static void* syncWrite(void *sarg) {
sampleUsePos = samplePos; sampleUsePos = samplePos;
int generated = generateDataBuffer(tID, winfo, buffer, insertRows, int generated = generateDataBuffer(tableID, winfo, buffer, insertRows,
i, start_time, &sampleUsePos); i, start_time, &sampleUsePos);
if (generated > 0) if (generated > 0)
i += generated; i += generated;
...@@ -4605,12 +4615,12 @@ static void* syncWrite(void *sarg) { ...@@ -4605,12 +4615,12 @@ static void* syncWrite(void *sarg) {
} }
} // num_of_DPT } // num_of_DPT
if ((tID == winfo->end_table_id) && superTblInfo && if ((tableID == winfo->end_table_id) && superTblInfo &&
(0 == strncasecmp( (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) { superTblInfo->dataSource, "sample", strlen("sample")))) {
samplePos = sampleUsePos; samplePos = sampleUsePos;
} }
} // tID } // tableID
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
...@@ -4705,13 +4715,20 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4705,13 +4715,20 @@ static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) { char* precision,SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = malloc(threads * sizeof(threadInfo)); threadInfo *infos = malloc(threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
int ntables = 0; int ntables = 0;
if (superTblInfo) if (superTblInfo)
ntables = superTblInfo->childTblCount; if (superTblInfo->childTblOffset)
ntables = superTblInfo->childTblLimit;
else
ntables = superTblInfo->childTblCount;
else else
ntables = g_args.num_of_tables; ntables = g_args.num_of_tables;
...@@ -4767,7 +4784,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4767,7 +4784,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double start = getCurrentTime(); double start = getCurrentTime();
int last = 0; int last;
if ((superTblInfo) && (superTblInfo->childTblOffset))
last = superTblInfo->childTblOffset;
else
last = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -5490,7 +5513,7 @@ void *superSubscribeProcess(void *sarg) { ...@@ -5490,7 +5513,7 @@ void *superSubscribeProcess(void *sarg) {
} }
} }
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
...@@ -5533,12 +5556,12 @@ static int subscribeTestProcess() { ...@@ -5533,12 +5556,12 @@ static int subscribeTestProcess() {
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed for create threads\n"); printf("malloc failed for create threads\n");
taos_close(taos); taos_close(taos);
exit(-1); exit(-1);
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
t_info->taos = taos; t_info->taos = taos;
...@@ -5549,14 +5572,14 @@ static int subscribeTestProcess() { ...@@ -5549,14 +5572,14 @@ static int subscribeTestProcess() {
//==== create sub threads for query from sub table //==== create sub threads for query from sub table
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
if ((g_queryInfo.subQueryInfo.sqlCount > 0) if ((g_queryInfo.subQueryInfo.sqlCount > 0)
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) { && (g_queryInfo.subQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt *
sizeof(pthread_t)); sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt *
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
printf("malloc failed for create threads\n"); printf("malloc failed for create threads\n");
taos_close(taos); taos_close(taos);
exit(-1); exit(-1);
} }
...@@ -5576,7 +5599,7 @@ static int subscribeTestProcess() { ...@@ -5576,7 +5599,7 @@ static int subscribeTestProcess() {
} }
int last = 0; int last = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i; threadInfo *t_info = infosOfSub + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -5593,14 +5616,14 @@ static int subscribeTestProcess() { ...@@ -5593,14 +5616,14 @@ static int subscribeTestProcess() {
} }
tmfree((char*)pids); tmfree((char*)pids);
tmfree((char*)infos); tmfree((char*)infos);
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL); pthread_join(pidsOfSub[i], NULL);
} }
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub); tmfree((char*)infosOfSub);
taos_close(taos); taos_close(taos);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册