未验证 提交 3f880627 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #11700 from taosdata/feature/qnode5

Feature/qnode5
......@@ -92,38 +92,14 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_BIND {
int buffer_type;
void *buffer;
uintptr_t buffer_length; // unused
uintptr_t *length;
int *is_null;
int is_unsigned; // unused
int *error; // unused
union {
int64_t ts;
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
unsigned char *bin;
char *nchar;
} u;
unsigned int allocated;
} TAOS_BIND;
typedef struct TAOS_MULTI_BIND {
typedef struct TAOS_BIND_v2 {
int buffer_type;
void *buffer;
uintptr_t buffer_length;
int32_t buffer_length;
int32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
} TAOS_BIND_v2;
typedef enum {
SET_CONF_RET_SUCC = 0,
......@@ -152,34 +128,34 @@ DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
......
......@@ -684,6 +684,43 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
return TSDB_CODE_SUCCESS;
}
/**
* @brief The invoker is responsible for memory alloc/dealloc.
*
* @param pBuilder
* @param pBuf Output buffer of STSRow
*/
static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
pBuilder->pBuf = (STSRow *)pBuf;
if (!pBuilder->pBuf) {
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
TASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0);
uint32_t len = 0;
switch (pBuilder->rowType) {
case TD_ROW_TP:
#ifdef TD_SUPPORT_BITMAP
pBuilder->pBitmap = tdGetBitmapAddrTp(pBuilder->pBuf, pBuilder->flen);
#endif
break;
case TD_ROW_KV:
#ifdef TD_SUPPORT_BITMAP
pBuilder->pBitmap = tdGetBitmapAddrKv(pBuilder->pBuf, pBuilder->nBoundCols);
#endif
break;
default:
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
*
......
......@@ -283,12 +283,12 @@ typedef struct SVgDataBlocks {
} SVgDataBlocks;
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SVnodeModifOpStmt;
typedef struct SExplainOptions {
......
......@@ -21,6 +21,15 @@ extern "C" {
#endif
#include "querynodes.h"
#include "query.h"
typedef struct SStmtCallback {
TAOS_STMT* pStmt;
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*);
int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*);
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
} SStmtCallback;
typedef struct SParseContext {
uint64_t requestId;
......@@ -34,6 +43,7 @@ typedef struct SParseContext {
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog;
SStmtCallback *pStmtCb;
} SParseContext;
typedef struct SCmdMsgInfo {
......@@ -66,11 +76,27 @@ typedef struct SQuery {
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
bool isInsertSql(const char* pStr, size_t length);
void qDestroyQuery(SQuery* pQueryNode);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
void qFreeStmtDataBlock(void* pDataBlock);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
void qDestroyStmtDataBlock(void* pBlock);
int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum);
int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields);
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen);
#ifdef __cplusplus
}
#endif
......
......@@ -45,7 +45,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
typedef TAOS_MULTI_BIND TAOS_BIND_v2; // todo remove
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_BIND_v2* pParams);
// Convert to subplan to string for the scheduler to send to the executor
......
......@@ -132,6 +132,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222)
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223)
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224)
#define TSDB_CODE_TSC_STMT_API_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
// mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
......
......@@ -205,7 +205,8 @@ typedef struct SRequestObj {
char* sqlstr; // sql string
int32_t sqlLen;
int64_t self;
char* msgBuf; // error msg buffer
char* msgBuf;
int32_t msgBufLen;
int32_t code;
SArray* dbList;
SArray* tableList;
......@@ -278,7 +279,8 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
......@@ -302,6 +304,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq
void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
#ifdef __cplusplus
}
#endif
......
......@@ -19,6 +19,9 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "catalog.h"
typedef void STableDataBlocks;
typedef enum {
STMT_TYPE_INSERT = 1,
......@@ -26,17 +29,64 @@ typedef enum {
STMT_TYPE_QUERY,
} STMT_TYPE;
typedef enum {
STMT_INIT = 1,
STMT_PREPARE,
STMT_SETTBNAME,
STMT_SETTAGS,
STMT_FETCH_TAG_FIELDS,
STMT_FETCH_COL_FIELDS,
STMT_BIND,
STMT_BIND_COL,
STMT_ADD_BATCH,
STMT_EXECUTE,
} STMT_STATUS;
typedef struct SStmtTableCache {
STableDataBlocks* pDataBlock;
void* boundTags;
} SStmtTableCache;
typedef struct SStmtBindInfo {
bool needParse;
uint64_t tbUid;
uint64_t tbSuid;
int32_t sBindRowNum;
int32_t sBindLastIdx;
int8_t tbType;
void* boundTags;
char* tbName;
SName sname;
} SStmtBindInfo;
typedef struct SStmtExecInfo {
SRequestObj* pRequest;
SHashObj* pVgHash;
SHashObj* pBlockHash;
} SStmtExecInfo;
typedef struct SStmtSQLInfo {
STMT_TYPE type;
STMT_STATUS status;
bool autoCreate;
uint64_t runTimes;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
char* sqlStr;
int32_t sqlLen;
} SStmtSQLInfo;
typedef struct STscStmt {
STMT_TYPE type;
//int16_t last;
//STscObj* taos;
//SSqlObj* pSql;
//SMultiTbStmt mtb;
//SNormalStmt normal;
//int numOfRows;
STscObj* taos;
SCatalog* pCatalog;
int32_t affectedRows;
SStmtSQLInfo sql;
SStmtExecInfo exec;
SStmtBindInfo bInfo;
} STscStmt;
#define STMT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
......@@ -44,16 +94,16 @@ typedef struct STscStmt {
TAOS_STMT *stmtInit(TAOS *taos);
int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
char *stmtErrstr(TAOS_STMT *stmt);
const char *stmtErrstr(TAOS_STMT *stmt);
int stmtAffectedRows(TAOS_STMT *stmt);
int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
int stmtGetParamNum(TAOS_STMT *stmt, int *nums);
int stmtAddBatch(TAOS_STMT *stmt);
TAOS_RES *stmtUseResult(TAOS_STMT *stmt);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int32_t colIdx);
#ifdef __cplusplus
......
......@@ -186,6 +186,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
pRequest->pTscObj = pObj;
pRequest->body.fp = fp; // not used it yet
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest);
......
......@@ -157,7 +157,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS;
}
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
......@@ -170,6 +170,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
......@@ -298,16 +299,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
......@@ -331,7 +325,10 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
}
taosArrayDestroy(pNodeList);
qDestroyQuery(pQuery);
if (!keepQuery) {
qDestroyQuery(pQuery);
}
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
}
......@@ -339,6 +336,19 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
return pRequest;
}
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery, NULL);
}
return launchQueryImpl(pRequest, pQuery, code, false);
}
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SCatalog* pCatalog = NULL;
int32_t code = 0;
......@@ -383,7 +393,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = execQueryImpl(pTscObj, sql, sqlLen);
pRequest = launchQuery(pTscObj, sql, sqlLen);
if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
......@@ -715,7 +725,8 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
ASSERT(len <= bytes);
ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
varDataSetLen(p, len);
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
p += (len + VARSTR_HEADER_SIZE);
......
......@@ -128,6 +128,10 @@ const char *taos_errstr(TAOS_RES *res) {
}
void taos_free_result(TAOS_RES *res) {
if (NULL == res) {
return;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
destroyRequest(pRequest);
......@@ -579,84 +583,111 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
return stmtInit(taos);
}
int taos_stmt_close(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtClose(stmt);
return stmtPrepare(stmt, sql, length);
}
int taos_stmt_execute(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags) {
if (stmt == NULL || name == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtExec(stmt);
int32_t code = stmtSetTbName(stmt, name);
if (code) {
return code;
}
if (tags) {
return stmtSetTbTags(stmt, tags);
}
return TSDB_CODE_SUCCESS;
}
char *taos_stmt_errstr(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
if (stmt == NULL || name == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
return terrno;
}
return stmtErrstr(stmt);
return stmtSetTbName(stmt, name);
}
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return 0;
return terrno;
}
return stmtAffectedRows(stmt);
if (bind->num > 1) {
tscError("invalid bind number %d for %s", bind->num, __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, -1);
}
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) {
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBind(stmt, bind);
if (bind->num <= 0 || bind->num > INT16_MAX) {
tscError("invalid bind num %d", bind->num);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, -1);
}
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtPrepare(stmt, sql, length);
if (colIdx < 0) {
tscError("invalid bind column idx %d", colIdx);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, colIdx);
}
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
if (stmt == NULL || name == NULL || tags == NULL) {
int taos_stmt_add_batch(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbNameTags(stmt, name, tags);
return stmtAddBatch(stmt);
}
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
if (stmt == NULL || name == NULL) {
int taos_stmt_execute(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbNameTags(stmt, name, NULL);
return stmtExec(stmt);
}
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
......@@ -679,34 +710,38 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return stmtGetParamNum(stmt, nums);
}
int taos_stmt_add_batch(TAOS_STMT *stmt) {
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
return NULL;
}
return stmtAddBatch(stmt);
return stmtUseResult(stmt);
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
char *taos_stmt_errstr(TAOS_STMT *stmt) {
return (char *)stmtErrstr(stmt);
}
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
return 0;
}
return stmtUseResult(stmt);
return stmtAffectedRows(stmt);
}
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
int taos_stmt_close(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind);
return stmtClose(stmt);
}
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
......
......@@ -4,86 +4,539 @@
#include "clientStmt.h"
#include "tdef.h"
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
switch (newStatus) {
case STMT_SETTBNAME:
break;
default:
break;
}
//STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
pStmt->sql.status = newStatus;
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
if (NULL == pStmt->bInfo.tbName) {
tscError("no table name set");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
}
*tbName = pStmt->bInfo.tbName;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = tags;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->exec.pVgHash = pVgHash;
pStmt->exec.pBlockHash = pBlockHash;
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
*pVgHash = pStmt->exec.pVgHash;
*pBlockHash = pStmt->exec.pBlockHash;
return TSDB_CODE_SUCCESS;
}
int32_t stmtCacheBlock(STscStmt *pStmt) {
if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
return TSDB_CODE_SUCCESS;
}
uint64_t uid;
if (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) {
uid = pStmt->bInfo.tbSuid;
} else {
ASSERT(TSDB_NORMAL_TABLE == pStmt->bInfo.tbType);
uid = pStmt->bInfo.tbUid;
}
if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) {
return TSDB_CODE_SUCCESS;
}
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
STableDataBlocks* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
SStmtTableCache cache = {
.pDataBlock = pDst,
.boundTags = pStmt->bInfo.boundTags,
};
if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pStmt->bInfo.boundTags = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t stmtParseSql(STscStmt* pStmt) {
SStmtCallback stmtCb = {
.pStmt = pStmt,
.getTbNameFn = stmtGetTbName,
.setBindInfoFn = stmtSetBindInfo,
.setExecInfoFn = stmtSetExecInfo,
.getExecInfoFn = stmtGetExecInfo,
};
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
pStmt->bInfo.needParse = false;
switch (nodeType(pStmt->sql.pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->sql.type) {
pStmt->sql.type = STMT_TYPE_INSERT;
}
break;
case QUERY_NODE_SELECT_STMT:
pStmt->sql.type = STMT_TYPE_QUERY;
break;
default:
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
pStmt->bInfo.tbUid = 0;
pStmt->bInfo.tbSuid = 0;
pStmt->bInfo.tbType = 0;
pStmt->bInfo.needParse = true;
taosMemoryFreeClear(pStmt->bInfo.tbName);
destroyBoundColumnInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
uint64_t *key = taosHashGetKey(pIter, NULL);
if (keepTable && (*key == pStmt->bInfo.tbUid)) {
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
continue;
}
qFreeStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
}
if (keepTable) {
return TSDB_CODE_SUCCESS;
}
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
taosMemoryFree(pStmt->sql.sqlStr);
qDestroyQuery(pStmt->sql.pQuery);
void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
while (pIter) {
SStmtTableCache* pCache = (SStmtTableCache*)pIter;
qDestroyStmtDataBlock(pCache->pDataBlock);
destroyBoundColumnInfo(pCache->boundTags);
pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
}
taosHashCleanup(pStmt->sql.pTableCache);
pStmt->sql.pTableCache = NULL;
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
STMT_ERR_RET(stmtCleanExecInfo(pStmt, false));
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.needParse = true;
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pStmt->pCatalog) {
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
}
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
if (pTableMeta->uid == pStmt->bInfo.tbUid) {
pStmt->bInfo.needParse = false;
return TSDB_CODE_SUCCESS;
}
if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (NULL == pCache) {
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
pStmt->bInfo.needParse = false;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
return TSDB_CODE_SUCCESS;
}
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (pCache) {
pStmt->bInfo.needParse = false;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtResetStmt(STscStmt* pStmt) {
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == pStmt->sql.pTableCache) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
STMT_ERR_RET(terrno);
}
pStmt->sql.status = STMT_INIT;
return TSDB_CODE_SUCCESS;
}
TAOS_STMT *stmtInit(TAOS *taos) {
STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL;
#if 0
pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
if (pStmt == NULL) {
if (NULL == pStmt) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to allocate memory for statement");
return NULL;
}
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
free(pStmt);
pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == pStmt->sql.pTableCache) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to allocate memory for statement");
taosMemoryFree(pStmt);
return NULL;
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
free(pSql);
free(pStmt);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to malloc payload buffer");
return NULL;
pStmt->taos = pObj;
pStmt->bInfo.needParse = true;
pStmt->sql.status = STMT_INIT;
return pStmt;
}
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtResetStmt(pStmt));
}
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStmt->pSql = pSql;
pStmt->last = STMT_INIT;
pStmt->numOfRows = 0;
registerSqlObj(pSql);
#endif
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
return pStmt;
}
if (length <= 0) {
length = strlen(sql);
}
pStmt->sql.sqlStr = strndup(sql, length);
pStmt->sql.sqlLen = length;
int stmtClose(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
int stmtExec(TAOS_STMT *stmt) {
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(stmtGetFromCache(pStmt));
if (pStmt->bInfo.needParse) {
taosMemoryFree(pStmt->bInfo.tbName);
pStmt->bInfo.tbName = strdup(tbName);
}
return TSDB_CODE_SUCCESS;
}
char *stmtErrstr(TAOS_STMT *stmt) {
return NULL;
}
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, &pStmt->bInfo.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
int stmtAffectedRows(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind) {
int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_TAG_FIELDS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
return TSDB_CODE_SUCCESS;
int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_COL_FIELDS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int32_t colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (colIdx < 0) {
qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
} else {
if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
tscError("bind column index not in sequence");
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
pStmt->bInfo.sBindLastIdx = colIdx;
if (0 == colIdx) {
pStmt->bInfo.sBindRowNum = bind->num;
}
qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
}
return TSDB_CODE_SUCCESS;
}
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
int stmtAddBatch(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
int stmtExec(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
STMT_ERR_JRET(pStmt->exec.pRequest->code);
pStmt->affectedRows += taos_affected_rows(pStmt->exec.pRequest);
_return:
stmtCleanExecInfo(pStmt, (code ? false : true));
++pStmt->sql.runTimes;
STMT_RET(code);
}
int stmtClose(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_RET(stmtCleanSQLInfo(pStmt));
}
const char *stmtErrstr(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL) {
return (char*) tstrerror(terrno);
}
if (pStmt->exec.pRequest) {
pStmt->exec.pRequest->code = terrno;
}
return taos_errstr(pStmt->exec.pRequest);
}
int stmtAffectedRows(TAOS_STMT *stmt) {
return ((STscStmt*)stmt)->affectedRows;
}
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else {
*insert = isInsertSql(pStmt->sql.sqlStr, 0);
}
return TSDB_CODE_SUCCESS;
}
int stmtAddBatch(TAOS_STMT *stmt) {
int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
return TSDB_CODE_SUCCESS;
}
......@@ -91,9 +544,5 @@ TAOS_RES *stmtUseResult(TAOS_STMT *stmt) {
return NULL;
}
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return TSDB_CODE_SUCCESS;
}
......@@ -615,7 +615,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
int32_t code = 0;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
// todo check for invalid sql statement and return with error code
......
......@@ -519,8 +519,9 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
SKvRowIdx *pIdx = tdKvRowColIdxAt(pRow, rcol);
int16_t colIdx = -1;
if (pIdx) {
colIdx = POINTER_DISTANCE(pRow->data, pIdx) / sizeof(SKvRowIdx);
colIdx = POINTER_DISTANCE(pIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx);
}
TASSERT(colIdx >= 0);
SCellVal sVal = {0};
if (pIdx->colId == pDataCol->colId) {
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) {
......
......@@ -89,7 +89,7 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
}
static FORCE_INLINE void getSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
int32_t *toffset, col_id_t *colIdx) {
col_id_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) {
......@@ -131,7 +131,6 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
void destroyBlockArrayList(SArray* pDataBlockList);
void destroyBlockHashmap(SHashObj* pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
......@@ -139,5 +138,7 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq);
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
#endif // TDENGINE_DATABLOCKMGT_H
......@@ -21,6 +21,8 @@ extern "C" {
#endif
#include "parser.h"
#include "parToken.h"
#include "parUtil.h"
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
......
此差异已折叠。
......@@ -18,6 +18,7 @@
#include "catalog.h"
#include "parUtil.h"
#include "querynodes.h"
#include "parInt.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
......@@ -102,7 +103,13 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) {
}
}
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
void destroyBoundColumnInfo(void* pBoundInfo) {
if (NULL == pBoundInfo) {
return;
}
SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
taosMemoryFreeClear(pColList->boundColumns);
taosMemoryFreeClear(pColList->cols);
taosMemoryFreeClear(pColList->colIdxInfo);
......@@ -149,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
if (pBlocks->nAllocSize - pBlocks->size < len) {
pBlocks->nAllocSize += len + pBlocks->rowSize;
......@@ -506,6 +513,28 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
// expand the allocated size
if (remain < allSize) {
pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;
char *tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
if (tmp != NULL) {
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
// do nothing, if allocate more memory failed
pDataBlock->nAllocSize = nAllocSizeOld;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
......@@ -541,3 +570,84 @@ int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo
pColInfo->boundNullLen);
return TSDB_CODE_SUCCESS;
}
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
STableDataBlocks* pBlock = (STableDataBlocks*)block;
if (keepBuf) {
taosMemoryFreeClear(pBlock->pData);
pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
if (NULL == pBlock->pData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
} else {
pBlock->pData = NULL;
}
pBlock->ordered = true;
pBlock->prevTS = INT64_MIN;
pBlock->size = sizeof(SSubmitBlk);
pBlock->tsSource = -1;
pBlock->numOfTables = 1;
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
pBlock->headerSize = pBlock->size;
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
return TSDB_CODE_SUCCESS;
}
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
*pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
if (NULL == *pDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
((STableDataBlocks*)(*pDst))->cloned = true;
return qResetStmtDataBlock(*pDst, false);
}
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
if (code) {
return code;
}
STableDataBlocks *pBlock = (STableDataBlocks*)*pDst;
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
if (NULL == pBlock->pData) {
qFreeStmtDataBlock(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
return TSDB_CODE_SUCCESS;
}
void qFreeStmtDataBlock(void* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
taosMemoryFreeClear(pDataBlock);
}
void qDestroyStmtDataBlock(void* pBlock) {
if (pBlock == NULL) {
return;
}
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
pDataBlock->cloned = false;
destroyDataBlock(pDataBlock);
}
......@@ -18,7 +18,11 @@
#include "parInt.h"
#include "parToken.h"
static bool isInsertSql(const char* pStr, size_t length) {
bool isInsertSql(const char* pStr, size_t length) {
if (NULL == pStr) {
return false;
}
int32_t index = 0;
do {
......@@ -68,4 +72,4 @@ void qDestroyQuery(SQuery* pQueryNode) {
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
return extractResultSchema(pRoot, numOfCols, pSchema);
}
\ No newline at end of file
}
......@@ -138,6 +138,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
// mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
......
此差异已折叠。
......@@ -4,18 +4,14 @@
ROOT=./
TARGET=exe
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
CFLAGS = -O0 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
CFLAGS = -O0 -g -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
-Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX \
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99 \
-fsanitize=address
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99 -Wno-sign-conversion
all: $(TARGET)
exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS)
gcc $(CFLAGS) ./stmtTest.c -o $(ROOT)stmtTest $(LFLAGS)
gcc $(CFLAGS) ./stmt_function.c -o $(ROOT)stmt_function $(LFLAGS)
clean:
rm $(ROOT)batchprepare
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册