提交 69f09fd0 编写于 作者: D dapan1121

stmt

上级 a823535f
......@@ -280,13 +280,22 @@ typedef struct SVgDataBlocks {
char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SStmtDataCtx {
uint64_t tbUid;
SHashObj* pVgroupsHashObj; // global
SHashObj* pTableBlockHashObj; // global
SHashObj* pSubTableHashObj; // global
SArray* pTableDataBlocks; // global
} SStmtDataCtx;
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
SStmtDataCtx stmtCtx;
} SVnodeModifOpStmt;
typedef struct SExplainOptions {
......
......@@ -22,6 +22,11 @@ extern "C" {
#include "querynodes.h"
typedef struct SStmtCallback {
TAOS_STMT* pStmt;
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
} SStmtCallback;
typedef struct SParseContext {
uint64_t requestId;
int32_t acctId;
......@@ -34,6 +39,7 @@ typedef struct SParseContext {
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog;
SStmtCallback *pStmtCb;
} SParseContext;
typedef struct SCmdMsgInfo {
......
......@@ -132,6 +132,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222)
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223)
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224)
#define TSDB_CODE_TSC_STMT_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
// mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
......
......@@ -213,6 +213,7 @@ typedef struct SRequestObj {
int32_t sqlLen;
int64_t self;
char* msgBuf;
int32_t msgBufLen;
void* pInfo; // sql parse info, generated by parser module
int32_t code;
SArray* dbList;
......
......@@ -26,21 +26,51 @@ typedef enum {
STMT_TYPE_QUERY,
} STMT_TYPE;
typedef enum {
STMT_INIT = 1,
STMT_PREPARE,
STMT_SETTBNAME,
STMT_BIND,
STMT_BIND_COL,
STMT_ADD_BATCH,
STMT_EXECUTE
} STMT_STATUS;
typedef struct STscStmt {
STMT_TYPE type;
//int16_t last;
//STscObj* taos;
//SSqlObj* pSql;
STMT_TYPE type;
STMT_STATUS status;
STscObj* taos;
SRequestObj* pRequest;
SQuery* pQuery;
char* sql;
int32_t sqlLen;
char* tbName;
TAOS_BIND* bindTags;
//SMultiTbStmt mtb;
//SNormalStmt normal;
//int numOfRows;
} STscStmt;
#define STMT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define STMT_CHK_STATUS(_stmt, _status, _v) do {
switch (_status) {
case STMT_INIT:
if ((_stmt)->status != 0) return (_v);
break;
case STMT_PREPARE:
if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_v);
break;
case STMT_SETTBNAME:
break;
}
} while (0)
TAOS_STMT *stmtInit(TAOS *taos);
int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
......
......@@ -186,6 +186,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
pRequest->pTscObj = pObj;
pRequest->body.fp = fp; // not used it yet
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest);
......
......@@ -143,7 +143,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS;
}
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
......@@ -156,6 +156,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
......@@ -289,7 +290,7 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery);
code = parseSql(pRequest, false, &pQuery, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -569,84 +569,80 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
return stmtInit(taos);
}
int taos_stmt_close(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtClose(stmt);
return stmtPrepare(stmt, sql, length);
}
int taos_stmt_execute(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
if (stmt == NULL || name == NULL || tags == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtExec(stmt);
return stmtSetTbNameTags(stmt, name, tags);
}
char *taos_stmt_errstr(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
if (stmt == NULL || name == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
return terrno;
}
return stmtErrstr(stmt);
return stmtSetTbNameTags(stmt, name, NULL);
}
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return 0;
return terrno;
}
return stmtAffectedRows(stmt);
return stmtBind(stmt, bind);
}
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) {
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBind(stmt, bind);
}
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
if (bind->num <= 0 || bind->num > INT16_MAX) {
tscError("invalid bind num %d", bind->num);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtPrepare(stmt, sql, length);
return stmtBindBatch(stmt, bind);
}
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
if (stmt == NULL || name == NULL || tags == NULL) {
int taos_stmt_add_batch(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbNameTags(stmt, name, tags);
return stmtAddBatch(stmt);
}
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
if (stmt == NULL || name == NULL) {
int taos_stmt_execute(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbNameTags(stmt, name, NULL);
return stmtExec(stmt);
}
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
......@@ -669,34 +665,47 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return stmtGetParamNum(stmt, nums);
}
int taos_stmt_add_batch(TAOS_STMT *stmt) {
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
return NULL;
}
return stmtAddBatch(stmt);
return stmtUseResult(stmt);
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
char *taos_stmt_errstr(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
return stmtUseResult(stmt);
return stmtErrstr(stmt);
}
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return 0;
}
return stmtAffectedRows(stmt);
}
int taos_stmt_close(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind);
return stmtClose(stmt);
}
......
......@@ -4,11 +4,23 @@
#include "clientStmt.h"
#include "tdef.h"
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
if (NULL == pStmt->tbName) {
tscError("no table name set");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
}
*tbName = pStmt->tbName;
return TSDB_CODE_SUCCESS;
}
TAOS_STMT *stmtInit(TAOS *taos) {
STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL;
#if 0
pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
if (pStmt == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -16,38 +28,64 @@ TAOS_STMT *stmtInit(TAOS *taos) {
return NULL;
}
pStmt->taos = pObj;
pStmt->status = STMT_INIT;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
return pStmt;
}
if (pSql == NULL) {
free(pStmt);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to allocate memory for statement");
return NULL;
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_STATUS_ERROR);
pStmt->sql = strndup(sql, length);
pStmt->sqlLen = length;
return TSDB_CODE_SUCCESS;
}
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (tbName) {
pStmt->tbName = strdup(tbName);
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
free(pSql);
free(pStmt);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to malloc payload buffer");
return NULL;
if (tags) {
pStmt->bindTags = tags;
}
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStmt->pSql = pSql;
pStmt->last = STMT_INIT;
pStmt->numOfRows = 0;
registerSqlObj(pSql);
#endif
return TSDB_CODE_SUCCESS;
}
return pStmt;
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR);
if (NULL == pStmt->pRequest) {
SStmtCallback stmtCb = {.pStmt = stmt, .getTbNameFn = stmtGetTbName};
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql, pStmt->sqlLen, &pStmt->pRequest));
STMT_ERR_RET(parseSql(pStmt->pRequest, false, &pStmt->pQuery, &stmtCb));
}
qBindStmtData(pStmt->pQuery, bind, pStmt->pRequest->msgBuf, pStmt->pRequest->msgBufLen);
return TSDB_CODE_SUCCESS;
}
int stmtClose(TAOS_STMT *stmt) {
int stmtAddBatch(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHK_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_STATUS_ERROR);
qBuildStmtOutput(pStmt->pQuery);
return TSDB_CODE_SUCCESS;
}
......@@ -55,23 +93,22 @@ int stmtExec(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
char *stmtErrstr(TAOS_STMT *stmt) {
return NULL;
}
int stmtAffectedRows(TAOS_STMT *stmt) {
int stmtClose(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind) {
return TSDB_CODE_SUCCESS;
}
char *stmtErrstr(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
return TSDB_CODE_SUCCESS;
if (stmt == NULL) {
return (char*) tstrerror(terrno);
}
return taos_errstr(pStmt->pRequest);
}
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
int stmtAffectedRows(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
......@@ -83,17 +120,9 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
return TSDB_CODE_SUCCESS;
}
int stmtAddBatch(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
TAOS_RES *stmtUseResult(TAOS_STMT *stmt) {
return NULL;
}
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return TSDB_CODE_SUCCESS;
}
......@@ -89,7 +89,7 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
}
static FORCE_INLINE void getSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
int32_t *toffset, col_id_t *colIdx) {
col_id_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) {
......
......@@ -63,6 +63,7 @@ typedef struct SInsertParseContext {
SArray* pVgDataBlocks; // global
int32_t totalNum;
SVnodeModifOpStmt* pOutput;
SStmtCallback* pStmtCb;
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param);
......@@ -453,8 +454,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(pMsgBuf, &tmpVal, pSchema->bytes, param);
return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
}
return func(pMsgBuf, NULL, 0, param);
......@@ -872,8 +872,22 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
for (int i = 0; i < spd->numOfBound; ++i) {
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
if (sToken.type == TK_NK_QUESTION) {
isParseBindParam = true;
if (NULL == pCxt->pStmtCb) {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
}
continue;
}
if (isParseBindParam) {
return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
}
param.schema = pSchema;
getSTSRowAppendInfo(schema, pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
......@@ -1005,11 +1019,15 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
// for each table
while (1) {
destroyInsertParseContextForTable(pCxt);
SToken sToken;
char *tbName = NULL;
// pSql -> tb_name ...
NEXT_TOKEN(pCxt->pSql, sToken);
......@@ -1021,6 +1039,21 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
break;
}
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && tbNum > 0) {
return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");;
}
if (TK_NK_QUESTION == sToken.type) {
if (pCxt->pStmtCb) {
CHECK_CODE((*pCxt->pStmtCb->getTbNameFn)(pCxt->pStmtCb->pStmt, &tbName));
sToken.z = tbName;
sToken.n = strlen(tbName);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
}
}
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
......@@ -1046,6 +1079,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(parseValuesClause(pCxt, dataBuf));
pCxt->pOutput->insertType = TSDB_QUERY_TYPE_INSERT;
tbNum++;
continue;
}
......@@ -1058,13 +1093,31 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
}
// todo
pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT;
tbNum++;
continue;
}
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
}
if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
pCxt->pOutput->stmtCtx.tbUid = pCxt->pTableMeta->uid;
pCxt->pOutput->stmtCtx.pVgroupsHashObj = pCxt->pVgroupsHashObj;
pCxt->pOutput->stmtCtx.pTableBlockHashObj = pCxt->pTableBlockHashObj;
pCxt->pOutput->stmtCtx.pSubTableHashObj = pCxt->pSubTableHashObj;
pCxt->pOutput->stmtCtx.pTableDataBlocks = pCxt->pTableDataBlocks;
pCxt->pVgroupsHashObj = NULL;
pCxt->pTableBlockHashObj = NULL;
pCxt->pSubTableHashObj = NULL;
pCxt->pTableDataBlocks = NULL;
return TSDB_CODE_SUCCESS;
}
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
}
return buildOutput(pCxt);
......@@ -1086,7 +1139,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb
};
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
......@@ -1094,6 +1148,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (pContext->pStmtCb) {
TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
}
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -506,6 +506,28 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
// expand the allocated size
if (remain < allSize) {
pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;
char *tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
if (tmp != NULL) {
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
// do nothing, if allocate more memory failed
pDataBlock->nAllocSize = nAllocSizeOld;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
......
......@@ -51,6 +51,98 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) {
return code;
}
int32_t qBindStmtData(SQuery* pQuery, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pCtx->pTableBlockHashObj, (const char*)&pCtx->tbUid, sizeof(pCtx->tbUid));
if (NULL == pDataBlock) {
return TSDB_CODE_QRY_APP_ERROR;
}
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num);
for (int32_t r = 0; r < bind->num; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
param.schema = pColSchema;
getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
if (bind[c].is_null && bind[c].is_null[r]) {
CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
} else {
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = bind[c].length[r];
}
CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, &param));
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
checkTimestamp(pDataBlock, (const char *)&tsKey);
}
}
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
spd->cols[i].toffset);
}
}
}
pDataBlock->size += extendedRowSize;
}
SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
}
return TSDB_CODE_SUCCESS;
}
int32_t qBuildStmtOutput(SQuery* pQuery) {
SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot;
SStmtDataCtx *pCtx = &modifyNode->stmtCtx;
int32_t code = 0;
SInsertParseContext insertCtx = {
.pVgroupsHashObj = pCtx->pVgroupsHashObj;
.pTableBlockHashObj = pCtx->pTableBlockHashObj;
.pSubTableHashObj = pCtx->pSubTableHashObj;
.pTableDataBlocks = pCtx->pTableDataBlocks;
};
// merge according to vgId
if (taosHashGetSize(pCtx->pTableBlockHashObj) > 0) {
CHECK_CODE_GOTO(mergeTableDataBlocks(pCtx->pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return);
}
code = buildOutput(&insertCtx);
_return:
destroyInsertParseContext(&insertCtx);
return code;
}
void qDestroyQuery(SQuery* pQueryNode) {
if (NULL == pQueryNode) {
return;
......@@ -68,4 +160,4 @@ void qDestroyQuery(SQuery* pQueryNode) {
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
return extractResultSchema(pRoot, numOfCols, pSchema);
}
\ No newline at end of file
}
......@@ -138,6 +138,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_STATUS_ERROR, "Stmt API usage error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
// mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册