提交 db3cbbf2 编写于 作者: L Liu Jicong

fix(query): column match

上级 3ed1ab6a
......@@ -38,7 +38,8 @@
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info;
......@@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
"-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
......@@ -407,7 +410,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1;
// pInfo->prevGroupId = -1;
pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
......@@ -517,11 +520,11 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
taosArrayPush(pInfo->tsArray, ts+i);
taosArrayPush(pInfo->tsArray, ts + i);
}
}
if (taosArrayGetSize(pInfo->tsArray) > 0) {
//TODO(liuyao) get from tsdb
// TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
......@@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t* id = taosArrayGet(pColList, i);
taosArrayPush(pColIds, id);
SColMatchInfo* id = taosArrayGet(pColList, i);
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
}
pInfo->pColMatchInfo = pColList;
......@@ -678,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
return NULL;
}
pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan
if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -701,11 +705,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
_error:
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
......@@ -774,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
if (NULL == pCondition) {
return;
}
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName);
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
}
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
......@@ -854,12 +859,12 @@ static SSDataBlock* buildSysTableMetaBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0;
const SSysTableMeta *pMeta = NULL;
const SSysTableMeta* pMeta = NULL;
getInfosDbMeta(&pMeta, &size);
int32_t index = 0;
for(int32_t i = 0; i < size; ++i) {
if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
for (int32_t i = 0; i < size; ++i) {
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
index = i;
break;
}
......@@ -867,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < pMeta[index].colNum; ++i) {
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = {0};
colInfoData.info.colId = i + 1;
colInfoData.info.type = pMeta[index].schema[i].type;
......@@ -1101,17 +1106,18 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
// blockDataDestroy(p); todo handle memory leak
// blockDataDestroy(p); todo handle memory leak
pInfo->pRes->info.rows = p->info.rows;
return p->info.rows;
}
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) {
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName) {
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t numOfRows = p->info.rows;
for(int32_t i = 0; i < size; ++i) {
for (int32_t i = 0; i < size; ++i) {
const SSysTableMeta* pm = &pSysDbTableMeta[i];
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
......@@ -1132,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);
for(int32_t j = 4; j <= 8; ++j) {
for (int32_t j = 4; j <= 8; ++j) {
pColInfoData = taosArrayGet(p->pDataBlock, j);
colDataAppendNULL(pColInfoData, numOfRows);
}
......@@ -1171,7 +1177,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
tNameAssign(&pInfo->name, pName);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*) readHandle;
pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else {
tsem_init(&pInfo->ready, 0, 0);
......@@ -1207,8 +1213,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
......@@ -1355,7 +1361,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, SArray* pColMatchInfo,
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......
......@@ -152,7 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
blockDebugShowData(pRes);
/*blockDebugShowData(pRes);*/
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid);
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册