提交 690faf1c 编写于 作者: sangshuduo's avatar sangshuduo

cherry pick from develop branch.

上级 39280a5e
...@@ -5067,13 +5067,6 @@ static int getRowDataFromSample( ...@@ -5067,13 +5067,6 @@ static int getRowDataFromSample(
SSuperTable* superTblInfo, int64_t* sampleUsePos) SSuperTable* superTblInfo, int64_t* sampleUsePos)
{ {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
*/
*sampleUsePos = 0; *sampleUsePos = 0;
} }
...@@ -5725,7 +5718,9 @@ static int64_t generateInterlaceDataWithoutStb( ...@@ -5725,7 +5718,9 @@ static int64_t generateInterlaceDataWithoutStb(
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr, char *value) char *dataType, int32_t dataLen, char **ptr,
int32_t timePrec,
char *value)
{ {
if (0 == strncasecmp(dataType, if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) { "BINARY", strlen("BINARY"))) {
...@@ -5884,7 +5879,25 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5884,7 +5879,25 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
int64_t *bind_ts2 = (int64_t *) *ptr; int64_t *bind_ts2 = (int64_t *) *ptr;
if (value) { if (value) {
*bind_ts2 = atoll(value); if (strchr(value, ':') && strchr(value, '-')) {
int i = 0;
while(value[i] != '\0') {
if (value[i] == '\"' || value[i] == '\'') {
value[i] = ' ';
}
i++;
}
int64_t tmpEpoch;
if (TSDB_CODE_SUCCESS != taosParseTime(
value, &tmpEpoch, strlen(value),
timePrec, 0)) {
errorPrint("Input %s, time format error!\n", value);
return -1;
}
*bind_ts2 = tmpEpoch;
} else {
*bind_ts2 = atoll(value);
}
} else { } else {
*bind_ts2 = rand_bigint(); *bind_ts2 = rand_bigint();
} }
...@@ -5909,6 +5922,7 @@ static int32_t prepareStmtWithoutStb( ...@@ -5909,6 +5922,7 @@ static int32_t prepareStmtWithoutStb(
uint32_t batch, uint32_t batch,
int64_t insertRows, int64_t insertRows,
int64_t recordFrom, int64_t recordFrom,
int32_t timePrec,
int64_t startTime) int64_t startTime)
{ {
int ret = taos_stmt_set_tbname(stmt, tableName); int ret = taos_stmt_set_tbname(stmt, tableName);
...@@ -5963,7 +5977,9 @@ static int32_t prepareStmtWithoutStb( ...@@ -5963,7 +5977,9 @@ static int32_t prepareStmtWithoutStb(
bind, bind,
data_type[i], data_type[i],
g_args.len_of_binary, g_args.len_of_binary,
&ptr, NULL)) { &ptr,
timePrec,
NULL)) {
return -1; return -1;
} }
} }
...@@ -5993,6 +6009,8 @@ static int32_t prepareStmtWithoutStb( ...@@ -5993,6 +6009,8 @@ static int32_t prepareStmtWithoutStb(
static int32_t prepareStbStmtBind( static int32_t prepareStbStmtBind(
char *bindArray, SSuperTable *stbInfo, bool sourceRand, char *bindArray, SSuperTable *stbInfo, bool sourceRand,
int64_t startTime, int32_t recSeq, int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos,
bool isColumn) bool isColumn)
{ {
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
...@@ -6041,12 +6059,14 @@ static int32_t prepareStbStmtBind( ...@@ -6041,12 +6059,14 @@ static int32_t prepareStbStmtBind(
stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen, stbInfo->columns[i-1].dataLen,
&ptr, &ptr,
timePrec,
NULL)) { NULL)) {
free(bindBuffer); free(bindBuffer);
return -1; return -1;
} }
} else { } else {
char *restStr = stbInfo->sampleDataBuf + cursor; char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * samplePos + cursor;
int lengthOfRest = strlen(restStr); int lengthOfRest = strlen(restStr);
int index = 0; int index = 0;
...@@ -6056,7 +6076,7 @@ static int32_t prepareStbStmtBind( ...@@ -6056,7 +6076,7 @@ static int32_t prepareStbStmtBind(
} }
} }
memset(bindBuffer, 0, g_args.len_of_binary); memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
strncpy(bindBuffer, restStr, index); strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too cursor += index + 1; // skip ',' too
...@@ -6065,6 +6085,7 @@ static int32_t prepareStbStmtBind( ...@@ -6065,6 +6085,7 @@ static int32_t prepareStbStmtBind(
stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen, stbInfo->columns[i-1].dataLen,
&ptr, &ptr,
timePrec,
bindBuffer)) { bindBuffer)) {
free(bindBuffer); free(bindBuffer);
return -1; return -1;
...@@ -6082,6 +6103,7 @@ static int32_t prepareStbStmtBind( ...@@ -6082,6 +6103,7 @@ static int32_t prepareStbStmtBind(
stbInfo->tags[t].dataType, stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen, stbInfo->tags[t].dataLen,
&ptr, &ptr,
timePrec,
NULL)) { NULL)) {
free(bindBuffer); free(bindBuffer);
return -1; return -1;
...@@ -6103,6 +6125,7 @@ static int32_t prepareStbStmt( ...@@ -6103,6 +6125,7 @@ static int32_t prepareStbStmt(
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
int ret; int ret;
...@@ -6143,7 +6166,10 @@ static int32_t prepareStbStmt( ...@@ -6143,7 +6166,10 @@ static int32_t prepareStbStmt(
} }
if (-1 == prepareStbStmtBind( if (-1 == prepareStbStmtBind(
tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { tagsArray, stbInfo, tagRand, -1, -1,
timePrec,
*pSamplePos,
false /* is tag */)) {
tmfree(tagsValBuf); tmfree(tagsValBuf);
tmfree(tagsArray); tmfree(tagsArray);
return -1; return -1;
...@@ -6179,7 +6205,10 @@ static int32_t prepareStbStmt( ...@@ -6179,7 +6205,10 @@ static int32_t prepareStbStmt(
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */ /* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand,
startTime, k, true /* is column */)) { startTime, k,
timePrec,
*pSamplePos,
true /* is column */)) {
free(bindArray); free(bindArray);
return -1; return -1;
} }
...@@ -6204,6 +6233,9 @@ static int32_t prepareStbStmt( ...@@ -6204,6 +6233,9 @@ static int32_t prepareStbStmt(
if (!sourceRand) { if (!sourceRand) {
(*pSamplePos) ++; (*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
} }
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
...@@ -6224,6 +6256,7 @@ static int32_t prepareStbStmtInterlace( ...@@ -6224,6 +6256,7 @@ static int32_t prepareStbStmtInterlace(
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
return prepareStbStmt( return prepareStbStmt(
...@@ -6233,6 +6266,7 @@ static int32_t prepareStbStmtInterlace( ...@@ -6233,6 +6266,7 @@ static int32_t prepareStbStmtInterlace(
tableSeq, tableSeq,
batch, batch,
insertRows, 0, startTime, insertRows, 0, startTime,
timePrec,
pSamplePos); pSamplePos);
} }
...@@ -6245,6 +6279,7 @@ static int32_t prepareStbStmtProgressive( ...@@ -6245,6 +6279,7 @@ static int32_t prepareStbStmtProgressive(
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
return prepareStbStmt( return prepareStbStmt(
...@@ -6254,6 +6289,7 @@ static int32_t prepareStbStmtProgressive( ...@@ -6254,6 +6289,7 @@ static int32_t prepareStbStmtProgressive(
tableSeq, tableSeq,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, recordFrom, startTime, insertRows, recordFrom, startTime,
timePrec,
pSamplePos); pSamplePos);
} }
...@@ -6459,6 +6495,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6459,6 +6495,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
startTime, startTime,
pThreadInfo->time_precision,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
#else #else
generated = -1; generated = -1;
...@@ -6485,6 +6522,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6485,6 +6522,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->stmt, tableName, pThreadInfo->stmt, tableName,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
pThreadInfo->time_precision,
startTime); startTime);
#else #else
generated = -1; generated = -1;
...@@ -6698,6 +6736,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6698,6 +6736,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
tableSeq, tableSeq,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, start_time, insertRows, i, start_time,
pThreadInfo->time_precision,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
#else #else
generated = -1; generated = -1;
...@@ -6718,6 +6757,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6718,6 +6757,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
tableName, tableName,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, insertRows, i,
pThreadInfo->time_precision,
start_time); start_time);
#else #else
generated = -1; generated = -1;
...@@ -6941,6 +6981,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -6941,6 +6981,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0; return 0;
} }
#if STMT_IFACE_ENABLED == 1
static void parseSampleFileToStmt()
{
// TODO:
}
#endif
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) { char* precision, SSuperTable* superTblInfo) {
...@@ -7096,6 +7144,46 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7096,6 +7144,46 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
#if STMT_IFACE_ENABLED == 1
char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer);
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
char *pstr = stmtBuffer;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
parseSampleFileToStmt();
}
#endif
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i; pThreadInfo->threadID = i;
...@@ -7127,12 +7215,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7127,12 +7215,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|| ((superTblInfo) || ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) { && (superTblInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) { if (NULL == pThreadInfo->stmt) {
...@@ -7145,35 +7227,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7145,35 +7227,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(-1); exit(-1);
} }
char buffer[BUFFER_SIZE]; int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0);
char *pstr = buffer;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){ if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_stmt_errstr(pThreadInfo->stmt)); ret, taos_stmt_errstr(pThreadInfo->stmt));
free(pids); free(pids);
free(infos); free(infos);
free(stmtBuffer);
exit(-1); exit(-1);
} }
} }
...@@ -7203,6 +7263,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7203,6 +7263,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
} }
#if STMT_IFACE_ENABLED == 1
free(stmtBuffer);
#endif
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册