提交 e4d61953 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support stb limit and offset. verified.

上级 57c1f20f
...@@ -1982,7 +1982,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -1982,7 +1982,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
exit(-1); exit(-1);
} }
int childTblCount = 10000; int childTblCount = (limit < 0)?10000:limit;
int count = 0; int count = 0;
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
char* pTblName = childTblName; char* pTblName = childTblName;
...@@ -2525,14 +2525,8 @@ static void createChildTables() { ...@@ -2525,14 +2525,8 @@ static void createChildTables() {
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
int startFrom; int startFrom = 0;
if (g_Dbs.db[i].superTbls[j].childTblOffset) {
startFrom = g_Dbs.db[i].superTbls[j].childTblOffset;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblLimit;
} else {
startFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
}
verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__, verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__,
g_totalChildTables, startFrom); g_totalChildTables, startFrom);
...@@ -3882,7 +3876,7 @@ PARSE_OVER: ...@@ -3882,7 +3876,7 @@ PARSE_OVER:
return ret; return ret;
} }
void prePareSampleData() { void prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
...@@ -4018,7 +4012,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4018,7 +4012,7 @@ static void syncWriteForNumberOfTblInOneSql(
int64_t et = 0xffffffff; int64_t et = 0xffffffff;
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
int32_t tbl_id = 0; int32_t tbl_id = 0;
for (int tableID = winfo->start_table_id; tableID <= winfo->end_table_id; ) { for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) {
int64_t start_time = 0; int64_t start_time = 0;
int inserted = i; int inserted = i;
...@@ -4027,12 +4021,12 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4027,12 +4021,12 @@ static void syncWriteForNumberOfTblInOneSql(
memset(buffer, 0, superTblInfo->maxSqlLen); memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer; char *pstr = buffer;
int32_t end_tbl_id = tableID + numberOfTblInOneSql; int32_t end_tbl_id = tableSeq + numberOfTblInOneSql;
if (end_tbl_id > winfo->end_table_id) { if (end_tbl_id > winfo->end_table_id) {
end_tbl_id = winfo->end_table_id+1; end_tbl_id = winfo->end_table_id+1;
} }
for (tbl_id = tableID; tbl_id < end_tbl_id; tbl_id++) { for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) {
sampleUsePos = samplePos; sampleUsePos = samplePos;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
...@@ -4140,7 +4134,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4140,7 +4134,7 @@ static void syncWriteForNumberOfTblInOneSql(
if (inserted >= superTblInfo->insertRows || if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tableID = tbl_id + 1; tableSeq = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
superTblInfo->lenOfOneRow); superTblInfo->lenOfOneRow);
goto send_to_server; goto send_to_server;
...@@ -4148,7 +4142,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4148,7 +4142,7 @@ static void syncWriteForNumberOfTblInOneSql(
} }
} }
tableID = tbl_id; tableSeq = tbl_id;
inserted += superTblInfo->rowsPerTbl; inserted += superTblInfo->rowsPerTbl;
send_to_server: send_to_server:
...@@ -4217,7 +4211,7 @@ send_to_server: ...@@ -4217,7 +4211,7 @@ send_to_server:
break; break;
} }
if (tableID > winfo->end_table_id) { if (tableSeq > winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos; samplePos = sampleUsePos;
} }
...@@ -4231,7 +4225,8 @@ send_to_server: ...@@ -4231,7 +4225,8 @@ send_to_server:
free_and_statistics: free_and_statistics:
tmfree(buffer); tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows);
return; return;
} }
...@@ -4293,7 +4288,7 @@ int32_t generateData(char *res, char **data_type, ...@@ -4293,7 +4288,7 @@ int32_t generateData(char *res, char **data_type,
return (int32_t)(pstr - res); return (int32_t)(pstr - res);
} }
static int prepareSampleData(SSuperTable *superTblInfo) { static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
char* sampleDataBuf = NULL; char* sampleDataBuf = NULL;
// each thread read sample data from csv file // each thread read sample data from csv file
...@@ -4330,7 +4325,6 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) ...@@ -4330,7 +4325,6 @@ static int execInsert(threadInfo *winfo, char *buffer, int k)
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
} else { } else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
...@@ -4354,7 +4348,8 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) ...@@ -4354,7 +4348,8 @@ static int execInsert(threadInfo *winfo, char *buffer, int k)
return affectedRows; return affectedRows;
} }
static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *buffer, static int generateDataBuffer(int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows, int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSampleUsePos) int64_t startFrom, int64_t startTime, int *pSampleUsePos)
{ {
...@@ -4372,6 +4367,26 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu ...@@ -4372,6 +4367,26 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu
assert(buffer != NULL); assert(buffer != NULL);
char *pChildTblName;
int childTblCount;
if (superTblInfo && (superTblInfo->childTblOffset > 0)) {
// TODO
// select tbname from stb limit 1 offset tableSeq
getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos,
pThreadInfo->db_name, superTblInfo->sTblName,
&pChildTblName, &childTblCount,
1, tableSeq);
} else {
pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (NULL == pChildTblName) {
fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN);
return -1;
}
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d",
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
}
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
...@@ -4384,19 +4399,19 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu ...@@ -4384,19 +4399,19 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu
} else { } else {
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
superTblInfo, superTblInfo,
tableID % superTblInfo->tagSampleCount); tableSeq % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n"); fprintf(stderr, "tag buf failed to allocate memory\n");
free(pChildTblName);
return -1; return -1;
} }
pstr += snprintf(pstr, pstr += snprintf(pstr,
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
"insert into %s.%s%d using %s.%s tags %s values", "insert into %s.%s using %s.%s tags %s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblPrefix, pChildTblName,
tableID,
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->sTblName, superTblInfo->sTblName,
tagsValBuf); tagsValBuf);
...@@ -4406,22 +4421,20 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu ...@@ -4406,22 +4421,20 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
"insert into %s.%s values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblName + tableID * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} else { } else {
pstr += snprintf(pstr, pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s%d values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, pChildTblName);
tableID);
} }
} else { } else {
pstr += snprintf(pstr, pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s%d values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, pChildTblName);
tableID);
} }
int k; int k;
...@@ -4432,14 +4445,16 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu ...@@ -4432,14 +4445,16 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu
if (superTblInfo) { if (superTblInfo) {
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample( retLen = getRowDataFromSample(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom, startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo, superTblInfo,
pSampleUsePos); pSampleUsePos);
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { } else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100; int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
...@@ -4459,6 +4474,7 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu ...@@ -4459,6 +4474,7 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu
} }
if (retLen < 0) { if (retLen < 0) {
free(pChildTblName);
return -1; return -1;
} }
...@@ -4518,7 +4534,7 @@ static void* syncWrite(void *sarg) { ...@@ -4518,7 +4534,7 @@ static void* syncWrite(void *sarg) {
if (superTblInfo) { if (superTblInfo) {
if (0 != prepareSampleData(superTblInfo)) if (0 != prepareSampleDataForSTable(superTblInfo))
return NULL; return NULL;
if (superTblInfo->numberOfTblInOneSql > 0) { if (superTblInfo->numberOfTblInOneSql > 0) {
...@@ -4553,12 +4569,8 @@ static void* syncWrite(void *sarg) { ...@@ -4553,12 +4569,8 @@ static void* syncWrite(void *sarg) {
int sampleUsePos; int sampleUsePos;
if (superTblInfo && superTblInfo->childTblOffset) { for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id;
// TODO tableSeq ++) {
}
for (uint32_t tableID = winfo->start_table_id; tableID <= winfo->end_table_id;
tableID++) {
int64_t start_time = winfo->start_time; int64_t start_time = winfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
...@@ -4571,7 +4583,7 @@ static void* syncWrite(void *sarg) { ...@@ -4571,7 +4583,7 @@ static void* syncWrite(void *sarg) {
sampleUsePos = samplePos; sampleUsePos = samplePos;
int generated = generateDataBuffer(tableID, winfo, buffer, insertRows, int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows,
i, start_time, &sampleUsePos); i, start_time, &sampleUsePos);
if (generated > 0) if (generated > 0)
i += generated; i += generated;
...@@ -4615,12 +4627,12 @@ static void* syncWrite(void *sarg) { ...@@ -4615,12 +4627,12 @@ static void* syncWrite(void *sarg) {
} }
} // num_of_DPT } // num_of_DPT
if ((tableID == winfo->end_table_id) && superTblInfo && if ((tableSeq == winfo->end_table_id) && superTblInfo &&
(0 == strncasecmp( (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) { superTblInfo->dataSource, "sample", strlen("sample")))) {
samplePos = sampleUsePos; samplePos = sampleUsePos;
} }
} // tableID } // tableSeq
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
...@@ -5069,7 +5081,7 @@ static int insertTestProcess() { ...@@ -5069,7 +5081,7 @@ static int insertTestProcess() {
} }
// pretreatement // pretreatement
prePareSampleData(); prepareSampleData();
double start; double start;
double end; double end;
...@@ -5297,7 +5309,8 @@ static int queryTestProcess() { ...@@ -5297,7 +5309,8 @@ static int queryTestProcess() {
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table //==== create sub threads for query from all sub table of the super table
if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) { if ((g_queryInfo.subQueryInfo.sqlCount > 0)
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册