提交 0f923bc8 编写于 作者: D dapan1121

feat: insert from query

上级 750903f5
......@@ -154,22 +154,24 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);
blkHead->uid = htobe64(uid);
blkHead->schemaLen = htonl(0);
int32_t rows = pDataBlock->info.rows;
int32_t rows = 0;
int32_t dataLen = 0;
STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
for (int32_t j = 0; j < rows; j++) {
int64_t lastTs = TSKEY_MIN;
bool ignoreRow = false;
for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
ignoreRow = false;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = NULL;
......@@ -191,18 +193,38 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
}
if (colDataIsNull_s(pColData, j)) {
if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
ignoreRow = true;
break;
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
if (*(int64_t*)data == lastTs) {
ignoreRow = true;
break;
} else {
lastTs = *(int64_t*)data;
}
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
}
}
if (ignoreRow) {
continue;
}
rows++;
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
blkHead->dataLen = htonl(dataLen);
blkHead->numOfRows = htons(rows);
ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
......
......@@ -2463,6 +2463,8 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
......@@ -3504,7 +3506,6 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
taosMemoryFreeClear(pOperator->info);
taosMemoryFreeClear(pOperator);
}
......@@ -3674,11 +3675,15 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
cleanupBasicInfo(pInfo);
taosMemoryFreeClear(param);
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(param);
}
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -3686,6 +3691,8 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(pInfo->p);
taosMemoryFreeClear(param);
}
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -3696,6 +3703,8 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo);
taosMemoryFreeClear(param);
}
void cleanupExprSupp(SExprSupp* pSupp) {
......@@ -3712,6 +3721,8 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -3729,6 +3740,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
}
tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(param);
}
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
......
......@@ -38,6 +38,8 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
}
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
......@@ -724,6 +726,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree(pInfo->columnOffset);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
......@@ -806,4 +810,4 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
}
......@@ -104,6 +104,8 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosMemoryFreeClear(param);
}
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
......
......@@ -592,6 +592,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
}
taosMemoryFreeClear(param);
}
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
......@@ -740,6 +742,8 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
taosMemoryFreeClear(param);
}
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
......@@ -1488,6 +1492,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->scanCols);
taosMemoryFreeClear(pInfo->pUser);
taosMemoryFreeClear(param);
}
static int32_t getSysTableDbNameColId(const char* pTable) {
......@@ -2166,6 +2172,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(param);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
......@@ -2656,6 +2664,8 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo);
taosMemoryFreeClear(param);
}
typedef struct STableMergeScanExecInfo {
......@@ -2787,6 +2797,8 @@ static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
blockDataDestroy(pInfo->pRes);
tsdbLastrowReaderClose(pInfo->pLastrowReader);
taosMemoryFreeClear(param);
}
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,
......
......@@ -235,6 +235,8 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
}
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
......@@ -451,6 +453,8 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
......@@ -670,6 +674,8 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
}
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
......
......@@ -1557,6 +1557,8 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
taosMemoryFreeClear(param);
}
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -1564,6 +1566,8 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pRecycledPages);
taosMemoryFreeClear(param);
}
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -1586,6 +1590,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
}
}
nodesDestroyNode((SNode*)pInfo->pPhyNode);
taosMemoryFreeClear(param);
}
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
......@@ -2319,6 +2325,8 @@ _error:
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -2995,6 +3003,8 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChInfo);
}
}
taosMemoryFreeClear(param);
}
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -3954,6 +3964,8 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChInfo);
}
}
taosMemoryFreeClear(param);
}
int64_t getStateWinTsKey(void* data, int32_t index) {
......@@ -4368,6 +4380,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
taosMemoryFreeClear(param);
}
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
......@@ -4602,6 +4616,8 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
taosMemoryFreeClear(param);
}
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册