未验证 提交 ec3c5d5b 编写于 作者: Y Yang Zhao 提交者: GitHub

[TD-10465]<feature>taosdemo support schemaless insertion (#8341)

* [TD-10465]<feature>taosdemo support schemaless

* fix performance output

* tasodemo sml interlace

* sml interlace

* sml interlace

* fix memory leak

* taosdemo sml interlace

* fix core dump
上级 f5eed75d
......@@ -147,6 +147,7 @@ enum enum_TAOS_INTERFACE {
TAOSC_IFACE,
REST_IFACE,
STMT_IFACE,
SML_IFACE,
INTERFACE_BUT
};
......@@ -504,6 +505,7 @@ typedef struct SThreadInfo_S {
uint64_t querySeq; // sequence number of sql command
TAOS_SUB* tsub;
char** lines;
int sockfd;
} threadInfo;
......@@ -1055,6 +1057,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->iface = REST_IFACE;
} else if (0 == strcasecmp(argv[i+1], "stmt")) {
arguments->iface = STMT_IFACE;
} else if (0 == strcasecmp(argv[i+1], "sml")) {
arguments->iface = SML_IFACE;
} else {
errorWrongValue(argv[0], "-I", argv[i+1]);
exit(EXIT_FAILURE);
......@@ -1067,6 +1071,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->iface = REST_IFACE;
} else if (0 == strcasecmp((char *)(argv[i] + strlen("--interface=")), "stmt")) {
arguments->iface = STMT_IFACE;
} else if (0 == strcasecmp((char *)(argv[i] + strlen("--interface=")), "sml")) {
arguments->iface = SML_IFACE;
} else {
errorPrintReqArg3(argv[0], "--interface");
exit(EXIT_FAILURE);
......@@ -1078,6 +1084,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->iface = REST_IFACE;
} else if (0 == strcasecmp((char *)(argv[i] + strlen("-I")), "stmt")) {
arguments->iface = STMT_IFACE;
} else if (0 == strcasecmp((char *)(argv[i] + strlen("-I")), "sml")) {
arguments->iface = SML_IFACE;
} else {
errorWrongValue(argv[0], "-I",
(char *)(argv[i] + strlen("-I")));
......@@ -1094,6 +1102,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->iface = REST_IFACE;
} else if (0 == strcasecmp(argv[i+1], "stmt")) {
arguments->iface = STMT_IFACE;
} else if (0 == strcasecmp(argv[i+1], "sml")) {
arguments->iface = SML_IFACE;
} else {
errorWrongValue(argv[0], "--interface", argv[i+1]);
exit(EXIT_FAILURE);
......@@ -2611,7 +2621,8 @@ static int printfInsertMeta() {
// first time if no iface specified
printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":
(g_args.iface==REST_IFACE)?"rest":"stmt");
(g_args.iface==REST_IFACE)?"rest":
(g_args.iface==STMT_IFACE)?"stmt":"sml");
}
printf("host: \033[33m%s:%u\033[0m\n",
......@@ -2737,7 +2748,8 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].dataSource);
printf(" iface: \033[33m%s\033[0m\n",
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":
(g_Dbs.db[i].superTbls[j].iface==STMT_IFACE)?"stmt":"sml");
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
g_Dbs.db[i].superTbls[j].childTblLimit);
......@@ -2936,7 +2948,8 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " iface: %s\n",
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":
(g_Dbs.db[i].superTbls[j].iface==STMT_IFACE)?"stmt":"sml");
fprintf(fp, " insertRows: %"PRId64"\n",
g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " interlace rows: %u\n",
......@@ -4467,6 +4480,10 @@ int createDatabasesAndStables(char *command) {
int validStbCount = 0;
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (g_Dbs.db[i].superTbls[j].iface == SML_IFACE) {
goto skip;
}
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].stbName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
......@@ -4488,6 +4505,7 @@ int createDatabasesAndStables(char *command) {
continue;
}
}
skip:
validStbCount ++;
}
g_Dbs.db[i].superTblCount = validStbCount;
......@@ -5667,6 +5685,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "sml")) {
g_Dbs.db[i].superTbls[j].iface= SML_IFACE;
g_args.iface = SML_IFACE;
} else {
errorPrint("failed to read json, insert_mode %s not recognized\n",
stbIface->valuestring);
......@@ -7007,7 +7028,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
{
int32_t affectedRows;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
int32_t code;
uint16_t iface;
if (stbInfo)
iface = stbInfo->iface;
......@@ -7059,7 +7080,19 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
}
affectedRows = k;
break;
case SML_IFACE:
code = taos_schemaless_insert(pThreadInfo->taos, pThreadInfo->lines, k, 0, pThreadInfo->time_precision == TSDB_TIME_PRECISION_MILLI
? "ms"
: (pThreadInfo->time_precision == TSDB_TIME_PRECISION_MICRO
? "us"
: "ns"));
if (code) {
errorPrint2("%s() LN%d, failed to execute schemaless insert. reason: %s\n",
__func__, __LINE__, tstrerror(code));
exit(EXIT_FAILURE);
}
affectedRows = k;
break;
default:
errorPrint2("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, stbInfo->iface);
......@@ -9545,6 +9578,441 @@ free_of_interlace_stmt:
#endif
static void generateSmlHead(char* smlHead, SSuperTable* stbInfo, threadInfo* pThreadInfo, int tbSeq) {
int64_t dataLen = 0;
dataLen += snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"%s,id=%s%" PRIu64 "", stbInfo->stbName,
stbInfo->childTblPrefix,
tbSeq + pThreadInfo->start_table_from);
for (int j = 0; j < stbInfo->tagCount; j++) {
tstrncpy(smlHead + dataLen, ",", 2);
dataLen += 1;
switch (stbInfo->tags[j].data_type) {
case TSDB_DATA_TYPE_TIMESTAMP:
errorPrint2(
"%s() LN%d, Does not support data type %s as tag\n",
__func__, __LINE__, stbInfo->tags[j].dataType);
exit(EXIT_FAILURE);
case TSDB_DATA_TYPE_BOOL:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%s", j, rand_bool_str());
break;
case TSDB_DATA_TYPE_TINYINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%si8", j, rand_tinyint_str());
break;
case TSDB_DATA_TYPE_UTINYINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%su8", j, rand_utinyint_str());
break;
case TSDB_DATA_TYPE_SMALLINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%si16", j, rand_smallint_str());
break;
case TSDB_DATA_TYPE_USMALLINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%su16", j, rand_usmallint_str());
break;
case TSDB_DATA_TYPE_INT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%si32", j, rand_int_str());
break;
case TSDB_DATA_TYPE_UINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%su32", j, rand_uint_str());
break;
case TSDB_DATA_TYPE_BIGINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%si64", j, rand_bigint_str());
break;
case TSDB_DATA_TYPE_UBIGINT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%su64", j, rand_ubigint_str());
break;
case TSDB_DATA_TYPE_FLOAT:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%sf32", j, rand_float_str());
break;
case TSDB_DATA_TYPE_DOUBLE:
dataLen +=
snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen,
"T%d=%sf64", j, rand_double_str());
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (stbInfo->tags[j].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2(
"binary or nchar length overflow, maxsize:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
exit(EXIT_FAILURE);
}
char *buf = (char *)calloc(stbInfo->tags[j].dataLen + 1, 1);
if (NULL == buf) {
errorPrint2("calloc failed! size:%d\n",
stbInfo->tags[j].dataLen);
exit(EXIT_FAILURE);
}
rand_string(buf, stbInfo->tags[j].dataLen);
if (stbInfo->tags[j].data_type == TSDB_DATA_TYPE_BINARY) {
dataLen += snprintf(smlHead + dataLen,
HEAD_BUFF_LEN - dataLen,
"T%d=\"%s\"", j, buf);
} else {
dataLen += snprintf(smlHead + dataLen,
HEAD_BUFF_LEN - dataLen,
"T%d=L\"%s\"", j, buf);
}
tmfree(buf);
break;
default:
errorPrint2("%s() LN%d, Unknown data type %s\n", __func__,
__LINE__, stbInfo->tags[j].dataType);
exit(EXIT_FAILURE);
}
}
}
static void generateSmlTail(char* line, char* smlHead, SSuperTable* stbInfo,
threadInfo* pThreadInfo, int64_t timestamp) {
int dataLen = 0;
dataLen = snprintf(line, BUFFER_SIZE, "%s ", smlHead);
for (uint32_t c = 0; c < stbInfo->columnCount; c++) {
if (c != 0) {
tstrncpy(line + dataLen, ",", 2);
dataLen += 1;
}
switch (stbInfo->columns[c].data_type) {
case TSDB_DATA_TYPE_TIMESTAMP:
errorPrint2(
"%s() LN%d, Does not support data type %s as tag\n",
__func__, __LINE__, stbInfo->columns[c].dataType);
exit(EXIT_FAILURE);
case TSDB_DATA_TYPE_BOOL:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen, "c%d=%s",
c, rand_bool_str());
break;
case TSDB_DATA_TYPE_TINYINT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen, "c%d=%si8",
c, rand_tinyint_str());
break;
case TSDB_DATA_TYPE_UTINYINT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen, "c%d=%su8",
c, rand_utinyint_str());
break;
case TSDB_DATA_TYPE_SMALLINT:
dataLen += snprintf(
line + dataLen, BUFFER_SIZE - dataLen,
"c%d=%si16", c, rand_smallint_str());
break;
case TSDB_DATA_TYPE_USMALLINT:
dataLen += snprintf(
line + dataLen, BUFFER_SIZE - dataLen,
"c%d=%su16", c, rand_usmallint_str());
break;
case TSDB_DATA_TYPE_INT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=%si32", c, rand_int_str());
break;
case TSDB_DATA_TYPE_UINT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=%su32", c, rand_uint_str());
break;
case TSDB_DATA_TYPE_BIGINT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=%si64", c, rand_bigint_str());
break;
case TSDB_DATA_TYPE_UBIGINT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=%su64", c, rand_ubigint_str());
break;
case TSDB_DATA_TYPE_FLOAT:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=%sf32", c, rand_float_str());
break;
case TSDB_DATA_TYPE_DOUBLE:
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=%sf64", c, rand_double_str());
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (stbInfo->columns[c].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2(
"binary or nchar length overflow, maxsize:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
exit(EXIT_FAILURE);
}
char *buf =
(char *)calloc(stbInfo->columns[c].dataLen + 1, 1);
if (NULL == buf) {
errorPrint2("calloc failed! size:%d\n",
stbInfo->columns[c].dataLen);
exit(EXIT_FAILURE);
}
rand_string(buf, stbInfo->columns[c].dataLen);
if (stbInfo->columns[c].data_type ==
TSDB_DATA_TYPE_BINARY) {
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=\"%s\"", c, buf);
} else {
dataLen += snprintf(line + dataLen,
BUFFER_SIZE - dataLen,
"c%d=L\"%s\"", c, buf);
}
tmfree(buf);
break;
default:
errorPrint2("%s() LN%d, Unknown data type %s\n",
__func__, __LINE__,
stbInfo->columns[c].dataType);
exit(EXIT_FAILURE);
}
}
dataLen += snprintf(line + dataLen, BUFFER_SIZE - dataLen," %" PRId64 "", timestamp);
}
static void* syncWriteInterlaceSml(threadInfo *pThreadInfo, uint32_t interlaceRows) {
debugPrint("[%d] %s() LN%d: ### interlace schemaless write\n",
pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows;
uint64_t maxSqlLen;
int64_t timeStampStep;
uint64_t insert_interval;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (stbInfo) {
insertRows = stbInfo->insertRows;
maxSqlLen = stbInfo->maxSqlLen;
timeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval;
} else {
insertRows = g_args.insertRows;
maxSqlLen = g_args.max_sql_len;
timeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
if (interlaceRows > g_args.reqPerReq)
interlaceRows = g_args.reqPerReq;
uint32_t batchPerTbl = interlaceRows;
uint32_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.reqPerReq / interlaceRows;
} else {
batchPerTblTimes = 1;
}
char *smlHead[pThreadInfo->ntables];
for (int t = 0; t < pThreadInfo->ntables; t++) {
smlHead[t] = (char *)calloc(HEAD_BUFF_LEN, 1);
if (NULL == smlHead[t]) {
errorPrint2("calloc failed! size:%d\n", HEAD_BUFF_LEN);
exit(EXIT_FAILURE);
}
generateSmlHead(smlHead[t], stbInfo, pThreadInfo, t);
}
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
uint64_t st = 0;
uint64_t et = UINT64_MAX;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
uint64_t tableSeq = pThreadInfo->start_table_from;
int64_t startTime = pThreadInfo->start_time;
uint64_t generatedRecPerTbl = 0;
bool flagSleep = true;
uint64_t sleepTimeTotal = 0;
int percentComplete = 0;
int64_t totalRows = insertRows * pThreadInfo->ntables;
pThreadInfo->lines = calloc(g_args.reqPerReq, sizeof(char *));
if (NULL == pThreadInfo->lines) {
errorPrint2("Failed to alloc %"PRIu64" bytes, reason:%s\n",
g_args.reqPerReq * sizeof(char *),
strerror(errno));
return NULL;
}
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs();
flagSleep = false;
}
// generate data
uint32_t recOfBatch = 0;
for (uint64_t i = 0; i < batchPerTblTimes; i++) {
int64_t timestamp = startTime;
for (int j = recOfBatch; j < recOfBatch + batchPerTbl; j++) {
pThreadInfo->lines[j] = calloc(BUFFER_SIZE, 1);
if (NULL == pThreadInfo->lines[j]) {
errorPrint2("Failed to alloc %d bytes, reason:%s\n",
BUFFER_SIZE, strerror(errno));
}
generateSmlTail(pThreadInfo->lines[j], smlHead[i], stbInfo, pThreadInfo, timestamp);
timestamp += timeStampStep;
}
tableSeq ++;
recOfBatch += batchPerTbl;
pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
generatedRecPerTbl += batchPerTbl;
startTime = pThreadInfo->start_time
+ generatedRecPerTbl * timeStampStep;
flagSleep = true;
if (generatedRecPerTbl >= insertRows)
break;
int64_t remainRows = insertRows - generatedRecPerTbl;
if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows;
if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq)
break;
}
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__,
generatedRecPerTbl, insertRows);
if ((g_args.reqPerReq - recOfBatch) < batchPerTbl)
break;
}
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
startTs = taosGetTimestampUs();
if (recOfBatch == 0) {
errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl);
if (batchPerTbl > 0) {
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
batchPerTbl, maxSqlLen / batchPerTbl);
}
errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n",
maxSqlLen, batchPerTbl);
goto free_of_interlace;
}
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
endTs = taosGetTimestampUs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.2f ms\n",
__func__, __LINE__, delay / 1000.0);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (recOfBatch != affectedRows) {
errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, pThreadInfo->buffer);
goto free_of_interlace;
}
pThreadInfo->totalAffectedRows += affectedRows;
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
if ((insert_interval) && flagSleep) {
et = taosGetTimestampMs();
if (insert_interval > (et - st) ) {
uint64_t sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
__func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval;
}
}
for (int index = 0; index < g_args.reqPerReq; index++) {
free(pThreadInfo->lines[index]);
}
}
if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
free_of_interlace:
tmfree(pThreadInfo->lines);
for (int index = 0; index < pThreadInfo->ntables; index++) {
free(smlHead[index]);
}
printStatPerThread(pThreadInfo);
return NULL;
}
// sync write interlace data
static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
debugPrint("[%d] %s() LN%d: ### interlace write\n",
......@@ -9947,6 +10415,120 @@ free_of_stmt_progressive:
printStatPerThread(pThreadInfo);
return NULL;
}
static void* syncWriteProgressiveSml(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### sml progressive write\n", __func__, __LINE__);
SSuperTable* stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep =
stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows =
(stbInfo)?stbInfo->insertRows:g_args.insertRows;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
uint64_t lastPrintTime = taosGetTimestampMs();
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
char *smlHead[pThreadInfo->ntables];
for (int t = 0; t < pThreadInfo->ntables; t++) {
smlHead[t] = (char *)calloc(HEAD_BUFF_LEN, 1);
if (NULL == smlHead[t]) {
errorPrint2("calloc failed! size:%d\n", HEAD_BUFF_LEN);
exit(EXIT_FAILURE);
}
generateSmlHead(smlHead[t], stbInfo, pThreadInfo, t);
}
int currentPercent = 0;
int percentComplete = 0;
if (insertRows < g_args.reqPerReq) {
g_args.reqPerReq = insertRows;
}
pThreadInfo->lines = calloc(g_args.reqPerReq, sizeof(char *));
if (NULL == pThreadInfo->lines) {
errorPrint2("Failed to alloc %"PRIu64" bytes, reason:%s\n",
g_args.reqPerReq * sizeof(char *),
strerror(errno));
return NULL;
}
for (uint64_t i = 0; i < pThreadInfo->ntables; i++) {
int64_t timestamp = pThreadInfo->start_time;
for (uint64_t j = 0; j < insertRows;) {
for (int k = 0; k < g_args.reqPerReq; k++) {
pThreadInfo->lines[k] = calloc(BUFFER_SIZE, 1);
if (NULL == pThreadInfo->lines[k]) {
errorPrint2("Failed to alloc %d bytes, reason:%s\n",
BUFFER_SIZE, strerror(errno));
}
generateSmlTail(pThreadInfo->lines[k], smlHead[i], stbInfo, pThreadInfo, timestamp);
timestamp += timeStampStep;
j++;
if (j == insertRows) {
break;
}
}
uint64_t startTs = taosGetTimestampUs();
int32_t affectedRows = execInsert(pThreadInfo, g_args.reqPerReq);
uint64_t endTs = taosGetTimestampUs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.f ms\n",
__func__, __LINE__, delay/1000.0);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay){
pThreadInfo->maxDelay = delay;
}
if (delay < pThreadInfo->minDelay){
pThreadInfo->minDelay = delay;
}
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
pThreadInfo->totalAffectedRows += affectedRows;
pThreadInfo->totalInsertRows += g_args.reqPerReq;
currentPercent =
pThreadInfo->totalAffectedRows * g_Dbs.threadCount / insertRows;
if (currentPercent > percentComplete) {
printf("[%d]:%d%%\n", pThreadInfo->threadID,
currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
for (int index = 0; index < g_args.reqPerReq; index++) {
free(pThreadInfo->lines[index]);
}
if (j == insertRows) {
break;
}
}
}
tmfree(pThreadInfo->lines);
for (int index = 0; index < pThreadInfo->ntables; index++) {
free(smlHead[index]);
}
return NULL;
}
// sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
......@@ -10152,6 +10734,8 @@ static void* syncWrite(void *sarg) {
#else
return syncWriteInterlaceStmt(pThreadInfo, interlaceRows);
#endif
} else if (SML_IFACE == stbInfo->iface) {
return syncWriteInterlaceSml(pThreadInfo, interlaceRows);
} else {
return syncWriteInterlace(pThreadInfo, interlaceRows);
}
......@@ -10161,6 +10745,9 @@ static void* syncWrite(void *sarg) {
if (((stbInfo) && (STMT_IFACE == stbInfo->iface))
|| (STMT_IFACE == g_args.iface)) {
return syncWriteProgressiveStmt(pThreadInfo);
} else if (((stbInfo) && (SML_IFACE == stbInfo->iface))
|| (SML_IFACE == g_args.iface)) {
return syncWriteProgressiveSml(pThreadInfo);
} else {
return syncWriteProgressive(pThreadInfo);
}
......@@ -10318,7 +10905,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
// read sample data from file first
int ret;
if (stbInfo) {
if (stbInfo && stbInfo->iface != SML_IFACE) {
ret = prepareSampleForStb(stbInfo);
} else {
ret = prepareSampleForNtb();
......@@ -10341,72 +10928,76 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t ntables = 0;
uint64_t tableFrom;
if (stbInfo) {
int64_t limit;
uint64_t offset;
if (stbInfo->iface != SML_IFACE) {
int64_t limit;
uint64_t offset;
if ((NULL != g_args.sqlFile)
&& (stbInfo->childTblExists == TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset != 0)
|| (stbInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if ((NULL != g_args.sqlFile)
&& (stbInfo->childTblExists == TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset != 0)
|| (stbInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((stbInfo->childTblLimit < 0)
|| ((stbInfo->childTblOffset
+ stbInfo->childTblLimit)
> (stbInfo->childTblCount))) {
if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((stbInfo->childTblLimit < 0)
|| ((stbInfo->childTblOffset
+ stbInfo->childTblLimit)
> (stbInfo->childTblCount))) {
if (stbInfo->childTblCount < stbInfo->childTblOffset) {
printf("WARNING: offset will not be used since the child tables count is less then offset!\n");
if (stbInfo->childTblCount < stbInfo->childTblOffset) {
printf("WARNING: offset will not be used since the child tables count is less then offset!\n");
stbInfo->childTblOffset = 0;
stbInfo->childTblOffset = 0;
}
stbInfo->childTblLimit =
stbInfo->childTblCount - stbInfo->childTblOffset;
}
stbInfo->childTblLimit =
stbInfo->childTblCount - stbInfo->childTblOffset;
offset = stbInfo->childTblOffset;
limit = stbInfo->childTblLimit;
} else {
limit = stbInfo->childTblCount;
offset = 0;
}
offset = stbInfo->childTblOffset;
limit = stbInfo->childTblLimit;
} else {
limit = stbInfo->childTblCount;
offset = 0;
}
ntables = limit;
tableFrom = offset;
ntables = limit;
tableFrom = offset;
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset + stbInfo->childTblLimit)
> stbInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n");
prompt();
}
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset + stbInfo->childTblLimit)
> stbInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n");
prompt();
}
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == stbInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt();
}
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == stbInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt();
}
stbInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (stbInfo->childTblName == NULL) {
taos_close(taos0);
errorPrint2("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
stbInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (stbInfo->childTblName == NULL) {
taos_close(taos0);
errorPrint2("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
exit(EXIT_FAILURE);
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, stbInfo->stbName,
&stbInfo->childTblName, &childTblCount,
limit,
offset, stbInfo->escapeChar);
ntables = childTblCount;
} else {
ntables = stbInfo->childTblCount;
}
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, stbInfo->stbName,
&stbInfo->childTblName, &childTblCount,
limit,
offset, stbInfo->escapeChar);
ntables = childTblCount;
} else {
ntables = g_args.ntables;
tableFrom = 0;
......@@ -10990,33 +11581,34 @@ static int insertTestProcess() {
double start;
double end;
if (g_totalChildTables > 0) {
fprintf(stderr,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
g_totalChildTables, g_Dbs.threadCountForCreateTbl);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
g_totalChildTables, g_Dbs.threadCountForCreateTbl);
}
if (g_args.iface != SML_IFACE) {
if (g_totalChildTables > 0) {
fprintf(stderr,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
g_totalChildTables, g_Dbs.threadCountForCreateTbl);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
g_totalChildTables, g_Dbs.threadCountForCreateTbl);
}
// create child tables
start = taosGetTimestampMs();
createChildTables();
end = taosGetTimestampMs();
// create child tables
start = taosGetTimestampMs();
createChildTables();
end = taosGetTimestampMs();
fprintf(stderr,
"\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCountForCreateTbl, g_actualChildTables);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCountForCreateTbl, g_actualChildTables);
fprintf(stderr,
"\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCountForCreateTbl, g_actualChildTables);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCountForCreateTbl, g_actualChildTables);
}
}
}
// create sub threads for inserting data
//start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) {
......@@ -12069,10 +12661,12 @@ static void setParaFromArg() {
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", min(DATATYPE_BUFF_LEN, strlen("INT") + 1));
g_Dbs.db[0].superTbls[0].tags[0].data_type = TSDB_DATA_TYPE_INT;
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", min(DATATYPE_BUFF_LEN, strlen("BINARY") + 1));
g_Dbs.db[0].superTbls[0].tags[1].data_type = TSDB_DATA_TYPE_BINARY;
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.binwidth;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册