提交 85d4ee60 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 c55aae2c
...@@ -421,19 +421,23 @@ typedef struct SSysTableScanInfo { ...@@ -421,19 +421,23 @@ typedef struct SSysTableScanInfo {
SRetrieveTableReq req; SRetrieveTableReq req;
SEpSet epSet; SEpSet epSet;
tsem_t ready; tsem_t ready;
SReadHandle readHandle;
SReadHandle readHandle; int32_t accountId;
int32_t accountId; bool showRewrite;
bool showRewrite; SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SNode* pCondition; // db_name filter condition, to discard data that are not in current database SMTbCursor* pCur; // cursor for iterate the local table meta store.
SMTbCursor* pCur; // cursor for iterate the local table meta store. SArray* scanCols; // SArray<int16_t> scan column id list
SArray* scanCols; // SArray<int16_t> scan column id list SName name;
SName name; SSDataBlock* pRes;
SSDataBlock* pRes; int64_t numOfBlocks; // extract basic running information.
int64_t numOfBlocks; // extract basic running information. SLoadRemoteDataInfo loadInfo;
SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
void* pHandle;
} SBlockDistInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info int32_t* rowCellInfoOffset; // offset value for each row result cell info
......
...@@ -650,38 +650,43 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -650,38 +650,43 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
pBlock->info.rows = 1; pBlock->info.rows = 1;
pBlock->info.numOfCols = 1;
// SBufferWriter bw = tbufInitWriter(NULL, false);
// blockDistInfoToBinary(&blockDistInfo, &bw);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
// int32_t len = (int32_t) tbufTell(&bw); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
// *(int32_t*) pColInfo->pData = len; tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); varDataSetLen(p, len);
//
// tbufCloseWriter(&bw); colDataAppend(pColInfo, 0, p, false);
taosMemoryFree(p);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return pBlock; return pBlock;
} }
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*) param;
blockDataDestroy(pDistInfo->pResBlock);
}
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pInfo->dataReader = dataReader; pInfo->pHandle = dataReader;
// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = 1024; infoData.info.bytes = 1024;
// taosArrayPush(pInfo->block.pDataBlock, &infoData);
taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData);
pOperator->name = "DataBlockInfoScanOperator"; pOperator->name = "DataBlockInfoScanOperator";
// pOperator->operatorType = OP_TableBlockInfoScan; // pOperator->operatorType = OP_TableBlockInfoScan;
...@@ -690,8 +695,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* ...@@ -690,8 +695,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, NULL, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
return pOperator; return pOperator;
_error: _error:
......
...@@ -1142,6 +1142,11 @@ static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32 ...@@ -1142,6 +1142,11 @@ static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STableBlockDistInfo);
return true;
}
// clang-format off // clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = { const SBuiltinFuncDefinition funcMgtBuiltins[] = {
...@@ -1877,7 +1882,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1877,7 +1882,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_BLOCK_DIST, .type = FUNCTION_TYPE_BLOCK_DIST,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateBlockDistFunc, .translateFunc = translateBlockDistFunc,
.getEnvFunc = NULL, .getEnvFunc = getBlockDistFuncEnv,
.processFunc = blockDistFunction, .processFunc = blockDistFunction,
.finalizeFunc = blockDistFinalize .finalizeFunc = blockDistFinalize
} }
......
...@@ -4439,8 +4439,29 @@ int32_t blockDistFunction(SqlFunctionCtx *pCtx) { ...@@ -4439,8 +4439,29 @@ int32_t blockDistFunction(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
char *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo);
memcpy(pInfo, pInputCol->pData, varDataTLen(pInputCol->pData));
STableBlockDistInfo p1 = {0};
tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1);
pDistInfo->numOfBlocks += p1.numOfBlocks;
pDistInfo->numOfTables += p1.numOfTables;
pDistInfo->numOfInmemRows += p1.numOfInmemRows;
pDistInfo->totalSize += p1.totalSize;
pDistInfo->totalRows += p1.totalRows;
pDistInfo->numOfFiles += p1.numOfFiles;
if (pDistInfo->minRows > p1.minRows) {
pDistInfo->minRows = p1.minRows;
}
if (pDistInfo->maxRows < p1.maxRows) {
pDistInfo->maxRows = p1.maxRows;
}
for(int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
}
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册