diff --git a/src/query/inc/qTsbuf.h b/src/query/inc/qTsbuf.h index 90bd64336fdeed91deb68b9b490224a7fb29bc80..5d055782c9b82a1444c97a62d429cc2ba9a53986 100644 --- a/src/query/inc/qTsbuf.h +++ b/src/query/inc/qTsbuf.h @@ -88,6 +88,7 @@ typedef struct STSBuf { STSList tsData; // uncompressed raw ts data uint64_t numOfTotal; bool autoDelete; + bool remainOpen; int32_t tsOrder; // order of timestamp in ts comp buffer STSCursor cur; } STSBuf; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 2997d56326b664488a1942df0150343bb5b0f6f9..4b4c8b1426e027b508af846fbfe3914989ce7c0d 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3836,8 +3836,10 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { STSBuf * pTSbuf = pInfo->pTSBuf; tsBufFlush(pTSbuf); - strcpy(pCtx->aOutputBuf, pTSbuf->path); + *(FILE **)pCtx->aOutputBuf = pTSbuf->f; + + pTSbuf->remainOpen = true; tsBufDestroy(pTSbuf); doFinalizer(pCtx); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9d96d31cec55281703bfbda8bceeb95de0c11dd5..fc7b3bdd6d115920a8ab693b38b6ff56af54e326 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2010,6 +2010,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL); } + static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { if (pRuntimeEnv->pQuery == NULL) { return; @@ -2021,6 +2022,16 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { qDebug("QInfo:%p teardown runtime env", pQInfo); cleanupResultRowInfo(&pRuntimeEnv->windowResInfo); + if (isTSCompQuery(pQuery)) { + FILE *f = *(FILE **)pQuery->sdata[0]->data; + + if (f) { + fclose(f); + *(FILE **)pQuery->sdata[0]->data = NULL; + } + } + + if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -6949,10 +6960,10 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { * TODO handle the case that the file is too large to send back one time */ if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { - struct stat fstat; - if (stat(pQuery->sdata[0]->data, &fstat) == 0) { - *numOfRows = fstat.st_size; - return fstat.st_size; + struct stat fStat; + if (fstat(fileno(*(FILE **)pQuery->sdata[0]->data), &fStat) == 0) { + *numOfRows = fStat.st_size; + return fStat.st_size; } else { qError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); return 0; @@ -6968,15 +6979,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // load data from file to msg buffer if (isTSCompQuery(pQuery)) { - int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); + + FILE *f = *(FILE **)pQuery->sdata[0]->data; // make sure file exist - if (FD_VALID(fd)) { - uint64_t s = lseek(fd, 0, SEEK_END); + if (f) { + off_t s = lseek(fileno(f), 0, SEEK_END); - qDebug("QInfo:%p ts comp data return, file:%s, size:%"PRId64, pQInfo, pQuery->sdata[0]->data, s); - if (lseek(fd, 0, SEEK_SET) >= 0) { - size_t sz = read(fd, data, (uint32_t) s); + qDebug("QInfo:%p ts comp data return, file:%p, size:%"PRId64, pQInfo, f, s); + if (fseek(f, 0, SEEK_SET) >= 0) { + size_t sz = fread(data, 1, s, f); if(sz < s) { // todo handle error assert(0); } @@ -6984,15 +6996,8 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { UNUSED(s); } - close(fd); - unlink(pQuery->sdata[0]->data); - } else { - // todo return the error code to client and handle invalid fd - qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, - pQuery->sdata[0]->data, strerror(errno)); - if (fd != -1) { - close(fd); - } + fclose(f); + *(FILE **)pQuery->sdata[0]->data = NULL; } // all data returned, set query over diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index d0c59fe5efe08aa1c9dbe3bbe65db139b074ebee..a5d4690a8e101ccd87985ace36b2abbe1412a2e2 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -19,6 +19,8 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { if (pTSBuf == NULL) { return NULL; } + + pTSBuf->autoDelete = autoDelete; taosGetTmpfilePath("join", pTSBuf->path); pTSBuf->f = fopen(pTSBuf->path, "w+"); @@ -26,6 +28,10 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { free(pTSBuf); return NULL; } + + if (!autoDelete) { + unlink(pTSBuf->path); + } if (NULL == allocResForTSBuf(pTSBuf)) { return NULL; @@ -37,8 +43,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { tsBufResetPos(pTSBuf); pTSBuf->cur.order = TSDB_ORDER_ASC; - - pTSBuf->autoDelete = autoDelete; + pTSBuf->tsOrder = order; return pTSBuf; @@ -49,6 +54,8 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { if (pTSBuf == NULL) { return NULL; } + + pTSBuf->autoDelete = autoDelete; tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path)); @@ -129,7 +136,6 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { // ascending by default pTSBuf->cur.order = TSDB_ORDER_ASC; - pTSBuf->autoDelete = autoDelete; // tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), // pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete); @@ -147,8 +153,10 @@ void* tsBufDestroy(STSBuf* pTSBuf) { tfree(pTSBuf->pData); tfree(pTSBuf->block.payload); - - fclose(pTSBuf->f); + + if (!pTSBuf->remainOpen) { + fclose(pTSBuf->f); + } if (pTSBuf->autoDelete) { // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);