未验证 提交 c0acf73c 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12094 from taosdata/szhou/feature/sml-final

fix: schemaless performance
...@@ -149,7 +149,7 @@ int main(int argc, char* argv[]) { ...@@ -149,7 +149,7 @@ int main(int argc, char* argv[]) {
int assembleSTables = 0; int assembleSTables = 0;
int opt; int opt;
while ((opt = getopt(argc, argv, "s:c:r:f:t:b:p:w:hv")) != -1) { while ((opt = getopt(argc, argv, "s:c:r:f:t:b:p:w:a:hv")) != -1) {
switch (opt) { switch (opt) {
case 's': case 's':
numSuperTables = atoi(optarg); numSuperTables = atoi(optarg);
......
...@@ -58,6 +58,22 @@ typedef enum { ...@@ -58,6 +58,22 @@ typedef enum {
SML_TIME_STAMP_NOW SML_TIME_STAMP_NOW
} SMLTimeStampType; } SMLTimeStampType;
typedef struct SSmlSqlInsertBatch {
uint64_t id;
int32_t index;
char* sql;
int32_t code;
int32_t tryTimes;
tsem_t sem;
int32_t affectedRows;
bool tryAgain;
bool resetQueryCache;
bool sleep;
} SSmlSqlInsertBatch;
#define MAX_SML_SQL_INSERT_BATCHES 512
typedef struct { typedef struct {
uint64_t id; uint64_t id;
SMLProtocolType protocol; SMLProtocolType protocol;
...@@ -65,7 +81,13 @@ typedef struct { ...@@ -65,7 +81,13 @@ typedef struct {
SHashObj* smlDataToSchema; SHashObj* smlDataToSchema;
int32_t affectedRows; int32_t affectedRows;
pthread_mutex_t batchMutex;
pthread_cond_t batchCond;
int32_t numBatches;
SSmlSqlInsertBatch batches[MAX_SML_SQL_INSERT_BATCHES];
} SSmlLinesInfo; } SSmlLinesInfo;
char* addEscapeCharToString(char *str, int32_t len); char* addEscapeCharToString(char *str, int32_t len);
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info); bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
......
...@@ -32,10 +32,6 @@ typedef struct { ...@@ -32,10 +32,6 @@ typedef struct {
static uint64_t linesSmlHandleId = 0; static uint64_t linesSmlHandleId = 0;
static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1,
SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info);
static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1,
SSmlLinesInfo* info);
uint64_t genLinesSmlId() { uint64_t genLinesSmlId() {
uint64_t id; uint64_t id;
...@@ -91,16 +87,17 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t ...@@ -91,16 +87,17 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t
*bytes = tDataTypes[kv->type].bytes; *bytes = tDataTypes[kv->type].bytes;
} else { } else {
if (kv->type == TSDB_DATA_TYPE_NCHAR) { if (kv->type == TSDB_DATA_TYPE_NCHAR) {
char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1); // char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
int32_t bytesNeeded = 0; // int32_t bytesNeeded = 0;
bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); // bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
if (!succ) { // if (!succ) {
free(ucs); // free(ucs);
tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value); // tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
return TSDB_CODE_TSC_INVALID_VALUE; // return TSDB_CODE_TSC_INVALID_VALUE;
} // }
free(ucs); // free(ucs);
*bytes = bytesNeeded + VARSTR_HEADER_SIZE; // *bytes = bytesNeeded + VARSTR_HEADER_SIZE;
*bytes = kv->length * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
} else if (kv->type == TSDB_DATA_TYPE_BINARY) { } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
*bytes = kv->length + VARSTR_HEADER_SIZE; *bytes = kv->length + VARSTR_HEADER_SIZE;
} }
...@@ -792,9 +789,23 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -792,9 +789,23 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return 0; return 0;
} }
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, static int smlSnprintf(char* buf, int32_t *total, int32_t cap, char* fmt, ...) {
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { if (*total > cap) {
int32_t code = TSDB_CODE_SUCCESS; return -1;
}
va_list argp;
va_start(argp, fmt);
int len = vsnprintf(buf + *total, cap - *total, fmt, argp);
if (len < 0 || len >= cap - *total) {
return -2;
}
*total += len;
return 0;
}
static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints,
char* sql, int32_t capacity, int32_t* cTableSqlLen, int fromIndex, int* nextIndex, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields); size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints); size_t rows = taosArrayGetSize(cTablePoints);
...@@ -810,53 +821,79 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa ...@@ -810,53 +821,79 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
} }
} }
char* sql = malloc(tsMaxSQLStringLen + 1); TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*));
if (sql == NULL) { int r = fromIndex;
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1;
int32_t totalLen = 0; int32_t totalLen = 0;
totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName); int ret = 0;
ret = smlSnprintf(sql, &totalLen, capacity, " %s using %s (", cTableName, sTableName);
if (ret != 0) {
goto _cleanup;
}
for (int i = 0; i < numTags; ++i) { for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i); SSchema* tagSchema = taosArrayGet(tagsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name); ret = smlSnprintf(sql, &totalLen, capacity, "%s,", tagSchema->name);
if (ret != 0) {
goto _cleanup;
}
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags ("); ret = smlSnprintf(sql, &totalLen, capacity, ") tags (");
if (ret != 0) {
goto _cleanup;
}
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for (int i = 0; i < numTags; ++i) { for (int i = 0; i < numTags; ++i) {
if (capacity - totalLen < TSDB_MAX_BYTES_PER_ROW) {
goto _cleanup;
}
if (tagKVs[i] == NULL) { if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); ret = smlSnprintf(sql, &totalLen, capacity, "NULL,");
if (ret != 0) {
goto _cleanup;
}
} else { } else {
TAOS_SML_KV* kv = tagKVs[i]; TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen; size_t beforeLen = totalLen;
int32_t len = 0; int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len; totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
ret = smlSnprintf(sql, &totalLen, capacity, ",");
if (ret != 0) {
goto _cleanup;
}
} }
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") ("); ret = smlSnprintf(sql, &totalLen, capacity, ") (");
if (ret != 0) {
goto _cleanup;
}
for (int i = 0; i < numCols; ++i) { for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i); SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name); ret = smlSnprintf(sql, &totalLen, capacity, "%s,", colSchema->name);
if (ret != 0) {
goto _cleanup;
}
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values "); ret = smlSnprintf(sql, &totalLen, capacity, ") values ");
if (ret != 0) {
TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); goto _cleanup;
for (int r = 0; r < rows; ++r) { }
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
for (; r < rows; ++r) {
if (capacity - totalLen < TSDB_MAX_BYTES_PER_ROW) {
break;
}
ret = smlSnprintf(sql, &totalLen, capacity, "(");
if (ret != 0) {
goto _cleanup;
}
memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*)); memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
...@@ -867,372 +904,215 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa ...@@ -867,372 +904,215 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
for (int i = 0; i < numCols; ++i) { for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) { if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); ret = smlSnprintf(sql, &totalLen, capacity, "NULL,");
if (ret != 0) {
goto _cleanup;
}
} else { } else {
TAOS_SML_KV* kv = colKVs[i]; TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen; size_t beforeLen = totalLen;
int32_t len = 0; int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len; totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); ret = smlSnprintf(sql, &totalLen, capacity, ",");
if (ret != 0) {
goto _cleanup;
}
} }
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); ret = smlSnprintf(sql, &totalLen, capacity, ")");
if (ret != 0) {
goto _cleanup;
}
} }
_cleanup:
free(colKVs); free(colKVs);
sql[totalLen] = '\0';
tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName,
sql);
bool tryAgain = false;
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
free(sql); if (r == fromIndex) {
tscError("buffer can not fit one line");
*cTableSqlLen = 0;
} else {
*cTableSqlLen = totalLen;
}
*nextIndex = r;
return code; return 0;
} }
static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) {
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { SSmlSqlInsertBatch *batch = (SSmlSqlInsertBatch *)param;
size_t numTags = taosArrayGetSize(sTableSchema->tags); batch->code = taos_errno(res);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; if (batch->code != 0) {
for (int i= 0; i < rows; ++i) { tscError("SML:0x%"PRIx64 " batch %d , taos_query_a return %d:%s", batch->id, batch->index, batch->code, taos_errstr(res));
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
} }
tscDebug("SML:0x%"PRIx64 " batch %d, taos_query inserted %d rows", batch->id, batch->index, taos_affected_rows(res));
batch->affectedRows = taos_affected_rows(res);
taos_free_result(res);
//tag bind int32_t code = batch->code;
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); batch->tryAgain = false;
taosArraySetSize(tagBinds, numTags); batch->resetQueryCache = false;
int isNullColBind = TSDB_TRUE; batch->sleep = false;
for (int j = 0; j < numTags; ++j) { if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
TAOS_BIND* bind = taosArrayGet(tagBinds, j); || code == TSDB_CODE_VND_INVALID_VGROUP_ID
bind->is_null = &isNullColBind; || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
} || code == TSDB_CODE_APP_NOT_READY
for (int j = 0; j < numTags; ++j) { || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && batch->tryTimes < TSDB_MAX_REPLICA) {
if (tagKVs[j] == NULL) continue; batch->tryAgain = true;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
} }
//rows bind if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); batch->resetQueryCache = true;
for (int i = 0; i < rows; ++i) { if (batch->tryAgain) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i); batch->sleep = true;
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
} }
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
}
int32_t code = 0;
code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
} }
//free rows bind if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
for (int i = 0; i < rows; ++i) { if (batch->tryAgain) {
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); batch->sleep = true;
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
free(bind->length);
} }
free(colBinds);
}
taosArrayDestroy(&rowsBind);
//free tag bind
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
} }
taosArrayDestroy(&tagBinds);
return code; tsem_post(&batch->sem);
} }
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName, static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
SArray* tagsSchema, SArray* tagsBind, int32_t code = TSDB_CODE_SUCCESS;
SArray* colsSchema, SArray* rowsBind,
size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(tagsSchema);
size_t numCols = taosArrayGetSize(colsSchema);
char* sql = malloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ; SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
sprintf(sql, "insert into ? using %s (", sTableName); arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i); for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); info->batches[i].id = info->id;
info->batches[i].index = i;
info->batches[i].sql = NULL;
info->batches[i].tryTimes = 0;
tsem_init(&info->batches[i].sem, 0, 0);
} }
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); info->numBatches = 0;
SSmlSqlInsertBatch *batch = info->batches;
batch->sql = malloc(tsMaxSQLStringLen + 1);
int32_t freeBytes = tsMaxSQLStringLen;
int32_t usedBytes = sprintf(batch->sql, "insert into");
freeBytes -= usedBytes;
for (int i = 0; i < numTags; ++i) { int32_t cTableSqlLen = 0;
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
for (int i = 0; i < numCols; ++i) { SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
SSchema* colSchema = taosArrayGet(colsSchema, i); while (pCTablePoints) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); SArray* cTablePoints = *pCTablePoints;
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
for (int i = 0; i < numCols; ++i) { TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql); int32_t nextIndex = 0;
int32_t fromIndex = nextIndex;
while (nextIndex != taosArrayGetSize(cTablePoints)) {
fromIndex = nextIndex;
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints,
batch->sql + usedBytes, freeBytes, &cTableSqlLen, fromIndex, &nextIndex,
info);
tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s. range[%d-%d).",
info->id, point->childTableName, point->stableName, fromIndex, nextIndex);
usedBytes += cTableSqlLen;
freeBytes -= cTableSqlLen;
if (nextIndex != taosArrayGetSize(cTablePoints)) {
batch->sql[usedBytes] = '\0';
info->numBatches++;
tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql);
if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) {
tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto cleanup;
}
size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 2 / 3; batch = &info->batches[info->numBatches];
size_t rows = taosArrayGetSize(rowsBind); batch->sql = malloc(tsMaxSQLStringLen + 1);
size_t batchSize = MIN(maxBatchSize, rows); freeBytes = tsMaxSQLStringLen;
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu", usedBytes = sprintf(batch->sql, "insert into");
info->id, cTableName, rows, batchSize); freeBytes -= usedBytes;
SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < rows;) {
int j = i;
for (; j < i + batchSize && j<rows; ++j) {
taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
}
if (j > i) {
tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info);
if (code != 0) {
taosArrayDestroy(&batchBind);
tfree(sql);
return code;
} }
taosArrayClear(batchBind);
} }
i = j;
}
taosArrayDestroy(&batchBind);
tfree(sql);
return code;
}
static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind,
SSmlLinesInfo* info) {
int32_t code = 0;
TAOS_STMT* stmt = taos_stmt_init(taos); pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
if (stmt == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
batch->sql[usedBytes] = '\0';
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); info->numBatches++;
tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql);
if (code != 0) { if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) {
tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt)); tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id);
taos_stmt_close(stmt); code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return code; goto cleanup;
} }
bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false};
bool tryAgain = false; for (int i = 0; i < info->numBatches; ++i) {
int32_t try = 0; SSmlSqlInsertBatch* insertBatch = &info->batches[i];
do { insertBatch->tryTimes = 1;
code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); taos_query_a(taos, insertBatch->sql, insertCallback, insertBatch);
if (code != 0) { batchesExecuted[i] = true;
tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt)); }
int32_t triedBatches = info->numBatches;
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
size_t rows = taosArrayGetSize(batchBind);
for (int32_t i = 0; i < rows; ++i) {
TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
code = taos_stmt_bind_param(stmt, colsBinds);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
code = taos_stmt_add_batch(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt); while (triedBatches > 0) {
return code; for (int i = 0; i < info->numBatches; ++i) {
if (batchesExecuted[i]) {
tsem_wait(&info->batches[i].sem);
info->affectedRows += info->batches[i].affectedRows;
} }
} }
code = taos_stmt_execute(stmt); for (int i = 0; i < info->numBatches; ++i) {
if (code != 0) { SSmlSqlInsertBatch* b = info->batches + i;
tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try); if (b->resetQueryCache) {
} TAOS_RES* res = taos_query(taos, "RESET QUERY CACHE");
tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt)); taos_free_result(res);
break;
tryAgain = false; }
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
} }
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { for (int i = 0; i < info->numBatches; ++i) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); SSmlSqlInsertBatch* b = info->batches + i;
int32_t code2 = taos_errno(res2); if (b->sleep) {
if (code2 != TSDB_CODE_SUCCESS) { taosMsleep(100 * (2 << b->tryTimes));
tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2)); break;
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
} }
} }
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) { memset(batchesExecuted, 0, sizeof(batchesExecuted));
taosMsleep( 100 * (2 << try)); triedBatches = 0;
for (int i = 0; i < info->numBatches; ++i) {
SSmlSqlInsertBatch* insertBatch = &info->batches[i];
if (insertBatch->tryAgain) {
insertBatch->tryTimes++;
taos_query_a(taos, insertBatch->sql, insertCallback, insertBatch);
batchesExecuted[i] = true;
triedBatches++;
} }
} }
} while (tryAgain);
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
return 0;
}
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
if (childTableDataPoints < 10) {
code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
} else {
code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
} }
return code;
}
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { code = 0;
int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < info->numBatches; ++i) {
SSmlSqlInsertBatch* b = info->batches + i;
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); if (b->code != 0) {
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); code = b->code;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
size_t rowSize = 0;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
rowSize += colSchema->bytes;
}
tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu",
info->id, point->childTableName, point->stableName, rowSize);
code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
goto cleanup;
} }
tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
} }
cleanup: cleanup:
for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) {
free(info->batches[i].sql);
info->batches[i].sql = NULL;
tsem_destroy(&info->batches[i].sem);
}
pCTablePoints = taosHashIterate(cname2points, NULL); pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) { while (pCTablePoints) {
SArray* pPoints = *pCTablePoints; SArray* pPoints = *pCTablePoints;
...@@ -1328,7 +1208,7 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine ...@@ -1328,7 +1208,7 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
} }
tscDebug("SML:0x%"PRIx64" apply data points", info->id); tscDebug("SML:0x%"PRIx64" apply data points", info->id);
code = applyDataPoints(taos, points, numPoint, stableSchemas, info); code = applyDataPointsWithSqlInsert(taos, points, numPoint, stableSchemas, info);
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
} }
...@@ -2129,7 +2009,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash ...@@ -2129,7 +2009,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} }
pKV->key = calloc(len + TS_BACKQUOTE_CHAR_SIZE + 1, 1); pKV->key = malloc(len + TS_BACKQUOTE_CHAR_SIZE + 1);
memcpy(pKV->key, key, len + 1); memcpy(pKV->key, key, len + 1);
addEscapeCharToString(pKV->key, len); addEscapeCharToString(pKV->key, len);
tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len); tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
...@@ -2572,13 +2452,13 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, ...@@ -2572,13 +2452,13 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
int32_t capacity = 0; int32_t capacity = 0;
if (isField) { if (isField) {
capacity = 64; capacity = 64;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); *pKVs = malloc(capacity * sizeof(TAOS_SML_KV));
// leave space for timestamp; // leave space for timestamp;
pkv = *pKVs; pkv = *pKVs;
pkv++; pkv++;
} else { } else {
capacity = 8; capacity = 8;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); *pKVs = malloc(capacity * sizeof(TAOS_SML_KV));
pkv = *pKVs; pkv = *pKVs;
} }
...@@ -2673,7 +2553,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf ...@@ -2673,7 +2553,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
uint8_t has_tags = 0; uint8_t has_tags = 0;
TAOS_SML_KV *timestamp = NULL; TAOS_SML_KV *timestamp = NULL;
SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); SHashObj *keyHashTable = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
ret = parseSmlMeasurement(smlData, &index, &has_tags, info); ret = parseSmlMeasurement(smlData, &index, &has_tags, info);
if (ret) { if (ret) {
...@@ -2758,8 +2638,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p ...@@ -2758,8 +2638,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
info->tsType = tsType; info->tsType = tsType;
info->protocol = protocol; info->protocol = protocol;
if (numLines <= 0 || numLines > 65536) { if (numLines <= 0 || numLines > 65536*32) {
tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d", info->id, numLines);
tfree(info); tfree(info);
code = TSDB_CODE_TSC_APP_ERROR; code = TSDB_CODE_TSC_APP_ERROR;
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册