未验证 提交 8e67dae1 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1585 from taosdata/feature/query

[td-98] fix bugs while query/insert mixed up in sql requests.
......@@ -267,19 +267,19 @@ typedef struct {
int32_t numOfTablesInSubmit;
};
int32_t clauseIndex; // index of multiple subclause query
int8_t isParseFinish;
short numOfCols;
uint32_t allocSize;
char * payload;
int payloadLen;
int32_t clauseIndex; // index of multiple subclause query
int8_t parseFinished;
short numOfCols;
uint32_t allocSize;
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
// submit data blocks branched according to vnode
SDataBlockList *pDataBlocks;
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
// for parameter ('?') binding and batch processing
int32_t batchSize;
int32_t numOfParams;
......@@ -358,8 +358,6 @@ typedef struct SSqlObj {
SSqlCmd cmd;
SSqlRes res;
uint8_t numOfSubs;
char * asyncTblPos;
void * pTableHashList;
struct SSqlObj **pSubs;
struct SSqlObj * prev, *next;
} SSqlObj;
......@@ -422,7 +420,7 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
void tscResetSqlCmdObj(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj *pSql);
/**
......
......@@ -67,7 +67,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sqlstr);
tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -342,7 +342,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
(*pSql->fp)(pSql->param, taosres, code);
if (shouldFree) {
tscTrace("%p Async sql is automatically freed in async res", pSql);
tscTrace("%p sqlObj is automatically freed in async res", pSql);
tscFreeSqlObj(pSql);
}
}
......@@ -463,7 +463,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} else { // normal async query continues
if (pCmd->isParseFinish) {
if (pCmd->parseFinished) {
tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
......
......@@ -21,7 +21,6 @@
#include "os.h"
#include "hash.h"
//#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
......@@ -656,7 +655,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
}
}
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd,
int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
......@@ -664,7 +663,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -942,7 +941,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (pSql->asyncTblPos == NULL) {
if (pCmd->curSql == NULL) {
assert(code == TSDB_CODE_ACTION_IN_PROGRESS);
}
}
......@@ -1008,23 +1007,23 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
return code;
}
assert(((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList))
|| ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList)));
assert(((NULL == pCmd->curSql) && (NULL == pCmd->pTableList))
|| ((NULL != pCmd->curSql) && (NULL != pCmd->pTableList)));
if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
pSql->pTableHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
if ((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) {
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_CLI_OUT_OF_MEMORY;
goto _error_clean;
}
} else {
assert((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList));
str = pSql->asyncTblPos;
assert((NULL != pCmd->curSql) && (NULL != pCmd->pTableList));
str = pCmd->curSql;
}
tscTrace("%p create data block list for submit data:%p, asyncTblPos:%p, pTableHashList:%p", pSql, pSql->cmd.pDataBlocks, pSql->asyncTblPos, pSql->pTableHashList);
tscTrace("%p create data block list for submit data:%p, curSql:%p, pTableList:%p", pSql, pSql->cmd.pDataBlocks, pCmd->curSql, pCmd->pTableList);
while (1) {
int32_t index = 0;
......@@ -1052,7 +1051,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
}
}
pSql->asyncTblPos = sToken.z;
pCmd->curSql = sToken.z;
// Check if the table name available or not
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
......@@ -1064,7 +1063,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr;
ptrdiff_t pos = pCmd->curSql - pSql->sqlstr;
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/*
......@@ -1075,13 +1074,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
*/
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql,
pos, pSql->asyncTblPos);
pos, pCmd->curSql);
return code;
}
// todo add to return
tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code));
pSql->asyncTblPos = NULL;
pCmd->curSql = NULL;
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
}
......@@ -1115,7 +1114,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
* app here insert data in different vnodes, so we need to set the following
* data in another submit procedure using async insert routines
*/
code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
......@@ -1227,7 +1226,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
......@@ -1257,11 +1256,11 @@ _error_clean:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean:
taosHashCleanup(pSql->pTableHashList);
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
pSql->pTableHashList = NULL;
pSql->asyncTblPos = NULL;
pCmd->isParseFinish = 1;
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
......@@ -1305,17 +1304,15 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
tscFreeSqlObjPartial(pSql);
pSql->sqlstr = p;
} else {
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
tscTrace("continue parse sql: %s", pSql->cmd.curSql);
}
if (tscIsInsertOrImportData(pSql->sqlstr)) {
/*
* only for async multi-vnode insertion
* Set the fp before parse the sql string, in case of getmetermeta failed, in which
* the error handle callback function can rightfully restore the user defined function (fp)
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
* the error handle callback function can rightfully restore the user-defined callback function (fp).
*/
if (initialParse) {
// replace user defined callback function with multi-insert proxy function
pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
......@@ -1335,11 +1332,11 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
}
/*
* the pRes->code may be modified or even released by another thread in tscTableMetaCallBack
* function, so do NOT use pRes->code to determine if the getMeterMeta/getMetricMeta function
* invokes new threads to get data from mnode or simply retrieves data from cache.
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
* so do NOT use pRes->code to determine if the getTableMeta/getMetricMeta function
* invokes new threads to get data from mgmt node or simply retrieves data from cache.
*
* do NOT assign return code to pRes->code for the same reason for it may be released by another thread
* do NOT assign return code to pRes->code for the same reason since it may be released by another thread
* pRes->code = ret;
*/
return ret;
......@@ -1457,7 +1454,6 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
return numOfRows;
}
// multi-vnodes insertion in sync query model
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
if (pCmd->command != TSDB_SQL_INSERT) {
......
......@@ -350,7 +350,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
......@@ -364,7 +364,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
if (shouldFree) {
tscTrace("%p Async sql is automatically freed", pSql);
tscTrace("%p sqlObj is automatically freed", pSql);
tscFreeSqlObj(pSql);
}
}
......
......@@ -232,15 +232,16 @@ void taos_close(TAOS *taos) {
int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 1;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfTotalInCurrentClause = 0;
pSql->asyncTblPos = NULL;
if (NULL != pSql->pTableHashList) {
taosHashCleanup(pSql->pTableHashList);
pSql->pTableHashList = NULL;
pCmd->curSql = NULL;
if (NULL != pCmd->pTableList) {
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
}
tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr);
......@@ -767,7 +768,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (tscShouldFreeAsyncSqlObj(pSql)) {
tscTrace("%p Async SqlObj is freed by app", pSql);
tscTrace("%p SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else {
if (keepCmd) {
......@@ -851,7 +852,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
tscFreeSqlObj(pSql);
tscTrace("%p Async sql result is freed by app", pSql);
tscTrace("%p sql result is freed by app", pSql);
} else {
if (keepCmd) {
tscFreeSqlResult(pSql);
......@@ -1027,8 +1028,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
SSqlObj *pSql = pObj->pSql;
SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 1;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfTotalInCurrentClause = 0;
......@@ -1051,10 +1053,10 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql);
pSql->asyncTblPos = NULL;
if (NULL != pSql->pTableHashList) {
taosHashCleanup(pSql->pTableHashList);
pSql->pTableHashList = NULL;
pCmd->curSql = NULL;
if (NULL != pCmd->pTableList) {
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
}
pRes->code = (uint8_t)tsParseSql(pSql, false);
......
......@@ -406,10 +406,16 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
void tscFreeSqlCmdData(SSqlCmd* pCmd) {
pCmd->command = 0;
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
pCmd->count = 0;
pCmd->curSql = NULL;
pCmd->msgType = 0;
pCmd->parseFinished = 0;
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList= NULL;
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeSubqueryInfo(pCmd);
......@@ -480,14 +486,10 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
tscFreeSqlResult(pSql);
tfree(pSql->pSubs);
taosHashCleanup(pSql->pTableHashList);
pSql->freed = 0;
pSql->numOfSubs = 0;
pSql->pTableHashList = NULL;
pSql->asyncTblPos = NULL;
tscFreeSqlCmdData(pCmd);
tscResetSqlCmdObj(pCmd);
tscTrace("%p partially free sqlObj completed", pSql);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册