diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 7818765e572bb0b049d9961268fba95df1639dcf..cbc4caad1b045ed67bc55771f0e2401fd11b5c3b 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -380,20 +380,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd 4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision, pQueryInfo->fillType, pFillCol); } - - int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; - - if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 && pReducer->pFillInfo != NULL) { - pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols; - for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1); - pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1]; - } - } else { - if (pReducer->pFillInfo != NULL) { - assert(pReducer->pFillInfo->pTags == NULL); - } - } } static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1483f0de482c06cc4b98ecff0d11f11468dfe7ad..64a871ff746c157300fa2ee4bccc291354d12702 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2141,43 +2141,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { } } -//void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { -// SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); -// assert(pInfo->pSqlExpr != NULL); -// -// int32_t type = pInfo->pSqlExpr->resType; -// int32_t bytes = pInfo->pSqlExpr->resBytes; -// -// char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; -// -// if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { -// int32_t realLen = varDataLen(pData); -// assert(realLen <= bytes - VARSTR_HEADER_SIZE); -// -// if (isNull(pData, type)) { -// pRes->tsrow[columnIndex] = NULL; -// } else { -// pRes->tsrow[columnIndex] = ((tstr*)pData)->data; -// } -// -// if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor -// *(pData + realLen + VARSTR_HEADER_SIZE) = 0; -// } -// -// pRes->length[columnIndex] = realLen; -// } else { -// assert(bytes == tDataTypeDesc[type].nSize); -// -// if (isNull(pData, type)) { -// pRes->tsrow[columnIndex] = NULL; -// } else { -// pRes->tsrow[columnIndex] = pData; -// } -// -// pRes->length[columnIndex] = bytes; -// } -//} - void* malloc_throw(size_t size) { void* p = malloc(size); if (p == NULL) { diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index db6a69c2c5a98b4b6716d4b3ae3890adacb6c2a3..6b8dcb0bf9e11636cfb79298902ba74de26c9018 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -30,6 +30,11 @@ typedef struct { int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN union {int64_t i; double d;} fillVal; } SFillColInfo; + +typedef struct { + SSchema col; + char* tagVal; +} SFillTagColInfo; typedef struct SFillInfo { TSKEY start; // start timestamp @@ -44,7 +49,8 @@ typedef struct SFillInfo { int32_t numOfTags; // number of tags int32_t numOfCols; // number of columns, including the tags columns int32_t rowSize; // size of each row - char ** pTags; // tags value for current interpolation +// char ** pTags; // tags value for current interpolation + SFillTagColInfo* pTags; // tags value for filling gap int64_t slidingTime; // sliding value to determine the number of result for a given time window char * prevValues; // previous row of data, to generate the interpolation results char * nextValues; // next row of data diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index d29186ba49d01e1d4494075dc69f669114342b4d..998049fad6686a7af15ba69d1801a30f366d3a04 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -42,19 +42,38 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ pFillInfo->slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); - + if (numOfTags > 0) { + pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo)); + for(int32_t i = 0; i < numOfTags; ++i) { + pFillInfo->pTags[i].col.colId = -2; + } + } + int32_t rowsize = 0; + int32_t k = 0; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t bytes = pFillInfo->pFillCol[i].col.bytes; - pFillInfo->pData[i] = calloc(1, bytes * capacity); - - rowsize += bytes; - } - - if (numOfTags > 0) { - pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize); + SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; + pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity); + + if (pColInfo->flag == TSDB_COL_TAG) { + bool exists = false; + for(int32_t j = 0; j < k; ++j) { + if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) { + exists = true; + break; + } + } + + if (!exists) { + pFillInfo->pTags[k].col.colId = pColInfo->col.colId; + pFillInfo->pTags[k].tagVal = calloc(1, pColInfo->col.bytes); + + k += 1; + } + } + rowsize += pColInfo->col.bytes; } - + pFillInfo->rowSize = rowsize; pFillInfo->capacityInRows = capacity; @@ -129,16 +148,21 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) { assert(pFillInfo->numOfRows == pInput->num); - int32_t t = 0; - + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - - char* s = pInput->data + pCol->col.offset * pInput->num; - memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes); - - if (pCol->flag == TSDB_COL_TAG && t < pFillInfo->numOfTags) { // copy the tag value - memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes); + + char* data = pInput->data + pCol->col.offset * pInput->num; + memcpy(pFillInfo->pData[i], data, pInput->num * pCol->col.bytes); + + if (pCol->flag == TSDB_COL_TAG) { // copy the tag value to tag value buffer + for (int32_t j = 0; j < pFillInfo->numOfTags; ++j) { + SFillTagColInfo* pTag = &pFillInfo->pTags[j]; + if (pTag->col.colId == pCol->col.colId) { + memcpy(pTag->tagVal, data, pCol->col.bytes); + break; + } + } } } } @@ -224,22 +248,31 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } -static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) { - for (int32_t j = 0, i = start; i < pColInfo->numOfCols; ++i, ++j) { - SFillColInfo* pCol = &pColInfo->pFillCol[i]; - - char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); - assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type); +static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, SFillTagColInfo *pTags, int32_t start, int32_t num) { + for(int32_t j = 0; j < pColInfo->numOfCols; ++j) { + SFillColInfo* pCol = &pColInfo->pFillCol[j]; + if (pCol->flag == TSDB_COL_NORMAL) { + continue; + } + + char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num); + + for(int32_t i = 0; i < pColInfo->numOfTags; ++i) { + SFillTagColInfo* pTag = &pColInfo->pTags[i]; + if (pTag->col.colId == pCol->col.colId) { + assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); + break; + } + } } } -static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, - int64_t ts, char** pTags, bool outOfBound) { +static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, + int64_t ts, SFillTagColInfo* pTags, bool outOfBound) { char* prevValues = pFillInfo->prevValues; char* nextValues = pFillInfo->nextValues; SPoint point1, point2, point; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num); @@ -364,17 +397,16 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu char** nextValues = &pFillInfo->nextValues; int32_t numOfTags = pFillInfo->numOfTags; - char** pTags = pFillInfo->pTags; + SFillTagColInfo* pTags = pFillInfo->pTags; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); - if (numOfRows == 0) { /* * These data are generated according to fill strategy, since the current timestamp is out of time window of * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. */ while (num < outputRows) { - doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); + doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); } pFillInfo->numOfTotal += pFillInfo->numOfCurrent; @@ -401,7 +433,7 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { - doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false); + doFillResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false); } /* output buffer is full, abort */