提交 cc0110d3 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 40cafb49
...@@ -174,7 +174,7 @@ bool isValidDataType(int32_t type); ...@@ -174,7 +174,7 @@ bool isValidDataType(int32_t type);
void setVardataNull(char* val, int32_t type); void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes); void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void* getNullValue(int32_t type); void *getNullValue(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
......
...@@ -86,7 +86,7 @@ typedef struct SResultRow { ...@@ -86,7 +86,7 @@ typedef struct SResultRow {
bool closed; // this result status: closed or opened bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window union {STimeWindow win; char* key;}; // start key of current result row
} SResultRow; } SResultRow;
typedef struct SGroupResInfo { typedef struct SGroupResInfo {
...@@ -131,7 +131,6 @@ typedef struct SSingleColumnFilterInfo { ...@@ -131,7 +131,6 @@ typedef struct SSingleColumnFilterInfo {
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; TSKEY lastKey;
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
tVariant tag; tVariant tag;
STimeWindow win; STimeWindow win;
STSCursor cur; STSCursor cur;
...@@ -409,6 +408,7 @@ typedef struct SFillOperatorInfo { ...@@ -409,6 +408,7 @@ typedef struct SFillOperatorInfo {
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t colIndex; int32_t colIndex;
char *prevData; // previous group by value
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
void freeParam(SQueryParam *param); void freeParam(SQueryParam *param);
......
...@@ -2167,12 +2167,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2167,12 +2167,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
// do nothing at the first stage // do nothing at the first stage
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) { if (pResInfo->hasResult != DATA_SET_FLAG) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->pOutput, pCtx->outputType);
} else {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
}
return; return;
} }
......
...@@ -190,9 +190,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); ...@@ -190,9 +190,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator);
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static bool isPointInterpoQuery(SQuery *pQuery); static bool isPointInterpoQuery(SQuery *pQuery);
...@@ -1283,27 +1282,38 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1283,27 +1282,38 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
} }
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pRuntimeEnv->pQuery->current;
STableQueryInfo* item = pQuery->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type; int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
qError("QInfo:%p group by not supported on double/float columns, abort", pRuntimeEnv->qinfo);
return;
}
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char *val = pColInfoData->pData + bytes * j; char* val = pColInfoData->pData + bytes * j;
if (isNull(val, type)) { // TODO: ignore the null value if (isNull(val, type)) {
continue; continue;
} }
// TODO compare with the previous value to speedup the query processing if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) {
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &pInfo->binfo.resultRowInfo, pInfo->binfo.pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->binfo.rowCellInfoOffset); if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
}
memcpy(pInfo->prevData, val, bytes);
int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
}
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here
...@@ -1315,25 +1325,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat ...@@ -1315,25 +1325,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
} }
} }
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* rowCellInfoOffset) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData;
int16_t len = bytes;
if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) {
d = varDataVal(pData);
len = varDataLen(pData);
} else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
SQInfo* pQInfo = pRuntimeEnv->qinfo;
qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo);
return -1;
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
assert (pResultRow != NULL);
int64_t v = -1; int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData); GET_TYPED_DATA(v, int64_t, type, pData);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
...@@ -1347,7 +1339,27 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow ...@@ -1347,7 +1339,27 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
pResultRow->win.skey = v; pResultRow->win.skey = v;
pResultRow->win.ekey = v; pResultRow->win.ekey = v;
} }
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo;
SQLFunctionCtx *pCtx = pInfo->binfo.pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData;
int16_t len = bytes;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
d = varDataVal(pData);
len = varDataLen(pData);
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize); int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize);
if (ret != 0) { if (ret != 0) {
...@@ -4654,7 +4666,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { ...@@ -4654,7 +4666,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
} }
static SSDataBlock* doHashGroupbyAgg(void* param) { static SSDataBlock* hashGroupbyAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -4688,16 +4700,14 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { ...@@ -4688,16 +4700,14 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock);
} }
hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); doHashGroupbyAgg(pOperator, pInfo, pBlock);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
// todo need finalize the result if (!pRuntimeEnv->pQuery->stableQuery) { // finalize include the update of result rows
if (!pRuntimeEnv->pQuery->stableQuery) {
// finalize include the update of result rows
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
} else { } else {
updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
...@@ -4830,6 +4840,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4830,6 +4840,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData);
} }
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -5000,7 +5011,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5000,7 +5011,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doHashGroupbyAgg; pOperator->exec = hashGroupbyAggregate;
pOperator->cleanup = destroyGroupbyOperatorInfo; pOperator->cleanup = destroyGroupbyOperatorInfo;
return pOperator; return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册