提交 f1ea9905 编写于 作者: wmmhello's avatar wmmhello

fix: error in schemaless_insert if insert with multi threads

上级 f0531a2a
...@@ -303,7 +303,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -303,7 +303,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(10);
} }
break; break;
} }
...@@ -327,7 +327,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -327,7 +327,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(10);
} }
break; break;
} }
...@@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(10);
} }
break; break;
} }
...@@ -373,7 +373,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -373,7 +373,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(10);
} }
break; break;
} }
...@@ -424,7 +424,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -424,7 +424,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(10);
} }
break; break;
} }
...@@ -461,18 +461,18 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH ...@@ -461,18 +461,18 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH
static int32_t smlModifyDBSchemas(SSmlHandle* info) { static int32_t smlModifyDBSchemas(SSmlHandle* info) {
int32_t code = 0; int32_t code = 0;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
SSmlSTableMeta** tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, NULL); SSmlSTableMeta** tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, NULL);
while (tableMetaSml) { while (tableMetaSml) {
SSmlSTableMeta* sTableData = *tableMetaSml; SSmlSTableMeta* sTableData = *tableMetaSml;
STableMeta *pTableMeta = NULL; STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
size_t superTableLen = 0; size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, superTable, superTableLen); memcpy(pName.tname, superTable, superTableLen);
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta); code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
...@@ -487,7 +487,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -487,7 +487,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
code = smlApplySchemaAction(info, &schemaAction); code = smlApplySchemaAction(info, &schemaAction);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName); uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
return code; goto end;
} }
info->cost.numOfCreateSTables++; info->cost.numOfCreateSTables++;
}else if (code == TSDB_CODE_SUCCESS) { }else if (code == TSDB_CODE_SUCCESS) {
...@@ -502,7 +502,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -502,7 +502,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true); code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
return code; goto end;
} }
taosHashClear(hashTmp); taosHashClear(hashTmp);
...@@ -512,29 +512,33 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -512,29 +512,33 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false); code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false);
taosHashCleanup(hashTmp); taosHashCleanup(hashTmp);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; goto end;
} }
code = catalogRefreshTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, -1); code = catalogRefreshTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; goto end;
} }
} else { } else {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code; goto end;
} }
if(pTableMeta) taosMemoryFree(pTableMeta); if(pTableMeta) taosMemoryFree(pTableMeta);
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta); code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable); uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
return code; goto end;
} }
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
} }
return 0; return 0;
end:
catalogRefreshTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, 1);
return code;
} }
//========================================================================= //=========================================================================
...@@ -2298,7 +2302,13 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) { ...@@ -2298,7 +2302,13 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
info->cost.numOfCTables = taosHashGetSize(info->childTables); info->cost.numOfCTables = taosHashGetSize(info->childTables);
info->cost.schemaTime = taosGetTimestampUs(); info->cost.schemaTime = taosGetTimestampUs();
code = smlModifyDBSchemas(info);
int32_t retryNum = 0;
do{
code = smlModifyDBSchemas(info);
if (code == 0) break;
} while (retryNum++ < taosHashGetSize(info->superTables));
if (code != 0) { if (code != 0) {
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
goto cleanup; goto cleanup;
...@@ -2407,6 +2417,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2407,6 +2417,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
info->pRequest->code = smlProcess(info, lines, numLines); info->pRequest->code = smlProcess(info, lines, numLines);
end: end:
info->taos->schemalessType = 0;
uDebug("result:%s", info->msgBuf.buf); uDebug("result:%s", info->msgBuf.buf);
smlDestroyInfo(info); smlDestroyInfo(info);
return (TAOS_RES*)request; return (TAOS_RES*)request;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册