提交 12e8b038 编写于 作者: S shenglian zhou

before debugging

上级 d5ab8d7c
......@@ -272,6 +272,14 @@ TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) {
}
//=================================================================================================
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj* tagHash;
SHashObj* fieldHash;
SArray* tags; //SArray<SSchema>
SArray* fields; //SArray<SSchema>
} SSmlSTableSchema;
typedef struct {
char* key;
uint8_t type;
......@@ -279,7 +287,7 @@ typedef struct {
char* value;
//===================================
SSchema* fieldSchema;
SSchema* schema;
} TAOS_SML_KV;
typedef struct {
......@@ -293,17 +301,10 @@ typedef struct {
TAOS_SML_KV* fields;
int fieldNum;
//================================
SSmlSTableSchema* schema;
} TAOS_SML_DATA_POINT;
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj* tagHash;
SHashObj* fieldHash;
SArray* tags; //SArray<SSchema>
SArray* fields; //SArray<TAOS_SFIELD>
} SSmlSTableSchema;
int compareSmlColKv(const void* p1, const void* p2) {
TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
......@@ -426,7 +427,6 @@ typedef enum {
SCHEMA_ACTION_ADD_TAG,
SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
SCHEMA_ACTION_CHANGE_TAG_SIZE,
SCHEMA_ACTION_CREATE_CTABLE
} ESchemaAction;
typedef struct {
......@@ -440,19 +440,11 @@ typedef struct {
SSchema* field;
} SAlterSTableActionInfo;
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
char cTableName[TSDB_TABLE_NAME_LEN];
TAOS_SML_KV* tags;
int tagNum;
} SCreateCTableActionInfo;
typedef struct {
ESchemaAction action;
union {
SCreateSTableActionInfo createSTable;
SAlterSTableActionInfo alterSTable;
SCreateCTableActionInfo createCTable;
};
} SSchemaAction;
......@@ -505,7 +497,7 @@ int32_t addTaosFieldToHashAndArray(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* a
taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES);
}
smlKv->fieldSchema = pField;
smlKv->schema = pField;
return 0;
}
......@@ -545,96 +537,6 @@ int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool
return 0;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
SHashObj* sname2shema = taosHashInit(32,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
for (int i = 0; i < numPoint; ++i) {
TAOS_SML_DATA_POINT* point = &points[i];
SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN);
SSmlSTableSchema* pStableSchema = NULL;
if (ppStableSchema) {
pStableSchema= *ppStableSchema;
} else {
SSmlSTableSchema schema;
size_t stableNameLen = strlen(point->stableName);
strncpy(schema.sTableName, point->stableName, stableNameLen);
schema.sTableName[stableNameLen] = '\0';
schema.fields = taosArrayInit(64, sizeof(SSchema));
schema.tags = taosArrayInit(8, sizeof(SSchema));
schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pStableSchema = taosArrayPush(stableArray, &schema);
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES);
}
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags);
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* fieldKv = point->fields + j;
addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
}
}
SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction));
size_t numStable = taosArrayGetSize(stableArray);
for (int i = 0; i < numStable; ++i) {
SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i);
SSmlSTableSchema dbSchema = {0};
dbSchema.fields = taosArrayInit(64, sizeof(SSchema));
dbSchema.tags = taosArrayInit(8, sizeof(SSchema));
dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
schemaAction.createSTable.tags = pointSchema->tags;
schemaAction.createSTable.fields = pointSchema->fields;
taosArrayPush(schemaActions, &schemaAction);
}else if (code == TSDB_CODE_SUCCESS) {
size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);
SHashObj* dbTagHash = dbSchema.tagHash;
SHashObj* dbFieldHash = dbSchema.fieldHash;
for (int j = 0; j < pointTagSize; ++j) {
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
if (actionNeeded) {
taosArrayPush(schemaActions, &schemaAction);
}
}
for (int j = 0; j < pointFieldSize; ++j) {
SSchema* pointCol = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded);
if (actionNeeded) {
taosArrayPush(schemaActions, &schemaAction);
}
}
} else {
return code;
}
}
return code;
}
int32_t buildColumnDescription(SSchema* field,
char* buf, int32_t bufSize, int32_t* outBytes) {
uint8_t type = field->type;
......@@ -721,46 +623,7 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
code = taos_errno(res);
break;
}
case SCHEMA_ACTION_CREATE_CTABLE: {
// SCreateCTableActionInfo* pInfo = &action->createCTable;
// SArray* bindParams = taosArrayInit(2 + 2 * pInfo->tagNum, sizeof(TAOS_BIND));
// outBytes = sprintf(result, "create table ? using ?(");
// char* pos = result + outBytes; int32_t freeBytes = capacity-outBytes;
// uintptr_t lenSTableName = strlen(pInfo->sTableName);
// uintptr_t lenCTableName = strlen(pInfo->cTableName);
// TAOS_BIND tbCTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY,
// .buffer = pInfo->cTableName, .length = &lenCTableName};
// TAOS_BIND tbSTableName = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY,
// .buffer = pInfo->sTableName, .length = &lenSTableName};
// taosArrayPush(bindParams, &tbCTableName);
// taosArrayPush(bindParams, &tbSTableName);
// for (int32_t i = 0; i < pInfo->tagNum; ++i) {
// outBytes = snprintf(pos, freeBytes, "?,");
//
// TAOS_SML_KV* tagKv = pInfo->tags + i;
// TAOS_BIND tbTag = {.is_null = NULL, .buffer_type = TSDB_DATA_TYPE_BINARY,
// .buffer = tagKv->key, .length = };
// pos += outBytes; freeBytes -= outBytes;
// }
// --pos; ++freeBytes;
//
// outBytes = snprintf(pos, freeBytes, ") tags (");
// pos += outBytes; freeBytes -= outBytes;
// for (int32_t i = 0; i < pInfo->tagNum; ++i) {
// TAOS_SML_KV* tagKv = pInfo->tags + i;
// outBytes = snprintf(pos, freeBytes, "?,");
// pos += outBytes; freeBytes -= outBytes;
// }
// pos--; ++freeBytes;
// outBytes = snprintf(pos, freeBytes, ")");
//
// TAOS_STMT* stmt = taos_stmt_init(taos);
// taos_stmt_prepare(stmt, result, strlen(result));
//
//
// taos_stmt_bind_param(stmt, (TAOS_BIND*)bindParams);
break;
}
default:
break;
}
......@@ -768,55 +631,221 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
return code;
}
int32_t transformIntoPreparedStatement(SArray* points) {
size_t numPoints = taosArrayGetSize(points);
int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) {
size_t numTags = taosArrayGetSize(tagsSchema);
size_t numCols = taosArrayGetSize(colsSchema);
sprintf(result, "insert into ? using %s(", sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name);
}
snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") tags (");
// SHashObj* tag2bind = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
// SHashObj* field2multiBind = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
for (int i = 0; i < numTags; ++i) {
snprintf(result+strlen(result), freeBytes-strlen(result), "?,");
}
snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") (");
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = taosArrayGet(points, i);
char tableKey[256];
snprintf(tableKey, 256, "%s.%s", point->stableName, point->childTableName);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", colSchema->name);
}
snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") values (");
for (int i = 0; i < numCols; ++i) {
snprintf(result+strlen(result), freeBytes-strlen(result), "?,");
}
snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")");
return 0;
}
int32_t insertBatch(TAOS* taos, const char* sTableName, char* cTableName, SSchema* tagsSchema, int numTags, TAOS_BIND* tagBind,
SSchema* colsSchema, int numCols, TAOS_MULTI_BIND* colBind) {
int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) {
TAOS_STMT* stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, sql, strlen(sql));
taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
size_t rows = taosArrayGetSize(rowsBind);
for (int32_t i = 0; i < rows; ++i) {
TAOS_BIND* colBind = taosArrayGetP(rowsBind, i);
taos_stmt_bind_param(stmt, colBind);
taos_stmt_add_batch(stmt);
}
char result[TSDB_MAX_BINARY_LEN] = {0};
sprintf(result, "insert into ? using %s(", sTableName);
for (int i = 0; i < numTags; ++i) {
snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", tagsSchema[i].name);
taos_stmt_execute(stmt);
TAOS_RES* res = taos_stmt_use_result(stmt);
return taos_errno(res);
}
int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) {
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN];
int32_t tableNameLen;
getChildTableName(point, childTableName, &tableNameLen);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
}
SArray* cTablePoints = NULL;
SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
if (pCTablePoints) {
cTablePoints = *pCTablePoints;
} else {
cTablePoints = taosArrayInit(64, sizeof(point));
taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
}
taosArrayPush(cTablePoints, point);
}
snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") tags (");
for (int i = 0; i < numTags; ++i) {
snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,");
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT * point = taosArrayGet(cTablePoints, 0);
int32_t numTags = taosArrayGetSize(point->schema->tags);
int32_t numCols = taosArrayGetSize(point->schema->fields);
char* stableName = point->stableName;
char* ctableName = point->childTableName;
char sql[TSDB_MAX_BINARY_LEN];
getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
for (int i = 0; i < rows; ++i) {
point = taosArrayGet(cTablePoints, i);
taosArraySetSize(tagBinds, numTags);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema);
TAOS_BIND* bind = taosArrayGet(tagBinds, idx);
bind->buffer_type = kv->type;
bind->length = (uintptr_t*)&kv->length;
bind->buffer = kv->value;
}
SArray* colBinds = taosArrayInit(numCols, sizeof(TAOS_BIND));
taosArraySetSize(colBinds, numCols);
for (int j = 0; j<point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema);
TAOS_BIND* bind = taosArrayGet(colBinds, idx);
bind->buffer_type = kv->type;
bind->length = (uintptr_t*)&kv->length;
bind->buffer = kv->value;
}
taosArrayPush(rowsBind, &colBinds);
}
insertBatch(taos, sql, ctableName, tagBinds, rowsBind);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") (");
return 0;
}
for (int i = 0; i < numCols; ++i) {
snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "%s,", colsSchema[i].name);
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
SHashObj* sname2shema = taosHashInit(32,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
for (int i = 0; i < numPoint; ++i) {
TAOS_SML_DATA_POINT* point = &points[i];
SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, TSDB_TABLE_NAME_LEN);
SSmlSTableSchema* pStableSchema = NULL;
if (ppStableSchema) {
pStableSchema= *ppStableSchema;
} else {
SSmlSTableSchema schema;
size_t stableNameLen = strlen(point->stableName);
strncpy(schema.sTableName, point->stableName, stableNameLen);
schema.sTableName[stableNameLen] = '\0';
schema.fields = taosArrayInit(64, sizeof(SSchema));
schema.tags = taosArrayInit(8, sizeof(SSchema));
schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pStableSchema = taosArrayPush(stableArray, &schema);
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES);
}
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
addTaosFieldToHashAndArray(tagKv, pStableSchema->tagHash, pStableSchema->tags);
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* fieldKv = point->fields + j;
addTaosFieldToHashAndArray(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
}
point->schema = pStableSchema;
}
snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ") values (");
for (int i = 0; i < numCols; ++i) {
snprintf(result+strlen(result), TSDB_MAX_BINARY_LEN-strlen(result), "?,");
SArray* schemaActions = taosArrayInit(32, sizeof(SSchemaAction));
size_t numStable = taosArrayGetSize(stableArray);
for (int i = 0; i < numStable; ++i) {
SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i);
SSmlSTableSchema dbSchema = {0};
dbSchema.fields = taosArrayInit(64, sizeof(SSchema));
dbSchema.tags = taosArrayInit(8, sizeof(SSchema));
dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
schemaAction.createSTable.tags = pointSchema->tags;
schemaAction.createSTable.fields = pointSchema->fields;
taosArrayPush(schemaActions, &schemaAction);
}else if (code == TSDB_CODE_SUCCESS) {
size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);
SHashObj* dbTagHash = dbSchema.tagHash;
SHashObj* dbFieldHash = dbSchema.fieldHash;
for (int j = 0; j < pointTagSize; ++j) {
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
if (actionNeeded) {
taosArrayPush(schemaActions, &schemaAction);
}
}
for (int j = 0; j < pointFieldSize; ++j) {
SSchema* pointCol = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded);
if (actionNeeded) {
taosArrayPush(schemaActions, &schemaAction);
}
}
} else {
return code;
}
}
snprintf(result + strlen(result)-1, TSDB_MAX_BINARY_LEN-strlen(result)+1, ")");
int32_t code = 0;
code = taos_stmt_prepare(stmt, result, strlen(result));
for (int i = 0; i < taosArrayGetSize(schemaActions); ++i) {
SSchemaAction* action = taosArrayGet(schemaActions, i);
applySchemaAction(taos, action);
}
code = taos_stmt_set_tbname_tags(stmt, cTableName, tagBind);
code = taos_stmt_bind_param_batch(stmt, colBind);
code = taos_stmt_execute(stmt);
insertPoints(taos, points, numPoint);
return code;
}
//todo: table/column length check
//todo: type check
//todo: taosmbs2ucs4 check
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册