提交 62856b04 编写于 作者: X Xiaoyu Wang

insert using implement

上级 3b9e868e
...@@ -77,7 +77,6 @@ typedef struct STableDataBlocks { ...@@ -77,7 +77,6 @@ typedef struct STableDataBlocks {
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData; char *pData;
bool cloned; bool cloned;
STagData tagData;
SParsedDataColInfo boundColumnInfo; SParsedDataColInfo boundColumnInfo;
SRowBuilder rowBuilder; SRowBuilder rowBuilder;
......
...@@ -550,11 +550,12 @@ column_reference(A) ::= column_name(B). ...@@ -550,11 +550,12 @@ column_reference(A) ::= column_name(B).
column_reference(A) ::= table_name(B) NK_DOT column_name(C). { A = createRawExprNodeExt(pCxt, &B, &C, createColumnNode(pCxt, &B, &C)); } column_reference(A) ::= table_name(B) NK_DOT column_name(C). { A = createRawExprNodeExt(pCxt, &B, &C, createColumnNode(pCxt, &B, &C)); }
//pseudo_column(A) ::= NK_NOW. { A = createFunctionNode(pCxt, NULL, NULL); } //pseudo_column(A) ::= NK_NOW. { A = createFunctionNode(pCxt, NULL, NULL); }
pseudo_column(A) ::= NK_UNDERLINE(B) ROWTS(C). { //pseudo_column(A) ::= NK_UNDERLINE(B) ROWTS(C). {
SToken t = B; // SToken t = B;
t.n = (C.z + C.n) - B.z; // t.n = (C.z + C.n) - B.z;
A = createRawExprNode(pCxt, &t, createFunctionNode(pCxt, &t, NULL)); // A = createRawExprNode(pCxt, &t, createFunctionNode(pCxt, &t, NULL));
} // }
pseudo_column(A) ::= ROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= TBNAME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= TBNAME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= NK_UNDERLINE(B) QSTARTTS(C). { pseudo_column(A) ::= NK_UNDERLINE(B) QSTARTTS(C). {
SToken t = B; SToken t = B;
......
...@@ -55,8 +55,10 @@ typedef struct SInsertParseContext { ...@@ -55,8 +55,10 @@ typedef struct SInsertParseContext {
STableMeta* pTableMeta; // each table STableMeta* pTableMeta; // each table
SParsedDataColInfo tags; // each table SParsedDataColInfo tags; // each table
SKVRowBuilder tagsBuilder; // each table SKVRowBuilder tagsBuilder; // each table
SVCreateTbReq createTblReq; // each table
SHashObj* pVgroupsHashObj; // global SHashObj* pVgroupsHashObj; // global
SHashObj* pTableBlockHashObj; // global SHashObj* pTableBlockHashObj; // global
SHashObj* pSubTableHashObj; // global
SArray* pTableDataBlocks; // global SArray* pTableDataBlocks; // global
SArray* pVgDataBlocks; // global SArray* pVgDataBlocks; // global
int32_t totalNum; int32_t totalNum;
...@@ -738,8 +740,20 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi ...@@ -738,8 +740,20 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, SKVRow row) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName);
pCxt->createTblReq.type = TD_CHILD_TABLE;
pCxt->createTblReq.dbFName = strdup(dbFName);
pCxt->createTblReq.name = strdup(pName->tname);
pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid;
pCxt->createTblReq.ctbCfg.pTag = row;
return TSDB_CODE_SUCCESS;
}
// pSql -> tag1_value, ...) // pSql -> tag1_value, ...)
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision) { static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision, const SName* pName) {
if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) { if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -760,23 +774,46 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, ...@@ -760,23 +774,46 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
// todo construct payload return buildCreateTbReq(pCxt, pName, row);
}
taosMemoryFreeClear(row); static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
*pDst = taosMemoryMalloc(TABLE_META_SIZE(pSrc));
if (NULL == *pDst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(*pDst, pSrc, TABLE_META_SIZE(pSrc));
return TSDB_CODE_SUCCESS;
}
return 0; static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, STableMeta* pMeta) {
STableMeta* pBackup = NULL;
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
} }
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) { static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
SToken sToken; SName name;
createSName(&name, pTbnameToken, pCxt->pComCxt, &pCxt->msg);
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&name, tbFName);
int32_t len = strlen(tbFName);
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
if (NULL != pMeta) {
return cloneTableMeta(*pMeta, &pCxt->pTableMeta);
}
SToken sToken;
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(getTableMeta(pCxt, &sToken)); CHECK_CODE(getTableMeta(pCxt, &sToken));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
} }
CHECK_CODE(storeTableMeta(pCxt->pSubTableHashObj, tbFName, len, pCxt->pTableMeta));
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
...@@ -796,7 +833,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) ...@@ -796,7 +833,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if (TK_NK_LP != sToken.type) { if (TK_NK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
} }
CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision)); CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, &name));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -904,10 +941,17 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da ...@@ -904,10 +941,17 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
taosMemoryFreeClear(pReq->dbFName);
taosMemoryFreeClear(pReq->name);
taosMemoryFreeClear(pReq->ctbCfg.pTag);
}
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
taosMemoryFreeClear(pCxt->pTableMeta); taosMemoryFreeClear(pCxt->pTableMeta);
destroyBoundColumnInfo(&pCxt->tags); destroyBoundColumnInfo(&pCxt->tags);
tdDestroyKVRowBuilder(&pCxt->tagsBuilder); tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
destroyCreateSubTbReq(&pCxt->createTblReq);
} }
static void destroyDataBlock(STableDataBlocks* pDataBlock) { static void destroyDataBlock(STableDataBlocks* pDataBlock) {
...@@ -972,7 +1016,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -972,7 +1016,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL)); sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
if (TK_NK_LP == sToken.type) { if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...) // pSql -> field1_name, ...)
CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta))); CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
...@@ -1021,11 +1065,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1021,11 +1065,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pTableMeta = NULL, .pTableMeta = NULL,
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
.totalNum = 0, .totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT) .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)
}; };
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
NULL == context.pSubTableHashObj || NULL == context.pOutput) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
......
...@@ -138,7 +138,7 @@ static SKeyword keywordTable[] = { ...@@ -138,7 +138,7 @@ static SKeyword keywordTable[] = {
{"RESET", TK_RESET}, {"RESET", TK_RESET},
{"RETENTIONS", TK_RETENTIONS}, {"RETENTIONS", TK_RETENTIONS},
{"ROLLUP", TK_ROLLUP}, {"ROLLUP", TK_ROLLUP},
{"ROWTS", TK_ROWTS}, {"_ROWTS", TK_ROWTS},
{"SCORES", TK_SCORES}, {"SCORES", TK_SCORES},
{"SELECT", TK_SELECT}, {"SELECT", TK_SELECT},
{"SESSION", TK_SESSION}, {"SESSION", TK_SESSION},
...@@ -440,10 +440,10 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) { ...@@ -440,10 +440,10 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
*tokenId = TK_NK_QUESTION; *tokenId = TK_NK_QUESTION;
return 1; return 1;
} }
case '_': { // case '_': {
*tokenId = TK_NK_UNDERLINE; // *tokenId = TK_NK_UNDERLINE;
return 1; // return 1;
} // }
case '`': case '`':
case '\'': case '\'':
case '"': { case '"': {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册