提交 0330ab12 编写于 作者: H Haojun Liao

[td-11818] Refactor.

上级 1145d1a0
......@@ -4558,6 +4558,7 @@ void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
void createResultBlock(const SArray* pExprInfo, SExchangeInfo* pInfo, const SOperatorInfo* pOperator, size_t size);
static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
#if 0
......@@ -4930,7 +4931,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree(pMsgBody);
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
assert(pMsg->ahandle != NULL);
......@@ -4958,13 +4959,14 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
*newgroup = false;
if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) {
return NULL;
}
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) { // todo handle malloc error
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _error;
}
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0);
......@@ -4983,6 +4985,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _error;
}
pMsgSendInfo->param = pExchangeInfo;
......@@ -4993,7 +4997,6 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pExchangeInfo->ready);
if (pExchangeInfo->pRsp->numOfRows == 0) {
......@@ -5007,7 +5010,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows);
if (tmp == NULL) {
// todo
goto _error;
}
size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes;
......@@ -5021,8 +5024,17 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
pRes->info.rows = pExchangeInfo->pRsp->numOfRows;
return pExchangeInfo->pResult;
_error:
tfree(pMsg);
tfree(pMsgSendInfo);
terrno = pTaskInfo->code;
return NULL;
}
static SSDataBlock* createResultDataBlock(const SArray* pExprInfo);
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -5038,21 +5050,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
assert(taosArrayGetSize(pInfo->pSources) > 0);
size_t size = taosArrayGetSize(pExprInfo);
pInfo->pResult = calloc(1, sizeof(SSDataBlock));
pInfo->pResult->pDataBlock = taosArrayInit(pOperator->numOfOutput, sizeof(SColumnInfoData));
SArray* pResult = pInfo->pResult->pDataBlock;
for(int32_t i = 0; i < size; ++i) {
SColumnInfoData colInfoData = {0};
SExprInfo* p = taosArrayGetP(pExprInfo, i);
SSchema* pSchema = &p->base.resSchema;
colInfoData.info.type = pSchema->type;
colInfoData.info.colId = pSchema->colId;
colInfoData.info.bytes = pSchema->bytes;
taosArrayPush(pResult, &colInfoData);
}
pInfo->pResult = createResultDataBlock(pExprInfo);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = OP_Exchange;
......@@ -5064,13 +5062,13 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
pOperator->exec = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo;
{
{ // todo refactor
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsgFromServer;
rpcInit.cfp = processRspMsg;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)"root";
......@@ -5088,6 +5086,31 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
return pOperator;
}
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
SSDataBlock* pResBlock = calloc(1, sizeof(SSDataBlock));
if (pResBlock == NULL) {
return NULL;
}
size_t numOfCols = taosArrayGetSize(pExprInfo);
pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
SArray* pResult = pResBlock->pDataBlock;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfoData = {0};
SExprInfo* p = taosArrayGetP(pExprInfo, i);
SSchema* pSchema = &p->base.resSchema;
colInfoData.info.type = pSchema->type;
colInfoData.info.colId = pSchema->colId;
colInfoData.info.bytes = pSchema->bytes;
taosArrayPush(pResult, &colInfoData);
}
return pResBlock;
}
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0 && numOfOutput > 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册