提交 5821c1f8 编写于 作者: S shenglian zhou

schemaless improvement with pure sql

上级 5da9d427
...@@ -794,7 +794,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -794,7 +794,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
} }
static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints,
char* sql, int32_t capacity, int32_t* cTableSqlLen, SSmlLinesInfo* info) { char* sql, int32_t capacity, int32_t* cTableSqlLen, int fromIndex, int* nextIndex, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields); size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints); size_t rows = taosArrayGetSize(cTablePoints);
...@@ -845,7 +845,11 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable ...@@ -845,7 +845,11 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values "); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values ");
TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) { int r = fromIndex;
for (; r < rows; ++r) {
if (freeBytes - totalLen < 1024 * 16) {
break;
}
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "("); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*)); memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
...@@ -873,6 +877,10 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable ...@@ -873,6 +877,10 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable
} }
free(colKVs); free(colKVs);
if (r == fromIndex) {
tscError("buf can not fit one line");
}
*nextIndex = r;
*cTableSqlLen = totalLen; *cTableSqlLen = totalLen;
return 0; return 0;
...@@ -981,11 +989,12 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi ...@@ -981,11 +989,12 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi
info->numBatches = 0; info->numBatches = 0;
SSmlSqlInsertBatch *batch = info->batches; SSmlSqlInsertBatch *batch = info->batches;
batch->sql = malloc(tsMaxSQLStringLen + 1); batch->sql = malloc(tsMaxSQLStringLen + 1);
//TODO batch->sql allocation errror
int32_t freeBytes = tsMaxSQLStringLen; int32_t freeBytes = tsMaxSQLStringLen;
int32_t usedBytes = sprintf(batch->sql, "insert into"); int32_t usedBytes = sprintf(batch->sql, "insert into");
freeBytes -= usedBytes; freeBytes -= usedBytes;
int32_t cTableSqlLen = 0;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL); SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) { while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints; SArray* cTablePoints = *pCTablePoints;
...@@ -993,37 +1002,49 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi ...@@ -993,37 +1002,49 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s", int32_t nextIndex = 0;
info->id, point->childTableName, point->stableName); int32_t fromIndex = nextIndex;
int32_t cTableSqlLen = 0;
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info); while (nextIndex != taosArrayGetSize(cTablePoints)) {
int32_t safeBound = 1024 * 24; fromIndex = nextIndex;
if (cTableSqlLen < freeBytes - safeBound) { code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints,
batch->sql + usedBytes, freeBytes, &cTableSqlLen, fromIndex, &nextIndex,
info);
tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s. range[%d-%d).",
info->id, point->childTableName, point->stableName, fromIndex, nextIndex);
usedBytes += cTableSqlLen; usedBytes += cTableSqlLen;
freeBytes -= cTableSqlLen; freeBytes -= cTableSqlLen;
} else { if (nextIndex != taosArrayGetSize(cTablePoints)) {
batch->sql[usedBytes] = '\0'; batch->sql[usedBytes] = '\0';
info->numBatches++; info->numBatches++;
if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql);
tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id);
code = TSDB_CODE_TSC_OUT_OF_MEMORY; if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) {
goto cleanup; tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id);
} code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto cleanup;
}
batch = &info->batches[info->numBatches]; batch = &info->batches[info->numBatches];
batch->sql = malloc(tsMaxSQLStringLen + 1); batch->sql = malloc(tsMaxSQLStringLen + 1);
freeBytes = tsMaxSQLStringLen; freeBytes = tsMaxSQLStringLen;
usedBytes = sprintf(batch->sql, "insert into"); usedBytes = sprintf(batch->sql, "insert into");
freeBytes -= usedBytes; freeBytes -= usedBytes;
//TODO deal with one child table rows exceeds columns }
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info);
} }
pCTablePoints = taosHashIterate(cname2points, pCTablePoints); pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
} }
usedBytes += cTableSqlLen;
freeBytes -= cTableSqlLen;
batch->sql[usedBytes] = '\0'; batch->sql[usedBytes] = '\0';
info->numBatches++; info->numBatches++;
tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql);
if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) {
tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto cleanup;
}
bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false}; bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false};
for (int i = 0; i < info->numBatches; ++i) { for (int i = 0; i < info->numBatches; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册