提交 e7c8ada2 编写于 作者: X Xiaoyu Wang

fix: insert with auto create table

上级 f8d51bc9
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include "ttime.h" #include "ttime.h"
#include "ttypes.h" #include "ttypes.h"
// clang-format off
#define NEXT_TOKEN(pSql, sToken) \ #define NEXT_TOKEN(pSql, sToken) \
do { \ do { \
int32_t index = 0; \ int32_t index = 0; \
...@@ -248,12 +247,11 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool ...@@ -248,12 +247,11 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
} else { } else {
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
&pCxt->pTableMeta)); &pCxt->pTableMeta));
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
} }
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -828,12 +826,21 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { ...@@ -828,12 +826,21 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, STableMeta* pMeta) { static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
int32_t len, STableMeta* pMeta) {
SVgroupInfo vg;
SParseContext* pBasicCtx = pCxt->pComCxt;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = tGenIdPI64();
pMeta->vgId = vg.vgId;
STableMeta* pBackup = NULL; STableMeta* pBackup = NULL;
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) { if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pBackup->uid = tGenIdPI64();
return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES); return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
} }
...@@ -856,7 +863,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) ...@@ -856,7 +863,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
} }
CHECK_CODE(storeTableMeta(pCxt->pSubTableHashObj, tbFName, len, pCxt->pTableMeta)); CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, &name, tbFName, len, pCxt->pTableMeta));
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
...@@ -1261,9 +1268,10 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash ...@@ -1261,9 +1268,10 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen){ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, SName* pName, TAOS_MULTI_BIND* bind,
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; char* msgBuf, int32_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) { if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
...@@ -1311,11 +1319,10 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *p ...@@ -1311,11 +1319,10 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder; SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder}; SMemParam param = {.rb = pBuilder};
...@@ -1390,10 +1397,11 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in ...@@ -1390,10 +1397,11 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum) { int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; int32_t rowNum) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
int32_t extendedRowSize = getExtendedRowSize(pDataBlock); SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder; SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder}; SMemParam param = {.rb = pBuilder};
...@@ -1458,7 +1466,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBu ...@@ -1458,7 +1466,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBu
} }
#ifdef TD_DEBUG_PRINT_ROW #ifdef TD_DEBUG_PRINT_ROW
if(rowEnd) { if (rowEnd) {
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols); STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
tdSRowPrint(row, pSTSchema, __func__); tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema); taosMemoryFree(pSTSchema);
......
...@@ -498,14 +498,9 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p ...@@ -498,14 +498,9 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
} }
int32_t len = pBlocks->numOfRows *
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = int32_t finalLen =
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload); trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册