提交 54813175 编写于 作者: sangshuduo's avatar sangshuduo

refactor stmt functions.

上级 07603323
......@@ -295,6 +295,9 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
#if STMT_IFACE_ENABLED == 1
void *sampleBindArray;
#endif
//int sampleRowCount;
//int sampleUsePos;
......@@ -454,7 +457,7 @@ typedef struct SThreadInfo_S {
int64_t start_time;
char* cols;
bool use_metric;
SSuperTable* superTblInfo;
SSuperTable* stbInfo;
char *buffer; // sql cmd buffer
// for async insert
......@@ -3088,7 +3091,7 @@ int createDatabasesAndStables(char *command) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
......@@ -3099,35 +3102,43 @@ int createDatabasesAndStables(char *command) {
int dataLen = 0;
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName);
BUFFER_SIZE - dataLen, "create database if not exists %s",
g_Dbs.db[i].dbName);
if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks);
BUFFER_SIZE - dataLen, " blocks %d",
g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache);
BUFFER_SIZE - dataLen, " cache %d",
g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days);
BUFFER_SIZE - dataLen, " days %d",
g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep);
BUFFER_SIZE - dataLen, " keep %d",
g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum);
BUFFER_SIZE - dataLen, " quorum %d",
g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica);
BUFFER_SIZE - dataLen, " replica %d",
g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update);
BUFFER_SIZE - dataLen, " update %d",
g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen,
......@@ -3135,42 +3146,48 @@ int createDatabasesAndStables(char *command) {
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows);
BUFFER_SIZE - dataLen, " minrows %d",
g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows);
BUFFER_SIZE - dataLen, " maxrows %d",
g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp);
BUFFER_SIZE - dataLen, " comp %d",
g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel);
BUFFER_SIZE - dataLen, " wal %d",
g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast);
BUFFER_SIZE - dataLen, " cachelast %d",
g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"ns", strlen("ns")))
"ns", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
"us", 2))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
errorPrint( "\ncreate database %s failed!\n\n",
g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
......@@ -3217,7 +3234,7 @@ int createDatabasesAndStables(char *command) {
static void* createTable(void *sarg)
{
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("createTable");
......@@ -3247,7 +3264,7 @@ static void* createTable(void *sarg)
g_args.tb_prefix, i,
pThreadInfo->cols);
} else {
if (superTblInfo == NULL) {
if (stbInfo == NULL) {
errorPrint("%s() LN%d, use metric, but super table info is NULL\n",
__func__, __LINE__);
free(pThreadInfo->buffer);
......@@ -3260,12 +3277,12 @@ static void* createTable(void *sarg)
buff_len - len, "create table ");
}
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, i);
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, i);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
i % superTblInfo->tagSampleCount);
stbInfo,
i % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
free(pThreadInfo->buffer);
......@@ -3274,14 +3291,14 @@ static void* createTable(void *sarg)
len += snprintf(pThreadInfo->buffer + len,
buff_len - len,
"if not exists %s.%s%"PRIu64" using %s.%s tags %s ",
pThreadInfo->db_name, superTblInfo->childTblPrefix,
pThreadInfo->db_name, stbInfo->childTblPrefix,
i, pThreadInfo->db_name,
superTblInfo->sTblName, tagsValBuf);
stbInfo->sTblName, tagsValBuf);
free(tagsValBuf);
batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum)
if ((batchNum < stbInfo->batchCreateTableNum)
&& ((buff_len - len)
>= (superTblInfo->lenOfTagOfOneRow + 256))) {
>= (stbInfo->lenOfTagOfOneRow + 256))) {
continue;
}
}
......@@ -3316,7 +3333,7 @@ static void* createTable(void *sarg)
static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* superTblInfo) {
char* db_name, SSuperTable* stbInfo) {
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
......@@ -3343,7 +3360,7 @@ static int startMultiThreadCreateChildTable(
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->superTblInfo = superTblInfo;
pThreadInfo->stbInfo = stbInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
pThreadInfo->taos = taos_connect(
g_Dbs.host,
......@@ -3450,26 +3467,26 @@ static void createChildTables() {
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
static int readTagFromCsvFileToMem(SSuperTable * stbInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
FILE *fp = fopen(superTblInfo->tagsFile, "r");
FILE *fp = fopen(stbInfo->tagsFile, "r");
if (fp == NULL) {
printf("Failed to open tags file: %s, reason:%s\n",
superTblInfo->tagsFile, strerror(errno));
stbInfo->tagsFile, strerror(errno));
return -1;
}
if (superTblInfo->tagDataBuf) {
free(superTblInfo->tagDataBuf);
superTblInfo->tagDataBuf = NULL;
if (stbInfo->tagDataBuf) {
free(stbInfo->tagDataBuf);
stbInfo->tagDataBuf = NULL;
}
int tagCount = 10000;
int count = 0;
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount);
char* tagDataBuf = calloc(1, stbInfo->lenOfTagOfOneRow * tagCount);
if (tagDataBuf == NULL) {
printf("Failed to calloc, reason:%s\n", strerror(errno));
fclose(fp);
......@@ -3485,20 +3502,20 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
continue;
}
memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen);
memcpy(tagDataBuf + count * stbInfo->lenOfTagOfOneRow, line, readLen);
count++;
if (count >= tagCount - 1) {
char *tmp = realloc(tagDataBuf,
(size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
(size_t)tagCount*1.5*stbInfo->lenOfTagOfOneRow);
if (tmp != NULL) {
tagDataBuf = tmp;
tagCount = (int)(tagCount*1.5);
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
memset(tagDataBuf + count*stbInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*stbInfo->lenOfTagOfOneRow));
} else {
// exit, if allocate more memory failed
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile);
printf("realloc fail for save tag val from %s\n", stbInfo->tagsFile);
tmfree(tagDataBuf);
free(line);
fclose(fp);
......@@ -3507,8 +3524,8 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
}
}
superTblInfo->tagDataBuf = tagDataBuf;
superTblInfo->tagSampleCount = count;
stbInfo->tagDataBuf = tagDataBuf;
stbInfo->tagSampleCount = count;
free(line);
fclose(fp);
......@@ -3519,28 +3536,28 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) {
SSuperTable* stbInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r");
FILE* fp = fopen(stbInfo->sampleFile, "r");
if (fp == NULL) {
errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
stbInfo->sampleFile, strerror(errno));
return -1;
}
assert(superTblInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
assert(stbInfo->sampleDataBuf);
memset(stbInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * stbInfo->lenOfOneRow);
while(1) {
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
errorPrint( "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
stbInfo->sampleFile, strerror(errno));
fclose(fp);
return -1;
}
......@@ -3555,13 +3572,13 @@ static int readSampleFromCsvFileToMem(
continue;
}
if (readLen > superTblInfo->lenOfOneRow) {
if (readLen > stbInfo->lenOfOneRow) {
printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n",
(int32_t)readLen, superTblInfo->lenOfOneRow);
(int32_t)readLen, stbInfo->lenOfOneRow);
continue;
}
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
memcpy(stbInfo->sampleDataBuf + getRows * stbInfo->lenOfOneRow,
line, readLen);
getRows++;
......@@ -5046,6 +5063,18 @@ static void postFreeResource() {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
}
#if STMT_IFACE_ENABLED == 1
if (g_Dbs.db[i].superTbls[j].sampleBindArray) {
for (int c = 0; c < MAX_SAMPLES_ONCE_FROM_FILE; c++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
g_Dbs.db[i].superTbls[j].sampleBindArray
+ sizeof(uintptr_t *) * c));
tmfree((char *)tmp);
}
}
tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray);
#endif
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
......@@ -5066,12 +5095,12 @@ static void postFreeResource() {
tmfree(g_randfloat_buff);
tmfree(g_rand_current_buff);
tmfree(g_rand_phase_buff);
tmfree(g_randdouble_buff);
}
static int getRowDataFromSample(
char* dataBuf, int64_t maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int64_t* sampleUsePos)
SSuperTable* stbInfo, int64_t* sampleUsePos)
{
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*sampleUsePos = 0;
......@@ -5083,8 +5112,8 @@ static int getRowDataFromSample(
"(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s",
superTblInfo->sampleDataBuf
+ superTblInfo->lenOfOneRow * (*sampleUsePos));
stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos));
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++;
......@@ -5257,27 +5286,27 @@ static int64_t generateData(char *recBuf, char **data_type,
return (int32_t)strlen(recBuf);
}
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
char* sampleDataBuf = NULL;
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo);
stbInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(stbInfo);
if (0 != ret) {
errorPrint("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__);
tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
stbInfo->sampleDataBuf = NULL;
return -1;
}
......@@ -5287,14 +5316,14 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
{
int32_t affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
uint16_t iface;
if (superTblInfo)
iface = superTblInfo->iface;
if (stbInfo)
iface = stbInfo->iface;
else {
if (g_args.iface == INTERFACE_BUT)
iface = TAOSC_IFACE;
......@@ -5342,7 +5371,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
default:
errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->iface);
__func__, __LINE__, stbInfo->iface);
affectedRows = 0;
}
......@@ -5352,24 +5381,24 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) {
if (superTblInfo->childTblLimit > 0) {
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (stbInfo) {
if (AUTO_CREATE_SUBTBL != stbInfo->autoCreateTable) {
if (stbInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName +
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
stbInfo->childTblName +
(tableSeq - stbInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
superTblInfo->childTblPrefix, tableSeq);
stbInfo->childTblPrefix, tableSeq);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
......@@ -5450,7 +5479,7 @@ static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
}
static int32_t generateStbDataTail(
SSuperTable* superTblInfo,
SSuperTable* stbInfo,
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime,
......@@ -5460,7 +5489,7 @@ static int32_t generateStbDataTail(
char *pstr = buffer;
bool tsRand;
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
} else {
tsRand = false;
......@@ -5475,26 +5504,26 @@ static int32_t generateStbDataTail(
int64_t lenOfRow = 0;
if (tsRand) {
if (superTblInfo->disorderRatio > 0) {
lenOfRow = generateStbRowData(superTblInfo, data,
if (stbInfo->disorderRatio > 0) {
lenOfRow = generateStbRowData(stbInfo, data,
remainderBufLen,
startTime + getTSRandTail(
superTblInfo->timeStampStep, k,
superTblInfo->disorderRatio,
superTblInfo->disorderRange)
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange)
);
} else {
lenOfRow = generateStbRowData(superTblInfo, data,
lenOfRow = generateStbRowData(stbInfo, data,
remainderBufLen,
startTime + superTblInfo->timeStampStep * k
startTime + stbInfo->timeStampStep * k
);
}
} else {
lenOfRow = getRowDataFromSample(
data,
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
startTime + stbInfo->timeStampStep * k,
stbInfo,
pSamplePos);
}
......@@ -5550,7 +5579,7 @@ static int generateSQLHeadWithoutStb(char *tableName,
}
static int generateStbSQLHead(
SSuperTable* superTblInfo,
SSuperTable* stbInfo,
char *tableName, int64_t tableSeq,
char *dbName,
char *buffer, int remainderBufLen)
......@@ -5559,14 +5588,14 @@ static int generateStbSQLHead(
char headBuf[HEAD_BUFF_LEN];
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq);
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tableSeq % superTblInfo->tagSampleCount);
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
......@@ -5581,10 +5610,10 @@ static int generateStbSQLHead(
dbName,
tableName,
dbName,
superTblInfo->sTblName,
stbInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
} else if (TBL_ALREADY_EXISTS == stbInfo->childTblExists) {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
......@@ -5609,12 +5638,12 @@ static int generateStbSQLHead(
}
static int32_t generateStbInterlaceData(
SSuperTable *superTblInfo,
threadInfo *pThreadInfo,
char *tableName, uint32_t batchPerTbl,
uint64_t i,
uint32_t batchPerTblTimes,
uint64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
char *buffer,
int64_t insertRows,
int64_t startTime,
uint64_t *pRemainderBufLen)
......@@ -5622,8 +5651,9 @@ static int32_t generateStbInterlaceData(
assert(buffer);
char *pstr = buffer;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
int headLen = generateStbSQLHead(
superTblInfo,
stbInfo,
tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen);
......@@ -5643,12 +5673,12 @@ static int32_t generateStbInterlaceData(
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
int32_t k = generateStbDataTail(
superTblInfo,
stbInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&(pThreadInfo->samplePos), &dataLen);
......@@ -5911,14 +5941,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
}
static int32_t prepareStmtWithoutStb(
TAOS_STMT *stmt,
threadInfo *pThreadInfo,
char *tableName,
uint32_t batch,
int64_t insertRows,
int64_t recordFrom,
int32_t timePrec,
int64_t startTime)
{
TAOS_STMT *stmt = pThreadInfo->stmt;
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
......@@ -5972,7 +6002,7 @@ static int32_t prepareStmtWithoutStb(
data_type[i],
g_args.len_of_binary,
&ptr,
timePrec,
pThreadInfo->time_precision,
NULL)) {
return -1;
}
......@@ -6000,12 +6030,46 @@ static int32_t prepareStmtWithoutStb(
return k;
}
static int32_t prepareStbStmtBind(
char *bindArray, SSuperTable *stbInfo, bool sourceRand,
static int32_t prepareStbStmtBindTag(
char *bindArray, SSuperTable *stbInfo,
char *tagsVal,
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
&ptr,
timePrec,
NULL)) {
free(bindBuffer);
return -1;
}
}
free(bindBuffer);
return 0;
}
static int32_t prepareStbStmtBindRand(
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos,
bool isColumn)
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
......@@ -6020,126 +6084,139 @@ static int32_t prepareStbStmtBind(
TAOS_BIND *bind;
if (isColumn) {
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts;
if (i == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
timePrec,
NULL)) {
tmfree(bindBuffer);
return -1;
}
}
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
tmfree(bindBuffer);
return 0;
}
ptr += bind->buffer_length;
static int32_t prepareStbStmtBindWithSample(
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
timePrec,
NULL)) {
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * samplePos + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
ptr += bind->buffer_length;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * samplePos + cursor;
int lengthOfRest = strlen(restStr);
memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
}
} else {
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
timePrec,
NULL)) {
bindBuffer)) {
free(bindBuffer);
return -1;
}
}
}
free(bindBuffer);
return 0;
}
static int32_t prepareStbStmt(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
static int32_t prepareStbStmtRand(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos)
int64_t startTime)
{
int ret;
bool sourceRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
bool tagRand;
if (0 == stbInfo->tagSource) {
tagRand = true;
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagRand = false;
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
......@@ -6159,11 +6236,9 @@ static int32_t prepareStbStmt(
return -1;
}
if (-1 == prepareStbStmtBind(
tagsArray, stbInfo, tagRand, -1, -1,
timePrec,
*pSamplePos,
false /* is tag */)) {
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
......@@ -6198,11 +6273,10 @@ static int32_t prepareStbStmt(
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand,
if (-1 == prepareStbStmtBindRand(bindArray, stbInfo,
startTime, k,
timePrec,
*pSamplePos,
true /* is column */)) {
pThreadInfo->time_precision
/* is column */)) {
free(bindArray);
return -1;
}
......@@ -6225,13 +6299,6 @@ static int32_t prepareStbStmt(
k++;
recordFrom ++;
if (!sourceRand) {
(*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
}
if (recordFrom >= insertRows) {
break;
}
......@@ -6241,56 +6308,129 @@ static int32_t prepareStbStmt(
return k;
}
static int32_t prepareStbStmtInterlace(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
static int32_t prepareStbStmtWithSample(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
tableSeq,
batch,
insertRows, 0, startTime,
timePrec,
pSamplePos);
}
int ret;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
static int32_t prepareStbStmtProgressive(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, recordFrom, startTime,
timePrec,
pSamplePos);
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
}
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
tmfree(tagsValBuf);
tmfree(tagsArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_set_tbname_tags() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
} else {
ret = taos_stmt_set_tbname(stmt, tableName);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_set_tbname() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
}
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindWithSample(
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision,
*pSamplePos
/* is column */)) {
free(bindArray);
return -1;
}
ret = taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_bind_param() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
free(bindArray);
return -1;
}
// if msg > 3MB, break
ret = taos_stmt_add_batch(stmt);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_add_batch() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
free(bindArray);
return -1;
}
k++;
recordFrom ++;
(*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
if (recordFrom >= insertRows) {
break;
}
}
free(bindArray);
return k;
}
#endif
static int32_t generateStbProgressiveData(
SSuperTable *superTblInfo,
SSuperTable *stbInfo,
char *tableName,
int64_t tableSeq,
char *dbName, char *buffer,
......@@ -6304,7 +6444,7 @@ static int32_t generateStbProgressiveData(
memset(pstr, 0, *pRemainderBufLen);
int64_t headLen = generateStbSQLHead(
superTblInfo,
stbInfo,
tableName, tableSeq, dbName,
buffer, *pRemainderBufLen);
......@@ -6316,7 +6456,7 @@ static int32_t generateStbProgressiveData(
int64_t dataLen;
return generateStbDataTail(superTblInfo,
return generateStbDataTail(stbInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, recordFrom,
startTime,
......@@ -6376,26 +6516,34 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t nTimeStampStep;
uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
bool sourceRand;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (superTblInfo) {
insertRows = superTblInfo->insertRows;
if (stbInfo) {
insertRows = stbInfo->insertRows;
if ((superTblInfo->interlaceRows == 0)
if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows;
} else {
interlaceRows = superTblInfo->interlaceRows;
interlaceRows = stbInfo->interlaceRows;
}
maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval;
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else {
insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval;
sourceRand = true;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
......@@ -6475,29 +6623,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t oldRemainderLen = remainderBufLen;
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtInterlace(
superTblInfo,
pThreadInfo->stmt,
tableName,
tableSeq,
batchPerTbl,
insertRows, i,
startTime,
pThreadInfo->time_precision,
&(pThreadInfo->samplePos));
if (sourceRand) {
generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime,
&(pThreadInfo->samplePos));
}
#else
generated = -1;
#endif
} else {
generated = generateStbInterlaceData(
superTblInfo,
pThreadInfo,
tableName, batchPerTbl, i,
batchPerTblTimes,
tableSeq,
pThreadInfo, pstr,
pstr,
insertRows,
startTime,
&remainderBufLen);
......@@ -6510,10 +6667,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableName, batchPerTbl, startTime);
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt, tableName,
pThreadInfo,
tableName,
batchPerTbl,
insertRows, i,
pThreadInfo->time_precision,
startTime);
#else
generated = -1;
......@@ -6653,12 +6810,12 @@ free_of_interlace:
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
uint64_t maxSqlLen = stbInfo?stbInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:g_args.timestamp_step;
stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
(stbInfo)?stbInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
......@@ -6677,6 +6834,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
bool sourceRand;
if (stbInfo) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else {
sourceRand = true;
}
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
......@@ -6707,25 +6875,35 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
remainderBufLen -= len;
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtProgressive(
superTblInfo,
pThreadInfo->stmt,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
pThreadInfo->time_precision,
&(pThreadInfo->samplePos));
if (sourceRand) {
generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows,
i, start_time
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
}
#else
generated = -1;
#endif
} else {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr,
stbInfo,
tableName, tableSeq,
pThreadInfo->db_name, pstr,
insertRows, i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
......@@ -6734,11 +6912,10 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt,
pThreadInfo,
tableName,
g_args.num_of_RPR,
insertRows, i,
pThreadInfo->time_precision,
start_time);
#else
generated = -1;
......@@ -6800,9 +6977,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
} // num_of_DPT
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo)
(tableSeq == pThreadInfo->ntables - 1) && (stbInfo)
&& (0 == strncasecmp(
superTblInfo->dataSource,
stbInfo->dataSource,
"sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
......@@ -6818,18 +6995,18 @@ free_of_progressive:
static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("syncWrite");
uint32_t interlaceRows;
if (superTblInfo) {
if ((superTblInfo->interlaceRows == 0)
if (stbInfo) {
if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows;
} else {
interlaceRows = superTblInfo->interlaceRows;
interlaceRows = stbInfo->interlaceRows;
}
} else {
interlaceRows = g_args.interlace_rows;
......@@ -6846,10 +7023,10 @@ static void* syncWrite(void *sarg) {
static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* pThreadInfo = (threadInfo*)param;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
stbInfo?stbInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->et = taosGetTimestampMs();
if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) {
......@@ -6857,13 +7034,13 @@ static void callBack(void *param, TAOS_RES *res, int code) {
}
}
char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen);
char *buffer = calloc(1, pThreadInfo->stbInfo->maxSqlLen);
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
// if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) {
pThreadInfo->start_table_from++;
pThreadInfo->counter = 0;
......@@ -6877,15 +7054,15 @@ static void callBack(void *param, TAOS_RES *res, int code) {
for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
if (0 != pThreadInfo->stbInfo->disorderRatio
&& rand_num < pThreadInfo->stbInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data,
- (taosRandom() % pThreadInfo->stbInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->stbInfo, data,
MAX_DATA_SIZE,
d);
} else {
generateStbRowData(pThreadInfo->superTblInfo,
generateStbRowData(pThreadInfo->stbInfo,
data,
MAX_DATA_SIZE,
pThreadInfo->lastTs += 1000);
......@@ -6893,7 +7070,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
break;
}
}
......@@ -6909,7 +7086,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
static void *asyncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("asyncWrite");
......@@ -6918,7 +7095,7 @@ static void *asyncWrite(void *sarg) {
pThreadInfo->lastTs = pThreadInfo->start_time;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
stbInfo?stbInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs();
}
......@@ -6956,15 +7133,86 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
}
#if STMT_IFACE_ENABLED == 1
static void parseSampleFileToStmt()
static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
{
// TODO:
stbInfo->sampleBindArray = calloc(sizeof(char *), MAX_SAMPLES_ONCE_FROM_FILE);
assert(stbInfo->sampleBindArray);
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
int cursor = 0;
for (int c = 0; c < stbInfo->columnCount + 1; c++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c));
if (c == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * i + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[c-1].dataType,
stbInfo->columns[c-1].dataLen,
&ptr,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
}
}
*((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray;
}
free(bindBuffer);
return 0;
}
#endif
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) {
char* precision, SSuperTable* stbInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
......@@ -6983,14 +7231,14 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
int64_t start_time;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
if (stbInfo) {
if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec);
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp,
stbInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
strlen(stbInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
......@@ -7004,9 +7252,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t start = taosGetTimestampMs();
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
if (0 != prepareSampleDataForSTable(stbInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(-1);
......@@ -7025,52 +7273,52 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t ntables = 0;
uint64_t tableFrom;
if (superTblInfo) {
if (stbInfo) {
int64_t limit;
uint64_t offset;
if ((NULL != g_args.sqlFile)
&& (superTblInfo->childTblExists == TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset != 0)
|| (superTblInfo->childTblLimit >= 0))) {
&& (stbInfo->childTblExists == TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset != 0)
|| (stbInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset
+ superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset;
if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((stbInfo->childTblLimit < 0)
|| ((stbInfo->childTblOffset
+ stbInfo->childTblLimit)
> (stbInfo->childTblCount))) {
stbInfo->childTblLimit =
stbInfo->childTblCount - stbInfo->childTblOffset;
}
offset = superTblInfo->childTblOffset;
limit = superTblInfo->childTblLimit;
offset = stbInfo->childTblOffset;
limit = stbInfo->childTblLimit;
} else {
limit = superTblInfo->childTblCount;
limit = stbInfo->childTblCount;
offset = 0;
}
ntables = limit;
tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
> superTblInfo->childTblCount)) {
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset + stbInfo->childTblLimit)
> stbInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n");
prompt();
}
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == superTblInfo->childTblLimit)) {
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == stbInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt();
}
superTblInfo->childTblName = (char*)calloc(1,
stbInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
if (stbInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos0);
exit(-1);
......@@ -7079,8 +7327,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
db_name, stbInfo->sTblName,
&stbInfo->childTblName, &childTblCount,
limit,
offset);
} else {
......@@ -7101,8 +7349,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
b = ntables % threads;
}
if ((superTblInfo)
&& (superTblInfo->iface == REST_IFACE)) {
if ((stbInfo)
&& (stbInfo->iface == REST_IFACE)) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
......@@ -7122,16 +7370,16 @@ static void startMultiThreadInsertData(int threads, char* db_name,
char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer);
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
char *pstr = stmtBuffer;
if ((superTblInfo)
if ((stbInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
== stbInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
stbInfo->sTblName);
for (int tag = 0; tag < (stbInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
......@@ -7141,8 +7389,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
if (stbInfo) {
columnCount = stbInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
......@@ -7154,7 +7402,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
parseSampleFileToStmt();
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
parseSampleFileToStmt(stbInfo, timePrec);
}
}
#endif
......@@ -7164,13 +7415,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
pThreadInfo->stbInfo = stbInfo;
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(superTblInfo->iface != REST_IFACE)) {
if ((NULL == stbInfo) ||
(stbInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
......@@ -7186,8 +7437,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
#if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
......@@ -7216,8 +7467,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->taos = NULL;
}
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
/* if ((NULL == stbInfo)
|| (0 == stbInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
......@@ -7225,7 +7476,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->ntables = stbInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
......@@ -7268,9 +7519,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
__func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
if (stbInfo) {
stbInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
stbInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
......@@ -7291,22 +7542,22 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double tInMs = t/1000.0;
if (superTblInfo) {
if (stbInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
(double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
(double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
}
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
......@@ -7361,8 +7612,8 @@ static void *readTable(void *sarg) {
}
int64_t num_of_DPT;
/* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
/* if (pThreadInfo->stbInfo) {
num_of_DPT = pThreadInfo->stbInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
......@@ -7436,7 +7687,7 @@ static void *readMetric(void *sarg) {
return NULL;
}
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_DPT = pThreadInfo->stbInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -7575,14 +7826,14 @@ static int insertTestProcess() {
if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
SSuperTable* stbInfo = &g_Dbs.db[i].superTbls[j];
if (superTblInfo && (superTblInfo->insertRows > 0)) {
if (stbInfo && (stbInfo->insertRows > 0)) {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
stbInfo);
}
}
}
......@@ -8697,7 +8948,7 @@ static void queryResult() {
if (g_args.use_metric) {
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
pThreadInfo->stbInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, TBNAME_PREFIX_LEN);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册