From 160fd22802fd5a39d329083dc0e25c8026bcac14 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 8 Feb 2020 14:09:10 +0800 Subject: [PATCH] remove redundant codes --- src/client/src/tscServer.c | 2 +- src/system/detail/src/vnodeQueryImpl.c | 157 +------------------------ src/util/src/tresultBuf.c | 7 +- 3 files changed, 13 insertions(+), 153 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 35a0358f2f..ea064093cd 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1181,7 +1181,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); - if (num > tsMaxNumOfOrderedResults) { + if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64, pPObj, pSql, tsMaxNumOfOrderedResults, num); tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index cec3949eb8..07799775bd 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -4440,15 +4440,6 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } } -// if (FD_VALID(pSupporter->meterOutputFd)) { -// assert(pSupporter->meterOutputMMapBuf != NULL); -// dTrace("QInfo:%p disk-based output buffer during query:%" PRId64 " bytes", pQInfo, pSupporter->bufSize); -// munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); -// tclose(pSupporter->meterOutputFd); - -// unlink(pSupporter->extBufFile); -// } - tSidSetDestroy(&pSupporter->pSidSet); if (pSupporter->pMeterDataInfo != NULL) { @@ -4501,12 +4492,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->lastKey = pQuery->skey; // create runtime environment -// SSchema *pColumnModel = NULL; - tTagSchema *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; -// if (pTagSchemaInfo != NULL) { -// pColumnModel = pTagSchemaInfo->pSchema; -// } // get one queried meter SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid); @@ -4543,23 +4529,6 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } if (pQuery->nAggTimeInterval != 0) { -// getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile); -// pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666); - -// if (!FD_VALID(pSupporter->meterOutputFd)) { -// dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); -// return TSDB_CODE_SERV_OUT_OF_MEMORY; -// } - -// pSupporter->numOfPages = pSupporter->numOfMeters; - -// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); -// if (ret != TSDB_CODE_SUCCESS) { -// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, -// strerror(errno)); -// return TSDB_CODE_SERV_NO_DISKSPACE; -// } -// // one page for each table at least ret = createResultBuf(&pSupporter->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); if (ret != TSDB_CODE_SUCCESS) { @@ -4567,15 +4536,6 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; -// pSupporter->lastPageId = -1; -// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; - -// pSupporter->meterOutputMMapBuf = -// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); -// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { -// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); -// return TSDB_CODE_SERV_OUT_OF_MEMORY; -// } } // metric query do not invoke interpolation, it will be done at the second-stage merge @@ -5377,12 +5337,8 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; - SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset); + SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset + (pSupporter->subgroupIdx - 1)* 10000); -// char * pStart = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * (pSupporter->lastPageId + 1) + -// pSupporter->groupResultSize * pSupporter->offset; -// uint64_t numOfElem = ((tFilePage *)pStart)->numOfElems; -// assert(numOfElem <= pQuery->pointsToRead); int32_t total = 0; for(int32_t i = 0; i < list.size; ++i) { tFilePage* pData = getResultBufferPageById(pResultBuf, list.pData[i]); @@ -5397,15 +5353,12 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; + char* pDest = pQuery->sdata[i]->data; - memcpy(pQuery->sdata[i]->data + pRuntimeEnv->offset[i] * total + offset * bytes, - pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, - bytes * pData->numOfElems); -// pStart += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage); + memcpy(pDest + offset*bytes, pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, bytes * pData->numOfElems); } offset += pData->numOfElems; -// pQuery->sdata[0]->len += pData->numOfElems; } assert(pQuery->pointsRead == 0); @@ -5466,7 +5419,6 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery tFilePage *pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx); int64_t ts = getCurrentTimestamp(&cs, pos); - printf("++++++++++++++++++++++%d, %d, %lld\n", position->pageIdx, pos, ts); if (ts == lastTimestamp) {// merge with the last one doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true); } else { @@ -5551,64 +5503,8 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery return pSupporter->numOfGroupResultPages; } -//static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { -// assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); -// -// SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); -// -// int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); -// pSupporter->numOfPages = numOfPages; -// -// /* -// * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may -// * be insufficient -// */ -// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); -// if (ret != 0) { -// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, -// strerror(errno)); -// pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; -// pQInfo->killed = 1; -// -// return pQInfo->code; -// } -// -// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; -// pSupporter->meterOutputMMapBuf = -// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); -// -// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { -// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); -// pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; -// pQInfo->killed = 1; -// -// return pQInfo->code; -// } -// -// return TSDB_CODE_SUCCESS; -//} - int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { - printf("group===============%d\n", pSupporter->numOfGroupResultPages); -// int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; -// int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + -// pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); -// -// int32_t requiredPages = pSupporter->numOfPages; -// if (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) { -// while (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) { -// requiredPages += pSupporter->numOfMeters; -// } -// -// if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) { -// return -1; -// } -// } - -// char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages + -// pSupporter->groupResultSize * pSupporter->numOfGroupResultPages; - SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage))/ pQuery->rowSize; @@ -5625,27 +5521,21 @@ int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQue r = capacity; } - tFilePage* buf = getNewDataBuf(pResultBuf, base + pSupporter->numOfGroupResultPages, &pageId); + tFilePage* buf = getNewDataBuf(pResultBuf, base + pSupporter->subgroupIdx*10000 + pSupporter->numOfGroupResultPages, &pageId); //pagewise copy to dest buffer for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; buf->numOfElems = r; - memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char*)pQuery->sdata[i]->data) + offset * bytes, buf->numOfElems * bytes); + memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char*)pQuery->sdata[i]->data) + offset * bytes, + buf->numOfElems * bytes); } offset += r; remain -= r; } -// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { -// int32_t size = pRuntimeEnv->pCtx[i].outputBytes * pQuery->sdata[0]->len + sizeof(tFilePage); -// memcpy(lastPosition, pQuery->sdata[i], size); -// -// lastPosition += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage); -// } - pSupporter->numOfGroupResultPages += 1; return TSDB_CODE_SUCCESS; } @@ -6451,35 +6341,6 @@ void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQuery } } -//static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { -// if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { -// if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { -// return NULL; -// } -// } -// -// *pageId = (++pSupporter->lastPageId); -// return getFilePage(pSupporter, *pageId); -//} - -//tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, -// SMeterQuerySupportObj *pSupporter) { -// uint32_t pageId = 0; -// -// tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); -// if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results -// return NULL; -// } -// -// if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) { -// pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1; -// pMeterQueryInfo->pageList = realloc(pMeterQueryInfo->pageList, sizeof(uint32_t) * pMeterQueryInfo->numOfAlloc); -// } -// -// pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages++] = pageId; -// return pPage; -//} - void saveIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -7006,20 +6867,15 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete tFilePage * pData = NULL; SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; -// SQuery *pQuery = pRuntimeEnv->pQuery; // in the first scan, new space needed for results SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); int32_t pageId = -1; if (list.size == 0) { -// pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId); } else { -// int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; pData = getResultBufferPageById(pResultBuf, getLastPageId(&list)); -// pData = getFilePage(pSupporter, lastPageId); - printf("==============%d\n", pData->numOfElems); if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId); if (pData != NULL) { @@ -7637,7 +7493,6 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue // in handling records occuring around '1970-01-01', the aligned start timestamp may be 0. TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0); - printf("-----------------------%d\n", pData->numOfElems); SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery), diff --git a/src/util/src/tresultBuf.c b/src/util/src/tresultBuf.c index 57aca682fc..036012f0b6 100644 --- a/src/util/src/tresultBuf.c +++ b/src/util/src/tresultBuf.c @@ -173,7 +173,12 @@ tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* *pageId = (pResultBuf->allocateId++); registerPageId(pResultBuf, groupId, *pageId); - return getResultBufferPageById(pResultBuf, *pageId); + tFilePage* page = getResultBufferPageById(pResultBuf, *pageId); + + // clear memory for the new page + memset(page, 0, DEFAULT_INTERN_BUF_SIZE); + + return page; } int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } -- GitLab