提交 f171e803 编写于 作者: S shenglian zhou

[TD-xxxx]<enhance>:begin coding taos_sml_insert

上级 a8506a2d
......@@ -32,9 +32,6 @@ typedef struct {
uint8_t type;
int16_t length;
char* value;
//===================================
uint32_t fieldSchemaIdx;
} TAOS_SML_KV;
typedef struct {
......@@ -47,9 +44,6 @@ typedef struct {
// first kv must be timestamp
TAOS_SML_KV* fields;
int32_t fieldNum;
//================================
uint32_t schemaIdx;
} TAOS_SML_DATA_POINT;
typedef enum {
......@@ -62,10 +56,23 @@ typedef enum {
typedef struct {
uint64_t id;
SHashObj* smlDataToSchema;
} SSmlLinesInfo;
//=================================================================================================
static uint64_t linesSmlHandleId = 0;
uint64_t genLinesSmlId() {
uint64_t id;
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
} while (id == 0);
return id;
}
int compareSmlColKv(const void* p1, const void* p2) {
TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
......@@ -168,11 +175,46 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
}
smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
uintptr_t valPointer = (uintptr_t)smlKv;
taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx));
return 0;
}
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN] = {0};
strtolower(sTableName, point->stableName);
taosStringBuilderAppendString(&sb, sTableName);
for (int j = 0; j < point->tagNum; ++j) {
taosStringBuilderAppendChar(&sb, ',');
TAOS_SML_KV* tagKv = point->tags + j;
char tagName[TSDB_COL_NAME_LEN] = {0};
strtolower(tagName, tagKv->key);
taosStringBuilderAppendString(&sb, tagName);
taosStringBuilderAppendChar(&sb, '=');
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
MD5_CTX context;
MD5Init(&context);
MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
MD5Final(&context);
*tableNameLen = snprintf(tableName, *tableNameLen,
"t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
taosStringBuilderDestroy(&sb);
tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
return 0;
}
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
SHashObj* sname2shema = taosHashInit(32,
......@@ -203,6 +245,15 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
}
code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
......@@ -219,7 +270,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
}
point->schemaIdx = (uint32_t)stableIdx;
uintptr_t valPointer = (uintptr_t)point;
taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx));
}
size_t numStables = taosArrayGetSize(stableSchemas);
......@@ -567,41 +619,6 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo*
return 0;
}
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN] = {0};
strtolower(sTableName, point->stableName);
taosStringBuilderAppendString(&sb, sTableName);
for (int j = 0; j < point->tagNum; ++j) {
taosStringBuilderAppendChar(&sb, ',');
TAOS_SML_KV* tagKv = point->tags + j;
char tagName[TSDB_COL_NAME_LEN] = {0};
strtolower(tagName, tagKv->key);
taosStringBuilderAppendString(&sb, tagName);
taosStringBuilderAppendChar(&sb, '=');
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
MD5_CTX context;
MD5Init(&context);
MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
MD5Final(&context);
*tableNameLen = snprintf(tableName, *tableNameLen,
"t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
taosStringBuilderDestroy(&sb);
tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
return 0;
}
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind, SSmlLinesInfo* info) {
char sql[512];
sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);
......@@ -611,25 +628,25 @@ static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, cons
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_bind_param(stmt, bind);
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code));
return code;
}
return code;
......@@ -674,27 +691,27 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_prepare returns %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_bind_param returns %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_execute returns %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code));
return code;
}
return code;
......@@ -738,7 +755,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code));
return code;
}
......@@ -746,7 +763,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
code = taos_stmt_set_tbname(stmt, cTableName);
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, tstrerror(code));
return code;
}
......@@ -756,25 +773,25 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
code = taos_stmt_bind_param(stmt, colsBinds);
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code));
return code;
}
code = taos_stmt_add_batch(stmt);
if (code != 0) {
tfree(stmt);
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, tstrerror(code));
return code;
}
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s", info->id, code, tstrerror(code));
}
} while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA);
if (code != 0) {
tscError("SML:0x%"PRIx64" %s", info->id, taos_stmt_errstr(stmt));
tscError("SML:0x%"PRIx64" %d:%s", info->id, code, tstrerror(code));
taos_stmt_close(stmt);
} else {
taos_stmt_close(stmt);
......@@ -787,16 +804,10 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
}
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
uintptr_t valPointer = (uintptr_t)point;
size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pSchemaIndex != NULL);
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
......@@ -840,7 +851,10 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
tagKVs[*pFieldSchemaIdx] = kv;
}
}
......@@ -863,7 +877,10 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -912,7 +929,10 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
assert(tagKV->value);
if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) {
TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx);
uintptr_t valPointer = (uintptr_t)tagKV;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx);
code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" change child table tag failed. table name %s, tag %s", info->id, cTableName, tagKV->key);
......@@ -963,7 +983,10 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema,
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = colBinds + *pFieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -1000,9 +1023,11 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
uintptr_t valPointer = (uintptr_t)point;
size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pSchemaIndex != NULL);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
......@@ -1034,10 +1059,11 @@ cleanup:
return code;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
int32_t code = TSDB_CODE_SUCCESS;
info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false);
tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
......@@ -1067,6 +1093,15 @@ clean_up:
taosArrayDestroy(schema->tags);
}
taosArrayDestroy(stableSchemas);
taosHashCleanup(info->smlDataToSchema);
return code;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
int code = tscSmlInsert(taos, points, numPoint, info);
free(info);
return code;
}
......@@ -2076,18 +2111,6 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
//=========================================================================
static uint64_t linesSmlHandleId = 0;
uint64_t genLinesSmlId() {
uint64_t id;
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
} while (id == 0);
return id;
}
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
for (int i=0; i<point->tagNum; ++i) {
free((point->tags+i)->key);
......@@ -2157,7 +2180,7 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
}
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = taos_sml_insert(taos, points, (int)numPoints, info);
code = tscSmlInsert(taos, points, (int)numPoints, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册