提交 8913b012 编写于 作者: X Xiaoyu Wang

create sma index msg adjust

上级 afe99b1f
......@@ -199,6 +199,7 @@ typedef struct SIndexOptions {
typedef struct SCreateIndexStmt {
ENodeType type;
EIndexType indexType;
bool ignoreExists;
char indexName[TSDB_INDEX_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
SNodeList* pCols;
......@@ -207,6 +208,7 @@ typedef struct SCreateIndexStmt {
typedef struct SDropIndexStmt {
ENodeType type;
bool ignoreNotExists;
char indexName[TSDB_INDEX_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
} SDropIndexStmt;
......
......@@ -1480,7 +1480,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pNode->datum.p = calloc(1, pNode->node.resType.bytes);
pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
......
......@@ -150,9 +150,9 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, SToken* pIndexName, SToken* pTableName);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName, SToken* pTableName);
SNode* createCreateQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
SNode* createDropQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, const SToken* pSubscribeDbName);
......
......@@ -313,10 +313,11 @@ func_name_list(A) ::= func_name_list(B) NK_COMMA col_name(C).
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, &A, &B, NULL, C); }
cmd ::= CREATE FULLTEXT INDEX
index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, &A, &B, C, NULL); }
cmd ::= DROP INDEX index_name(A) ON table_name(B). { pCxt->pRootNode = createDropIndexStmt(pCxt, &A, &B); }
cmd ::= CREATE SMA INDEX not_exists_opt(D)
index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, &B, NULL, C); }
cmd ::= CREATE FULLTEXT INDEX not_exists_opt(D)
index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, D, &A, &B, C, NULL); }
cmd ::= DROP INDEX exists_opt(C) index_name(A) ON table_name(B). { pCxt->pRootNode = createDropIndexStmt(pCxt, C, &A, &B); }
index_options(A) ::= . { A = NULL; }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
......
......@@ -1160,13 +1160,14 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
return (SNode*)pStmt;
}
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
return NULL;
}
SCreateIndexStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->indexType = type;
pStmt->ignoreExists = ignoreExists;
strncpy(pStmt->indexName, pIndexName->z, pIndexName->n);
strncpy(pStmt->tableName, pTableName->z, pTableName->n);
pStmt->pCols = pCols;
......@@ -1184,12 +1185,13 @@ SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInt
return (SNode*)pOptions;
}
SNode* createDropIndexStmt(SAstCreateContext* pCxt, SToken* pIndexName, SToken* pTableName) {
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName, SToken* pTableName) {
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
return NULL;
}
SDropIndexStmt* pStmt = nodesMakeNode(QUERY_NODE_DROP_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreNotExists = ignoreNotExists;
strncpy(pStmt->indexName, pIndexName->z, pIndexName->n);
strncpy(pStmt->tableName, pTableName->z, pTableName->n);
return (SNode*)pStmt;
......
......@@ -383,7 +383,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pVal->datum.p = calloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE);
pVal->datum.p = calloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -1406,34 +1406,110 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SVCreateTSmaReq createSmaReq = {0};
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, char* pTableName, int32_t* pVgId) {
SVgroupInfo vg = {0};
int32_t code = getTableHashVgroup(pCxt, pCxt->pParseCxt->db, pTableName, &vg);
if (TSDB_CODE_SUCCESS == code) {
*pVgId = vg.vgId;
}
return code;
}
if (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval) ||
(NULL != pStmt->pOptions->pOffset && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pOffset)) ||
(NULL != pStmt->pOptions->pSliding && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding))) {
return pCxt->errCode;
static int32_t getSmaIndexSql(STranslateContext* pCxt, char** pSql, int32_t* pLen) {
*pSql = strdup(pCxt->pParseCxt->pSql);
if (NULL == *pSql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*pLen = pCxt->pParseCxt->sqlLen + 1;
return TSDB_CODE_SUCCESS;
}
createSmaReq.tSma.intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
createSmaReq.tSma.slidingUnit = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : 0);
strcpy(createSmaReq.tSma.indexName, pStmt->indexName);
static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pExpr, int32_t* pLen) {
return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen);
}
SName name;
name.type = TSDB_TABLE_NAME_T;
name.acctId = pCxt->pParseCxt->acctId;
static int32_t getSmaIndexBuildAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pSelect) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
if (NULL == pTable) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pTable->table.dbName, pCxt->pParseCxt->db);
strcpy(pTable->table.tableName, pStmt->tableName);
pSelect->pFromTable = (SNode*)pTable;
pSelect->pProjectionList = nodesCloneList(pStmt->pOptions->pFuncs);
if (NULL == pTable) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
SIntervalWindowNode* pInterval = nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
if (NULL == pInterval) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSelect->pWindow = (SNode*)pInterval;
pInterval->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
pInterval->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
pInterval->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
if (NULL == pInterval->pInterval || (NULL != pStmt->pOptions->pOffset && NULL == pInterval->pOffset) ||
(NULL != pStmt->pOptions->pSliding && NULL == pInterval->pSliding)) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pSelect, false, pAst, pLen);
}
nodesDestroyNode(pSelect);
return code;
}
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pCxt->pParseCxt->db);
strcpy(name.tname, pStmt->indexName);
tNameExtractFullName(&name, pReq->name);
strcpy(name.tname, pStmt->tableName);
STableMeta* pMeta = NULL;
int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &name, &pMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
name.tname[strlen(pStmt->tableName)] = '\0';
tNameExtractFullName(&name, pReq->stb);
pReq->igExists = pStmt->ignoreExists;
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
pReq->offset = (NULL != pStmt->pOptions->pOffset ? ((SValueNode*)pStmt->pOptions->pOffset)->datum.i : 0);
pReq->sliding = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pReq->interval);
pReq->slidingUnit = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit);
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->tableName, &pReq->dstVgId);
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexBuildAst(pCxt, pStmt, &pReq->ast, &pReq->astLen);
}
createSmaReq.tSma.tableUid = pMeta->uid;
createSmaReq.tSma.interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
createSmaReq.tSma.sliding = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : 0);
code = nodesListToString(pStmt->pOptions->pFuncs, false, &createSmaReq.tSma.expr, &createSmaReq.tSma.exprLen);
return code;
}
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
if (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval) ||
(NULL != pStmt->pOptions->pOffset && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pOffset)) ||
(NULL != pStmt->pOptions->pSliding && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding))) {
return pCxt->errCode;
}
SMCreateSmaReq createSmaReq = {0};
int32_t code = buildCreateSmaReq(pCxt, pStmt, &createSmaReq);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -1444,14 +1520,13 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_VND_CREATE_SMA;
pCxt->pCmdMsg->msgLen = tSerializeSVCreateTSmaReq(NULL, &createSmaReq);
pCxt->pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, &createSmaReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* pBuf = pCxt->pCmdMsg->pMsg;
tSerializeSVCreateTSmaReq(&pBuf, &createSmaReq);
tdDestroyTSma(&createSmaReq.tSma);
tSerializeSMCreateSmaReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &createSmaReq);
tFreeSMCreateSmaReq(&createSmaReq);
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册