提交 5efd5a3b 编写于 作者: H Haojun Liao

fix(query):adjust api for retrieve lastrow in executor.

上级 18afc81a
......@@ -132,7 +132,10 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLis
void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t type, SSDataBlock* pResBlock);
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader);
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds);
int32_t tsdbLastrowReaderClose(void* pReader);
// tq
......
......@@ -18,38 +18,116 @@
#include "tcommon.h"
#include "tsdb.h"
// todo parse the stsrow and set the results
static void keepOneRow(const STSRow* pRow, SSDataBlock* pBlock) {
int32_t rowIndex = pBlock->info.rows;
typedef struct SLastrowReader {
SVnode* pVnode;
STSchema* pSchema;
uint64_t uid;
// int32_t* pSlotIds;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list
} SLastrowReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) {
int32_t numOfRows = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
SColVal colVal = {0};
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
// todo extract the value of specified column id from STSRow
const char* p = NULL;
colDataAppend(pColInfoData, rowIndex, p, false);
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
} else {
tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (colVal.isNull) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[i], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull);
}
}
}
pBlock->info.rows += 1;
}
int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t type, SSDataBlock* pResBlock) {
if (pVnode == NULL || pTableIdList == NULL || pResBlock == NULL) {
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->type = type;
p->pVnode = pVnode;
p->numOfCols = numOfCols;
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
p->pTableList = pTableIdList;
#if 0
for(int32_t i = 0; i < p->numOfCols; ++i) {
for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) {
if (colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
p->pSlotIds[i] = -1;
break;
}
if (colId[i] == p->pSchema->columns[j].colId) {
p->pSlotIds[i] = j;
break;
}
}
if (IS_VAR_DATA_TYPE(colId[i])) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[p->pSlotIds[i]].bytes);
}
}
#endif
*pReader = p;
return TSDB_CODE_SUCCESS;
}
int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader;
for(int32_t i = 0; i < p->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]);
}
taosMemoryFree(p->transferBuf);
taosMemoryFree(pReader);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVnode* pv = pVnode;
SLastrowReader* pr = pReader;
STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pTableIdList);
size_t numOfTables = taosArrayGetSize(pr->pTableList);
// retrieve the only one last row of all tables in the uid list.
if (type == LASTROW_RETRIEVE_TYPE_SINGLE) {
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, i);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, pKeyInfo->uid, pv->pTsdb, &pRow);
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -65,16 +143,16 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty
pResBlock->info.rows -= 1;
}
keepOneRow(pRow, pResBlock);
saveOneRow(pRow, pResBlock, pr, slotIds);
internalResult = true;
lastKey = pRow->ts;
}
}
} else if (type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, i);
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, pKeyInfo->uid, pv->pTsdb, &pRow);
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -84,7 +162,12 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty
continue;
}
keepOneRow(pRow, pResBlock);
saveOneRow(pRow, pResBlock, pr, slotIds);
pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
}
}
} else {
return TSDB_CODE_INVALID_PARA;
......
......@@ -142,8 +142,8 @@ typedef struct SExecTaskInfo {
struct {
char *tablename;
char *dbname;
int32_t sversion;
int32_t tversion;
SSchemaWrapper*sw;
} schemaVer;
STableListInfo tableqinfoList; // this is a table list
......@@ -296,6 +296,9 @@ typedef struct SLastrowScanInfo {
SSDataBlock *pRes;
SArray *pTableList;
SReadHandle readHandle;
void *pLastrowReader;
SArray *pColMatchInfo;
int32_t *pSlotIds;
} SLastrowScanInfo;
typedef enum EStreamScanMode {
......
......@@ -187,7 +187,7 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
*sversion = pTaskInfo->schemaVer.sversion;
*sversion = pTaskInfo->schemaVer.sw->version;
*tversion = pTaskInfo->schemaVer.tversion;
if (pTaskInfo->schemaVer.dbname) {
strcpy(dbName, pTaskInfo->schemaVer.dbname);
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include <vnode.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......@@ -2803,7 +2805,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) {
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
*order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
......@@ -3886,15 +3888,15 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version;
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version;
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
} else {
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schemaRow.version;
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
}
metaReaderClear(&mr);
......@@ -4177,9 +4179,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL;
// }
int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo);
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
int32_t code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList);
code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
......
......@@ -13,13 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "executorimpl.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "systable.h"
#include "tglobal.h"
#include "tname.h"
#include "ttime.h"
......@@ -33,8 +32,6 @@
#include "ttypes.h"
#include "vnode.h"
#include "executorInt.h"
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
......@@ -2537,7 +2534,8 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
// check if it is a group by tbname
if (size == taosArrayGetSize(pInfo->pTableList)) {
tsdbRetrieveLastRow(pInfo->readHandle.vnode, pInfo->pTableList, LASTROW_RETRIEVE_TYPE_ALL, pInfo->pRes);
blockDataCleanup(pInfo->pRes);
tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds);
return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes;
} else {
//todo fetch the result for each group
......@@ -2550,9 +2548,10 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*) param;
blockDataDestroy(pInfo->pRes);
tsdbLastrowReaderClose(pInfo->pLastrowReader);
}
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SArray* pTableList, SExecTaskInfo* pTaskInfo) {
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -2562,7 +2561,34 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode,
pInfo->pTableList = pTableList;
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pTableScanNode->node.pOutputDataBlockDesc);
pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID);
int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t));
for(int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
pCols[i] = pColMatch->colId;
}
pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0]));
for(int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
for(int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pInfo->pSlotIds[pColMatch->targetSlotId] = -1;
break;
}
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) {
pInfo->pSlotIds[pColMatch->targetSlotId] = j;
break;
}
}
}
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols, &pInfo->pLastrowReader);
taosMemoryFree(pCols);
pOperator->name = "LastrowScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
......@@ -2570,8 +2596,10 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode,
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
initResultSizeInfo(pOperator, 1024);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator,
......
......@@ -106,6 +106,8 @@ bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx *pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t lastrowFunction(SqlFunctionCtx* pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
......
......@@ -1873,11 +1873,11 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateLastRow,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastrowFunction,
.finalizeFunc = firstLastFinalize
},
{
.name = "first",
......
......@@ -5503,3 +5503,43 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
}
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册