提交 94451c84 编写于 作者: S shenglian zhou

schemaless merge from master to develop

上级 7f76cef5
......@@ -25,6 +25,7 @@ typedef struct {
uint8_t type;
int16_t length;
char* value;
uint32_t fieldSchemaIdx;
} TAOS_SML_KV;
typedef struct {
......@@ -37,6 +38,8 @@ typedef struct {
// first kv must be timestamp
TAOS_SML_KV* fields;
int32_t fieldNum;
uint32_t schemaIdx;
} TAOS_SML_DATA_POINT;
typedef enum {
......@@ -55,7 +58,6 @@ typedef enum {
typedef struct {
uint64_t id;
SHashObj* smlDataToSchema;
} SSmlLinesInfo;
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
......
......@@ -28,50 +28,7 @@ typedef struct {
uint8_t precision;
} SSmlSTableSchema;
<<<<<<< HEAD
//=================================================================================================
=======
typedef struct {
char* key;
uint8_t type;
int16_t length;
char* value;
uint32_t fieldSchemaIdx;
} TAOS_SML_KV;
typedef struct {
char* stableName;
char* childTableName;
TAOS_SML_KV* tags;
int32_t tagNum;
// first kv must be timestamp
TAOS_SML_KV* fields;
int32_t fieldNum;
uint32_t schemaIdx;
} TAOS_SML_DATA_POINT;
>>>>>>> origin/master
static uint64_t linesSmlHandleId = 0;
uint64_t genLinesSmlId() {
uint64_t id;
<<<<<<< HEAD
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
} while (id == 0);
return id;
}
=======
} SSmlLinesInfo;
//=================================================================================================
>>>>>>> origin/master
static uint64_t linesSmlHandleId = 0;
......@@ -187,43 +144,8 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
}
uintptr_t valPointer = (uintptr_t)smlKv;
taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx));
return 0;
}
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN] = {0};
strtolower(sTableName, point->stableName);
taosStringBuilderAppendString(&sb, sTableName);
for (int j = 0; j < point->tagNum; ++j) {
taosStringBuilderAppendChar(&sb, ',');
TAOS_SML_KV* tagKv = point->tags + j;
char tagName[TSDB_COL_NAME_LEN] = {0};
strtolower(tagName, tagKv->key);
taosStringBuilderAppendString(&sb, tagName);
taosStringBuilderAppendChar(&sb, '=');
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
MD5_CTX context;
MD5Init(&context);
MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
MD5Final(&context);
*tableNameLen = snprintf(tableName, *tableNameLen,
"t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
taosStringBuilderDestroy(&sb);
tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
return 0;
}
......@@ -316,8 +238,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
}
uintptr_t valPointer = (uintptr_t)point;
taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx));
point->schemaIdx = (uint32_t)stableIdx;
}
size_t numStables = taosArrayGetSize(stableSchemas);
......@@ -432,10 +353,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
<<<<<<< HEAD
=======
taosMsleep(500);
>>>>>>> origin/master
}
break;
}
......@@ -460,10 +378,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
<<<<<<< HEAD
=======
taosMsleep(500);
>>>>>>> origin/master
}
break;
}
......@@ -485,10 +400,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
<<<<<<< HEAD
=======
taosMsleep(500);
>>>>>>> origin/master
}
break;
}
......@@ -510,10 +422,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
<<<<<<< HEAD
=======
taosMsleep(500);
>>>>>>> origin/master
}
break;
}
......@@ -555,10 +464,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
<<<<<<< HEAD
=======
taosMsleep(500);
>>>>>>> origin/master
}
break;
}
......@@ -655,48 +561,28 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
registerSqlObj(pSql);
SStrToken tableToken = {.z = tableNameLowerCase, .n = (uint32_t)strlen(tableNameLowerCase), .type = TK_ID};
tGetToken(tableNameLowerCase, &tableToken.type);
<<<<<<< HEAD
bool dbIncluded = false;
// Check if the table name available or not
if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pSql->cmd.payload, "table name is invalid");
tscFreeRegisteredSqlObj(pSql);
=======
// Check if the table name available or not
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pSql->cmd.payload, "table name is invalid");
taosReleaseRef(tscObjRef, pSql->self);
>>>>>>> origin/master
return code;
}
SName sname = {0};
<<<<<<< HEAD
if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
tscFreeRegisteredSqlObj(pSql);
=======
if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
taosReleaseRef(tscObjRef, pSql->self);
>>>>>>> origin/master
return code;
}
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
memset(fullTableName, 0, tListLen(fullTableName));
tNameExtractFullName(&sname, fullTableName);
<<<<<<< HEAD
size_t size = 0;
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
tscFreeRegisteredSqlObj(pSql);
=======
taosReleaseRef(tscObjRef, pSql->self);
size_t size = 0;
taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
>>>>>>> origin/master
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
}
if (tableMeta != NULL) {
......@@ -914,17 +800,10 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
<<<<<<< HEAD
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
=======
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
>>>>>>> origin/master
tryAgain = true;
}
......@@ -936,24 +815,11 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
}
taos_free_result(res2);
if (tryAgain) {
<<<<<<< HEAD
taosMsleep(50 * (2 << try));
=======
taosMsleep(100 * (2 << try));
>>>>>>> origin/master
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
<<<<<<< HEAD
taosMsleep( 50 * (2 << try));
}
}
} while (tryAgain);
taos_stmt_close(stmt);
=======
taosMsleep( 100 * (2 << try));
}
}
......@@ -1013,7 +879,6 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
}
taosArrayDestroy(batchBind);
tfree(sql);
>>>>>>> origin/master
return code;
}
......@@ -1021,14 +886,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
<<<<<<< HEAD
uintptr_t valPointer = (uintptr_t)point;
size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pSchemaIndex != NULL);
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
=======
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
>>>>>>> origin/master
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
......@@ -1072,14 +930,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
<<<<<<< HEAD
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
tagKVs[*pFieldSchemaIdx] = kv;
=======
tagKVs[kv->fieldSchemaIdx] = kv;
>>>>>>> origin/master
}
}
......@@ -1093,10 +944,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx);
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -1139,10 +987,7 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema,
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
uintptr_t valPointer = (uintptr_t)kv;
size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pFieldSchemaIdx != NULL);
TAOS_BIND* bind = colBinds + *pFieldSchemaIdx;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -1180,10 +1025,7 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
uintptr_t valPointer = (uintptr_t)point;
size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
assert(pSchemaIndex != NULL);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
......@@ -1226,7 +1068,6 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
int32_t code = TSDB_CODE_SUCCESS;
info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false);
tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
......@@ -1256,15 +1097,6 @@ clean_up:
taosArrayDestroy(schema->tags);
}
taosArrayDestroy(stableSchemas);
taosHashCleanup(info->smlDataToSchema);
return code;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
int code = tscSmlInsert(taos, points, numPoint, info);
free(info);
return code;
}
......
......@@ -43,7 +43,7 @@ static void* insertLines(void* args) {
SThreadLinesBatch* batch = insertArgs->batches + i;
printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
int32_t code = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, 0);
int64_t end = getTimeInUs();
insertArgs->costTime += end - begin;
printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
......@@ -160,30 +160,6 @@ int main(int argc, char* argv[]) {
time_t ct = time(0);
int64_t ts = ct * 1000;
<<<<<<< HEAD
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, i, j, ts + 10 * l);
lines[l] = line;
++l;
}
}
}
//shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_schemaless_insert");
int64_t begin = getTimeInUs();
int32_t code = taos_schemaless_insert(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable, 0);
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin);
=======
char* lineTemplate = calloc(65536, sizeof(char));
getLineTemplate(lineTemplate, 65535, numFields);
......@@ -277,6 +253,5 @@ int main(int argc, char* argv[]) {
free(lineTemplate);
taos_close(taos);
>>>>>>> origin/master
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册