未验证 提交 e2b99145 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #20376 from taosdata/fix/TS-2828

fix:[TS-2828] retry if ver is old
...@@ -1213,6 +1213,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -1213,6 +1213,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
static int32_t smlInsertData(SSmlHandle *info) { static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if(info->pRequest->dbList == NULL){
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
}
taosArrayPush(info->pRequest->dbList, info->pRequest->pDb);
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) { while (oneTable) {
SSmlTableInfo *tableData = *oneTable; SSmlTableInfo *tableData = *oneTable;
...@@ -1221,6 +1226,11 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -1221,6 +1226,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName)); memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
if(info->pRequest->tableList == NULL){
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
}
taosArrayPush(info->pRequest->tableList, &pName);
SRequestConnInfo conn = {0}; SRequestConnInfo conn = {0};
conn.pTrans = info->taos->pAppInfo->pTransporter; conn.pTrans = info->taos->pAppInfo->pTransporter;
conn.requestId = info->pRequest->requestId; conn.requestId = info->pRequest->requestId;
...@@ -1422,6 +1432,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL ...@@ -1422,6 +1432,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
do { do {
code = smlModifyDBSchemas(info); code = smlModifyDBSchemas(info);
if (code == 0) break; if (code == 0) break;
taosMsleep(200);
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) { if (code != 0) {
...@@ -1446,62 +1457,75 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, ...@@ -1446,62 +1457,75 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
SRequestObj *request = NULL;
SSmlHandle *info = NULL;
while(1){
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (request == NULL) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); info = smlBuildSmlInfo(taos);
if (request == NULL) { if (info == NULL) {
uError("SML:taos_schemaless_insert error request is null"); request->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; uError("SML:taos_schemaless_insert error SSmlHandle is null");
} return (TAOS_RES *)request;
}
info->pRequest = request;
info->isRawLine = rawLine != NULL;
info->ttl = ttl;
info->precision = precision;
info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->lineNum = numLines;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end;
}
SSmlHandle *info = smlBuildSmlInfo(taos); if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
if (info == NULL) { request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
request->code = TSDB_CODE_OUT_OF_MEMORY; smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
uError("SML:taos_schemaless_insert error SSmlHandle is null"); goto end;
return (TAOS_RES *)request; }
}
info->pRequest = request;
info->isRawLine = rawLine != NULL;
info->ttl = ttl;
info->precision = precision;
info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->lineNum = numLines;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end;
}
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) { if (protocol == TSDB_SML_LINE_PROTOCOL &&
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL); request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
goto end; smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
} goto end;
}
if (protocol == TSDB_SML_LINE_PROTOCOL && if (protocol == TSDB_SML_JSON_PROTOCOL) {
(precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) { numLines = 1;
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; } else if (numLines <= 0) {
smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL); request->code = TSDB_CODE_SML_INVALID_DATA;
goto end; smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
} goto end;
}
if (protocol == TSDB_SML_JSON_PROTOCOL) { code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
numLines = 1; request->code = code;
} else if (numLines <= 0) { info->cost.endTime = taosGetTimestampUs();
request->code = TSDB_CODE_SML_INVALID_DATA; info->cost.code = code;
smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL); smlPrintStatisticInfo(info);
goto end; if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){
refreshMeta(request->pTscObj, request);
uInfo("SML:%"PRIx64" ver is old retry or object is creating:%d", info->id, code);
smlDestroyInfo(info);
info = NULL;
taos_free_result(request);
request = NULL;
continue;
}
break;
} }
code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
request->code = code;
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
end: end:
smlDestroyInfo(info); smlDestroyInfo(info);
return (TAOS_RES *)request; return (TAOS_RES *)request;
......
...@@ -436,7 +436,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -436,7 +436,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
// bind data // bind data
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("smlBuildCol error, retry"); uDebug("smlBuildCol error, retry");
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -582,8 +582,10 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine ...@@ -582,8 +582,10 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
.i = ts, .i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if (info->dataFormat) { if (info->dataFormat) {
smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
smlBuildRow(info->currTableDataCtx); if(ret != TSDB_CODE_SUCCESS){return ret;}
ret = smlBuildRow(info->currTableDataCtx);
if(ret != TSDB_CODE_SUCCESS){return ret;}
clearColValArray(info->currTableDataCtx->pValues); clearColValArray(info->currTableDataCtx->pValues);
} else { } else {
taosArraySet(elements->colArray, 0, &kv); taosArraySet(elements->colArray, 0, &kv);
......
...@@ -1110,11 +1110,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1110,11 +1110,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
#ifndef NDEBUG qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif
bool found = false; bool found = false;
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
......
...@@ -1608,8 +1608,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1608,8 +1608,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) { if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey);
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
pTaskInfo->streamInfo.returned = 1; pTaskInfo->streamInfo.returned = 1;
......
...@@ -201,6 +201,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 ...@@ -201,6 +201,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
SSmlKv* kv = (SSmlKv*)data; SSmlKv* kv = (SSmlKv*)data;
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){ if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
ret = TSDB_CODE_SML_INVALID_DATA; ret = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildCol error col not same %s", pColSchema->name);
goto end; goto end;
} }
if (kv->type == TSDB_DATA_TYPE_NCHAR) { if (kv->type == TSDB_DATA_TYPE_NCHAR) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册