未验证 提交 88f13276 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #15121 from taosdata/feature/3_liaohj

fix(query): set the output column number for subscribe.
......@@ -64,7 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols);
/**
* Set the input data block for the stream scan.
......
......@@ -88,7 +88,7 @@ typedef struct {
STqExecTb execTb;
STqExecDb execDb;
};
int32_t numOfCols; // number of out pout column, temporarily used
} STqExecHandle;
typedef struct {
......
......@@ -506,7 +506,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true,
.version = ver,
};
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
......
......@@ -15,7 +15,7 @@
#include "tq.h"
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) {
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
......@@ -29,7 +29,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp)
// TODO enable compress
int32_t actualLen = 0;
blockEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false);
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pRsp->blockDataLen, &actualLen);
......@@ -87,7 +87,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
tqDebug("task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
......@@ -195,7 +195,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp);
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
......@@ -213,7 +213,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp);
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
......
......@@ -92,7 +92,8 @@ int32_t tqMetaOpen(STQ* pTq) {
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols);
ASSERT(handle.execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner);
......
......@@ -1961,9 +1961,11 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
SDelIdx idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);
code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
if (pIdx != NULL) {
code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
}
}
......
......@@ -159,27 +159,28 @@ typedef struct {
int64_t recoverEndVer;
} SStreamTaskInfo;
typedef struct {
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
} SSchemaInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
SStreamTaskInfo streamInfo;
struct {
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
} schemaVer;
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
......@@ -248,13 +249,13 @@ typedef struct SLoadRemoteDataInfo {
} SLoadRemoteDataInfo;
typedef struct SLimitInfo {
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo {
......
......@@ -227,14 +227,14 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i);
for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId &&
for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId &&
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->targetSlotId] = -1;
break;
}
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) {
if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) {
(*pSlotIds)[pColMatch->targetSlotId] = j;
break;
}
......
......@@ -67,15 +67,15 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
return false;
}
// clang-format off
// data format:
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2
// length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// actual size | |
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// | |sizeof(int32) |sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
// clang-format on
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = 0;
SNode* pNode;
......
......@@ -104,27 +104,40 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers) {
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols) {
if (msg == NULL) {
// TODO create raw scan
return NULL;
}
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(msg, &plan);
struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
}
// extract the number of output columns
SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
*numOfCols = 0;
SNode* pNode;
FOREACH(pNode, pDescNode->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
++(*numOfCols);
}
}
return pTaskInfo;
}
......@@ -135,17 +148,18 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
/*qDebugL("stream task string %s", (const char*)msg);*/
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(msg, &plan);
struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
}
......@@ -227,19 +241,19 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo->schemaVer.sw == NULL) {
if (pTaskInfo->schemaInfo.sw == NULL) {
return TSDB_CODE_SUCCESS;
}
*sversion = pTaskInfo->schemaVer.sw->version;
*tversion = pTaskInfo->schemaVer.tversion;
if (pTaskInfo->schemaVer.dbname) {
strcpy(dbName, pTaskInfo->schemaVer.dbname);
*sversion = pTaskInfo->schemaInfo.sw->version;
*tversion = pTaskInfo->schemaInfo.tversion;
if (pTaskInfo->schemaInfo.dbname) {
strcpy(dbName, pTaskInfo->schemaInfo.dbname);
} else {
dbName[0] = 0;
}
if (pTaskInfo->schemaVer.tablename) {
strcpy(tableName, pTaskInfo->schemaVer.tablename);
if (pTaskInfo->schemaInfo.tablename) {
strcpy(tableName, pTaskInfo->schemaInfo.tablename);
} else {
tableName[0] = 0;
}
......
......@@ -39,6 +39,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
taosThreadOnce(&initPoolOnce, initRefPool);
atexit(cleanupRefPool);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......
......@@ -4123,7 +4123,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->schemaVer.dbname = strdup(dbFName);
pTaskInfo->schemaInfo.dbname = strdup(dbFName);
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model;
......@@ -4149,35 +4149,35 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
return terrno;
}
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
pTaskInfo->schemaInfo.tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
} else {
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
}
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
if (pTaskInfo->schemaVer.sw == NULL) {
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname);
if (pSchemaInfo->sw == NULL) {
return;
}
taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
taosMemoryFree(pTaskInfo->schemaVer.sw);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
taosMemoryFree(pSchemaInfo->tablename);
taosMemoryFree(pSchemaInfo->sw->pSchema);
taosMemoryFree(pSchemaInfo->sw);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
......@@ -4935,6 +4935,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) {
......@@ -4973,7 +4974,9 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(pTaskInfo);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
......
......@@ -1543,6 +1543,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pRecycledPages);
blockDataDestroy(pInfo->pUpdateRes);
if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
......
......@@ -150,7 +150,6 @@ typedef struct SQWTaskCtx {
void *taskHandle;
void *sinkHandle;
SSubplan *plan;
STbVerInfo tbInfo;
} SQWTaskCtx;
......
......@@ -306,11 +306,6 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL;
}
if (ctx->plan) {
nodesDestroyNode((SNode*)ctx->plan);
ctx->plan = NULL;
}
}
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
......@@ -327,7 +322,6 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
atomic_store_ptr(&ctx->taskHandle, NULL);
atomic_store_ptr(&ctx->sinkHandle, NULL);
atomic_store_ptr(&ctx->plan, NULL);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
......
......@@ -522,8 +522,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
QW_ERR_JRET(code);
}
ctx->plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
......@@ -928,8 +926,6 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
QW_ERR_JRET(code);
}
ctx.plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册