未验证 提交 7beb8cd6 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19875 from taosdata/fix/TS-2524

fix:change async to sync in schemaless
......@@ -149,17 +149,8 @@ typedef struct {
int64_t endTime;
} SSmlCostInfo;
typedef struct {
SRequestObj *request;
tsem_t sem;
int32_t cnt;
int32_t total;
TdThreadSpinlock lock;
} Params;
typedef struct {
int64_t id;
Params *params;
SMLProtocolType protocol;
int8_t precision;
......@@ -178,7 +169,6 @@ typedef struct {
SQuery *pQuery;
SSmlCostInfo cost;
int32_t affectedRows;
SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
SArray *colsContainer; // for cols parse, if dataFormat == false
......@@ -1513,7 +1503,6 @@ static void smlDestroyInfo(SSmlHandle *info) {
if (!info->dataFormat) {
taosArrayDestroy(info->colsContainer);
}
destroyRequest(info->pRequest);
cJSON_Delete(info->root);
taosMemoryFreeClear(info);
......@@ -2351,20 +2340,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
}
info->cost.insertRpcTime = taosGetTimestampUs();
// launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
// info->affectedRows = taos_affected_rows(info->pRequest);
// return info->pRequest->code;
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWrapper->pRequest = info->pRequest;
launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
return TSDB_CODE_SUCCESS;
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
return info->pRequest->code;
}
static void smlPrintStatisticInfo(SSmlHandle *info) {
......@@ -2498,48 +2478,13 @@ static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
return TSDB_CODE_SUCCESS;
}
static void smlInsertCallback(void *param, void *res, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)res;
SSmlHandle *info = (SSmlHandle *)param;
int32_t rows = taos_affected_rows(pRequest);
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
Params *pParam = info->params;
// lock
taosThreadSpinLock(&pParam->lock);
pParam->cnt++;
if (code != TSDB_CODE_SUCCESS) {
pParam->request->code = code;
pParam->request->body.resInfo.numOfRows += rows;
} else {
pParam->request->body.resInfo.numOfRows += info->affectedRows;
}
// unlock
taosThreadSpinUnlock(&pParam->lock);
if (pParam->cnt == pParam->total) {
tsem_post(&pParam->sem);
}
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
smlDestroyInfo(info);
}
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
int numLines, int protocol, int precision, int32_t ttl) {
int batchs = 0;
STscObj *pTscObj = request->pTscObj;
SSmlHandle *info = NULL;
pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
Params params = {0};
params.request = request;
tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0);
if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
......@@ -2573,65 +2518,24 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
goto end;
}
batchs = ceil(((double)numLines) / tsSmlBatchSize);
params.total = batchs;
for (int i = 0; i < batchs; ++i) {
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
if (!req) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
goto end;
}
SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
if (!info) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
info->isRawLine = (rawLine == NULL);
info->ttl = ttl;
int32_t perBatch = tsSmlBatchSize;
if (numLines > perBatch) {
numLines -= perBatch;
} else {
perBatch = numLines;
numLines = 0;
}
info->params = &params;
info->affectedRows = perBatch;
info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info;
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
if (lines) {
lines += perBatch;
}
if (rawLine) {
int num = 0;
while (rawLine < rawLineEnd) {
if (*(rawLine++) == '\n') {
num++;
}
if (num == perBatch) {
break;
}
}
}
if (code != TSDB_CODE_SUCCESS) {
info->pRequest->body.queryFp(info, req, code);
}
info = smlBuildSmlInfo(pTscObj, request, (SMLProtocolType)protocol, precision);
if (!info) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
tsem_wait(&params.sem);
info->isRawLine = (rawLine == NULL);
info->ttl = ttl;
request->code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
end:
taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem);
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
uDebug("SML:0x%" PRIx64 " insert finished, code: %d", info->id, request->code);
info->cost.endTime = taosGetTimestampUs();
info->cost.code = request->code;
smlPrintStatisticInfo(info);
smlDestroyInfo(info);
return (TAOS_RES *)request;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册