提交 417fb694 编写于 作者: S shenglian zhou

schemaless remove redundant code

上级 5821c1f8
......@@ -32,10 +32,6 @@ typedef struct {
static uint64_t linesSmlHandleId = 0;
static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1,
SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info);
static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1,
SSmlLinesInfo* info);
uint64_t genLinesSmlId() {
uint64_t id;
......@@ -925,53 +921,6 @@ static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) {
sem_post(&batch->sem);
}
//static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) {
// int32_t code = 0;
// bool tryAgain = false;
// int32_t try = 0;
// do {
// TAOS_RES* res = taos_query(taos, sql);
// code = taos_errno(res);
// if (code != 0) {
// tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
// }
//
// tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
// info->affectedRows += taos_affected_rows(res);
// taos_free_result(res);
//
// tryAgain = false;
// if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
// || code == TSDB_CODE_VND_INVALID_VGROUP_ID
// || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
// || code == TSDB_CODE_APP_NOT_READY
// || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
// tryAgain = true;
// }
//
// if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
// TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
// int32_t code2 = taos_errno(res2);
// if (code2 != TSDB_CODE_SUCCESS) {
// tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
// }
// taos_free_result(res2);
// if (tryAgain) {
// taosMsleep(100 * (2 << try));
// }
// }
//
// if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// if (tryAgain) {
// taosMsleep( 100 * (2 << try));
// }
// }
// } while (tryAgain);
//
// return code;
//
//}
static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1118,457 +1067,6 @@ cleanup:
return code;
}
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
char* sql = malloc(tsMaxSQLStringLen + 1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1;
int32_t totalLen = 0;
totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags (");
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for (int i = 0; i < numTags; ++i) {
if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values ");
TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) {
TAOS_SML_KV* kv = point->fields + i;
colKVs[kv->fieldSchemaIdx] = kv;
}
for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
}
free(colKVs);
sql[totalLen] = '\0';
tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName,
sql);
bool tryAgain = false;
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
free(sql);
return code;
}
static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
//tag bind
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE;
for (int j = 0; j < numTags; ++j) {
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
bind->is_null = &isNullColBind;
}
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
//rows bind
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
}
int32_t code = 0;
code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
}
//free rows bind
for (int i = 0; i < rows; ++i) {
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
free(bind->length);
}
free(colBinds);
}
taosArrayDestroy(&rowsBind);
//free tag bind
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
}
taosArrayDestroy(&tagBinds);
return code;
}
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName,
SArray* tagsSchema, SArray* tagsBind,
SArray* colsSchema, SArray* rowsBind,
size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(tagsSchema);
size_t numCols = taosArrayGetSize(colsSchema);
char* sql = malloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
sprintf(sql, "insert into ? using %s (", sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
for (int i = 0; i < numTags; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
for (int i = 0; i < numCols; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql);
size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 2 / 3;
size_t rows = taosArrayGetSize(rowsBind);
size_t batchSize = MIN(maxBatchSize, rows);
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
info->id, cTableName, rows, batchSize);
SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < rows;) {
int j = i;
for (; j < i + batchSize && j<rows; ++j) {
taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
}
if (j > i) {
tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info);
if (code != 0) {
taosArrayDestroy(&batchBind);
tfree(sql);
return code;
}
taosArrayClear(batchBind);
}
i = j;
}
taosArrayDestroy(&batchBind);
tfree(sql);
return code;
}
static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind,
SSmlLinesInfo* info) {
int32_t code = 0;
TAOS_STMT* stmt = taos_stmt_init(taos);
if (stmt == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
return code;
}
bool tryAgain = false;
int32_t try = 0;
do {
code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
size_t rows = taosArrayGetSize(batchBind);
for (int32_t i = 0; i < rows; ++i) {
TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
code = taos_stmt_bind_param(stmt, colsBinds);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
code = taos_stmt_add_batch(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
}
tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
return 0;
}
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
if (childTableDataPoints < 1000) {
code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
} else {
code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
}
return code;
}
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
size_t rowSize = 0;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
rowSize += colSchema->bytes;
}
tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu",
info->id, point->childTableName, point->stableName, rowSize);
code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
goto cleanup;
}
tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
cleanup:
pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* pPoints = *pCTablePoints;
taosArrayDestroy(&pPoints);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
taosHashCleanup(cname2points);
return code;
}
static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1654,12 +1152,7 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
}
tscDebug("SML:0x%"PRIx64" apply data points", info->id);
bool tableByTable = false;
if (tableByTable) {
code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
} else {
code = applyDataPointsWithSqlInsert(taos, points, numPoint, stableSchemas, info);
}
code = applyDataPointsWithSqlInsert(taos, points, numPoint, stableSchemas, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册