未验证 提交 2c182e80 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11208 from taosdata/feature/3.0_wxy

insert using implement
......@@ -277,7 +277,6 @@ typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
......
......@@ -307,7 +307,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_DAYS_PER_FILE (1 * 1440) // unit minute
#define TSDB_MIN_DAYS_PER_FILE 60 // unit minute
#define TSDB_MAX_DAYS_PER_FILE (3650 * 1440)
#define TSDB_DEFAULT_DAYS_PER_FILE (10 * 1440)
......
......@@ -16,7 +16,16 @@
#include "command.h"
#include "tdatablock.h"
// #define SET_VARSTR(pData, val, pOffset)
static int32_t getSchemaBytes(const SSchema* pSchema) {
switch (pSchema->type) {
case TSDB_DATA_TYPE_BINARY:
return (pSchema->bytes - VARSTR_HEADER_SIZE);
case TSDB_DATA_TYPE_NCHAR:
return (pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
default:
return pSchema->bytes;
}
}
static void buildRspData(const STableMeta* pMeta, char* pData) {
int32_t* pColSizes = (int32_t*)pData;
......@@ -50,7 +59,7 @@ static void buildRspData(const STableMeta* pMeta, char* pData) {
// Length
pData += BitmapLen(numOfRows);
for (int32_t i = 0; i < numOfRows; ++i) {
*(int32_t*)pData = pMeta->schema[i].bytes;
*(int32_t*)pData = getSchemaBytes(pMeta->schema + i);
pData += sizeof(int32_t);
}
pColSizes[2] = sizeof(int32_t) * numOfRows;
......
......@@ -77,7 +77,7 @@ typedef struct STableDataBlocks {
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData;
bool cloned;
int32_t createTbReqLen;
SParsedDataColInfo boundColumnInfo;
SRowBuilder rowBuilder;
} STableDataBlocks;
......@@ -118,6 +118,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? dataBuf->pTableMeta->uid : dataBuf->pTableMeta->suid);
pBlocks->uid = dataBuf->pTableMeta->uid;
pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -136,7 +137,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks);
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
#endif // TDENGINE_DATABLOCKMGT_H
......@@ -753,7 +753,7 @@ static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, S
}
// pSql -> tag1_value, ...)
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision, const SName* pName) {
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const SName* pName) {
if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -763,9 +763,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
SSchema* pSchema = &pTagsSchema[pCxt->tags.boundColumns[i]];
param.schema = pSchema;
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
param.schema = pTagSchema;
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
}
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
......@@ -791,6 +791,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBackup->uid = tGenIdPI64();
return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
}
......@@ -833,7 +834,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if (TK_NK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, &name));
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, &name));
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
}
return TSDB_CODE_SUCCESS;
}
......@@ -1015,7 +1020,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
STableDataBlocks *dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq));
if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...)
......@@ -1046,7 +1051,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
}
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
}
return buildOutput(pCxt);
}
......
......@@ -149,8 +149,28 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
if (pBlocks->nAllocSize - pBlocks->size < len) {
pBlocks->nAllocSize += len + pBlocks->rowSize;
char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
if (pTmp != NULL) {
pBlocks->pData = pTmp;
memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size);
} else {
pBlocks->nAllocSize -= len + pBlocks->rowSize;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
char* pBuf = pBlocks->pData + pBlocks->size;
tSerializeSVCreateTbReq((void**)&pBuf, pCreateTbReq);
pBlocks->size += len;
pBlocks->createTbReqLen = len;
return TSDB_CODE_SUCCESS;
}
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) {
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) {
......@@ -163,6 +183,13 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
return ret;
}
if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctbCfg.pTag) {
ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
}
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
if (pBlockList) {
taosArrayPush(pBlockList, dataBlocks);
......@@ -294,7 +321,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
char * pBlockData = pBlocks->data;
char * pBlockData = pBlocks->data + pBlocks->schemaLen;
int n = 0;
while (n < nRows) {
pBlkKeyTuple->skey = TD_ROW_KEY((STSRow *)pBlockData);
......@@ -340,44 +367,26 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
}
// Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, int8_t schemaAttached, bool isRawPayload) {
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, bool isRawPayload) {
// TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = getTableInfo(pTableMeta);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
pDataBlock = (char*)pDataBlock + sizeof(SSubmitBlk);
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
pDataBlock = (char*)pDataBlock + nonDataLen;
int32_t flen = 0; // original total length of row
// schema needs to be included into the submit data block
if (schemaAttached) {
int32_t numOfCols = getNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = htons(pSchema[j].colId);
pCol->type = pSchema[j].type;
pCol->bytes = htons(pSchema[j].bytes);
pCol->offset = 0;
pDataBlock = (char*)pDataBlock + sizeof(STColumn);
if (isRawPayload) {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize;
} else {
if (isRawPayload) {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
}
pBlock->schemaLen = 0;
}
pBlock->schemaLen = pTableDataBlock->createTbReqLen;
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
char* p = pTableDataBlock->pData + nonDataLen;
pBlock->dataLen = 0;
int32_t numOfRows = pBlock->numOfRows;
......@@ -414,7 +423,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
return pBlock->dataLen + pBlock->schemaLen;
}
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) {
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
......@@ -429,7 +438,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
if (pBlocks->numOfRows > 0) {
STableDataBlocks* dataBuf = NULL;
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
if (ret != TSDB_CODE_SUCCESS) {
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
......@@ -474,7 +483,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
// erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload);
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
......
......@@ -62,7 +62,7 @@ protected:
void dumpReslut() {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
cout << "schemaAttache:" << (int32_t)pStmt->schemaAttache << ", payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl;
cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl;
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
......@@ -81,7 +81,6 @@ protected:
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->schemaAttache, 0);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
......@@ -168,6 +167,18 @@ TEST_F(InsertTest, multiTableMultiRowTest) {
checkReslut(2, 3, 2);
}
// INSERT INTO
// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
TEST_F(InsertTest, autoCreateTableTest) {
setDatabase("root", "test");
bind("insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
}
TEST_F(InsertTest, toleranceTest) {
setDatabase("root", "test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册