From 99d80ddc900b7cae31566782e281aa73bc838693 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 7 Jul 2022 11:38:58 +0800 Subject: [PATCH] feat: insert from query --- source/libs/executor/src/dataInserter.c | 26 ++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 0616f4b12f..edc6143b93 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -38,6 +38,7 @@ typedef struct SDataInserterHandle { SSubmitRes submitRes; SInserterParam* pParam; SArray* pDataBlocks; + SHashObj* pCols; int32_t status; bool queryEnd; uint64_t useconds; @@ -124,6 +125,7 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { int64_t uid = pInserter->pNode->tableId; int64_t suid = pInserter->pNode->stableId; int32_t vgId = pInserter->pNode->vgId; + bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols); SSubmitReq* ret = NULL; int32_t sz = taosArrayGetSize(pBlocks); @@ -144,7 +146,7 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { // TODO ret = rpcMallocCont(cap); ret->header.vgId = vgId; - ret->version = htonl(1); + ret->version = htonl(pTSchema->version); ret->length = sizeof(SSubmitReq); ret->numOfBlocks = htonl(sz); @@ -170,9 +172,20 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { for (int32_t k = 0; k < pTSchema->numOfCols; k++) { const STColumn* pColumn = &pTSchema->columns[k]; - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + SColumnInfoData* pColData = NULL; + int16_t colIdx = k; + if (!fullCol) { + int16_t *slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId)); + if (NULL == slotId) { + continue; + } + + colIdx = *slotId; + } + + pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx); if (colDataIsNull_s(pColData, j)) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); } else { void* data = colDataGetData(pColData, j); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); @@ -285,6 +298,13 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat return TSDB_CODE_QRY_OUT_OF_MEMORY; } + inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); + SNode* pNode = NULL; + FOREACH(pNode, pInserterNode->pCols) { + SColumnNode* pCol = (SColumnNode*)pNode; + taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)); + } + tsem_init(&inserter->ready, 0, 0); *pHandle = inserter; -- GitLab