提交 3fc7fa3b 编写于 作者: G Ganlin Zhao

feat(query): add sample function work with selectivity

上级 55a79f25
...@@ -195,13 +195,13 @@ typedef struct SMavgInfo { ...@@ -195,13 +195,13 @@ typedef struct SMavgInfo {
} SMavgInfo; } SMavgInfo;
typedef struct SSampleInfo { typedef struct SSampleInfo {
int32_t samples; int32_t samples;
int32_t totalPoints; int32_t totalPoints;
int32_t numSampled; int32_t numSampled;
uint8_t colType; uint8_t colType;
int16_t colBytes; int16_t colBytes;
char* data; char* data;
int64_t* timestamp; STuplePos* tuplePos;
} SSampleInfo; } SSampleInfo;
typedef struct STailItem { typedef struct STailItem {
...@@ -4350,7 +4350,7 @@ bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -4350,7 +4350,7 @@ bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
int32_t numOfSamples = pVal->datum.i; int32_t numOfSamples = pVal->datum.i;
pEnv->calcMemSize = sizeof(SSampleInfo) + numOfSamples * (pCol->node.resType.bytes + sizeof(int64_t)); pEnv->calcMemSize = sizeof(SSampleInfo) + numOfSamples * (pCol->node.resType.bytes + sizeof(STuplePos));
return true; return true;
} }
...@@ -4371,25 +4371,30 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) ...@@ -4371,25 +4371,30 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
return false; return false;
} }
pInfo->data = (char*)pInfo + sizeof(SSampleInfo); pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
pInfo->timestamp = (int64_t*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes); pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
return true; return true;
} }
static void sampleAssignResult(SSampleInfo* pInfo, char* data, TSKEY ts, int32_t index) { static void sampleAssignResult(SSampleInfo* pInfo, char* data, int32_t index) {
assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType); assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType);
*(pInfo->timestamp + index) = ts;
} }
static void doReservoirSample(SSampleInfo* pInfo, char* data, TSKEY ts, int32_t index) { static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) {
pInfo->totalPoints++; pInfo->totalPoints++;
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, ts, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, pInfo->tuplePos + pInfo->numSampled * sizeof(STuplePos));
}
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
int32_t j = taosRand() % (pInfo->totalPoints); int32_t j = taosRand() % (pInfo->totalPoints);
if (j < pInfo->samples) { if (j < pInfo->samples) {
sampleAssignResult(pInfo, data, ts, j); sampleAssignResult(pInfo, data, j);
if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, index, pCtx->pSrcBlock, pInfo->tuplePos + j * sizeof(STuplePos));
}
} }
} }
} }
...@@ -4400,11 +4405,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4400,11 +4405,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = NULL;
if (pInput->pPTS != NULL) {
tsList = (int64_t*)pInput->pPTS->pData;
}
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_s(pInputCol, i)) { if (colDataIsNull_s(pInputCol, i)) {
...@@ -4412,7 +4412,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4412,7 +4412,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
doReservoirSample(pInfo, data, /*tsList[i]*/ 0, i); doReservoirSample(pCtx, pInfo, data, i);
} }
SET_VAL(pResInfo, pInfo->numSampled, pInfo->numSampled); SET_VAL(pResInfo, pInfo->numSampled, pInfo->numSampled);
...@@ -4431,6 +4431,7 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -4431,6 +4431,7 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pInfo->numSampled; ++i) { for (int32_t i = 0; i < pInfo->numSampled; ++i) {
colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false);
setSelectivityValue(pCtx, pBlock, pInfo->tuplePos + i * sizeof(STuplePos), currentRow + i);
} }
return pInfo->numSampled; return pInfo->numSampled;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册