提交 be021333 编写于 作者: H Hui Li

[TD-489]

上级 19d1c87b
...@@ -317,6 +317,7 @@ typedef struct SSqlObj { ...@@ -317,6 +317,7 @@ typedef struct SSqlObj {
SRpcIpSet ipList; SRpcIpSet ipList;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
uint32_t insertType;
tsem_t rspSem; tsem_t rspSem;
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
...@@ -402,6 +403,7 @@ void tscCloseTscObj(STscObj *pObj); ...@@ -402,6 +403,7 @@ void tscCloseTscObj(STscObj *pObj);
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos); void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
......
...@@ -482,6 +482,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -482,6 +482,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
} else { } else {
code = tsParseSql(pSql, false); code = tsParseSql(pSql, false);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
(*pSql->fp)(pSql->param, NULL, code);
return;
}
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} }
} }
......
...@@ -1312,6 +1312,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1312,6 +1312,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pSql->insertType);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) { if (sToken.type != TK_INTO) {
...@@ -1339,7 +1340,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { ...@@ -1339,7 +1340,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
* Set the fp before parse the sql string, in case of getTableMeta failed, in which * Set the fp before parse the sql string, in case of getTableMeta failed, in which
* the error handle callback function can rightfully restore the user-defined callback function (fp). * the error handle callback function can rightfully restore the user-defined callback function (fp).
*/ */
if (initialParse) { if (initialParse && (pSql->insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
pSql->fetchFp = pSql->fp; pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert; pSql->fp = (void(*)())tscHandleMultivnodeInsert;
} }
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "taos.h" #include "taos.h"
#include "tsclient.h" #include "tsclient.h"
...@@ -20,6 +21,7 @@ ...@@ -20,6 +21,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tstrbuild.h" #include "tstrbuild.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h"
int tsParseInsertSql(SSqlObj *pSql); int tsParseInsertSql(SSqlObj *pSql);
int taos_query_imp(STscObj* pObj, SSqlObj* pSql); int taos_query_imp(STscObj* pObj, SSqlObj* pSql);
...@@ -262,7 +264,11 @@ static char* normalStmtBuildSql(STscStmt* stmt) { ...@@ -262,7 +264,11 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) { static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
if (bind->is_null != NULL && *(bind->is_null)) { if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data, param->type, param->bytes); if (param->type == TSDB_DATA_TYPE_BINARY || param->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(data + param->offset, param->type);
} else {
setNull(data + param->offset, param->type, param->bytes);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -297,14 +303,17 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) { ...@@ -297,14 +303,17 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
size = (short)*bind->length; size = (short)*bind->length;
break; STR_WITH_SIZE_TO_VARSTR(data + param->offset, bind->buffer, size);
return TSDB_CODE_SUCCESS;
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR: {
if (!taosMbsToUcs4(bind->buffer, *bind->length, data + param->offset, param->bytes, NULL)) { size_t output = 0;
if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
varDataSetLen(data + param->offset, output);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
default: default:
assert(false); assert(false);
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
...@@ -383,14 +392,6 @@ static int insertStmtAddBatch(STscStmt* stmt) { ...@@ -383,14 +392,6 @@ static int insertStmtAddBatch(STscStmt* stmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int insertStmtPrepare(STscStmt* stmt) {
SSqlObj *pSql = stmt->pSql;
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
return tsParseInsertSql(pSql);
}
static int insertStmtReset(STscStmt* pStmt) { static int insertStmtReset(STscStmt* pStmt) {
SSqlCmd* pCmd = &pStmt->pSql->cmd; SSqlCmd* pCmd = &pStmt->pSql->cmd;
if (pCmd->batchSize > 2) { if (pCmd->batchSize > 2) {
...@@ -451,14 +452,16 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -451,14 +452,16 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->insertType = 0;
pSql->fetchFp = waitForQueryRsp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
tscDoQuery(pSql); tscDoQuery(pSql);
// tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); // wait for the callback function to post the semaphore
if (pRes->code != TSDB_CODE_SUCCESS) { tsem_wait(&pSql->rspSem);
tscPartiallyFreeSqlObj(pSql); return pSql->res.code;
}
return pRes->code;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -478,6 +481,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -478,6 +481,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
tscError("failed to allocate memory for statement"); tscError("failed to allocate memory for statement");
return NULL; return NULL;
} }
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
...@@ -488,8 +492,10 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -488,8 +492,10 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
} }
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->pTscObj->pSql = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pStmt->pSql = pSql; pStmt->pSql = pSql;
return pStmt; return pStmt;
...@@ -497,22 +503,55 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -497,22 +503,55 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
if (length == 0) {
length = strlen(sql); if (stmt == NULL || pStmt->taos == NULL || pStmt->pSql == NULL) {
terrno = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED;
}
SSqlObj* pSql = pStmt->pSql;
size_t sqlLen = strlen(sql);
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*)pStmt->taos;
pSql->fp = waitForQueryRsp;
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p failed to malloc payload buffer", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
char* sqlstr = (char*)malloc(length + 1);
if (sqlstr == NULL) { pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
tscError("failed to malloc sql string buffer");
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
free(pCmd->payload);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
memcpy(sqlstr, sql, length);
sqlstr[length] = 0; pRes->qhandle = 0;
strtolower(sqlstr, sqlstr); pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
pStmt->pSql->sqlstr = sqlstr; if (tscIsInsertData(pSql->sqlstr)) {
if (tscIsInsertData(sqlstr)) {
pStmt->isInsert = true; pStmt->isInsert = true;
return insertStmtPrepare(pStmt);
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
return pSql->res.code;
}
return code;
} }
pStmt->isInsert = false; pStmt->isInsert = false;
...@@ -574,7 +613,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) { ...@@ -574,7 +613,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
} else { } else {
tfree(pStmt->pSql->sqlstr); tfree(pStmt->pSql->sqlstr);
pStmt->pSql->sqlstr = sql; pStmt->pSql->sqlstr = sql;
ret = taos_query_imp(pStmt->taos, pStmt->pSql); ret = taos_query(pStmt->taos, pStmt->pSql->sqlstr);
} }
} }
return ret; return ret;
......
...@@ -264,7 +264,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -264,7 +264,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSqlObj *pSql = ((STscObj *)param)->pSql; SSqlObj *pSql = ((STscObj *)param)->pSql;
......
...@@ -331,6 +331,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -331,6 +331,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册