提交 f3c92ad3 编写于 作者: H Haojun Liao

[td-1206]

上级 50e73ebb
...@@ -2422,24 +2422,14 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2422,24 +2422,14 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
static bool percentile_function_setup(SQLFunctionCtx *pCtx) { static bool percentile_function_setup(SQLFunctionCtx *pCtx) {
const int32_t MAX_AVAILABLE_BUFFER_SIZE = 1 << 20; // 1MB
const int32_t NUMOFCOLS = 1;
if (!function_setup(pCtx)) { if (!function_setup(pCtx)) {
return false; return false;
} }
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SSchema field[1] = { { (uint8_t)pCtx->inputType, "dummyCol", 0, pCtx->inputBytes } };
SColumnModel *pModel = createColumnModel(field, 1, 1000);
int32_t orderIdx = 0;
// tOrderDesc object
tOrderDescriptor *pDesc = tOrderDesCreate(&orderIdx, NUMOFCOLS, pModel, TSDB_ORDER_DESC);
((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket = ((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket =
tMemBucketCreate(1024, MAX_AVAILABLE_BUFFER_SIZE, pCtx->inputBytes, pCtx->inputType, pDesc); tMemBucketCreate(pCtx->inputBytes, pCtx->inputType);
return true; return true;
} }
...@@ -2485,15 +2475,13 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2485,15 +2475,13 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket; tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket;
if (pMemBucket->numOfElems > 0) { // check for null if (pMemBucket->total > 0) { // check for null
*(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v);
} else { } else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} }
tOrderDescDestroy(pMemBucket->pOrderDesc);
tMemBucketDestroy(pMemBucket); tMemBucketDestroy(pMemBucket);
doFinalizer(pCtx); doFinalizer(pCtx);
} }
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#define TDENGINE_QPERCENTILE_H #define TDENGINE_QPERCENTILE_H
#include "qExtbuffer.h" #include "qExtbuffer.h"
#include "qResultbuf.h"
#include "qTsbuf.h"
typedef struct MinMaxEntry { typedef struct MinMaxEntry {
union { union {
...@@ -31,47 +33,43 @@ typedef struct MinMaxEntry { ...@@ -31,47 +33,43 @@ typedef struct MinMaxEntry {
}; };
} MinMaxEntry; } MinMaxEntry;
typedef struct tMemBucketSegment { typedef struct {
int32_t numOfSlots; int32_t size;
MinMaxEntry * pBoundingEntries; int32_t pageId;
tExtMemBuffer **pBuffer; tFilePage *data;
} tMemBucketSegment; } SSlotInfo;
typedef struct tMemBucketSlot {
SSlotInfo info;
MinMaxEntry range;
} tMemBucketSlot;
struct tMemBucket;
typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value);
typedef struct tMemBucket { typedef struct tMemBucket {
int16_t numOfSegs; int16_t numOfSlots;
int16_t nTotalSlots; int16_t type;
int16_t nSlotsOfSeg; int16_t bytes;
int16_t dataType; int32_t total;
int32_t elemPerPage; // number of elements for each object
int16_t nElemSize; int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result
int32_t numOfElems; int32_t bufPageSize; // disk page size
MinMaxEntry range; // value range
int32_t nTotalBufferSize; int32_t times; // count that has been checked for deciding the correct data value buckets.
int32_t maxElemsCapacity; __compar_fn_t comparFn;
int32_t pageSize; tMemBucketSlot *pSlots;
int16_t numOfTotalPages; SDiskbasedResultBuf *pBuffer;
int16_t numOfAvailPages; /* remain available buffer pages */ __perc_hash_func_t hashFunc;
tMemBucketSegment *pSegs;
tOrderDescriptor * pOrderDesc;
MinMaxEntry nRange;
void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
} tMemBucket; } tMemBucket;
tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType);
tOrderDescriptor *pDesc);
void tMemBucketDestroy(tMemBucket *pBucket); void tMemBucketDestroy(tMemBucket *pBucket);
void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows); void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size);
double getPercentile(tMemBucket *pMemBucket, double percent); double getPercentile(tMemBucket *pMemBucket, double percent);
void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
#endif // TDENGINE_QPERCENTILE_H #endif // TDENGINE_QPERCENTILE_H
...@@ -14,310 +14,291 @@ ...@@ -14,310 +14,291 @@
*/ */
#include "qPercentile.h" #include "qPercentile.h"
#include "qResultbuf.h"
#include "os.h" #include "os.h"
#include "queryLog.h" #include "queryLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h"
#include "tulog.h" #include "tulog.h"
#include "tcompare.h"
tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { #define DEFAULT_NUM_OF_SLOT 1024
tExtMemBuffer *pBuffer = NULL;
int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) {
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { return (times * numOfSlots) + slotIndex;
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; }
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) {
if (i == segIdx && j == slotIdx) { tFilePage *buffer = (tFilePage *)calloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(tFilePage));
pBuffer = pSeg->pBuffer[j];
} else { int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times);
if (pSeg->pBuffer && pSeg->pBuffer[j]) { SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]);
} int32_t offset = 0;
} for(int32_t i = 0; i < list->size; ++i) {
} SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i);
tFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
memcpy(buffer->data + offset, pg->data, pg->num * pMemBucket->bytes);
offset += pg->num * pMemBucket->bytes;
} }
return pBuffer; qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn);
return buffer;
} }
static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx, static void resetBoundingBox(MinMaxEntry* range, int32_t type) {
tOrderDescriptor *pDesc) { switch (type) {
// release all data in other slots case TSDB_DATA_TYPE_BIGINT: {
tExtMemBuffer *pMemBuffer = pMemBucket->pSegs[segIdx].pBuffer[slotIdx]; range->i64MaxVal = INT64_MIN;
tFilePage * buffer = (tFilePage *)calloc(1, pMemBuffer->nElemSize * pMemBuffer->numOfTotalElems + sizeof(tFilePage)); range->i64MinVal = INT64_MAX;
int32_t oldCapacity = pDesc->pColumnModel->capacity; break;
pDesc->pColumnModel->capacity = pMemBuffer->numOfTotalElems; };
case TSDB_DATA_TYPE_INT:
if (!tExtMemBufferIsAllDataInMem(pMemBuffer)) { case TSDB_DATA_TYPE_SMALLINT:
pMemBuffer = releaseBucketsExceptFor(pMemBucket, segIdx, slotIdx); case TSDB_DATA_TYPE_TINYINT: {
assert(pMemBuffer->numOfTotalElems > 0); range->iMaxVal = INT32_MIN;
range->iMinVal = INT32_MAX;
// load data in disk to memory break;
tFilePage *pPage = (tFilePage *)calloc(1, pMemBuffer->pageSize); };
case TSDB_DATA_TYPE_DOUBLE:
for (uint32_t i = 0; i < pMemBuffer->fileMeta.flushoutData.nLength; ++i) { case TSDB_DATA_TYPE_FLOAT: {
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[i]; range->dMaxVal = -DBL_MAX;
range->dMinVal = DBL_MAX;
int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); break;
UNUSED(ret);
for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) {
ret = (int32_t)fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(ret);
assert(pPage->num > 0);
tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, (int32_t)pPage->num, (int32_t)pPage->num);
printf("id: %d count: %" PRIu64 "\n", j, buffer->num);
}
} }
taosTFree(pPage);
assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile);
} }
}
// load data in pMemBuffer to buffer
tFilePagesItem *pListItem = pMemBuffer->pHead; static void resetPosInfo(SSlotInfo* pInfo) {
while (pListItem != NULL) { pInfo->size = 0;
tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, (int32_t)pListItem->item.num, pInfo->pageId = -1;
(int32_t)pListItem->item.num); pInfo->data = NULL;
pListItem = pListItem->pNext;
}
tColDataQSort(pDesc, (int32_t)buffer->num, 0, (int32_t)buffer->num - 1, buffer->data, TSDB_ORDER_ASC);
pDesc->pColumnModel->capacity = oldCapacity; // restore value
return buffer;
} }
double findOnlyResult(tMemBucket *pMemBucket) { double findOnlyResult(tMemBucket *pMemBucket) {
assert(pMemBucket->numOfElems == 1); assert(pMemBucket->total == 1);
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
if (pSeg->pBuffer) { if (pSlot->info.size == 0) {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { continue;
tExtMemBuffer *pBuffer = pSeg->pBuffer[j]; }
if (pBuffer) {
assert(pBuffer->numOfTotalElems == 1); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
tFilePage *pPage = &pBuffer->pHead->item; SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
if (pBuffer->numOfElemsInBuffer == 1) { assert(list->size == 1);
switch (pMemBucket->dataType) {
case TSDB_DATA_TYPE_INT: SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0);
return *(int32_t *)pPage->data; tFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
case TSDB_DATA_TYPE_SMALLINT: assert(pPage->num == 1);
return *(int16_t *)pPage->data;
case TSDB_DATA_TYPE_TINYINT: switch (pMemBucket->type) {
return *(int8_t *)pPage->data; case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: return *(int32_t *)pPage->data;
return (double)(*(int64_t *)pPage->data); case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_DOUBLE: { return *(int16_t *)pPage->data;
double dv = GET_DOUBLE_VAL(pPage->data); case TSDB_DATA_TYPE_TINYINT:
//return *(double *)pPage->data; return *(int8_t *)pPage->data;
return dv; case TSDB_DATA_TYPE_BIGINT:
} return (double)(*(int64_t *)pPage->data);
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_DOUBLE: {
float fv = GET_FLOAT_VAL(pPage->data); double dv = GET_DOUBLE_VAL(pPage->data);
//return *(float *)pPage->data; return dv;
return fv; }
} case TSDB_DATA_TYPE_FLOAT: {
default: float fv = GET_FLOAT_VAL(pPage->data);
return 0; return fv;
}
}
}
} }
default:
return 0;
} }
} }
return 0; return 0;
} }
void tBucketBigIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
int64_t v = *(int64_t *)value; int64_t v = *(int64_t *)value;
int32_t index = -1;
int32_t halfSlot = pBucket->numOfSlots >> 1;
// int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1;
if (pBucket->nRange.i64MaxVal == INT64_MIN) { if (pBucket->range.i64MaxVal == INT64_MIN) {
if (v >= 0) { if (v >= 0) {
*segIdx = ((v >> (64 - 9)) >> 6) + 8; index = (v >> (64 - 9)) + halfSlot;
*slotIdx = (v >> (64 - 9)) & 0x3F;
} else { // v<0 } else { // v<0
*segIdx = ((-v) >> (64 - 9)) >> 6; index = ((-v) >> (64 - 9));
*slotIdx = ((-v) >> (64 - 9)) & 0x3F; index = -index + (halfSlot - 1);
*segIdx = 7 - (*segIdx);
} }
return index;
} else { } else {
// todo hash for bigint and float and double // todo hash for bigint and float and double
int64_t span = pBucket->nRange.i64MaxVal - pBucket->nRange.i64MinVal; int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal;
if (span < pBucket->nTotalSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = (int32_t)(v - pBucket->nRange.i64MinVal); int32_t delta = (int32_t)(v - pBucket->range.i64MinVal);
*segIdx = delta / pBucket->nSlotsOfSeg; index = delta % pBucket->numOfSlots;
*slotIdx = delta % pBucket->nSlotsOfSeg;
} else { } else {
double x = (double)span / pBucket->nTotalSlots; double slotSpan = (double)span / pBucket->numOfSlots;
double posx = (v - pBucket->nRange.i64MinVal) / x; index = (v - pBucket->range.i64MinVal) / slotSpan;
if (v == pBucket->nRange.i64MaxVal) { if (v == pBucket->range.i64MaxVal) {
posx -= 1; index -= 1;
} }
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} }
return index;
} }
} }
// todo refactor to more generic // todo refactor to more generic
void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
int32_t v = *(int32_t *)value; int32_t v = *(int32_t *)value;
int32_t index = -1;
if (pBucket->nRange.iMaxVal == INT32_MIN) { if (pBucket->range.iMaxVal == INT32_MIN) {
/* /*
* taking negative integer into consideration, * taking negative integer into consideration,
* there is only half of pBucket->segs available for non-negative integer * there is only half of pBucket->segs available for non-negative integer
*/ */
// int32_t numOfSlots = pBucket->nTotalSlots>>1; int32_t halfSlot = pBucket->numOfSlots >> 1;
// int32_t bits = bitsOfNumber(numOfSlots)-1; int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1;
if (v >= 0) { if (v >= 0) {
*segIdx = ((v >> (32 - 9)) >> 6) + 8; index = (v >> (bits - 9)) + halfSlot;
*slotIdx = (v >> (32 - 9)) & 0x3F; } else { // v < 0
} else { // v<0 index = ((-v) >> (32 - 9));
*segIdx = ((-v) >> (32 - 9)) >> 6; index = -index + (halfSlot - 1);
*slotIdx = ((-v) >> (32 - 9)) & 0x3F;
*segIdx = 7 - (*segIdx);
} }
return index;
} else { } else {
// divide a range of [iMinVal, iMaxVal] into 1024 buckets // divide a range of [iMinVal, iMaxVal] into 1024 buckets
int32_t span = pBucket->nRange.iMaxVal - pBucket->nRange.iMinVal; int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal;
if (span < pBucket->nTotalSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = v - pBucket->nRange.iMinVal; int32_t delta = v - pBucket->range.iMinVal;
*segIdx = delta / pBucket->nSlotsOfSeg; index = (delta % pBucket->numOfSlots);
*slotIdx = delta % pBucket->nSlotsOfSeg;
} else { } else {
double x = (double)span / pBucket->nTotalSlots; double slotSpan = (double)span / pBucket->numOfSlots;
double posx = (v - pBucket->nRange.iMinVal) / x; index = (v - pBucket->range.iMinVal) / slotSpan;
if (v == pBucket->nRange.iMaxVal) { if (v == pBucket->range.iMaxVal) {
posx -= 1; index -= 1;
} }
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} }
return index;
} }
} }
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
// double v = *(double *)value;
double v = GET_DOUBLE_VAL(value); double v = GET_DOUBLE_VAL(value);
int32_t index = -1;
if (pBucket->nRange.dMinVal == DBL_MAX) { if (pBucket->range.dMinVal == DBL_MAX) {
/* /*
* taking negative integer into consideration, * taking negative integer into consideration,
* there is only half of pBucket->segs available for non-negative integer * there is only half of pBucket->segs available for non-negative integer
*/ */
double x = DBL_MAX / (pBucket->nTotalSlots >> 1); double x = DBL_MAX / (pBucket->numOfSlots >> 1);
double posx = (v + DBL_MAX) / x; double posx = (v + DBL_MAX) / x;
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; return ((int32_t)posx) % pBucket->numOfSlots;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} else { } else {
// divide a range of [dMinVal, dMaxVal] into 1024 buckets // divide a range of [dMinVal, dMaxVal] into 1024 buckets
double span = pBucket->nRange.dMaxVal - pBucket->nRange.dMinVal; double span = pBucket->range.dMaxVal - pBucket->range.dMinVal;
if (span < pBucket->nTotalSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = (int32_t)(v - pBucket->nRange.dMinVal); int32_t delta = (int32_t)(v - pBucket->range.dMinVal);
*segIdx = delta / pBucket->nSlotsOfSeg; index = (delta % pBucket->numOfSlots);
*slotIdx = delta % pBucket->nSlotsOfSeg;
} else { } else {
double x = span / pBucket->nTotalSlots; double slotSpan = span / pBucket->numOfSlots;
double posx = (v - pBucket->nRange.dMinVal) / x; index = (v - pBucket->range.dMinVal) / slotSpan;
if (v == pBucket->nRange.dMaxVal) { if (v == pBucket->range.dMaxVal) {
posx -= 1; index -= 1;
} }
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} }
if (*segIdx < 0 || *segIdx > 16 || *slotIdx < 0 || *slotIdx > 64) { if (index < 0 || index > pBucket->numOfSlots) {
uError("error in hash process. segment is: %d, slot id is: %d\n", *segIdx, *slotIdx); uError("error in hash process. slot id: %d", index);
} }
return index;
} }
} }
tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, static __perc_hash_func_t getHashFunc(int32_t type) {
tOrderDescriptor *pDesc) { switch (type) {
tMemBucket *pBucket = (tMemBucket *)malloc(sizeof(tMemBucket));
pBucket->nTotalSlots = totalSlots;
pBucket->nSlotsOfSeg = 1 << 6; // 64 Segments, 16 slots each seg.
pBucket->dataType = dataType;
pBucket->nElemSize = nElemSize;
pBucket->pageSize = DEFAULT_PAGE_SIZE;
pBucket->numOfElems = 0;
pBucket->numOfSegs = pBucket->nTotalSlots / pBucket->nSlotsOfSeg;
pBucket->nTotalBufferSize = nBufferSize;
pBucket->maxElemsCapacity = pBucket->nTotalBufferSize / pBucket->nElemSize;
pBucket->numOfTotalPages = pBucket->nTotalBufferSize / pBucket->pageSize;
pBucket->numOfAvailPages = pBucket->numOfTotalPages;
pBucket->pSegs = NULL;
pBucket->pOrderDesc = pDesc;
switch (pBucket->dataType) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
pBucket->nRange.iMinVal = INT32_MAX; return tBucketIntHash;
pBucket->nRange.iMaxVal = INT32_MIN;
pBucket->HashFunc = tBucketIntHash;
break;
}; };
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
pBucket->nRange.dMinVal = DBL_MAX; return tBucketDoubleHash;
pBucket->nRange.dMaxVal = -DBL_MAX;
pBucket->HashFunc = tBucketDoubleHash;
break;
}; };
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
pBucket->nRange.i64MinVal = INT64_MAX; return tBucketBigIntHash;
pBucket->nRange.i64MaxVal = INT64_MIN;
pBucket->HashFunc = tBucketBigIntHash;
break;
}; };
default: { default: {
uError("MemBucket:%p,not support data type %d,failed", pBucket, pBucket->dataType);
taosTFree(pBucket);
return NULL; return NULL;
} }
} }
}
int32_t numOfCols = pDesc->pColumnModel->numOfCols; static void resetSlotInfo(tMemBucket* pBucket) {
if (numOfCols != 1) { for (int32_t i = 0; i < pBucket->numOfSlots; ++i) {
uError("MemBucket:%p,only consecutive data is allowed,invalid numOfCols:%d", pBucket, numOfCols); tMemBucketSlot* pSlot = &pBucket->pSlots[i];
taosTFree(pBucket);
return NULL; resetBoundingBox(&pSlot->range, pBucket->type);
resetPosInfo(&pSlot->info);
} }
}
SSchema* pSchema = getColumnModelSchema(pDesc->pColumnModel, 0); tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) {
if (pSchema->type != dataType) { tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket));
uError("MemBucket:%p,data type is not consistent,%d in schema, %d in param", pBucket, pSchema->type, dataType); if (pBucket == NULL) {
taosTFree(pBucket);
return NULL; return NULL;
} }
if (pBucket->numOfTotalPages < pBucket->nTotalSlots) { pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT;
uWarn("MemBucket:%p,total buffer pages %d are not enough for all slots", pBucket, pBucket->numOfTotalPages); pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page
}
pBucket->pSegs = (tMemBucketSegment *)malloc(pBucket->numOfSegs * sizeof(tMemBucketSegment)); pBucket->type = dataType;
pBucket->bytes = nElemSize;
pBucket->total = 0;
pBucket->times = 1;
for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { pBucket->maxCapacity = 200000;
pBucket->pSegs[i].numOfSlots = pBucket->nSlotsOfSeg;
pBucket->pSegs[i].pBuffer = NULL; pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->pSegs[i].pBoundingEntries = NULL; pBucket->comparFn = getKeyComparFunc(pBucket->type);
resetBoundingBox(&pBucket->range, pBucket->type);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
uError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type);
free(pBucket);
return NULL;
} }
uDebug("MemBucket:%p,created,buffer size:%ld,elem size:%d", pBucket, pBucket->numOfTotalPages * DEFAULT_PAGE_SIZE, pBucket->pSlots = (tMemBucketSlot *)calloc(pBucket->numOfSlots, sizeof(tMemBucketSlot));
pBucket->nElemSize); if (pBucket->pSlots == NULL) {
free(pBucket);
return NULL;
}
resetSlotInfo(pBucket);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL);
if (ret != TSDB_CODE_SUCCESS) {
tMemBucketDestroy(pBucket);
return NULL;
}
uDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes);
return pBucket; return pBucket;
} }
...@@ -326,81 +307,11 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -326,81 +307,11 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return; return;
} }
if (pBucket->pSegs) { destroyResultBuf(pBucket->pBuffer);
for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { taosTFree(pBucket->pSlots);
tMemBucketSegment *pSeg = &(pBucket->pSegs[i]);
taosTFree(pSeg->pBoundingEntries);
if (pSeg->pBuffer == NULL || pSeg->numOfSlots == 0) {
continue;
}
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) {
if (pSeg->pBuffer[j] != NULL) {
pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]);
}
}
taosTFree(pSeg->pBuffer);
}
}
taosTFree(pBucket->pSegs);
taosTFree(pBucket); taosTFree(pBucket);
} }
/*
* find the slots which accounts for largest proportion of total in-memory buffer
*/
static void tBucketGetMaxMemSlot(tMemBucket *pBucket, int16_t *segIdx, int16_t *slotIdx) {
*segIdx = -1;
*slotIdx = -1;
int32_t val = 0;
for (int32_t k = 0; k < pBucket->numOfSegs; ++k) {
tMemBucketSegment *pSeg = &pBucket->pSegs[k];
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
if (pSeg->pBuffer == NULL || pSeg->pBuffer[i] == NULL) {
continue;
}
if (val < pSeg->pBuffer[i]->numOfInMemPages) {
val = pSeg->pBuffer[i]->numOfInMemPages;
*segIdx = k;
*slotIdx = i;
}
}
}
}
static void resetBoundingBox(tMemBucketSegment *pSeg, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BIGINT: {
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
pSeg->pBoundingEntries[i].i64MaxVal = INT64_MIN;
pSeg->pBoundingEntries[i].i64MinVal = INT64_MAX;
}
break;
};
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: {
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
pSeg->pBoundingEntries[i].iMaxVal = INT32_MIN;
pSeg->pBoundingEntries[i].iMinVal = INT32_MAX;
}
break;
};
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: {
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
pSeg->pBoundingEntries[i].dMaxVal = -DBL_MAX;
pSeg->pBoundingEntries[i].dMinVal = DBL_MAX;
}
break;
}
}
}
void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
switch (dataType) { switch (dataType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
...@@ -461,7 +372,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { ...@@ -461,7 +372,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
break; break;
}; };
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
// double val = *(float *)data;
double val = GET_FLOAT_VAL(data); double val = GET_FLOAT_VAL(data);
if (r->dMinVal > val) { if (r->dMinVal > val) {
...@@ -478,116 +388,45 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { ...@@ -478,116 +388,45 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
} }
/* /*
* in memory bucket, we only accept the simple data consecutive put in a row/column * in memory bucket, we only accept data array list
* no column-model in this case.
*/ */
void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) { void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pBucket->numOfElems += numOfRows; assert(pBucket != NULL && data != NULL && size > 0);
int16_t segIdx = 0, slotIdx = 0; pBucket->total += size;
for (int32_t i = 0; i < numOfRows; ++i) {
char *d = (char *)data + i * tDataTypeDesc[pBucket->dataType].nSize;
switch (pBucket->dataType) {
case TSDB_DATA_TYPE_SMALLINT: {
int32_t val = *(int16_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int32_t val = *(int8_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t val = *(int32_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t val = *(int64_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
// double val = *(double *)d;
double val = GET_DOUBLE_VAL(d);
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
// double val = *(float *)d;
double val = GET_FLOAT_VAL(d);
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
}
tMemBucketSegment *pSeg = &pBucket->pSegs[segIdx]; int32_t bytes = pBucket->bytes;
if (pSeg->pBoundingEntries == NULL) {
pSeg->pBoundingEntries = (MinMaxEntry *)malloc(sizeof(MinMaxEntry) * pBucket->nSlotsOfSeg);
resetBoundingBox(pSeg, pBucket->dataType);
}
if (pSeg->pBuffer == NULL) { for (int32_t i = 0; i < size; ++i) {
pSeg->pBuffer = (tExtMemBuffer **)calloc(pBucket->nSlotsOfSeg, sizeof(void *)); char *d = (char *) data + i * bytes;
}
if (pSeg->pBuffer[slotIdx] == NULL) { int32_t slotIdx = (pBucket->hashFunc)(pBucket, d);
pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize, assert(slotIdx >= 0);
pBucket->pageSize, pBucket->pOrderDesc->pColumnModel);
pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL;
pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage;
}
tMemBucketUpdateBoundingBox(&pSeg->pBoundingEntries[slotIdx], d, pBucket->dataType); tMemBucketSlot *pSlot = &pBucket->pSlots[slotIdx];
tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type);
// ensure available memory pages to allocate // ensure available memory pages to allocate
int16_t cseg = 0, cslot = 0; int32_t groupId = getGroupId(pBucket->numOfSlots, slotIdx, pBucket->times);
if (pBucket->numOfAvailPages == 0) { int32_t pageId = -1;
uDebug("MemBucket:%p,max avail size:%d, no avail memory pages,", pBucket, pBucket->numOfTotalPages);
tBucketGetMaxMemSlot(pBucket, &cseg, &cslot);
if (cseg == -1 || cslot == -1) {
uError("MemBucket:%p,failed to find appropriated avail buffer", pBucket);
return;
}
if (cseg != segIdx || cslot != slotIdx) {
pBucket->numOfAvailPages += pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages;
int32_t avail = pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages; if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) {
UNUSED(avail); if (pSlot->info.data != NULL) {
tExtMemBufferFlush(pBucket->pSegs[cseg].pBuffer[cslot]); assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0);
uDebug("MemBucket:%p,seg:%d,slot:%d flushed to disk,new avail pages:%d", pBucket, cseg, cslot, // keep the pointer in memory
pBucket->numOfAvailPages); releaseResBufPage(pBucket->pBuffer, pSlot->info.data);
} else { pSlot->info.data = NULL;
uDebug("MemBucket:%p,failed to choose slot to flush to disk seg:%d,slot:%d", pBucket, cseg, cslot);
} }
}
int16_t consumedPgs = pSeg->pBuffer[slotIdx]->numOfInMemPages;
int16_t newPgs = tExtMemBufferPut(pSeg->pBuffer[slotIdx], d, 1); pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId);
/* pSlot->info.pageId = pageId;
* trigger 1. page re-allocation, to reduce the available pages }
* 2. page flushout, to increase the available pages
*/
pBucket->numOfAvailPages += (consumedPgs - newPgs);
}
}
void releaseBucket(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes);
if (segIdx < 0 || segIdx > pMemBucket->numOfSegs || slotIdx < 0) {
return;
}
tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; pSlot->info.data->num += 1;
if (slotIdx < 0 || slotIdx >= pSeg->numOfSlots || pSeg->pBuffer[slotIdx] == NULL) { pSlot->info.size += 1;
return;
} }
pSeg->pBuffer[slotIdx] = destoryExtMemBuffer(pSeg->pBuffer[slotIdx]);
} }
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
...@@ -595,54 +434,49 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV ...@@ -595,54 +434,49 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV
*minVal = DBL_MAX; *minVal = DBL_MAX;
*maxVal = -DBL_MAX; *maxVal = -DBL_MAX;
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
if (pSeg->pBuffer == NULL) { if (pSlot->info.size == 0) {
continue; continue;
} }
switch (pMemBucket->dataType) {
switch (pMemBucket->type) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { double minv = pSlot->range.iMinVal;
double minv = pSeg->pBoundingEntries[j].iMinVal; double maxv = pSlot->range.iMaxVal;
double maxv = pSeg->pBoundingEntries[j].iMaxVal;
if (*minVal > minv) { if (*minVal > minv) {
*minVal = minv; *minVal = minv;
} }
if (*maxVal < maxv) { if (*maxVal < maxv) {
*maxVal = maxv; *maxVal = maxv;
}
} }
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { double minv = pSlot->range.dMinVal;
double minv = pSeg->pBoundingEntries[j].dMinVal; double maxv = pSlot->range.dMaxVal;
double maxv = pSeg->pBoundingEntries[j].dMaxVal;
if (*minVal > minv) { if (*minVal > minv) {
*minVal = minv; *minVal = minv;
} }
if (*maxVal < maxv) { if (*maxVal < maxv) {
*maxVal = maxv; *maxVal = maxv;
}
} }
break; break;
} }
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { double minv = (double)pSlot->range.i64MinVal;
double minv = (double)pSeg->pBoundingEntries[j].i64MinVal; double maxv = (double)pSlot->range.i64MaxVal;
double maxv = (double)pSeg->pBoundingEntries[j].i64MaxVal;
if (*minVal > minv) { if (*minVal > minv) {
*minVal = minv; *minVal = minv;
} }
if (*maxVal < maxv) { if (*maxVal < maxv) {
*maxVal = maxv; *maxVal = maxv;
}
} }
break; break;
} }
...@@ -650,20 +484,6 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV ...@@ -650,20 +484,6 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV
} }
} }
static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBucket, int32_t segIdx) {
int32_t i = segIdx + 1;
while (i < pMemBucket->numOfSegs && pMemBucket->pSegs[i].numOfSlots == 0) ++i;
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i];
assert(pMemBucket->numOfSegs > i && pMemBucket->pSegs[i].pBuffer != NULL);
i = 0;
while (i < pMemBucket->nSlotsOfSeg && pSeg->pBuffer[i] == NULL) ++i;
assert(i < pMemBucket->nSlotsOfSeg);
return pSeg->pBoundingEntries[i];
}
/* /*
* *
* now, we need to find the minimum value of the next slot for * now, we need to find the minimum value of the next slot for
...@@ -671,262 +491,198 @@ static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBuck ...@@ -671,262 +491,198 @@ static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBuck
* j is the last slot of current segment, we need to get the first * j is the last slot of current segment, we need to get the first
* slot of the next segment. * slot of the next segment.
*/ */
static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t slotIdx) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx];
MinMaxEntry next;
if (slotIdx == pSeg->numOfSlots - 1) { // find next segment with data
return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx);
} else {
int32_t j = slotIdx + 1; int32_t j = slotIdx + 1;
for (; j < pMemBucket->nSlotsOfSeg && pMemBucket->pSegs[segIdx].pBuffer[j] == 0; ++j) { while (j < pMemBucket->numOfSlots && (pMemBucket->pSlots[j].info.size == 0)) {
++j;
}
assert(j < pMemBucket->numOfSlots);
return pMemBucket->pSlots[j].range;
}
static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index);
char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage);
static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) {
assert(isIdenticalData(pMemBucket, slotIndex));
tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex];
double finalResult = 0.0;
switch (pMemBucket->type) {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_INT: {
finalResult = pSlot->range.iMinVal;
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
finalResult = pSlot->range.dMinVal;
break;
}; };
if (j == pMemBucket->nSlotsOfSeg) { // current slot has no available case TSDB_DATA_TYPE_BIGINT: {
// slot,try next segment finalResult = pSlot->range.i64MinVal;
return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx); break;
} else {
next = pSeg->pBoundingEntries[slotIdx + 1];
assert(pSeg->pBuffer[slotIdx + 1] != NULL);
} }
} }
return next; return finalResult;
} }
bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx);
char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage);
double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) { double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) {
int32_t num = 0; int32_t num = 0;
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { if (pSlot->info.size == 0) {
if (pSeg->pBuffer == NULL || pSeg->pBuffer[j] == NULL) { continue;
continue; }
}
// required value in current slot // required value in current slot
if (num < (count + 1) && num + pSeg->pBuffer[j]->numOfTotalElems >= (count + 1)) { if (num < (count + 1) && num + pSlot->info.size >= (count + 1)) {
if (pSeg->pBuffer[j]->numOfTotalElems + num == (count + 1)) { if (pSlot->info.size + num == (count + 1)) {
/* /*
* now, we need to find the minimum value of the next slot for interpolating the percentile value * now, we need to find the minimum value of the next slot for interpolating the percentile value
* j is the last slot of current segment, we need to get the first slot of the next segment. * j is the last slot of current segment, we need to get the first slot of the next segment.
* */
*/ MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i);
MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i, j);
double maxOfThisSlot = 0;
double maxOfThisSlot = 0; double minOfNextSlot = 0;
double minOfNextSlot = 0; switch (pMemBucket->type) {
switch (pMemBucket->dataType) { case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT: { maxOfThisSlot = pSlot->range.iMaxVal;
maxOfThisSlot = pSeg->pBoundingEntries[j].iMaxVal; minOfNextSlot = next.iMinVal;
minOfNextSlot = next.iMinVal; break;
break;
};
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
maxOfThisSlot = pSeg->pBoundingEntries[j].dMaxVal;
minOfNextSlot = next.dMinVal;
break;
};
case TSDB_DATA_TYPE_BIGINT: {
maxOfThisSlot = (double)pSeg->pBoundingEntries[j].i64MaxVal;
minOfNextSlot = (double)next.i64MinVal;
break;
}
}; };
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
maxOfThisSlot = pSlot->range.dMaxVal;
minOfNextSlot = next.dMinVal;
break;
};
case TSDB_DATA_TYPE_BIGINT: {
maxOfThisSlot = (double)pSlot->range.i64MaxVal;
minOfNextSlot = (double)next.i64MinVal;
break;
}
};
assert(minOfNextSlot > maxOfThisSlot); assert(minOfNextSlot > maxOfThisSlot);
double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot;
return val; return val;
} }
if (pSeg->pBuffer[j]->numOfTotalElems <= pMemBucket->maxElemsCapacity) {
// data in buffer and file are merged together to be processed.
tFilePage *buffer = loadIntoBucketFromDisk(pMemBucket, i, j, pMemBucket->pOrderDesc);
int32_t currentIdx = count - num;
char * thisVal = buffer->data + pMemBucket->nElemSize * currentIdx;
char * nextVal = thisVal + pMemBucket->nElemSize;
double td = 1.0, nd = 1.0;
switch (pMemBucket->dataType) {
case TSDB_DATA_TYPE_SMALLINT: {
td = *(int16_t *)thisVal;
nd = *(int16_t *)nextVal;
break;
}
case TSDB_DATA_TYPE_TINYINT: {
td = *(int8_t *)thisVal;
nd = *(int8_t *)nextVal;
break;
}
case TSDB_DATA_TYPE_INT: {
td = *(int32_t *)thisVal;
nd = *(int32_t *)nextVal;
break;
};
case TSDB_DATA_TYPE_FLOAT: {
// td = *(float *)thisVal;
// nd = *(float *)nextVal;
td = GET_FLOAT_VAL(thisVal);
nd = GET_FLOAT_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
// td = *(double *)thisVal;
td = GET_DOUBLE_VAL(thisVal);
// nd = *(double *)nextVal;
nd = GET_DOUBLE_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
td = (double)*(int64_t *)thisVal;
nd = (double)*(int64_t *)nextVal;
break;
}
}
double val = (1 - fraction) * td + fraction * nd;
taosTFree(buffer);
return val;
} else { // incur a second round bucket split
if (isIdenticalData(pMemBucket, i, j)) {
tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j];
tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize);
char *thisVal = getFirstElemOfMemBuffer(pSeg, j, pPage);
double finalResult = 0.0;
switch (pMemBucket->dataType) {
case TSDB_DATA_TYPE_SMALLINT: {
finalResult = *(int16_t *)thisVal;
break;
}
case TSDB_DATA_TYPE_TINYINT: {
finalResult = *(int8_t *)thisVal;
break;
}
case TSDB_DATA_TYPE_INT: {
finalResult = *(int32_t *)thisVal;
break;
};
case TSDB_DATA_TYPE_FLOAT: {
// finalResult = *(float *)thisVal;
finalResult = GET_FLOAT_VAL(thisVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
// finalResult = *(double *)thisVal;
finalResult = GET_DOUBLE_VAL(thisVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
finalResult = (double)(*(int64_t *)thisVal);
break;
}
}
free(pPage);
return finalResult;
}
uDebug("MemBucket:%p,start second round bucketing", pMemBucket); if (pSlot->info.size <= pMemBucket->maxCapacity) {
// data in buffer and file are merged together to be processed.
tFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
int32_t currentIdx = count - num;
if (pSeg->pBuffer[j]->numOfElemsInBuffer != 0) { char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
uDebug("MemBucket:%p,flush %d pages to disk, clear status", pMemBucket, pSeg->pBuffer[j]->numOfInMemPages); char *nextVal = thisVal + pMemBucket->bytes;
pMemBucket->numOfAvailPages += pSeg->pBuffer[j]->numOfInMemPages; double td = 1.0, nd = 1.0;
tExtMemBufferFlush(pSeg->pBuffer[j]); switch (pMemBucket->type) {
case TSDB_DATA_TYPE_SMALLINT: {
td = *(int16_t *)thisVal;
nd = *(int16_t *)nextVal;
break;
} }
case TSDB_DATA_TYPE_TINYINT: {
tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j]; td = *(int8_t *)thisVal;
pSeg->pBuffer[j] = NULL; nd = *(int8_t *)nextVal;
break;
// release all
for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt];
for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) {
if (pSeg->pBuffer && pSeg->pBuffer[ttx]) {
pSeg->pBuffer[ttx] = destoryExtMemBuffer(pSeg->pBuffer[ttx]);
}
}
} }
case TSDB_DATA_TYPE_INT: {
pMemBucket->nRange.i64MaxVal = pSeg->pBoundingEntries->i64MaxVal; td = *(int32_t *)thisVal;
pMemBucket->nRange.i64MinVal = pSeg->pBoundingEntries->i64MinVal; nd = *(int32_t *)nextVal;
pMemBucket->numOfElems = 0; break;
};
for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) { case TSDB_DATA_TYPE_FLOAT: {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt]; td = GET_FLOAT_VAL(thisVal);
for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) { nd = GET_FLOAT_VAL(nextVal);
if (pSeg->pBoundingEntries) { break;
resetBoundingBox(pSeg, pMemBucket->dataType); }
} case TSDB_DATA_TYPE_DOUBLE: {
} td = GET_DOUBLE_VAL(thisVal);
nd = GET_DOUBLE_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
td = (double)*(int64_t *)thisVal;
nd = (double)*(int64_t *)nextVal;
break;
} }
}
tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize); double val = (1 - fraction) * td + fraction * nd;
taosTFree(buffer);
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; return val;
assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); } else { // incur a second round bucket split
if (isIdenticalData(pMemBucket, i)) {
return getIdenticalDataVal(pMemBucket, i);
}
int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); // try next round
UNUSED(ret); pMemBucket->times += 1;
uDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times);
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { pMemBucket->range = pSlot->range;
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); pMemBucket->total = 0;
if (sz != pMemBuffer->pageSize) {
uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path);
} else {
tMemBucketPut(pMemBucket, pPage->data, (int32_t)pPage->num);
}
}
fclose(pMemBuffer->file); resetSlotInfo(pMemBucket);
if (unlink(pMemBuffer->path) != 0) {
uError("MemBucket:%p, remove tmp file %s failed", pMemBucket, pMemBuffer->path);
}
taosTFree(pMemBuffer);
taosTFree(pPage);
return getPercentileImpl(pMemBucket, count - num, fraction); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
} SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
} else { assert(list->size > 0);
num += pSeg->pBuffer[j]->numOfTotalElems;
for (int32_t f = 0; f < list->size; ++f) {
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
tFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo);
}
return getPercentileImpl(pMemBucket, count - num, fraction);
} }
} else {
num += pSlot->info.size;
} }
} }
return 0; return 0;
} }
double getPercentile(tMemBucket *pMemBucket, double percent) { double getPercentile(tMemBucket *pMemBucket, double percent) {
if (pMemBucket->numOfElems == 0) { if (pMemBucket->total == 0) {
return 0.0; return 0.0;
} }
if (pMemBucket->numOfElems == 1) { // return the only element // if only one elements exists, return it
if (pMemBucket->total == 1) {
return findOnlyResult(pMemBucket); return findOnlyResult(pMemBucket);
} }
percent = fabs(percent); percent = fabs(percent);
// validate the parameters // find the min/max value, no need to scan all data in bucket
if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) {
double minx = 0, maxx = 0; double minx = 0, maxx = 0;
/*
* find the min/max value, no need to scan all data in bucket
*/
findMaxMinValue(pMemBucket, &maxx, &minx); findMaxMinValue(pMemBucket, &maxx, &minx);
return fabs(percent - 100) < DBL_EPSILON ? maxx : minx; return fabs(percent - 100) < DBL_EPSILON ? maxx : minx;
} }
double percentVal = (percent * (pMemBucket->numOfElems - 1)) / ((double)100.0); double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0);
int32_t orderIdx = (int32_t)percentVal; int32_t orderIdx = (int32_t)percentVal;
// do put data by using buckets // do put data by using buckets
...@@ -934,19 +690,18 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { ...@@ -934,19 +690,18 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
} }
/* /*
* check if data in one slot are all identical * check if data in one slot are all identical only need to compare with the bounding box
* only need to compare with the bounding box
*/ */
bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { bool isIdenticalData(tMemBucket *pMemBucket, int32_t index) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; tMemBucketSlot *pSeg = &pMemBucket->pSlots[index];
if (pMemBucket->dataType == TSDB_DATA_TYPE_INT || pMemBucket->dataType == TSDB_DATA_TYPE_BIGINT || if (pMemBucket->type == TSDB_DATA_TYPE_INT || pMemBucket->type == TSDB_DATA_TYPE_BIGINT ||
pMemBucket->dataType == TSDB_DATA_TYPE_SMALLINT || pMemBucket->dataType == TSDB_DATA_TYPE_TINYINT) { pMemBucket->type == TSDB_DATA_TYPE_SMALLINT || pMemBucket->type == TSDB_DATA_TYPE_TINYINT) {
return pSeg->pBoundingEntries[slotIdx].i64MinVal == pSeg->pBoundingEntries[slotIdx].i64MaxVal; return pSeg->range.i64MinVal == pSeg->range.i64MaxVal;
} }
if (pMemBucket->dataType == TSDB_DATA_TYPE_FLOAT || pMemBucket->dataType == TSDB_DATA_TYPE_DOUBLE) { if (pMemBucket->type == TSDB_DATA_TYPE_FLOAT || pMemBucket->type == TSDB_DATA_TYPE_DOUBLE) {
return fabs(pSeg->pBoundingEntries[slotIdx].dMaxVal - pSeg->pBoundingEntries[slotIdx].dMinVal) < DBL_EPSILON; return fabs(pSeg->range.dMaxVal - pSeg->range.dMinVal) < DBL_EPSILON;
} }
return false; return false;
...@@ -956,24 +711,24 @@ bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { ...@@ -956,24 +711,24 @@ bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) {
* get the first element of one slot into memory. * get the first element of one slot into memory.
* if no data of current slot in memory, load it from disk * if no data of current slot in memory, load it from disk
*/ */
char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage) { char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage) {
tExtMemBuffer *pMemBuffer = pSeg->pBuffer[slotIdx]; // STSBuf *pMemBuffer = pSeg->pBuffer[slotIdx];
char * thisVal = NULL; char *thisVal = NULL;
if (pSeg->pBuffer[slotIdx]->numOfElemsInBuffer != 0) { // if (pSeg->pBuffer[slotIdx]->numOfTotal != 0) {
thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data; //// thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data;
} else { // } else {
/* // /*
* no data in memory, load one page into memory // * no data in memory, load one page into memory
*/ // */
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; // tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0];
assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); // assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize);
int32_t ret; // int32_t ret;
ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); // ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
UNUSED(ret); // UNUSED(ret);
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); // size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(sz); // UNUSED(sz);
thisVal = pPage->data; // thisVal = pPage->data;
} // }
return thisVal; return thisVal;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册