提交 f414d8a9 编写于 作者: H Haojun Liao

[td-4151]

上级 3dc445f9
...@@ -42,12 +42,6 @@ extern "C" { ...@@ -42,12 +42,6 @@ extern "C" {
struct SSqlInfo; struct SSqlInfo;
struct SLocalMerger; struct SLocalMerger;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct STableComInfo { typedef struct STableComInfo {
...@@ -253,16 +247,15 @@ typedef struct { ...@@ -253,16 +247,15 @@ typedef struct {
typedef struct SInsertStatementParam { typedef struct SInsertStatementParam {
SName **pTableNameList; // all involved tableMeta list of current insert sql statement. SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables; int32_t numOfTables; // number of tables in table name list
SHashObj *pTableBlockHashList; // data block for each table SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not int8_t schemaAttached; // denote if submit block is built with table schema or not
STagData tagData; // NOTE: pTagData->data is used as a variant length array STagData tagData; // NOTE: pTagData->data is used as a variant length array
int32_t dataSourceType; // from file or from sql statement
char msg[512]; // error message char msg[512]; // error message
char *sql; // current sql statement position char *sql; // current sql statement position
uint32_t insertType; // TODO remove it uint32_t insertType; // insert data from [file|sql statement| bound statement]
} SInsertStatementParam; } SInsertStatementParam;
// TODO extract sql parser supporter // TODO extract sql parser supporter
...@@ -271,14 +264,11 @@ typedef struct { ...@@ -271,14 +264,11 @@ typedef struct {
uint8_t msgType; uint8_t msgType;
SInsertStatementParam insertParam; SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32 char reserve1[3]; // fix bus error on arm32
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
union { union {
int32_t count; int32_t count;
}; };
char * curSql; // current sql, resume position of sql after parsing paused char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32 char reserve2[3]; // fix bus error on arm32
int16_t numOfCols; int16_t numOfCols;
...@@ -289,22 +279,10 @@ typedef struct { ...@@ -289,22 +279,10 @@ typedef struct {
SHashObj *pTableMetaMap; // local buffer to keep the queried table meta, before validating the AST SHashObj *pTableMetaMap; // local buffer to keep the queried table meta, before validating the AST
SQueryInfo *pQueryInfo; SQueryInfo *pQueryInfo;
SQueryInfo *active; // current active query info SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams; int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
// int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array STagData tagData; // NOTE: pTagData->data is used as a variant length array
// SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
// int32_t numOfTables;
// SHashObj *pTableBlockHashList; // data block for each table
// SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int32_t resColumnId; int32_t resColumnId;
} SSqlCmd; } SSqlCmd;
......
...@@ -336,21 +336,33 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM ...@@ -336,21 +336,33 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta); int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base); SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
// update the table uid
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid; pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) { if (pExpr->colInfo.colIndex >= 0) {
int32_t index = pExpr->colInfo.colIndex; int32_t index = pExpr->colInfo.colIndex;
if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) || if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) ||
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < numOfCols || index >= (numOfCols + numOfTags)))) { (TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) {
return pSql->retryReason; return pSql->retryReason;
} }
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) && if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) { if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
return pSql->retryReason; strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) {
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else { // do nothing for udc
} }
} }
} }
...@@ -388,12 +400,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -388,12 +400,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg); tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// check if it is a sub-query of super table query first, if true, enter another routine // check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send the corresponding query", pSql->self); TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
...@@ -415,42 +427,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -415,42 +427,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
} else { // continue to process normal async query } else { // continue to process normal async query
if (pCmd->parseFinished) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send corresponding query", pSql->self); tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
assert(pCmd->command != TSDB_SQL_INSERT);
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64" redo parse sql string and proceed", pSql->self);
pCmd->parseFinished = false;
tscResetSqlCmd(pCmd, true);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscBuildAndSendRequest(pSql, NULL);
} else { // in all other cases, simple retry
tscBuildAndSendRequest(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
tscDebug("0x%"PRIx64" continue parse sql after get table meta", pSql->self);
code = tsParseSql(pSql, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
...@@ -460,8 +438,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -460,8 +438,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error; goto _error;
} }
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)){ if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
...@@ -471,22 +449,38 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -471,22 +449,38 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { } else {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
tscImportDataFromFile(pSql); tscImportDataFromFile(pSql);
} else { } else {
tscHandleMultivnodeInsert(pSql); tscHandleMultivnodeInsert(pSql);
} }
}
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again",
pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else { } else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd); tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
executeQuery(pSql, pQueryInfo1);
} }
taosReleaseRef(tscObjRef, pSql->self); code = tsParseSql(pSql, true);
return; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo1);
} }
}
taosReleaseRef(tscObjRef, pSql->self);
return;
}
} else { // stream computing } else { // stream computing
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
...@@ -510,21 +504,16 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -510,21 +504,16 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command); tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) { if (tscNumOfExprs(pQueryInfo) == 0) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
} }
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
} }
// tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
_error: _error:
......
...@@ -751,8 +751,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -751,8 +751,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
char *sql = *sqlstr; char *sql = *sqlstr;
pSql->cmd.autoCreated = false;
// get the token of specified table // get the token of specified table
index = 0; index = 0;
tableToken = tStrGetToken(sql, &index, false); tableToken = tStrGetToken(sql, &index, false);
...@@ -1015,12 +1013,13 @@ int validateTableName(char *tblName, int len, SStrToken* psTblToken) { ...@@ -1015,12 +1013,13 @@ int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
return tscValidateName(psTblToken); return tscValidateName(psTblToken);
} }
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) { static int32_t validateDataSource(SSqlCmd *pCmd, int32_t type, const char *sql) {
if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) { if (pCmd->insertParam.insertType != 0 && !TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, type)) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql); return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
} }
pCmd->dataSourceType = type;
pCmd->insertParam.insertType = type;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1090,7 +1089,6 @@ static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SS ...@@ -1090,7 +1089,6 @@ static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SS
_clean: _clean:
pCmd->curSql = NULL; pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code; return code;
} }
...@@ -1142,7 +1140,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1142,7 +1140,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
* if the data is from the data file, no data has been generated yet. So, there no data to * if the data is from the data file, no data has been generated yet. So, there no data to
* merge or submit, save the file path and parse the file in other routines. * merge or submit, save the file path and parse the file in other routines.
*/ */
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
goto _clean; goto _clean;
} }
...@@ -1203,7 +1201,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1203,7 +1201,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (sToken.type == TK_FILE) { if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, TSDB_QUERY_TYPE_FILE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
...@@ -1236,7 +1234,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1236,7 +1234,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (bindedColumns == NULL) { if (bindedColumns == NULL) {
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, TSDB_QUERY_TYPE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
...@@ -1256,7 +1254,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1256,7 +1254,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
// insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); // insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta; STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, TSDB_QUERY_TYPE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
...@@ -1298,7 +1296,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1298,7 +1296,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
} }
// merge according to vgId // merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) { if (!TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
...@@ -1309,7 +1307,6 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1309,7 +1307,6 @@ int tsParseInsertSql(SSqlObj *pSql) {
_clean: _clean:
pCmd->curSql = NULL; pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code; return code;
} }
...@@ -1328,8 +1325,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) { ...@@ -1328,8 +1325,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd->command = TSDB_SQL_INSERT; pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | TSDB_QUERY_TYPE_STMT_INSERT);
sToken = tStrGetToken(pSql->sqlstr, &index, false); sToken = tStrGetToken(pSql->sqlstr, &index, false);
if (sToken.type != TK_INTO) { if (sToken.type != TK_INTO) {
...@@ -1344,11 +1340,11 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1344,11 +1340,11 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
if ((!pCmd->parseFinished) && (!initial)) { if (!initial) {
tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->curSql); tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->curSql);
} }
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != ret) {
return ret; return ret;
} }
...@@ -1358,17 +1354,15 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1358,17 +1354,15 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
return ret; return ret;
} }
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql); ret = tsParseInsertSql(pSql);
if ((sqlstr == NULL) || (pSql->parseRetry >= 1) || assert(ret == TSDB_CODE_SUCCESS || ret == TSDB_CODE_TSC_ACTION_IN_PROGRESS || ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_SQL);
(ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
free(sqlstr); if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_SQL)) {
} else { tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true); tscResetSqlCmd(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->parseRetry++; pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) { if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
ret = tsParseInsertSql(pSql); ret = tsParseInsertSql(pSql);
} }
...@@ -1376,9 +1370,12 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1376,9 +1370,12 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
} else { } else {
SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr); SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) { if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry < 1 && SQLInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true); tscResetSqlCmd(pCmd, true);
pSql->parseRetry++; pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
} }
...@@ -1561,7 +1558,7 @@ void tscImportDataFromFile(SSqlObj *pSql) { ...@@ -1561,7 +1558,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
return; return;
} }
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0); assert(TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT) && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo; pCmd->active = pCmd->pQueryInfo;
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport)); SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
......
...@@ -1104,14 +1104,14 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -1104,14 +1104,14 @@ static int insertStmtExecute(STscStmt* stmt) {
// data block reset // data block reset
pCmd->batchSize = 0; pCmd->batchSize = 0;
for(int32_t i = 0; i < pCmd->numOfTables; ++i) { for(int32_t i = 0; i < pCmd->insertParam.numOfTables; ++i) {
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) { if (pCmd->insertParam.pTableNameList && pCmd->insertParam.pTableNameList[i]) {
tfree(pCmd->pTableNameList[i]); tfree(pCmd->insertParam.pTableNameList[i]);
} }
} }
pCmd->numOfTables = 0; pCmd->insertParam.numOfTables = 0;
tfree(pCmd->pTableNameList); tfree(pCmd->insertParam.pTableNameList);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
return pSql->res.code; return pSql->res.code;
...@@ -1126,12 +1126,12 @@ static void insertBatchClean(STscStmt* pStmt) { ...@@ -1126,12 +1126,12 @@ static void insertBatchClean(STscStmt* pStmt) {
pCmd->batchSize = 0; pCmd->batchSize = 0;
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) { if (pCmd->insertParam.pTableNameList && pCmd->insertParam.pTableNameList[i]) {
tfree(pCmd->pTableNameList[i]); tfree(pCmd->insertParam.pTableNameList[i]);
} }
} }
tfree(pCmd->pTableNameList); tfree(pCmd->insertParam.pTableNameList);
/* /*
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
...@@ -1155,7 +1155,7 @@ static void insertBatchClean(STscStmt* pStmt) { ...@@ -1155,7 +1155,7 @@ static void insertBatchClean(STscStmt* pStmt) {
*/ */
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->numOfTables = 0; pCmd->insertParam.numOfTables = 0;
taosHashEmpty(pCmd->insertParam.pTableBlockHashList); taosHashEmpty(pCmd->insertParam.pTableBlockHashList);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -1174,7 +1174,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { ...@@ -1174,7 +1174,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry
if (taosHashGetSize(pStmt->pSql->cmd.pTableBlockHashList) <= 0) { // merge according to vgId if (taosHashGetSize(pStmt->pSql->cmd.insertParam.pTableBlockHashList) <= 0) { // merge according to vgId
tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self); tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self);
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;
} }
...@@ -1261,7 +1261,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -1261,7 +1261,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->fp = waitForQueryRsp; pSql->fp = waitForQueryRsp;
pSql->fetchFp = waitForQueryRsp; pSql->fetchFp = waitForQueryRsp;
pCmd->insertType = TSDB_QUERY_TYPE_STMT_INSERT; pCmd->insertParam.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p failed to malloc payload buffer", pSql); tscError("%p failed to malloc payload buffer", pSql);
...@@ -1387,7 +1387,6 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { ...@@ -1387,7 +1387,6 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pSql->cmd.parseFinished = 0;
pSql->cmd.numOfParams = 0; pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0; pSql->cmd.batchSize = 0;
...@@ -1448,8 +1447,8 @@ int taos_stmt_close(TAOS_STMT* stmt) { ...@@ -1448,8 +1447,8 @@ int taos_stmt_close(TAOS_STMT* stmt) {
if (pStmt->multiTbInsert) { if (pStmt->multiTbInsert) {
taosHashCleanup(pStmt->mtb.pTableHash); taosHashCleanup(pStmt->mtb.pTableHash);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true); pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true);
taosHashCleanup(pStmt->pSql->cmd.pTableBlockHashList); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.pTableBlockHashList = NULL; pStmt->pSql->cmd.insertParam.pTableNameList = NULL;
} }
} }
......
...@@ -674,7 +674,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -674,7 +674,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pCmd->active = pCmd->pQueryInfo; pCmd->active = pCmd->pQueryInfo;
pCmd->command = pCmd->pQueryInfo->command; pCmd->command = pCmd->pQueryInfo->command;
pCmd->parseFinished = 1;
return TSDB_CODE_SUCCESS; // do not build query message here return TSDB_CODE_SUCCESS; // do not build query message here
} }
...@@ -711,7 +710,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -711,7 +710,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression"); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
} }
pSql->cmd.parseFinished = 1;
if (tscBuildMsg[pCmd->command] != NULL) { if (tscBuildMsg[pCmd->command] != NULL) {
return tscBuildMsg[pCmd->command](pSql, pInfo); return tscBuildMsg[pCmd->command](pSql, pInfo);
} else { } else {
......
...@@ -360,7 +360,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -360,7 +360,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1; pSql->cmd.insertParam.schemaAttached = 1;
} }
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
...@@ -477,7 +477,7 @@ int doBuildAndSendMsg(SSqlObj *pSql) { ...@@ -477,7 +477,7 @@ int doBuildAndSendMsg(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_INSERT ||
pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_CONNECT ||
pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META || // pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_STABLEVGROUP) { pCmd->command == TSDB_SQL_STABLEVGROUP) {
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
} }
...@@ -583,23 +583,6 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -583,23 +583,6 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
// char* pMsg = pSql->cmd.payload;
//
// // NOTE: shell message size should not include SMsgDesc
// int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
//
// SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
// pMsgDesc->numOfVnodes = htonl(1); // always one vnode
//
// pMsg += sizeof(SMsgDesc);
// SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
//
// pShellMsg->header.vgId = htonl(pTableMeta->vgId); // data in current block all routes to the same vgroup
// pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc
// pShellMsg->length = pShellMsg->header.contLen;
//
// pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // the number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
...@@ -1689,6 +1672,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1689,6 +1672,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
#if 0
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
...@@ -1710,6 +1694,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1710,6 +1694,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg); pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2356,53 +2341,60 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2356,53 +2341,60 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) { if (NULL == pNew) {
tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self); tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META; pNew->cmd.command = TSDB_SQL_META;
tscAddQueryInfo(&pNew->cmd); tscAddQueryInfo(&pNew->cmd);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd); SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd);
pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self); tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo); STableMetaInfo *pNewTableMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
assert(pNewQueryInfo->numOfTables == 1); assert(pNewQueryInfo->numOfTables == 1);
tNameAssign(&pNewMeterMetaInfo->name, &pTableMetaInfo->name); tNameAssign(&pNewTableMetaInfo->name, &pTableMetaInfo->name);
if (pSql->cmd.autoCreated) {
int32_t code = copyTagData(&pNew->cmd.tagData, &pSql->cmd.tagData);
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" malloc failed for new tag data to get table meta", pSql->self);
tscFreeSqlObj(pNew);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
registerSqlObj(pNew); registerSqlObj(pNew);
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get tableMeta, auto create:%d", pSql->self, pNew->self,
pNew->cmd.autoCreated);
pNew->fp = tscTableMetaCallBack; pNew->fp = tscTableMetaCallBack;
pNew->param = (void *)pSql->self; pNew->param = (void *)pSql->self;
tscDebug("0x%"PRIx64" metaRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->metaRid, pNew->self); tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get tableMeta, auto create:%d, metaRid from %"PRId64" to %"PRId64,
pSql->self, pNew->self, autocreate, pSql->metaRid, pNew->self);
pSql->metaRid = pNew->self; pSql->metaRid = pNew->self;
{
STableInfoMsg *pInfoMsg = (STableInfoMsg *)pNew->cmd.payload;
int32_t code = tNameExtractFullName(&pNewTableMetaInfo->name, pInfoMsg->tableFname);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pInfoMsg->createFlag = htons(autocreate? 1 : 0);
char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg);
// tag data exists
if (autocreate && pSql->cmd.tagData.dataLen != 0) {
pMsg = serializeTagData(&pSql->cmd.tagData, pMsg);
}
pNew->cmd.payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
pNew->cmd.msgType = TSDB_MSG_TYPE_CM_TABLE_META;
}
int32_t code = tscBuildAndSendRequest(pNew, NULL); int32_t code = tscBuildAndSendRequest(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated
...@@ -2481,7 +2473,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg ...@@ -2481,7 +2473,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
return code; return code;
} }
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
assert(tIsValidName(&pTableMetaInfo->name)); assert(tIsValidName(&pTableMetaInfo->name));
uint32_t size = tscGetTableMetaMaxSize(); uint32_t size = tscGetTableMetaMaxSize();
...@@ -2497,7 +2489,6 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { ...@@ -2497,7 +2489,6 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
memset(pTableMetaInfo->pTableMeta, 0, size); memset(pTableMetaInfo->pTableMeta, 0, size);
pTableMetaInfo->tableMetaSize = size; pTableMetaInfo->tableMetaSize = size;
} else { } else {
//uint32_t s = tscGetTableMetaSize(pTableMetaInfo->pTableMeta);
memset(pTableMetaInfo->pTableMeta, 0, size); memset(pTableMetaInfo->pTableMeta, 0, size);
pTableMetaInfo->tableMetaSize = size; pTableMetaInfo->tableMetaSize = size;
} }
...@@ -2520,23 +2511,26 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { ...@@ -2520,23 +2511,26 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
if (pMeta->tableType == TSDB_CHILD_TABLE) { if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMetaInfo->pTableMeta, name, buf); int32_t code = tscCreateTableMetaFromCChildMeta(pTableMetaInfo->pTableMeta, name, buf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return getTableMetaFromMnode(pSql, pTableMetaInfo); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
}
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
return tscGetTableMetaImpl(pSql, pTableMetaInfo, false);
} }
int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) { int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
pSql->cmd.autoCreated = createIfNotExists; return tscGetTableMetaImpl(pSql, pTableMetaInfo, createIfNotExists);
return tscGetTableMeta(pSql, pTableMetaInfo);
} }
/** /**
* retrieve table meta from mnode, and update the local table meta hashmap. * retrieve table meta from mnode, and then update the local table meta hashmap.
* @param pSql sql object * @param pSql sql object
* @param tableIndex table index * @param tableIndex table index
* @return status code * @return status code
...@@ -2564,7 +2558,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -2564,7 +2558,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
size_t len = strlen(name); size_t len = strlen(name);
taosHashRemove(tscTableMetaInfo, name, len); taosHashRemove(tscTableMetaInfo, name, len);
return getTableMetaFromMnode(pSql, pTableMetaInfo); return getTableMetaFromMnode(pSql, pTableMetaInfo, false);
} }
static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) {
...@@ -2656,7 +2650,7 @@ void tscInitMsgsFp() { ...@@ -2656,7 +2650,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg; // tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
......
...@@ -3020,8 +3020,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3020,8 +3020,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->res.code = pSql->res.code; pParentObj->res.code = pSql->res.code;
// set the flag in the parent sqlObj // set the flag in the parent sqlObj
if (pSql->cmd.submitSchema) { if (pSql->cmd.insertParam.schemaAttached) {
pParentObj->cmd.submitSchema = 1; pParentObj->cmd.insertParam.schemaAttached = 1;
} }
} }
...@@ -3069,15 +3069,15 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3069,15 +3069,15 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
tscError("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self, tscError("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self,
pParentObj->res.numOfRows, numOfFailed, numOfSub); pParentObj->res.numOfRows, numOfFailed, numOfSub);
tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.numOfTables); tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) { for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(pParentObj->cmd.pTableNameList[i], name); tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
pParentObj->res.code = TSDB_CODE_SUCCESS; pParentObj->res.code = TSDB_CODE_SUCCESS;
pParentObj->cmd.parseFinished = false; // pParentObj->cmd.parseFinished = false;
tscResetSqlCmd(&pParentObj->cmd, false); tscResetSqlCmd(&pParentObj->cmd, false);
...@@ -3112,7 +3112,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { ...@@ -3112,7 +3112,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); assert(pSupporter->index < pSupporter->pSql->subState.numOfSub);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index); STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.insertParam.pDataBlocks, pSupporter->index);
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
......
...@@ -1106,33 +1106,31 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1106,33 +1106,31 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
} }
void destroyTableNameList(SSqlCmd* pCmd) { void destroyTableNameList(SSqlCmd* pCmd) {
if (pCmd->numOfTables == 0) { if (pCmd->insertParam.numOfTables == 0) {
assert(pCmd->pTableNameList == NULL); assert(pCmd->insertParam.pTableNameList == NULL);
return; return;
} }
for(int32_t i = 0; i < pCmd->numOfTables; ++i) { for(int32_t i = 0; i < pCmd->insertParam.numOfTables; ++i) {
tfree(pCmd->pTableNameList[i]); tfree(pCmd->insertParam.pTableNameList[i]);
} }
pCmd->numOfTables = 0; pCmd->insertParam.numOfTables = 0;
tfree(pCmd->pTableNameList); tfree(pCmd->insertParam.pTableNameList);
} }
void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
pCmd->command = 0; pCmd->command = 0;
pCmd->numOfCols = 0; pCmd->numOfCols = 0;
pCmd->count = 0; pCmd->count = 0;
pCmd->curSql = NULL; pCmd->curSql = NULL;
pCmd->msgType = 0; pCmd->msgType = 0;
pCmd->parseFinished = 0;
pCmd->autoCreated = 0;
destroyTableNameList(pCmd); destroyTableNameList(pCmd);
pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, removeMeta); pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, clearCachedMeta);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
tscFreeQueryInfo(pCmd, removeMeta); tscFreeQueryInfo(pCmd, clearCachedMeta);
if (pCmd->pTableMetaMap != NULL) { if (pCmd->pTableMetaMap != NULL) {
STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
...@@ -1341,7 +1339,7 @@ void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) { ...@@ -1341,7 +1339,7 @@ void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) {
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
assert(pDataBlock->pTableMeta != NULL); assert(pDataBlock->pTableMeta != NULL && pDataBlock->size <= pDataBlock->nAllocSize && pDataBlock->size > sizeof(SMsgDesc));
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
...@@ -1363,22 +1361,22 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -1363,22 +1361,22 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* the dataBlock only includes the RPC Header buffer and actual submit message body, * the dataBlock only includes the RPC Header buffer and actual submit message body,
* space for digest needs additional space. * space for digest needs additional space.
*/ */
int ret = tscAllocPayload(pCmd, pDataBlock->size + 100); int ret = tscAllocPayload(pCmd, pDataBlock->size);
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != ret) {
return ret; return ret;
} }
assert(pDataBlock->size <= pDataBlock->nAllocSize);
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size); memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size);
//the payloadLen should be actual message body size, the old value of payloadLen is the allocated payload size //the payloadLen should be actual message body size, the old value of payloadLen is the allocated payload size
pCmd->payloadLen = pDataBlock->size; pCmd->payloadLen = pDataBlock->size;
assert(pCmd->allocSize >= (uint32_t)(pCmd->payloadLen));
// NOTE: shell message size should not include SMsgDesc // NOTE: shell message size should not include SMsgDesc
int32_t size = pCmd->payloadLen - sizeof(SMsgDesc); int32_t size = pCmd->payloadLen - sizeof(SMsgDesc);
SMsgDesc* pMsgDesc = (SMsgDesc*) pCmd->payload; SMsgDesc* pMsgDesc = (SMsgDesc*) pCmd->payload;
pMsgDesc->numOfVnodes = htonl(1); // always for one vnode pMsgDesc->numOfVnodes = htonl(1); // always for one vnode
SSubmitMsg *pShellMsg = (SSubmitMsg *)(pCmd->payload + sizeof(SMsgDesc)); SSubmitMsg *pShellMsg = (SSubmitMsg *)(pCmd->payload + sizeof(SMsgDesc));
pShellMsg->header.vgId = htonl(pDataBlock->pTableMeta->vgId); // data in current block all routes to the same vgroup pShellMsg->header.vgId = htonl(pDataBlock->pTableMeta->vgId); // data in current block all routes to the same vgroup
...@@ -1386,8 +1384,6 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -1386,8 +1384,6 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
pShellMsg->length = pShellMsg->header.contLen; pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfBlocks = htonl(pDataBlock->numOfTables); // the number of tables to be inserted pShellMsg->numOfBlocks = htonl(pDataBlock->numOfTables); // the number of tables to be inserted
assert(pCmd->allocSize >= (uint32_t)(pCmd->payloadLen + 100) && pCmd->payloadLen > 0);
tscDebug("0x%"PRIx64" submit msg built, vgId:%d numOfTables:%d", pSql->self, pDataBlock->pTableMeta->vgId, pDataBlock->numOfTables); tscDebug("0x%"PRIx64" submit msg built, vgId:%d numOfTables:%d", pSql->self, pDataBlock->pTableMeta->vgId, pDataBlock->numOfTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1550,20 +1546,20 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -1550,20 +1546,20 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
} }
static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) { static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->insertParam.pTableBlockHashList); pCmd->insertParam.numOfTables = (int32_t) taosHashGetSize(pCmd->insertParam.pTableBlockHashList);
if (pCmd->pTableNameList == NULL) { if (pCmd->insertParam.pTableNameList == NULL) {
pCmd->pTableNameList = calloc(pCmd->numOfTables, POINTER_BYTES); pCmd->insertParam.pTableNameList = calloc(pCmd->insertParam.numOfTables, POINTER_BYTES);
} else { } else {
memset(pCmd->pTableNameList, 0, pCmd->numOfTables * POINTER_BYTES); memset(pCmd->insertParam.pTableNameList, 0, pCmd->insertParam.numOfTables * POINTER_BYTES);
} }
STableDataBlocks **p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); STableDataBlocks **p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
int32_t i = 0; int32_t i = 0;
while(p1) { while(p1) {
STableDataBlocks* pBlocks = *p1; STableDataBlocks* pBlocks = *p1;
tfree(pCmd->pTableNameList[i]); tfree(pCmd->insertParam.pTableNameList[i]);
pCmd->pTableNameList[i++] = tNameDup(&pBlocks->tableName); pCmd->insertParam.pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p1); p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p1);
} }
...@@ -1635,7 +1631,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) { ...@@ -1635,7 +1631,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
pBlocks->schemaLen = 0; pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema); int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->insertParam.schemaAttached);
assert(finalLen <= len); assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
...@@ -1693,18 +1689,22 @@ bool tscIsInsertData(char* sqlstr) { ...@@ -1693,18 +1689,22 @@ bool tscIsInsertData(char* sqlstr) {
} }
int tscAllocPayload(SSqlCmd* pCmd, int size) { int tscAllocPayload(SSqlCmd* pCmd, int size) {
assert(size > 0);
if (pCmd->payload == NULL) { if (pCmd->payload == NULL) {
assert(pCmd->allocSize == 0); assert(pCmd->allocSize == 0);
pCmd->payload = (char*)calloc(1, size); pCmd->payload = (char*)calloc(1, size);
if (pCmd->payload == NULL) return TSDB_CODE_TSC_OUT_OF_MEMORY; if (pCmd->payload == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pCmd->allocSize = size; pCmd->allocSize = size;
} else { } else {
if (pCmd->allocSize < (uint32_t)size) { if (pCmd->allocSize < (uint32_t)size) {
char* b = realloc(pCmd->payload, size); char* b = realloc(pCmd->payload, size);
if (b == NULL) return TSDB_CODE_TSC_OUT_OF_MEMORY; if (b == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pCmd->payload = b; pCmd->payload = b;
pCmd->allocSize = size; pCmd->allocSize = size;
} }
...@@ -1712,7 +1712,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { ...@@ -1712,7 +1712,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
memset(pCmd->payload, 0, pCmd->allocSize); memset(pCmd->payload, 0, pCmd->allocSize);
} }
assert(pCmd->allocSize >= (uint32_t)size); assert(pCmd->allocSize >= (uint32_t)size && size > 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3006,8 +3006,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in ...@@ -3006,8 +3006,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd; pCmd->command = cmd;
pCmd->parseFinished = 1;
pCmd->autoCreated = pSql->cmd.autoCreated;
int32_t code = copyTagData(&pNew->cmd.tagData, &pSql->cmd.tagData); int32_t code = copyTagData(&pNew->cmd.tagData, &pSql->cmd.tagData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3106,12 +3104,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3106,12 +3104,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pnCmd->pTableMetaMap = NULL; pnCmd->pTableMetaMap = NULL;
pnCmd->pQueryInfo = NULL; pnCmd->pQueryInfo = NULL;
pnCmd->pDataBlocks = NULL; pnCmd->insertParam.pDataBlocks = NULL;
pnCmd->numOfTables = 0; pnCmd->insertParam.numOfTables = 0;
pnCmd->parseFinished = 1; pnCmd->insertParam.pTableNameList = NULL;
pnCmd->pTableNameList = NULL; pnCmd->insertParam.pTableBlockHashList = NULL;
pnCmd->pTableBlockHashList = NULL;
pnCmd->tagData.data = NULL; pnCmd->tagData.data = NULL;
pnCmd->tagData.dataLen = 0; pnCmd->tagData.dataLen = 0;
...@@ -3384,8 +3381,6 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3384,8 +3381,6 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
pCmd->parseFinished = 1;
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
} }
......
...@@ -345,6 +345,7 @@ do { \ ...@@ -345,6 +345,7 @@ do { \
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type #define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册