提交 928a9773 编写于 作者: H Haojun Liao

ehn(query): enhance the api of ssdatablock to support copy data while clone a new ssdatablock.

上级 0252fa49
......@@ -203,11 +203,10 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks);
......
......@@ -408,7 +408,11 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
}
SResultColumn *pCol = &pResultInfo->pCol[col];
return colDataIsNull_f(pCol->nullbitmap, row);
if (IS_VAR_DATA_TYPE(pResultInfo->fields[col].type)) {
return (pCol->offset[row] == -1);
} else {
return colDataIsNull_f(pCol->nullbitmap, row);
}
}
bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; }
......
......@@ -1149,10 +1149,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
return NULL;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if(pDataBlock == NULL){
return NULL;
}
int32_t numOfCols = pDataBlock->info.numOfCols;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
......@@ -1160,6 +1161,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
pBlock->info.numOfCols = numOfCols;
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rows;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
......@@ -1168,6 +1170,23 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
if (copyData) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows);
}
pBlock->info.rows = pDataBlock->info.rows;
pBlock->info.capacity = pDataBlock->info.rows;
}
return pBlock;
}
......
......@@ -468,10 +468,10 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo;
typedef struct {
char *pData;
bool isNull;
int16_t type;
int32_t bytes;
char *pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo {
......@@ -497,19 +497,19 @@ typedef struct SDataGroupInfo {
// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
SOptrBasicInfo binfo;
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
......@@ -548,21 +548,21 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
bool hasVarCol;
SOptrBasicInfo binfo;
bool hasVarCol;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf *pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf *pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SAggSupporter aggSup;
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
......@@ -590,8 +590,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
......
......@@ -52,7 +52,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SSDataBlock* p = createOneDataBlock(pDataBlock);
SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info;
taosArrayClear(p->pDataBlock);
......
......@@ -1701,9 +1701,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
}
}
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo,
SAggSupporter* pAggSup) {
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) {
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = binfo->pCtx;
......@@ -3039,7 +3038,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pBlock);
SSDataBlock* px = createOneDataBlock(pBlock, false);
blockDataEnsureCapacity(px, pBlock->info.rows);
// todo extract method
......@@ -4614,7 +4613,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
SSortedMergeOperatorInfo* pInfo = pOperator->info;
SSortHandle* pHandle = pInfo->pSortHandle;
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes);
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity);
while (1) {
......
......@@ -227,7 +227,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
......@@ -244,7 +244,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (num > 0) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
......
......@@ -590,7 +590,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
// TODO refactor
......
......@@ -99,7 +99,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pIndexMap = pIndexMap;
pSortHandle->pDataBlock = createOneDataBlock(pBlock);
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
......@@ -206,7 +206,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
}
......@@ -488,7 +488,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId);
if (code != 0) {
return code;
......@@ -531,7 +531,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock);
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册