提交 850f9d62 编写于 作者: sangshuduo's avatar sangshuduo

[TD-5872]<fix>: taosdemo stmt csv perf improve.

上级 dab65838
......@@ -444,6 +444,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int64_t *bind_ts;
int threadID;
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
......@@ -5058,10 +5059,15 @@ static void postFreeResource() {
}
#if STMT_IFACE_ENABLED == 1
if (g_Dbs.db[i].superTbls[j].sampleBindArray) {
for (int c = 0; c < MAX_SAMPLES_ONCE_FROM_FILE; c++) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
g_Dbs.db[i].superTbls[j].sampleBindArray
+ sizeof(uintptr_t *) * c));
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
}
......@@ -5747,8 +5753,9 @@ static int64_t generateInterlaceDataWithoutStb(
}
#if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr,
static int32_t prepareStmtBindArrayByType(
TAOS_BIND *bind,
char *dataType, int32_t dataLen,
int32_t timePrec,
char *value)
{
......@@ -5759,13 +5766,15 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_binary = (char *)*ptr;
char *bind_binary;
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
if (value) {
bind_binary = calloc(1, strlen(value) + 1);
strncpy(bind_binary, value, strlen(value));
bind->buffer_length = strlen(bind_binary);
} else {
bind_binary = calloc(1, dataLen + 1);
rand_string(bind_binary, dataLen);
bind->buffer_length = dataLen;
}
......@@ -5773,8 +5782,6 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->length = &bind->buffer_length;
bind->buffer = bind_binary;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
......@@ -5782,12 +5789,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_nchar = (char *)*ptr;
char *bind_nchar;
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
if (value) {
bind_nchar = calloc(1, strlen(value) + 1);
strncpy(bind_nchar, value, strlen(value));
} else {
bind_nchar = calloc(1, dataLen + 1);
rand_string(bind_nchar, dataLen);
}
......@@ -5795,11 +5804,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = (int32_t *)*ptr;
int32_t *bind_int = malloc(sizeof(int32_t));
if (value) {
*bind_int = atoi(value);
......@@ -5811,11 +5818,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = (int64_t *)*ptr;
int64_t *bind_bigint = malloc(sizeof(int64_t));
if (value) {
*bind_bigint = atoll(value);
......@@ -5827,11 +5832,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *) *ptr;
float *bind_float = malloc(sizeof(float));
if (value) {
*bind_float = (float)atof(value);
......@@ -5843,11 +5846,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)*ptr;
double *bind_double = malloc(sizeof(double));
if (value) {
*bind_double = atof(value);
......@@ -5859,11 +5860,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = (int16_t *)*ptr;
int16_t *bind_smallint = malloc(sizeof(int16_t));
if (value) {
*bind_smallint = (int16_t)atoi(value);
......@@ -5875,11 +5874,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)*ptr;
int8_t *bind_tinyint = malloc(sizeof(int8_t));
if (value) {
*bind_tinyint = (int8_t)atoi(value);
......@@ -5891,22 +5888,28 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)*ptr;
int8_t *bind_bool = malloc(sizeof(int8_t));
*bind_bool = rand_bool();
if (value) {
if (strncasecmp(value, "true", 4)) {
*bind_bool = true;
} else {
*bind_bool = false;
}
} else {
*bind_bool = rand_bool();
}
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *) *ptr;
int64_t *bind_ts2 = malloc(sizeof(int64_t));
if (value) {
if (strchr(value, ':') && strchr(value, '-')) {
......@@ -5936,8 +5939,6 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else {
errorPrint( "No support data type: %s\n", dataType);
return -1;
......@@ -5974,15 +5975,11 @@ static int32_t prepareStmtWithoutStb(
int32_t k = 0;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
int64_t *bind_ts = pThreadInfo->bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (g_args.disorderRatio) {
......@@ -5998,8 +5995,6 @@ static int32_t prepareStmtWithoutStb(
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < g_args.num_of_CPR; i ++) {
bind = (TAOS_BIND *)((char *)bindArray
+ (sizeof(TAOS_BIND) * (i + 1)));
......@@ -6007,7 +6002,6 @@ static int32_t prepareStmtWithoutStb(
bind,
data_type[i],
g_args.len_of_binary,
&ptr,
pThreadInfo->time_precision,
NULL)) {
return -1;
......@@ -6048,10 +6042,6 @@ static int32_t prepareStbStmtBindTag(
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
......@@ -6060,7 +6050,6 @@ static int32_t prepareStbStmtBindTag(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
&ptr,
timePrec,
NULL)) {
free(bindBuffer);
......@@ -6073,6 +6062,7 @@ static int32_t prepareStbStmtBindTag(
}
static int32_t prepareStbStmtBindRand(
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec)
......@@ -6084,19 +6074,14 @@ static int32_t prepareStbStmtBindRand(
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts;
int64_t *bind_ts = ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
......@@ -6111,12 +6096,10 @@ static int32_t prepareStbStmtBindRand(
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
timePrec,
NULL)) {
tmfree(bindBuffer);
......@@ -6129,78 +6112,32 @@ static int32_t prepareStbStmtBindRand(
}
static int32_t prepareStbStmtBindWithSample(
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * samplePos + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
bind = (TAOS_BIND *)bindArray;
memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
int64_t *bind_ts = ts;
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
}
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
free(bindBuffer);
return 0;
}
......@@ -6279,7 +6216,9 @@ static int32_t prepareStbStmtRand(
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindRand(bindArray, stbInfo,
if (-1 == prepareStbStmtBindRand(
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision
/* is column */)) {
......@@ -6380,30 +6319,24 @@ static int32_t prepareStbStmtWithSample(
}
}
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
uint32_t k;
for (k = 0; k < batch;) {
char *bindArray = (char *)(*((uintptr_t *)
(stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos))));
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindWithSample(
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision,
*pSamplePos
/* is column */)) {
free(bindArray);
return -1;
}
ret = taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_bind_param() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
free(bindArray);
return -1;
}
// if msg > 3MB, break
......@@ -6411,7 +6344,6 @@ static int32_t prepareStbStmtWithSample(
if (0 != ret) {
errorPrint("%s() LN%d, stmt_add_batch() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
free(bindArray);
return -1;
}
......@@ -6428,11 +6360,8 @@ static int32_t prepareStbStmtWithSample(
}
}
free(bindArray);
return k;
}
#endif
static int32_t generateStbProgressiveData(
......@@ -7161,17 +7090,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
#if STMT_IFACE_ENABLED == 1
static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
{
// TODO:
stbInfo->sampleBindArray = calloc(sizeof(char *), MAX_SAMPLES_ONCE_FROM_FILE);
assert(stbInfo->sampleBindArray);
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (stbInfo->sampleBindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
......@@ -7180,9 +7106,6 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
int cursor = 0;
......@@ -7191,16 +7114,11 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c));
if (c == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->buffer = NULL; //bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * i + cursor;
......@@ -7213,26 +7131,31 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
}
}
memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
char *bindBuffer = calloc(1, index + 1);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, DOUBLE_BUFF_LEN);
return -1;
}
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
if (-1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[c-1].dataType,
stbInfo->columns[c-1].dataLen,
&ptr,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
free(bindBuffer);
}
}
*((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray;
}
free(bindBuffer);
return 0;
}
#endif
......@@ -7487,6 +7410,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
free(stmtBuffer);
exit(-1);
}
pThreadInfo->bind_ts = malloc(sizeof(int64_t));
}
#endif
} else {
......@@ -7531,11 +7455,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
#if STMT_IFACE_ENABLED == 1
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
tmfree((char *)pThreadInfo->bind_ts);
}
#endif
tsem_destroy(&(pThreadInfo->lock_sem));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册