未验证 提交 30e52ca9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2377 from taosdata/feature/query

Feature/query
......@@ -349,7 +349,7 @@ typedef struct SSqlStream {
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
int tsParseSql(SSqlObj *pSql, bool initialParse);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet);
int tscProcessSql(SSqlObj *pSql);
......
......@@ -39,41 +39,26 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
int doAsyncParseSql(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to malloc payload");
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return code;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
return tsParseSql(pSql, true);
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return;
}
strtolower(pSql->sqlstr, sqlstr);
int32_t code = doAsyncParseSql(pSql);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
pSql->cmd.curSql = pSql->sqlstr;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -1014,42 +1014,37 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
* @param pSql
* @return
*/
int doParseInsertSql(SSqlObj *pSql, char *str) {
int tsParseInsertSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
char* str = pCmd->curSql;
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
STableMetaInfo *pTableMetaInfo = NULL;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = NULL;
if (pQueryInfo->numOfTables == 0) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
} else {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
}
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536
// but TSDB_PAYLOAD_SIZE is 65380
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) {
return code;
}
assert(((NULL == pCmd->curSql) && (NULL == pCmd->pTableList))
|| ((NULL != pCmd->curSql) && (NULL != pCmd->pTableList)));
if ((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) {
if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error_clean;
goto _error;
}
} else {
assert((NULL != pCmd->curSql) && (NULL != pCmd->pTableList));
str = pCmd->curSql;
}
......@@ -1075,7 +1070,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
*/
if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL;
goto _error_clean;
goto _error;
} else {
break;
}
......@@ -1086,11 +1081,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
// Check if the table name available or not
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
goto _error_clean;
goto _error;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
ptrdiff_t pos = pCmd->curSql - pSql->sqlstr;
......@@ -1103,20 +1098,19 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
* interrupted position.
*/
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql,
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 ", %s", pSql,
pos, pCmd->curSql);
return code;
}
// todo add to return
tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code));
tscError("%p async insert parse error, code:%d, reason:%s", pSql, code, tstrerror(code));
pCmd->curSql = NULL;
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
goto _error;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error_clean;
goto _error;
}
index = 0;
......@@ -1125,7 +1119,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
goto _error_clean;
goto _error;
}
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -1137,7 +1131,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
/*
......@@ -1146,11 +1140,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
*/
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
} else if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
index = 0;
......@@ -1158,7 +1152,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
str += index;
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error_clean;
goto _error;
}
char fname[PATH_MAX] = {0};
......@@ -1168,7 +1162,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
wordexp_t full_path;
if (wordexp(fname, &full_path, 0) != 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
goto _error_clean;
goto _error;
}
strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path);
......@@ -1179,7 +1173,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
pTableMeta, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
......@@ -1190,7 +1184,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
SParsedDataColInfo spd = {0};
......@@ -1226,7 +1220,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (spd.hasVal[t] == true) {
code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z);
goto _error_clean;
goto _error;
}
spd.hasVal[t] = true;
......@@ -1237,13 +1231,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (!findColumnIndex) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z);
goto _error_clean;
goto _error;
}
}
if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) {
code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z);
goto _error_clean;
goto _error;
}
index = 0;
......@@ -1252,16 +1246,16 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (sToken.type != TK_VALUES) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z);
goto _error_clean;
goto _error;
}
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
} else {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
goto _error_clean;
goto _error;
}
}
......@@ -1272,7 +1266,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
goto _error;
}
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
......@@ -1281,7 +1275,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
code = TSDB_CODE_SUCCESS;
goto _clean;
_error_clean:
_error:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean:
......@@ -1294,7 +1288,7 @@ _clean:
return code;
}
int tsParseInsertSql(SSqlObj *pSql) {
int tsInsertInitialCheck(SSqlObj *pSql) {
if (!pSql->pTscObj->writeAuth) {
return TSDB_CODE_TSC_NO_WRITE_AUTH;
}
......@@ -1312,30 +1306,24 @@ int tsParseInsertSql(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
}
return doParseInsertSql(pSql, pSql->sqlstr + index);
pCmd->curSql = sToken.z + sToken.n;
return TSDB_CODE_SUCCESS;
}
int tsParseSql(SSqlObj *pSql, bool initialParse) {
int32_t ret = TSDB_CODE_SUCCESS;
if (initialParse) {
assert(!pSql->cmd.parseFinished);
SSqlCmd* pCmd = &pSql->cmd;
char* p = pSql->sqlstr;
pSql->sqlstr = NULL;
tscPartiallyFreeSqlObj(pSql);
pSql->sqlstr = p;
} else if (!pSql->cmd.parseFinished) {
tscTrace("continue parse sql: %s", pSql->cmd.curSql);
tscTrace("------------------%p, initial:%d, sqlstr:%s", pSql, initialParse, pSql->sqlstr);
if (!pCmd->parseFinished) {
tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql);
}
if (tscIsInsertData(pSql->sqlstr)) {
......@@ -1347,7 +1335,11 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
if (initialParse && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
return ret;
}
ret = tsParseInsertSql(pSql);
} else {
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
......@@ -1362,11 +1354,9 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
/*
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
* so do NOT use pRes->code to determine if the getTableMeta/getMetricMeta function
* invokes new threads to get data from mnode or simply retrieves data from cache.
*
* do NOT assign return code to pRes->code for the same reason since it may be released by another thread
* pRes->code = ret;
* so do NOT use pRes->code to determine if the getTableMeta function
* invokes new threads to get data from mgmt node or simply retrieves data from cache.
* do NOT assign return code to pRes->code for the same reason since it may be released by another thread already.
*/
return ret;
}
......
......@@ -1690,7 +1690,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
if (pItem->pNode->pParam->nExpr > 1 && strlen(pItem->aliasName) > 0) {
if (pItem->pNode->pParam->nExpr > 1 && (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg8);
}
......@@ -1761,7 +1761,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
int32_t numOfFields = 0;
// multicolumn selection does not support alias name
if (strlen(pItem->aliasName) != 0) {
if (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg8);
}
......
......@@ -488,7 +488,7 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p start to send msg to free qhandle in dnode, command:%s", pSql, sqlCmd[pCmd->command]);
tscTrace("%p send msg to dnode to free qhandle ASAP, command:%s", pSql, sqlCmd[pCmd->command]);
pSql->freed = 1;
tscProcessSql(pSql);
......@@ -510,18 +510,17 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
void taos_free_result(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
tscTrace("%p start to free result", res);
if (pSql == NULL || pSql->signature != pSql) {
tscTrace("%p result has been freed", pSql);
tscTrace("%p sqlObj has been freed", pSql);
return;
}
// The semaphore can not be changed while freeing async sub query objects.
SSqlRes *pRes = &pSql->res;
if (pRes == NULL || pRes->qhandle == 0) {
tscTrace("%p SqlObj is freed by app, qhandle is null", pSql);
tscFreeSqlObj(pSql);
tscTrace("%p SqlObj is freed by app, qhandle is null", pSql);
return;
}
......@@ -529,6 +528,7 @@ void taos_free_result(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
tscFreeSqlObj(pSql);
tscTrace("%p SqlObj is freed by app", pSql);
return;
}
......@@ -713,7 +713,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
......
......@@ -503,8 +503,10 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
int32_t code = doAsyncParseSql(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
sem_wait(&pSql->rspSem);
}
......
......@@ -1165,8 +1165,8 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
assert(pSql->res.numOfRows == 0);
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
if (pSql->pSubs == NULL) {
......@@ -1364,7 +1364,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL;
pRes->qhandle = 1; // hack the qhandle check
pRes->qhandle = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 16); // 64KB
......@@ -1845,34 +1845,41 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
// increase the total inserted rows
// record the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
if (tres != pParentObj) {
pParentObj->res.numOfRows += numOfRows;
}
} else {
SSqlObj* pSql = (SSqlObj*) tres;
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
}
taos_free_result(tres);
// it is not the initial sqlObj, free it
if (tres != pParentObj) {
taos_free_result(tres);
} else {
assert(pParentObj->pSubs[0] == tres);
}
tfree(pSupporter);
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
return;
}
tscTrace("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows);
tfree(pState);
tfree(pSupporter);
// release data block data
tfree(pState);
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
......@@ -1886,9 +1893,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->qhandle = 1; // hack the qhandle check
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0);
......@@ -1899,27 +1904,50 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pState->numOfRemain = pState->numOfTotal;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
pSql->fp = multiVnodeInsertFinalize;
pSql->param = pSupporter;
pSql->pSubs[0] = pSql; // the first sub insert points back to itself
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pSql, 0);
int32_t code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[0]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, 0,
pDataBlocks->nSize, code);
goto _error;
}
int32_t i = 1;
for (; i < pSql->numOfSubs; ++i) {
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
SInsertSupporter* pSupporter1 = calloc(1, sizeof(SInsertSupporter));
pSupporter1->pSql = pSql;
pSupporter1->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
goto _error;
}
/*
* assign the callback function to fetchFp to make sure that the error process function can restore
* the callback function (multiVnodeInsertMerge) correctly.
* the callback function (multiVnodeInsertFinalize) correctly.
*/
pNew->fetchFp = pNew->fp;
pSql->pSubs[i] = pNew;
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[i]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, i,
pDataBlocks->nSize, code);
goto _error;
} else {
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
}
}
if (i < pSql->numOfSubs) {
......@@ -1927,21 +1955,30 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code; // free all allocated resource
}
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
// use the local variable
int32_t numOfSub = pSql->numOfSubs;
for (int32_t j = 0; j < numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j,
pDataBlocks->nSize, code);
}
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
_error:
// restore the udf fp
pSql->fp = pSql->fetchFp;
tfree(pState);
tfree(pSql->param);
for(int32_t j = 1; j < i; ++j) {
tfree(pSql->pSubs[j]->param);
taos_free_result(pSql->pSubs[j]);
}
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册