提交 eb712702 编写于 作者: H Haojun Liao

[td-11818]fix bug in select * from super_table.

上级 9ef605fa
...@@ -372,11 +372,14 @@ typedef struct STaskParam { ...@@ -372,11 +372,14 @@ typedef struct STaskParam {
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray *pSources; SArray *pSources;
uint64_t bytes; // total load bytes from remote
tsem_t ready; tsem_t ready;
void *pTransporter; void *pTransporter;
SRetrieveTableRsp *pRsp; SRetrieveTableRsp *pRsp;
SSDataBlock *pResult; SSDataBlock *pResult;
int32_t current;
uint64_t rowsOfCurrentSource;
uint64_t bytes; // total load bytes from remote
uint64_t totalRows;
} SExchangeInfo; } SExchangeInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
......
...@@ -5144,32 +5144,41 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { ...@@ -5144,32 +5144,41 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
*newgroup = false; *newgroup = false;
if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) {
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if (pExchangeInfo->current >= totalSources) {
return NULL; return NULL;
} }
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); SResFetchReq* pMsg = NULL;
SMsgSendInfo* pMsgSendInfo = NULL;
while(1) {
pMsg = calloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) { // todo handle malloc error if (NULL == pMsg) { // todo handle malloc error
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _error; goto _error;
} }
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
SEpSet epSet = {0}; SEpSet epSet = {0};
epSet.numOfEps = pSource->addr.numOfEps; epSet.numOfEps = pSource->addr.numOfEps;
epSet.port[0] = pSource->addr.epAddr[0].port; epSet.port[0] = pSource->addr.epAddr[0].port;
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
qDebug("QID:0x%" PRIx64 " build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId); pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId); pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId); pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques // send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) { if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); qError("QID:0x%" PRIx64 " prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _error; goto _error;
} }
...@@ -5184,21 +5193,32 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { ...@@ -5184,21 +5193,32 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (pExchangeInfo->pRsp->numOfRows == 0) { SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp;
if (pRsp->numOfRows == 0) {
if (pExchangeInfo->current >= taosArrayGetSize(pExchangeInfo->pSources)) {
return NULL; return NULL;
} }
qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows);
pExchangeInfo->rowsOfCurrentSource = 0;
pExchangeInfo->current += 1;
continue;
}
SSDataBlock* pRes = pExchangeInfo->pResult; SSDataBlock* pRes = pExchangeInfo->pResult;
char* pData = pExchangeInfo->pRsp->data; char* pData = pRsp->data;
for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows);
if (tmp == NULL) { if (tmp == NULL) {
goto _error; goto _error;
} }
size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; size_t len = pRsp->numOfRows * pColInfoData->info.bytes;
memcpy(tmp, pData, len); memcpy(tmp, pData, len);
pColInfoData->pData = tmp; pColInfoData->pData = tmp;
...@@ -5206,9 +5226,27 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { ...@@ -5206,9 +5226,27 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
} }
pRes->info.numOfCols = pOperator->numOfOutput; pRes->info.numOfCols = pOperator->numOfOutput;
pRes->info.rows = pExchangeInfo->pRsp->numOfRows; pRes->info.rows = pRsp->numOfRows;
pExchangeInfo->totalRows += pRsp->numOfRows;
pExchangeInfo->bytes += pRsp->compLen;
pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows;
if (pRsp->completed == 1) {
qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes,
pExchangeInfo->current + 1, totalSources);
pExchangeInfo->rowsOfCurrentSource = 0;
pExchangeInfo->current += 1;
} else {
qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes);
}
return pExchangeInfo->pResult; return pExchangeInfo->pResult;
}
_error: _error:
tfree(pMsg); tfree(pMsg);
...@@ -7719,7 +7757,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask ...@@ -7719,7 +7757,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (pPhyNode->info.type == OP_StreamScan) { } else if (pPhyNode->info.type == OP_StreamScan) {
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo); return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册