提交 7f0077ca 编写于 作者: H Haojun Liao

feat(query): a new builtin aggregate function is added.

上级 1c94bbb5
...@@ -819,6 +819,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -819,6 +819,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
continue; continue;
} }
numOfElems++;
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
...@@ -828,9 +830,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -828,9 +830,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
break;
} }
numOfElems++;
} }
} else { } else {
// in case of descending order time stamp serial, which usually happens as the results of the nest query, // in case of descending order time stamp serial, which usually happens as the results of the nest query,
...@@ -847,6 +848,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -847,6 +848,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
continue; continue;
} }
numOfElems++;
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
...@@ -855,9 +858,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -855,9 +858,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
*(TSKEY*)(buf + bytes) = cts; *(TSKEY*)(buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
break;
} }
numOfElems++;
} }
} }
...@@ -874,43 +876,55 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { ...@@ -874,43 +876,55 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t bytes = pInputCol->info.bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true); ASSERT(pInputCol->hasNull == true);
return 0; return 0;
} }
if (pCtx->order == TSDB_ORDER_DESC) { SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue; continue;
} }
char* data = colDataGetData(pInputCol, i);
memcpy(buf, data, pInputCol->info.bytes);
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->complete = true; // set query completed on this column
numOfElems++; numOfElems++;
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
}
break; break;
} }
} else { // ascending order } else { // descending order
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue; continue;
} }
char* data = colDataGetData(pInputCol, i); numOfElems++;
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
if (pResInfo->numOfRes == 0 || (*(TSKEY*)buf) < ts) { char* data = colDataGetData(pInputCol, i);
memcpy(buf, data, pCtx->inputBytes); TSKEY cts = getRowPTs(pInput->pPTS, i);
*(TSKEY*)buf = ts; if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
memcpy(buf, data, bytes);
*(TSKEY*)(buf + bytes) = cts;
pResInfo->numOfRes = 1;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
numOfElems++;
break; break;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册