提交 77db8bf1 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 7153c8e4
......@@ -262,7 +262,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
*/
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle);
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type);
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SDataBlockInfo* blockInfo);
/**
* Get current data block information
......
......@@ -345,29 +345,28 @@ typedef struct SQueryParam {
typedef struct STableScanInfo {
SQueryRuntimeEnv *pRuntimeEnv;
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
int32_t order; // scan order
int32_t times; // repeat counts
int32_t current;
int32_t reverseTimes; // 0 by default
SSDataBlock block;
SQLFunctionCtx *pCtx; // next operator query context
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
int32_t order; // scan order
int32_t times; // repeat counts
int32_t current;
int32_t reverseTimes; // 0 by default
SQLFunctionCtx *pCtx; // next operator query context
SResultRowInfo *pResultRowInfo;
int32_t *rowCellInfoOffset;
SExprInfo *pExpr;
SSDataBlock block;
bool loadExternalRows; // load external rows (prev & next rows)
bool externalLoaded; // external rows loaded
int32_t numOfOutput;
int64_t elapsedTime;
int32_t tableIndex;
int32_t tableIndex;
} STableScanInfo;
typedef struct STagScanInfo {
......
......@@ -4087,38 +4087,32 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) {
*(TSKEY *) pCtx->pOutput = pCtx->startTs;
} else {
if (pCtx->start.key == INT64_MIN) {
assert(pCtx->end.key == INT64_MIN);
return;
}
if (type == TSDB_FILL_NULL) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
} else if (type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
} else if (type == TSDB_FILL_PREV) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val);
} else {
assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
}
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->end.val);
} else {
assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->outputBytes, pCtx->inputType);
}
char* d = GET_INPUT_DATA(pCtx, 1);
assignVal(pCtx->pOutput, d, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_LINEAR) {
SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val};
SPoint point2 = {.key = pCtx->end.key, .val = &pCtx->end.val};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
char* start = GET_INPUT_DATA(pCtx, 0);
char* end = GET_INPUT_DATA(pCtx, 1);
TSKEY skey = GET_TS_DATA(pCtx, 0);
TSKEY ekey = GET_TS_DATA(pCtx, 1);
SPoint point1 = {.key = skey, .val = start};
SPoint point2 = {.key = ekey, .val = end };
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (IS_NUMERIC_TYPE(srcType)) { // TODO should find the not null data?
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
if (isNull(start, srcType) || isNull(end, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, srcType);
}
} else {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
......@@ -4127,8 +4121,8 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
}
SET_VAL(pCtx, 1, 1);
}
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
if (pCtx->size > 0) {
......
......@@ -172,7 +172,7 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery);
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
......@@ -1842,7 +1842,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
if (pQuery->fillType != TSDB_FILL_NONE) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput);
}
......@@ -2430,8 +2430,8 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool asc
return TS_JOIN_TS_EQUAL;
}
void filterDataBlock_rv(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo *pFilterInfo,
int32_t numOfFilterCols, SSDataBlock* pBlock, STSBuf* pTsBuf, bool ascQuery) {
void filterDataBlock_rv(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
SSDataBlock* pBlock, STSBuf* pTsBuf, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t *p = calloc(numOfRows, sizeof(int8_t));
......@@ -4153,17 +4153,26 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static void setTableQueryHandle(SQueryRuntimeEnv* pRuntimeEnv, int32_t tableIndex) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
// handle the first table
STableQueryInfo* pCheckInfo = taosArrayGetP(group, tableIndex);
int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
STableQueryInfo* pCheckInfo = NULL;
if (numOfGroup == 1) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
pCheckInfo = taosArrayGetP(group, tableIndex);
} else {
assert(numOfGroup == pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, tableIndex);
pCheckInfo = taosArrayGetP(group, 0);
}
// handle the first table
STsdbQueryCond cond = {
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
.loadExternalRows = isPointInterpoQuery(pQuery),
};
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
......@@ -4198,7 +4207,6 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
cond.loadExternalRows = isPointInterpoQuery(pQuery);
if (!isSTableQuery
&& (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)
......@@ -4307,7 +4315,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery));
} else if (needReverseScan(pQuery)) {
pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else {
......@@ -4427,8 +4435,8 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
}
static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
SSDataBlock *pBlock = &pTableScanInfo->block;
SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery;
SSDataBlock* pBlock = &pTableScanInfo->block;
SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery;
STableGroupInfo* pTableGroupInfo = &pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo;
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
......@@ -4438,7 +4446,8 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) {
STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid));
STableQueryInfo** pTableQueryInfo =
(STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid));
if (pTableQueryInfo == NULL) {
break;
}
......@@ -4459,9 +4468,27 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
continue;
}
if (pTableScanInfo->loadExternalRows) {
pTableScanInfo->externalLoaded = true;
}
return pBlock;
}
if (pTableScanInfo->loadExternalRows && (!pTableScanInfo->externalLoaded)) {
pBlock->pDataBlock = tsdbGetExternalRow(pTableScanInfo->pQueryHandle, &pBlock->info);
pTableScanInfo->externalLoaded = true;
if (pBlock->pDataBlock != NULL) {
STableQueryInfo** pTableQueryInfo =
(STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid));
assert(*pTableQueryInfo != NULL);
pQuery->current = *pTableQueryInfo;
}
return (pBlock->pDataBlock != NULL)? pBlock:NULL;
}
return NULL;
}
......@@ -4537,7 +4564,7 @@ static SSDataBlock* doTableScan(void* param) {
return NULL;
}
static SSDataBlock* doSeqTableBlockScan(void* param) {
static SSDataBlock* doSeqTableBlocksScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
STableScanInfo *pTableScanInfo = pOperator->info;
......@@ -4558,6 +4585,7 @@ static SSDataBlock* doSeqTableBlockScan(void* param) {
setTableQueryHandle(pRuntimeEnv, pTableScanInfo->tableIndex);
pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle;
pTableScanInfo->externalLoaded = false;
}
}
......@@ -4584,7 +4612,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
return pOperator;
}
SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
......@@ -4594,6 +4622,7 @@ SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRun
pInfo->current = 0;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->loadExternalRows = loadExternalRows;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockSeqScan";
......@@ -4601,7 +4630,7 @@ SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRun
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doSeqTableBlockScan;
pOperator->exec = doSeqTableBlocksScan;
return pOperator;
}
......
......@@ -484,6 +484,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
tsdbCleanupQueryHandle(pQueryHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
}
pQueryHandle->prev = doFreeColumnInfoData(pQueryHandle->prev);
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
......@@ -2445,10 +2448,37 @@ out_of_memory:
return terrno;
}
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type) {
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SDataBlockInfo* blockInfo) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
assert(type == TSDB_PREV_ROW || type == TSDB_NEXT_ROW);
return (type == TSDB_PREV_ROW)? pQueryHandle->prev:pQueryHandle->next;
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, 0);
blockInfo->tid = pCheckInfo->tableId.tid;
blockInfo->uid = pCheckInfo->tableId.uid;
blockInfo->numOfCols = numOfCols;
if (pQueryHandle->prev == NULL || pQueryHandle->next == NULL) {
blockInfo->rows = 0;
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i);
SColumnInfoData* first = taosArrayGet(pQueryHandle->prev, i);
memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);
SColumnInfoData* sec = taosArrayGet(pQueryHandle->next, i);
memcpy(pColInfoData->pData + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
blockInfo->window.skey = *(TSKEY*)pColInfoData->pData;
blockInfo->window.ekey = *(TSKEY*)(pColInfoData->pData + TSDB_KEYSIZE);
}
}
blockInfo->rows = 2;
return pQueryHandle->pColumns;
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册