提交 0aadc6c9 编写于 作者: H Haojun Liao

[td-225] refactor.

上级 9277029d
...@@ -658,9 +658,10 @@ typedef struct SJoinStatus { ...@@ -658,9 +658,10 @@ typedef struct SJoinStatus {
} SJoinStatus; } SJoinStatus;
typedef struct SJoinOperatorInfo { typedef struct SJoinOperatorInfo {
SSDataBlock *pRes; SSDataBlock *pRes;
SJoinStatus *status; SJoinStatus *status;
int32_t numOfUpstream; int32_t numOfUpstream;
SRspResultInfo resultInfo; // todo refactor, add this info for each operator
} SJoinOperatorInfo; } SJoinOperatorInfo;
SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
...@@ -796,37 +797,41 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t ...@@ -796,37 +797,41 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) {
SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo));
pInfo->numOfUpstream = numOfUpstream; pInfo->numOfUpstream = numOfUpstream;
pInfo->status = calloc(numOfUpstream, sizeof(SJoinStatus)); pInfo->status = calloc(numOfUpstream, sizeof(SJoinStatus));
SRspResultInfo* pResInfo = &pInfo->resultInfo;
pResInfo->capacity = 4096;
pResInfo->threshold = 4096 * 0.8;
pInfo->pRes = calloc(1, sizeof(SSDataBlock)); pInfo->pRes = calloc(1, sizeof(SSDataBlock));
pInfo->pRes->info.numOfCols = numOfOutput; pInfo->pRes->info.numOfCols = numOfOutput;
pInfo->pRes->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); pInfo->pRes->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData colData = {{0}}; SColumnInfoData colData = {{0}};
colData.info.bytes = pSchema[i].bytes; colData.info.bytes = pSchema[i].bytes;
colData.info.type = pSchema[i].type; colData.info.type = pSchema[i].type;
colData.info.colId = pSchema[i].colId; colData.info.colId = pSchema[i].colId;
colData.pData = calloc(1, colData.info.bytes * 4096); colData.pData = calloc(1, colData.info.bytes * pResInfo->capacity);
taosArrayPush(pInfo->pRes->pDataBlock, &colData); taosArrayPush(pInfo->pRes->pDataBlock, &colData);
} }
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "JoinOperator"; pOperator->name = "JoinOperator";
pOptr->operatorType = OP_Join; pOperator->operatorType = OP_Join;
pOptr->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOptr->blockingOptr = false; pOperator->blockingOptr = false;
pOptr->info = pInfo; pOperator->info = pInfo;
pOptr->exec = doBlockJoin; pOperator->exec = doBlockJoin;
pOptr->cleanup = destroyDummyInputOperator; pOperator->cleanup = destroyDummyInputOperator;
for(int32_t i = 0; i < numOfUpstream; ++i) { for(int32_t i = 0; i < numOfUpstream; ++i) {
appendUpstream(pOptr, pUpstream[i]); appendUpstream(pOperator, pUpstream[i]);
} }
return pOptr; return pOperator;
} }
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册