提交 0f5dc110 编写于 作者: X Xiaoyu Wang

TD-11819 Parsing insert statement and assembling binary objects.

上级 5e981e43
...@@ -158,9 +158,15 @@ typedef enum { ...@@ -158,9 +158,15 @@ typedef enum {
PAYLOAD_TYPE_RAW = 1, PAYLOAD_TYPE_RAW = 1,
} EPayloadType; } EPayloadType;
typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData;
} SVgDataBlocks;
typedef struct SInsertStmtInfo { typedef struct SInsertStmtInfo {
// SHashObj* pTableBlockHashList; // data block for each table SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
SArray* pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttache; // denote if submit block is built with table schema or not int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement] uint32_t insertType; // insert data from [file|sql statement| bound statement]
......
...@@ -507,22 +507,16 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t ...@@ -507,22 +507,16 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0; int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType); bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
STableDataBlocks** p = taosHashIterate(pHashObj, NULL); STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
STableDataBlocks* pOneTableBlock = *p; STableDataBlocks* pOneTableBlock = *p;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
while (pOneTableBlock) {
while(pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) { if (pBlocks->numOfRows > 0) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -532,22 +526,21 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t ...@@ -532,22 +526,21 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
return ret; return ret;
} }
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) { if (tmp != NULL) {
dataBuf->pData = tmp; dataBuf->pData = tmp;
//memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else { // failed to allocate memory, free already allocated memory and return error code } else { // failed to allocate memory, free already allocated memory and return error code
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList); destroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData); tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple); tfree(blkKeyInfo.pKeyTuple);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
} }
......
...@@ -233,7 +233,6 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) ...@@ -233,7 +233,6 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start)
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) { if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
pDataBlocks->ordered = false; pDataBlocks->ordered = false;
// tscWarn("NOT ordered input timestamp");
} }
pDataBlocks->prevTS = k; pDataBlocks->prevTS = k;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册