未验证 提交 b352f0d1 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13658 from taosdata/feature/TD-14761

fix:error in schemaless
......@@ -17,6 +17,9 @@
#include "tname.h"
#include "cJSON.h"
#include "tglobal.h"
#include "osSemaphore.h"
#include "osThread.h"
//=================================================================================================
#define SPACE ' '
......@@ -67,6 +70,9 @@ for (int i = 1; i < keyLen; ++i) { \
#define BINARY_ADD_LEN 2 // "binary" 2 means " "
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5
#define LINE_BATCH 20
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
......@@ -153,8 +159,17 @@ typedef struct {
int64_t endTime;
} SSmlCostInfo;
typedef struct{
SRequestObj* request;
SCatalog* catalog;
tsem_t sem;
TdThreadSpinlock lock;
} Params;
typedef struct {
int64_t id;
Params *params;
bool isLast;
SMLProtocolType protocol;
int8_t precision;
......@@ -303,7 +318,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(10);
taosMsleep(500);
}
break;
}
......@@ -327,7 +342,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(10);
taosMsleep(500);
}
break;
}
......@@ -350,7 +365,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(10);
taosMsleep(500);
}
break;
}
......@@ -373,7 +388,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(10);
taosMsleep(500);
}
break;
}
......@@ -424,7 +439,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(10);
taosMsleep(500);
}
break;
}
......@@ -541,56 +556,6 @@ end:
return code;
}
//=========================================================================
/* Field Escape charaters
1: measurement Comma,Space
2: tag_key, tag_value, field_key Comma,Equal Sign,Space
3: field_value Double quote,Backslash
*/
//static void escapeSpecialCharacter(uint8_t field, const char **pos) {
// const char *cur = *pos;
// if (*cur != '\\') {
// return;
// }
// switch (field) {
// case 1:
// switch (*(cur + 1)) {
// case ',':
// case ' ':
// cur++;
// break;
// default:
// break;
// }
// break;
// case 2:
// switch (*(cur + 1)) {
// case ',':
// case ' ':
// case '=':
// cur++;
// break;
// default:
// break;
// }
// break;
// case 3:
// switch (*(cur + 1)) {
// case '"':
// case '\\':
// cur++;
// break;
// default:
// break;
// }
// break;
// default:
// break;
// }
// *pos = cur;
//}
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
......@@ -1426,6 +1391,7 @@ static void smlDestroyInfo(SSmlHandle* info){
if(!info->dataFormat){
taosArrayDestroy(info->colsContainer);
}
destroyRequest(info->pRequest);
taosMemoryFreeClear(info);
}
......@@ -1453,11 +1419,6 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
goto cleanup;
}
info->precision = precision;
info->protocol = protocol;
......@@ -2206,7 +2167,6 @@ end:
return ret;
}
static int32_t smlInsertData(SSmlHandle* info) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -2248,10 +2208,12 @@ static int32_t smlInsertData(SSmlHandle* info) {
}
info->cost.insertRpcTime = taosGetTimestampUs();
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
//launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
// info->affectedRows = taos_affected_rows(info->pRequest);
// return info->pRequest->code;
info->affectedRows = taos_affected_rows(info->pRequest);
return info->pRequest->code;
launchAsyncQuery(info->pRequest, info->pQuery);
return TSDB_CODE_SUCCESS;
}
static void smlPrintStatisticInfo(SSmlHandle *info){
......@@ -2311,7 +2273,7 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
do{
code = smlModifyDBSchemas(info);
if (code == 0) break;
} while (retryNum++ < taosHashGetSize(info->superTables));
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) {
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
......@@ -2332,30 +2294,48 @@ cleanup:
return code;
}
static int32_t isSchemalessDb(SSmlHandle* info){
static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){
SName name;
tNameSetDbName(&name, info->taos->acctId, info->taos->db, strlen(info->taos->db));
tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
SDbCfgInfo pInfo = {0};
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SEpSet ep = getEpSet_s(&taos->pAppInfo->mgmtEp);
int32_t code = catalogGetDBCfg(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
int32_t code = catalogGetDBCfg(catalog, taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
info->pRequest->code = code;
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
return code;
}
taosArrayDestroy(pInfo.pRetensions);
if (!pInfo.schemaless){
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);
return TSDB_CODE_SML_INVALID_DB_CONF;
}
return TSDB_CODE_SUCCESS;
}
static void smlInsertCallback(void* param, void* res, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)res;
SSmlHandle* info = (SSmlHandle *)param;
// lock
if(code != TSDB_CODE_SUCCESS){
taosThreadSpinLock(&info->params->lock);
info->params->request->code = code;
taosThreadSpinUnlock(&info->params->lock);
}
// unlock
printf("SML:0x%" PRIx64 " insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows);
Params *pParam = info->params;
bool isLast = info->isLast;
smlDestroyInfo(info);
if(isLast){
tsem_post(&pParam->sem);
}
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
......@@ -2384,48 +2364,95 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL;
}
SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision);
if(!info){
return (TAOS_RES*)request;
((STscObj *)taos)->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
int cnt = ceil(((double)numLines)/LINE_BATCH);
Params params;
params.request = request;
tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0);
int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &params.catalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML get catalog error %d", code);
request->code = code;
goto end;
}
info->taos->schemalessType = 1;
if(request->pDb == NULL){
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&info->msgBuf, "Database not specified", NULL);
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end;
}
if(isSchemalessDb(info) != TSDB_CODE_SUCCESS){
if(isSchemalessDb(((STscObj *)taos), params.catalog) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&info->msgBuf, "Cannot write data to a non schemaless database", NULL);
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end;
}
if (!lines) {
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&info->msgBuf, "lines is null", NULL);
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
goto end;
}
if(protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL){
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
smlBuildInvalidDataMsg(&info->msgBuf, "protocol invalidate", NULL);
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
goto end;
}
if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
smlBuildInvalidDataMsg(&info->msgBuf, "precision invalidate for line protocol", NULL);
smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
goto end;
}
info->pRequest->code = smlProcess(info, lines, numLines);
for (int i = 0; i < cnt; ++i) {
SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
goto end;
}
SSmlHandle* info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision);
if(!info){
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
int32_t perBatch = LINE_BATCH;
if(numLines > perBatch){
numLines -= perBatch;
info->isLast = false;
}else{
perBatch = numLines;
numLines = 0;
info->isLast = true;
}
info->params = &params;
info->pCatalog = params.catalog;
info->affectedRows = perBatch;
info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info;
code = smlProcess(info, lines, perBatch);
lines += perBatch;
if (code != TSDB_CODE_SUCCESS){
info->pRequest->body.queryFp(info, req, code);
}
}
tsem_wait(&params.sem);
end:
info->taos->schemalessType = 0;
uDebug("result:%s", info->msgBuf.buf);
smlDestroyInfo(info);
taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem);
((STscObj *)taos)->schemalessType = 0;
uDebug("result:%s", request->msgBuf);
return (TAOS_RES*)request;
}
......@@ -1325,7 +1325,7 @@ TEST(testCase, sml_oom_Test) {
pRes = taos_query(taos, "use oom");
taos_free_result(pRes);
TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, 100, TSDB_SML_LINE_PROTOCOL, 0);
TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(res), 0);
taos_free_result(pRes);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册