提交 c6dd4d4a 编写于 作者: Z zhaoyanggh

[TD-10456]<feature>taosdemo support schemaless

上级 d182e031
......@@ -62,6 +62,7 @@ extern char configDir[];
#define STR_INSERT_INTO "INSERT INTO "
#define MAX_RECORDS_PER_REQ 32766
#define MAX_LINE_SIZE 16384
#define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into ..
......@@ -316,6 +317,7 @@ typedef struct SSuperTable_S {
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
bool schemaless;
} SSuperTable;
typedef struct {
......@@ -2137,6 +2139,17 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
return 0;
}
static int execSmlLines(TAOS* taos, char* lines[], int numLines, bool quiet) {
int32_t code = taos_insert_lines(taos, lines, numLines);
if (code != 0) {
if (!quiet) {
errorPrint2("Failed to execute schemaless line, reason: %s\n", tstrerror(code));
}
return -1;
}
return 0;
}
static void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo)
{
pThreadInfo->fp = fopen(pThreadInfo->filePath, "at");
......@@ -2696,6 +2709,8 @@ static int printfInsertMeta() {
printf(" stbName: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].stbName);
printf(" schemaless: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].schemaless ? "yes" : "no");
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
} else if (AUTO_CREATE_SUBTBL ==
......@@ -4454,6 +4469,10 @@ int createDatabasesAndStables(char *command) {
int validStbCount = 0;
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (g_Dbs.db[i].superTbls[j].schemaless)
{
goto skip;
}
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].stbName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
......@@ -4475,6 +4494,7 @@ int createDatabasesAndStables(char *command) {
continue;
}
}
skip:
validStbCount ++;
}
g_Dbs.db[i].superTblCount = validStbCount;
......@@ -4484,6 +4504,250 @@ int createDatabasesAndStables(char *command) {
return 0;
}
static void* insertSchemaless(void *sargs) {
threadInfo *pThreadInfo = (threadInfo *)sargs;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep = stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows = stbInfo?stbInfo->insertRows:g_args.insertRows;
setThreadName("insertSchemaless");
int count = 0;
int batch = min(g_args.reqPerReq, insertRows*(pThreadInfo->ntables));
char *lines[batch];
int64_t timestamp = pThreadInfo->start_time;
char *smlHead[pThreadInfo->ntables];
for (int t = 0; t < pThreadInfo->ntables; t++) {
int64_t dataLen = 0;
smlHead[t] = (char *)calloc(MAX_LINE_SIZE, 1);
if ( NULL == smlHead[t]) {
errorPrint2("calloc failed! size:%d\n", MAX_LINE_SIZE);
exit(EXIT_FAILURE);
}
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"%s,id=\"%s%ld\"", stbInfo->stbName, stbInfo->childTblPrefix, t + pThreadInfo->start_table_from);
for (uint64_t j = 0; j < stbInfo->tagCount; j++) {
tstrncpy(smlHead[t] + dataLen, ",", 2);
dataLen += 1;
switch (stbInfo->tags[j].data_type) {
case TSDB_DATA_TYPE_TIMESTAMP:
errorPrint2("%s() LN%d, Does not support data type %s as tag\n",
__func__, __LINE__,
stbInfo->tags[j].dataType);
exit(EXIT_FAILURE);
case TSDB_DATA_TYPE_BOOL:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%s", j, rand_bool_str());
break;
case TSDB_DATA_TYPE_TINYINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%si8", j, rand_tinyint_str());
break;
case TSDB_DATA_TYPE_UTINYINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%su8", j, rand_utinyint_str());
break;
case TSDB_DATA_TYPE_SMALLINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%si16", j, rand_smallint_str());
break;
case TSDB_DATA_TYPE_USMALLINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%su16", j, rand_usmallint_str());
break;
case TSDB_DATA_TYPE_INT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%si32", j, rand_int_str());
break;
case TSDB_DATA_TYPE_UINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%su32", j, rand_uint_str());
break;
case TSDB_DATA_TYPE_BIGINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%si64", j, rand_bigint_str());
break;
case TSDB_DATA_TYPE_UBIGINT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%su64", j, rand_ubigint_str());
break;
case TSDB_DATA_TYPE_FLOAT:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%sf32", j, rand_float_str());
break;
case TSDB_DATA_TYPE_DOUBLE:
dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen,
"t%ld=%sf64", j, rand_double_str());
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (stbInfo->tags[j].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("binary or nchar length overflow, maxsize:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
exit(EXIT_FAILURE);
}
char* buf = (char*)calloc(stbInfo->tags[j].dataLen+1, 1);
if (NULL == buf) {
errorPrint2("calloc failed! size:%d\n",
stbInfo->tags[j].dataLen);
exit(EXIT_FAILURE);
}
rand_string(buf, stbInfo->tags[j].dataLen);
if(stbInfo->tags[j].data_type == TSDB_DATA_TYPE_BINARY) {
dataLen += snprintf(smlHead[t] + dataLen,
MAX_DATA_SIZE - dataLen, "t%ld=\"%s\"", j, buf);
} else {
dataLen += snprintf(smlHead[t] + dataLen,
MAX_DATA_SIZE - dataLen, "t%ld=L\"%s\"", j, buf);
}
tmfree(buf);
break;
default:
errorPrint2("%s() LN%d, Unknown data type %s\n",
__func__, __LINE__,
stbInfo->tags[j].dataType);
exit(EXIT_FAILURE);
}
}
}
int currentPercent = 0;
int percentComplete = 0;
int totalAffectedRows = 0;
for (int64_t i = 0; i < insertRows; i++) {
timestamp = timestamp + i * timeStampStep;
for (uint64_t j = 0; j < pThreadInfo->ntables; j++) {
int64_t dataLen = 0;
lines[count] = calloc(MAX_LINE_SIZE, 1);
if (NULL == lines[count]) {
errorPrint2("calloc failed! size:%d\n", MAX_LINE_SIZE);
return NULL;
}
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"%s ", smlHead[j]);
for (uint32_t c = 0; c < stbInfo->columnCount; c++) {
if (c != 0) {
tstrncpy(lines[count] + dataLen, ",", 2);
dataLen += 1;
}
switch (stbInfo->columns[c].data_type) {
case TSDB_DATA_TYPE_TIMESTAMP:
errorPrint2("%s() LN%d, Does not support data type %s as tag\n",
__func__, __LINE__,
stbInfo->columns[c].dataType);
return NULL;
case TSDB_DATA_TYPE_BOOL:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%s", c, rand_bool_str());
break;
case TSDB_DATA_TYPE_TINYINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%si8", c, rand_tinyint_str());
break;
case TSDB_DATA_TYPE_UTINYINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%su8", c, rand_utinyint_str());
break;
case TSDB_DATA_TYPE_SMALLINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%si16", c, rand_smallint_str());
break;
case TSDB_DATA_TYPE_USMALLINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%su16", c, rand_usmallint_str());
break;
case TSDB_DATA_TYPE_INT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%si32", c, rand_int_str());
break;
case TSDB_DATA_TYPE_UINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%su32", c, rand_uint_str());
break;
case TSDB_DATA_TYPE_BIGINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%si64", c, rand_bigint_str());
break;
case TSDB_DATA_TYPE_UBIGINT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%su64", c, rand_ubigint_str());
break;
case TSDB_DATA_TYPE_FLOAT:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%sf32", c, rand_float_str());
break;
case TSDB_DATA_TYPE_DOUBLE:
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
"c%d=%sf64", c, rand_double_str());
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (stbInfo->columns[c].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("binary or nchar length overflow, maxsize:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
exit(EXIT_FAILURE);
}
char* buf = (char*)calloc(stbInfo->columns[c].dataLen + 1, 1);
if (NULL == buf) {
errorPrint2("calloc failed! size:%d\n",
stbInfo->columns[c].dataLen);
exit(EXIT_FAILURE);
}
rand_string(buf, stbInfo->columns[c].dataLen);
if(stbInfo->columns[c].data_type == TSDB_DATA_TYPE_BINARY) {
dataLen += snprintf(lines[count] + dataLen,
MAX_DATA_SIZE - dataLen, "c%d=\"%s\"", c, buf);
} else {
dataLen += snprintf(lines[count] + dataLen,
MAX_DATA_SIZE - dataLen, "c%d=L\"%s\"", c, buf);
}
tmfree(buf);
break;
default:
errorPrint2("%s() LN%d, Unknown data type %s\n",
__func__, __LINE__,
stbInfo->columns[j].dataType);
return NULL;
}
}
dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen,
" %ld%s", timestamp, pThreadInfo->time_precision == TSDB_TIME_PRECISION_MILLI ?
"ms":(pThreadInfo->time_precision == TSDB_TIME_PRECISION_MICRO ? "us":"ns"));
count++;
if (count == batch) {
execSmlLines(pThreadInfo->taos, lines, batch, false);
totalAffectedRows += batch;
currentPercent = totalAffectedRows * g_Dbs.threadCount / insertRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
count = 0;
for (int index = 0; index < batch; index++) {
free(lines[index]);
}
}
}
}
if(count != 0) {
execSmlLines(pThreadInfo->taos, lines, count, false);
totalAffectedRows += count;
currentPercent = totalAffectedRows * g_Dbs.threadCount / insertRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
for (int index = 0; index < count; index++) {
free(lines[index]);
}
}
for (int index = 0; index < pThreadInfo->ntables; index++) {
free(smlHead[index]);
}
return NULL;
}
static void* createTable(void *sarg)
{
threadInfo *pThreadInfo = (threadInfo *)sarg;
......@@ -4593,6 +4857,105 @@ static void* createTable(void *sarg)
return NULL;
}
static int startMultiThreadInsertSchemaless(int threads, int64_t ntables, char* db_name,
char* precision, SSuperTable* stbInfo) {
uint64_t tableFrom = 0;
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
ERROR_EXIT("startMultiThreadInsertSchemaless malloc failed\n");
}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
if (0 == strncasecmp(precision, "ms", 2)) {
timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO;
} else if (0 == strncasecmp(precision, "ns", 2)) {
timePrec = TSDB_TIME_PRECISION_NANO;
} else {
errorPrint2("Not support precision: %s\n", precision);
exit(EXIT_FAILURE);
}
}
int64_t startTime;
if (stbInfo) {
if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(timePrec);
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
stbInfo->startTimestamp,
&startTime,
strlen(stbInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
}
} else {
startTime = DEFAULT_START_TIME;
}
debugPrint("%s() LN%d, startTime= %"PRId64"\n",
__func__, __LINE__, startTime);
if (threads < 1) {
threads = 1;
}
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int64_t b = 0;
b = ntables % threads;
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->stbInfo = stbInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
db_name,
g_Dbs.port);
if (pThreadInfo->taos == NULL) {
errorPrint2("%s() LN%d, Failed to connect to TDengine, reason:%s\n",
__func__, __LINE__, taos_errstr(NULL));
free(pids);
free(infos);
return -1;
}
pThreadInfo->time_precision = timePrec;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->start_time = startTime;
pThreadInfo->use_metric = true;
pThreadInfo->minDelay = UINT64_MAX;
pThreadInfo->tables_created = 0;
// generateSMLhead(pThreadInfo);
pthread_create(pids + i, NULL, insertSchemaless, pThreadInfo);
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
taos_close(pThreadInfo->taos);
g_actualChildTables += pThreadInfo->tables_created;
}
free(pids);
free(infos);
return 0;
}
static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* stbInfo) {
......@@ -5513,6 +5876,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
TBNAME_PREFIX_LEN);
cJSON *schemaless = cJSON_GetObjectItem(stbInfo, "schemaless");
if (schemaless
&& schemaless->type == cJSON_String
&& schemaless->valuestring != NULL) {
if (0 == strncasecmp(schemaless->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].schemaless = true;
} else {
g_Dbs.db[i].superTbls[j].schemaless = false;
}
} else if (!schemaless) {
g_Dbs.db[i].superTbls[j].schemaless = false;
} else {
errorPrint("%s", "failed to read json, schemaless not found\n");
goto PARSE_OVER;
}
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
if (autoCreateTbl
&& autoCreateTbl->type == cJSON_String
......@@ -10852,6 +11231,46 @@ static int insertTestProcess() {
double start;
double end;
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* stbInfo = &g_Dbs.db[i].superTbls[j];
if (stbInfo->schemaless) {
fprintf(stderr,
"start schemaless insert into %"PRId64" table(s) for %ld records each with %d thread(s)\n\n",
g_totalChildTables, stbInfo->insertRows, g_Dbs.threadCount);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"start schemaless insert into %"PRId64" table(s) for %ld records each with %d thread(s)\n\n",
g_totalChildTables, stbInfo->insertRows, g_Dbs.threadCount);
}
start = taosGetTimestampMs();
startMultiThreadInsertSchemaless(
g_Dbs.threadCount,
g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
stbInfo);
end = taosGetTimestampMs();
fprintf(stderr,
"\nSpent %.4f seconds schemaless insert into %"PRId64" table(s) with %d thread(s) and %"PRId64" records each\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCount, stbInfo->insertRows);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"\nSpent %.4f seconds schemaless insert into %"PRId64" table(s) with %d thread(s),and %"PRId64" records each\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCount, stbInfo->insertRows);
}
return 0;
}
}
}
}
}
if (g_totalChildTables > 0) {
fprintf(stderr,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册